durable_execution_sdk_testing/checkpoint_server/
execution_manager.rs1use std::collections::HashMap;
7
8use uuid::Uuid;
9
10use super::checkpoint_manager::{CheckpointManager, OperationEvents};
11use super::checkpoint_token::{
12 decode_checkpoint_token, encode_checkpoint_token, CheckpointTokenData,
13};
14use super::event_processor::{EventType, HistoryEvent};
15use super::types::{
16 CompleteInvocationRequest, ExecutionId, InvocationId, StartDurableExecutionRequest,
17 StartInvocationRequest,
18};
19use crate::error::TestError;
20
21#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
23pub struct InvocationResult {
24 pub checkpoint_token: String,
26 pub execution_id: ExecutionId,
28 pub invocation_id: InvocationId,
30 pub operation_events: Vec<OperationEvents>,
32}
33
34#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
36pub struct CompleteInvocationResponse {
37 pub event: HistoryEvent,
39 pub has_dirty_operations: bool,
41}
42
43#[derive(Debug, Clone)]
45pub struct StartExecutionParams {
46 pub execution_id: ExecutionId,
48 pub invocation_id: InvocationId,
50 pub payload: Option<String>,
52}
53
54#[derive(Debug, Default)]
56pub struct ExecutionManager {
57 executions: HashMap<ExecutionId, CheckpointManager>,
59}
60
61impl ExecutionManager {
62 pub fn new() -> Self {
64 Self {
65 executions: HashMap::new(),
66 }
67 }
68
69 pub fn start_execution(&mut self, params: StartExecutionParams) -> InvocationResult {
71 let execution_id = params.execution_id;
72 let invocation_id = params.invocation_id;
73 let payload = params.payload.unwrap_or_else(|| "{}".to_string());
74
75 let mut checkpoint_manager = CheckpointManager::new(&execution_id);
76
77 let initial_operation = checkpoint_manager.initialize(&payload);
79
80 checkpoint_manager.start_invocation(&invocation_id);
82
83 let checkpoint_token = encode_checkpoint_token(&CheckpointTokenData {
85 execution_id: execution_id.clone(),
86 token: Uuid::new_v4().to_string(),
87 invocation_id: invocation_id.clone(),
88 });
89
90 self.executions
91 .insert(execution_id.clone(), checkpoint_manager);
92
93 InvocationResult {
94 checkpoint_token,
95 execution_id,
96 invocation_id,
97 operation_events: vec![initial_operation],
98 }
99 }
100
101 pub fn start_execution_from_request(
103 &mut self,
104 request: StartDurableExecutionRequest,
105 ) -> InvocationResult {
106 let execution_id = Uuid::new_v4().to_string();
107 self.start_execution(StartExecutionParams {
108 execution_id,
109 invocation_id: request.invocation_id,
110 payload: request.payload,
111 })
112 }
113
114 pub fn start_invocation(
116 &mut self,
117 params: StartInvocationRequest,
118 ) -> Result<InvocationResult, TestError> {
119 let checkpoint_manager = self
120 .executions
121 .get_mut(¶ms.execution_id)
122 .ok_or_else(|| TestError::ExecutionNotFound(params.execution_id.clone()))?;
123
124 if checkpoint_manager.is_execution_completed() {
125 return Err(TestError::ExecutionNotFound(format!(
126 "Execution {} is already completed",
127 params.execution_id
128 )));
129 }
130
131 let operation_events = checkpoint_manager.start_invocation(¶ms.invocation_id);
132
133 let checkpoint_token = encode_checkpoint_token(&CheckpointTokenData {
134 execution_id: params.execution_id.clone(),
135 token: Uuid::new_v4().to_string(),
136 invocation_id: params.invocation_id.clone(),
137 });
138
139 Ok(InvocationResult {
140 checkpoint_token,
141 execution_id: params.execution_id,
142 invocation_id: params.invocation_id,
143 operation_events,
144 })
145 }
146
147 pub fn complete_invocation(
149 &mut self,
150 request: CompleteInvocationRequest,
151 ) -> Result<CompleteInvocationResponse, TestError> {
152 let checkpoint_manager = self
153 .executions
154 .get_mut(&request.execution_id)
155 .ok_or_else(|| TestError::ExecutionNotFound(request.execution_id.clone()))?;
156
157 let timestamps = checkpoint_manager.complete_invocation(&request.invocation_id)?;
158
159 let event = HistoryEvent {
161 id: checkpoint_manager.event_processor().event_count() + 1,
162 event_type: EventType::InvocationCompleted,
163 timestamp: timestamps.end_timestamp,
164 operation_id: None,
165 details_type: "InvocationCompletedDetails".to_string(),
166 details: serde_json::json!({
167 "start_timestamp": timestamps.start_timestamp,
168 "end_timestamp": timestamps.end_timestamp,
169 "error": request.error,
170 "request_id": request.invocation_id,
171 }),
172 };
173
174 Ok(CompleteInvocationResponse {
175 event,
176 has_dirty_operations: checkpoint_manager.has_dirty_operations(),
177 })
178 }
179
180 pub fn get_checkpoints_by_execution(&self, execution_id: &str) -> Option<&CheckpointManager> {
182 self.executions.get(execution_id)
183 }
184
185 pub fn get_checkpoints_by_execution_mut(
187 &mut self,
188 execution_id: &str,
189 ) -> Option<&mut CheckpointManager> {
190 self.executions.get_mut(execution_id)
191 }
192
193 pub fn get_checkpoints_by_token(
195 &self,
196 token: &str,
197 ) -> Result<Option<&CheckpointManager>, TestError> {
198 let token_data = decode_checkpoint_token(token)?;
199 Ok(self.executions.get(&token_data.execution_id))
200 }
201
202 pub fn get_checkpoints_by_token_mut(
204 &mut self,
205 token: &str,
206 ) -> Result<Option<&mut CheckpointManager>, TestError> {
207 let token_data = decode_checkpoint_token(token)?;
208 Ok(self.executions.get_mut(&token_data.execution_id))
209 }
210
211 pub fn get_execution_ids(&self) -> Vec<&ExecutionId> {
213 self.executions.keys().collect()
214 }
215
216 pub fn has_execution(&self, execution_id: &str) -> bool {
218 self.executions.contains_key(execution_id)
219 }
220
221 pub fn remove_execution(&mut self, execution_id: &str) -> Option<CheckpointManager> {
223 self.executions.remove(execution_id)
224 }
225
226 pub fn clear(&mut self) {
228 self.executions.clear();
229 }
230}
231
232#[cfg(test)]
233mod tests {
234 use super::*;
235
236 #[test]
237 fn test_start_execution() {
238 let mut manager = ExecutionManager::new();
239
240 let result = manager.start_execution(StartExecutionParams {
241 execution_id: "exec-1".to_string(),
242 invocation_id: "inv-1".to_string(),
243 payload: Some(r#"{"test": true}"#.to_string()),
244 });
245
246 assert_eq!(result.execution_id, "exec-1");
247 assert_eq!(result.invocation_id, "inv-1");
248 assert!(!result.checkpoint_token.is_empty());
249 assert_eq!(result.operation_events.len(), 1);
250 }
251
252 #[test]
253 fn test_start_execution_from_request() {
254 let mut manager = ExecutionManager::new();
255
256 let result = manager.start_execution_from_request(StartDurableExecutionRequest {
257 invocation_id: "inv-1".to_string(),
258 payload: Some("{}".to_string()),
259 });
260
261 assert!(!result.execution_id.is_empty());
262 assert_eq!(result.invocation_id, "inv-1");
263 }
264
265 #[test]
266 fn test_start_invocation() {
267 let mut manager = ExecutionManager::new();
268
269 manager.start_execution(StartExecutionParams {
270 execution_id: "exec-1".to_string(),
271 invocation_id: "inv-1".to_string(),
272 payload: None,
273 });
274
275 manager
277 .complete_invocation(CompleteInvocationRequest {
278 execution_id: "exec-1".to_string(),
279 invocation_id: "inv-1".to_string(),
280 error: None,
281 })
282 .unwrap();
283
284 let result = manager
286 .start_invocation(StartInvocationRequest {
287 execution_id: "exec-1".to_string(),
288 invocation_id: "inv-2".to_string(),
289 })
290 .unwrap();
291
292 assert_eq!(result.execution_id, "exec-1");
293 assert_eq!(result.invocation_id, "inv-2");
294 }
295
296 #[test]
297 fn test_start_invocation_unknown_execution() {
298 let mut manager = ExecutionManager::new();
299
300 let result = manager.start_invocation(StartInvocationRequest {
301 execution_id: "unknown".to_string(),
302 invocation_id: "inv-1".to_string(),
303 });
304
305 assert!(matches!(result, Err(TestError::ExecutionNotFound(_))));
306 }
307
308 #[test]
309 fn test_complete_invocation() {
310 let mut manager = ExecutionManager::new();
311
312 manager.start_execution(StartExecutionParams {
313 execution_id: "exec-1".to_string(),
314 invocation_id: "inv-1".to_string(),
315 payload: None,
316 });
317
318 let response = manager
319 .complete_invocation(CompleteInvocationRequest {
320 execution_id: "exec-1".to_string(),
321 invocation_id: "inv-1".to_string(),
322 error: None,
323 })
324 .unwrap();
325
326 assert_eq!(response.event.event_type, EventType::InvocationCompleted);
327 }
328
329 #[test]
330 fn test_get_checkpoints_by_token() {
331 let mut manager = ExecutionManager::new();
332
333 let result = manager.start_execution(StartExecutionParams {
334 execution_id: "exec-1".to_string(),
335 invocation_id: "inv-1".to_string(),
336 payload: None,
337 });
338
339 let checkpoint_manager = manager
340 .get_checkpoints_by_token(&result.checkpoint_token)
341 .unwrap()
342 .unwrap();
343
344 assert_eq!(checkpoint_manager.execution_id(), "exec-1");
345 }
346}