Skip to main content

durable_execution_sdk_testing/checkpoint_server/
execution_manager.rs

1//! Execution manager for managing the state of all executions.
2//!
3//! This module implements the ExecutionManager which manages the state of all
4//! executions, with each execution having its own CheckpointManager.
5
6use std::collections::HashMap;
7
8use uuid::Uuid;
9
10use super::checkpoint_manager::{CheckpointManager, OperationEvents};
11use super::checkpoint_token::{
12    decode_checkpoint_token, encode_checkpoint_token, CheckpointTokenData,
13};
14use super::event_processor::{EventType, HistoryEvent};
15use super::types::{
16    CompleteInvocationRequest, ExecutionId, InvocationId, StartDurableExecutionRequest,
17    StartInvocationRequest,
18};
19use crate::error::TestError;
20
21/// Result of starting an invocation.
22#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
23pub struct InvocationResult {
24    /// The checkpoint token for this invocation
25    pub checkpoint_token: String,
26    /// The execution ID
27    pub execution_id: ExecutionId,
28    /// The invocation ID
29    pub invocation_id: InvocationId,
30    /// The operation events for this execution
31    pub operation_events: Vec<OperationEvents>,
32}
33
34/// Response from completing an invocation.
35#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
36pub struct CompleteInvocationResponse {
37    /// The history event for the invocation completion
38    pub event: HistoryEvent,
39    /// Whether there are dirty operations
40    pub has_dirty_operations: bool,
41}
42
43/// Parameters for starting an execution.
44#[derive(Debug, Clone)]
45pub struct StartExecutionParams {
46    /// The execution ID
47    pub execution_id: ExecutionId,
48    /// The invocation ID
49    pub invocation_id: InvocationId,
50    /// Optional payload
51    pub payload: Option<String>,
52}
53
54/// Manages all execution states.
55#[derive(Debug, Default)]
56pub struct ExecutionManager {
57    /// Map of execution ID to checkpoint manager
58    executions: HashMap<ExecutionId, CheckpointManager>,
59}
60
61impl ExecutionManager {
62    /// Create a new execution manager.
63    pub fn new() -> Self {
64        Self {
65            executions: HashMap::new(),
66        }
67    }
68
69    /// Start a new execution.
70    pub fn start_execution(&mut self, params: StartExecutionParams) -> InvocationResult {
71        let execution_id = params.execution_id;
72        let invocation_id = params.invocation_id;
73        let payload = params.payload.unwrap_or_else(|| "{}".to_string());
74
75        let mut checkpoint_manager = CheckpointManager::new(&execution_id);
76
77        // Initialize with the first operation
78        let initial_operation = checkpoint_manager.initialize(&payload);
79
80        // Start the invocation
81        checkpoint_manager.start_invocation(&invocation_id);
82
83        // Generate checkpoint token
84        let checkpoint_token = encode_checkpoint_token(&CheckpointTokenData {
85            execution_id: execution_id.clone(),
86            token: Uuid::new_v4().to_string(),
87            invocation_id: invocation_id.clone(),
88        });
89
90        self.executions
91            .insert(execution_id.clone(), checkpoint_manager);
92
93        InvocationResult {
94            checkpoint_token,
95            execution_id,
96            invocation_id,
97            operation_events: vec![initial_operation],
98        }
99    }
100
101    /// Start a new execution from a request.
102    pub fn start_execution_from_request(
103        &mut self,
104        request: StartDurableExecutionRequest,
105    ) -> InvocationResult {
106        let execution_id = Uuid::new_v4().to_string();
107        self.start_execution(StartExecutionParams {
108            execution_id,
109            invocation_id: request.invocation_id,
110            payload: request.payload,
111        })
112    }
113
114    /// Start an invocation for an existing execution.
115    pub fn start_invocation(
116        &mut self,
117        params: StartInvocationRequest,
118    ) -> Result<InvocationResult, TestError> {
119        let checkpoint_manager = self
120            .executions
121            .get_mut(&params.execution_id)
122            .ok_or_else(|| TestError::ExecutionNotFound(params.execution_id.clone()))?;
123
124        if checkpoint_manager.is_execution_completed() {
125            return Err(TestError::ExecutionNotFound(format!(
126                "Execution {} is already completed",
127                params.execution_id
128            )));
129        }
130
131        let operation_events = checkpoint_manager.start_invocation(&params.invocation_id);
132
133        let checkpoint_token = encode_checkpoint_token(&CheckpointTokenData {
134            execution_id: params.execution_id.clone(),
135            token: Uuid::new_v4().to_string(),
136            invocation_id: params.invocation_id.clone(),
137        });
138
139        Ok(InvocationResult {
140            checkpoint_token,
141            execution_id: params.execution_id,
142            invocation_id: params.invocation_id,
143            operation_events,
144        })
145    }
146
147    /// Complete an invocation.
148    pub fn complete_invocation(
149        &mut self,
150        request: CompleteInvocationRequest,
151    ) -> Result<CompleteInvocationResponse, TestError> {
152        let checkpoint_manager = self
153            .executions
154            .get_mut(&request.execution_id)
155            .ok_or_else(|| TestError::ExecutionNotFound(request.execution_id.clone()))?;
156
157        let timestamps = checkpoint_manager.complete_invocation(&request.invocation_id)?;
158
159        // Create the completion event
160        let event = HistoryEvent {
161            id: checkpoint_manager.event_processor().event_count() + 1,
162            event_type: EventType::InvocationCompleted,
163            timestamp: timestamps.end_timestamp,
164            operation_id: None,
165            details_type: "InvocationCompletedDetails".to_string(),
166            details: serde_json::json!({
167                "start_timestamp": timestamps.start_timestamp,
168                "end_timestamp": timestamps.end_timestamp,
169                "error": request.error,
170                "request_id": request.invocation_id,
171            }),
172        };
173
174        Ok(CompleteInvocationResponse {
175            event,
176            has_dirty_operations: checkpoint_manager.has_dirty_operations(),
177        })
178    }
179
180    /// Get checkpoint manager for an execution.
181    pub fn get_checkpoints_by_execution(&self, execution_id: &str) -> Option<&CheckpointManager> {
182        self.executions.get(execution_id)
183    }
184
185    /// Get mutable checkpoint manager for an execution.
186    pub fn get_checkpoints_by_execution_mut(
187        &mut self,
188        execution_id: &str,
189    ) -> Option<&mut CheckpointManager> {
190        self.executions.get_mut(execution_id)
191    }
192
193    /// Get checkpoint manager by checkpoint token.
194    pub fn get_checkpoints_by_token(
195        &self,
196        token: &str,
197    ) -> Result<Option<&CheckpointManager>, TestError> {
198        let token_data = decode_checkpoint_token(token)?;
199        Ok(self.executions.get(&token_data.execution_id))
200    }
201
202    /// Get mutable checkpoint manager by checkpoint token.
203    pub fn get_checkpoints_by_token_mut(
204        &mut self,
205        token: &str,
206    ) -> Result<Option<&mut CheckpointManager>, TestError> {
207        let token_data = decode_checkpoint_token(token)?;
208        Ok(self.executions.get_mut(&token_data.execution_id))
209    }
210
211    /// Get all execution IDs.
212    pub fn get_execution_ids(&self) -> Vec<&ExecutionId> {
213        self.executions.keys().collect()
214    }
215
216    /// Check if an execution exists.
217    pub fn has_execution(&self, execution_id: &str) -> bool {
218        self.executions.contains_key(execution_id)
219    }
220
221    /// Remove an execution.
222    pub fn remove_execution(&mut self, execution_id: &str) -> Option<CheckpointManager> {
223        self.executions.remove(execution_id)
224    }
225
226    /// Clear all executions.
227    pub fn clear(&mut self) {
228        self.executions.clear();
229    }
230}
231
232#[cfg(test)]
233mod tests {
234    use super::*;
235
236    #[test]
237    fn test_start_execution() {
238        let mut manager = ExecutionManager::new();
239
240        let result = manager.start_execution(StartExecutionParams {
241            execution_id: "exec-1".to_string(),
242            invocation_id: "inv-1".to_string(),
243            payload: Some(r#"{"test": true}"#.to_string()),
244        });
245
246        assert_eq!(result.execution_id, "exec-1");
247        assert_eq!(result.invocation_id, "inv-1");
248        assert!(!result.checkpoint_token.is_empty());
249        assert_eq!(result.operation_events.len(), 1);
250    }
251
252    #[test]
253    fn test_start_execution_from_request() {
254        let mut manager = ExecutionManager::new();
255
256        let result = manager.start_execution_from_request(StartDurableExecutionRequest {
257            invocation_id: "inv-1".to_string(),
258            payload: Some("{}".to_string()),
259        });
260
261        assert!(!result.execution_id.is_empty());
262        assert_eq!(result.invocation_id, "inv-1");
263    }
264
265    #[test]
266    fn test_start_invocation() {
267        let mut manager = ExecutionManager::new();
268
269        manager.start_execution(StartExecutionParams {
270            execution_id: "exec-1".to_string(),
271            invocation_id: "inv-1".to_string(),
272            payload: None,
273        });
274
275        // Complete first invocation
276        manager
277            .complete_invocation(CompleteInvocationRequest {
278                execution_id: "exec-1".to_string(),
279                invocation_id: "inv-1".to_string(),
280                error: None,
281            })
282            .unwrap();
283
284        // Start second invocation
285        let result = manager
286            .start_invocation(StartInvocationRequest {
287                execution_id: "exec-1".to_string(),
288                invocation_id: "inv-2".to_string(),
289            })
290            .unwrap();
291
292        assert_eq!(result.execution_id, "exec-1");
293        assert_eq!(result.invocation_id, "inv-2");
294    }
295
296    #[test]
297    fn test_start_invocation_unknown_execution() {
298        let mut manager = ExecutionManager::new();
299
300        let result = manager.start_invocation(StartInvocationRequest {
301            execution_id: "unknown".to_string(),
302            invocation_id: "inv-1".to_string(),
303        });
304
305        assert!(matches!(result, Err(TestError::ExecutionNotFound(_))));
306    }
307
308    #[test]
309    fn test_complete_invocation() {
310        let mut manager = ExecutionManager::new();
311
312        manager.start_execution(StartExecutionParams {
313            execution_id: "exec-1".to_string(),
314            invocation_id: "inv-1".to_string(),
315            payload: None,
316        });
317
318        let response = manager
319            .complete_invocation(CompleteInvocationRequest {
320                execution_id: "exec-1".to_string(),
321                invocation_id: "inv-1".to_string(),
322                error: None,
323            })
324            .unwrap();
325
326        assert_eq!(response.event.event_type, EventType::InvocationCompleted);
327    }
328
329    #[test]
330    fn test_get_checkpoints_by_token() {
331        let mut manager = ExecutionManager::new();
332
333        let result = manager.start_execution(StartExecutionParams {
334            execution_id: "exec-1".to_string(),
335            invocation_id: "inv-1".to_string(),
336            payload: None,
337        });
338
339        let checkpoint_manager = manager
340            .get_checkpoints_by_token(&result.checkpoint_token)
341            .unwrap()
342            .unwrap();
343
344        assert_eq!(checkpoint_manager.execution_id(), "exec-1");
345    }
346}