Skip to main content

oxigdal_workflow/monitoring/
debugging.rs

1//! Workflow debugging utilities.
2
3use chrono::{DateTime, Utc};
4use dashmap::DashMap;
5use serde::{Deserialize, Serialize};
6use std::collections::HashMap;
7use std::sync::Arc;
8use tokio::sync::RwLock;
9
10/// Debug information for a workflow execution.
11#[derive(Debug, Clone, Serialize, Deserialize)]
12pub struct DebugInfo {
13    /// Workflow ID.
14    pub workflow_id: String,
15    /// Execution ID.
16    pub execution_id: String,
17    /// Debug timestamp.
18    pub timestamp: DateTime<Utc>,
19    /// Current execution state.
20    pub state: DebugState,
21    /// Task states.
22    pub task_states: HashMap<String, TaskDebugState>,
23    /// Breakpoints.
24    pub breakpoints: Vec<Breakpoint>,
25    /// Variables snapshot.
26    pub variables: HashMap<String, serde_json::Value>,
27    /// Call stack.
28    pub call_stack: Vec<StackFrame>,
29}
30
31/// Debug state for workflow execution.
32#[derive(Debug, Clone, Serialize, Deserialize)]
33pub enum DebugState {
34    /// Execution is running.
35    Running,
36    /// Execution is paused at a breakpoint.
37    Paused {
38        /// Current task ID.
39        task_id: String,
40        /// Reason for pause.
41        reason: String,
42    },
43    /// Execution is stepping through tasks.
44    Stepping,
45    /// Execution has completed.
46    Completed,
47    /// Execution has failed.
48    Failed {
49        /// Error message.
50        error: String,
51    },
52}
53
54/// Debug state for a single task.
55#[derive(Debug, Clone, Serialize, Deserialize)]
56pub struct TaskDebugState {
57    /// Task ID.
58    pub task_id: String,
59    /// Task status.
60    pub status: String,
61    /// Task inputs.
62    pub inputs: HashMap<String, serde_json::Value>,
63    /// Task outputs.
64    pub outputs: Option<HashMap<String, serde_json::Value>>,
65    /// Execution duration.
66    pub duration_ms: Option<u64>,
67    /// Error information.
68    pub error: Option<String>,
69}
70
71/// Breakpoint definition.
72#[derive(Debug, Clone, Serialize, Deserialize)]
73pub struct Breakpoint {
74    /// Breakpoint ID.
75    pub id: String,
76    /// Task ID where breakpoint is set.
77    pub task_id: String,
78    /// Breakpoint condition (optional).
79    pub condition: Option<String>,
80    /// Whether the breakpoint is enabled.
81    pub enabled: bool,
82    /// Hit count.
83    pub hit_count: usize,
84}
85
86/// Stack frame for call stack.
87#[derive(Debug, Clone, Serialize, Deserialize)]
88pub struct StackFrame {
89    /// Task ID.
90    pub task_id: String,
91    /// Task name.
92    pub task_name: String,
93    /// Frame index.
94    pub index: usize,
95}
96
97/// Debug session for a workflow execution.
98pub struct DebugSession {
99    execution_id: String,
100    info: Arc<RwLock<DebugInfo>>,
101    breakpoints: Arc<DashMap<String, Breakpoint>>,
102}
103
104impl DebugSession {
105    /// Create a new debug session.
106    pub fn new(workflow_id: String, execution_id: String) -> Self {
107        Self {
108            execution_id: execution_id.clone(),
109            info: Arc::new(RwLock::new(DebugInfo {
110                workflow_id,
111                execution_id,
112                timestamp: Utc::now(),
113                state: DebugState::Running,
114                task_states: HashMap::new(),
115                breakpoints: Vec::new(),
116                variables: HashMap::new(),
117                call_stack: Vec::new(),
118            })),
119            breakpoints: Arc::new(DashMap::new()),
120        }
121    }
122
123    /// Add a breakpoint.
124    pub fn add_breakpoint(&self, task_id: String, condition: Option<String>) -> String {
125        let id = uuid::Uuid::new_v4().to_string();
126        let breakpoint = Breakpoint {
127            id: id.clone(),
128            task_id,
129            condition,
130            enabled: true,
131            hit_count: 0,
132        };
133
134        self.breakpoints.insert(id.clone(), breakpoint);
135        id
136    }
137
138    /// Remove a breakpoint.
139    pub fn remove_breakpoint(&self, breakpoint_id: &str) -> Option<Breakpoint> {
140        self.breakpoints.remove(breakpoint_id).map(|(_, bp)| bp)
141    }
142
143    /// Enable a breakpoint.
144    pub fn enable_breakpoint(&self, breakpoint_id: &str) {
145        if let Some(mut bp) = self.breakpoints.get_mut(breakpoint_id) {
146            bp.enabled = true;
147        }
148    }
149
150    /// Disable a breakpoint.
151    pub fn disable_breakpoint(&self, breakpoint_id: &str) {
152        if let Some(mut bp) = self.breakpoints.get_mut(breakpoint_id) {
153            bp.enabled = false;
154        }
155    }
156
157    /// Check if execution should pause at this task.
158    pub async fn should_pause(&self, task_id: &str) -> bool {
159        for entry in self.breakpoints.iter() {
160            let bp = entry.value();
161            if bp.enabled && bp.task_id == task_id {
162                // Check condition if present
163                if bp.condition.is_some() {
164                    // Condition evaluation would go here
165                    // For now, just pause
166                }
167                return true;
168            }
169        }
170        false
171    }
172
173    /// Pause execution at a task.
174    pub async fn pause(&self, task_id: String, reason: String) {
175        let mut info = self.info.write().await;
176        info.state = DebugState::Paused { task_id, reason };
177        info.timestamp = Utc::now();
178    }
179
180    /// Resume execution.
181    pub async fn resume(&self) {
182        let mut info = self.info.write().await;
183        info.state = DebugState::Running;
184        info.timestamp = Utc::now();
185    }
186
187    /// Step to next task.
188    pub async fn step(&self) {
189        let mut info = self.info.write().await;
190        info.state = DebugState::Stepping;
191        info.timestamp = Utc::now();
192    }
193
194    /// Update task state.
195    pub async fn update_task_state(&self, task_id: String, state: TaskDebugState) {
196        let mut info = self.info.write().await;
197        info.task_states.insert(task_id, state);
198        info.timestamp = Utc::now();
199    }
200
201    /// Set a variable value.
202    pub async fn set_variable(&self, name: String, value: serde_json::Value) {
203        let mut info = self.info.write().await;
204        info.variables.insert(name, value);
205    }
206
207    /// Get current debug info.
208    pub async fn get_info(&self) -> DebugInfo {
209        self.info.read().await.clone()
210    }
211
212    /// Get execution ID.
213    pub fn execution_id(&self) -> &str {
214        &self.execution_id
215    }
216}
217
218/// Workflow debugger.
219pub struct Debugger {
220    sessions: Arc<DashMap<String, Arc<DebugSession>>>,
221}
222
223impl Debugger {
224    /// Create a new debugger.
225    pub fn new() -> Self {
226        Self {
227            sessions: Arc::new(DashMap::new()),
228        }
229    }
230
231    /// Start a debug session.
232    pub fn start_session(&self, workflow_id: String, execution_id: String) -> Arc<DebugSession> {
233        let session = Arc::new(DebugSession::new(workflow_id, execution_id.clone()));
234        self.sessions.insert(execution_id, session.clone());
235        session
236    }
237
238    /// Get a debug session.
239    pub fn get_session(&self, execution_id: &str) -> Option<Arc<DebugSession>> {
240        self.sessions.get(execution_id).map(|entry| entry.clone())
241    }
242
243    /// End a debug session.
244    pub fn end_session(&self, execution_id: &str) {
245        self.sessions.remove(execution_id);
246    }
247
248    /// Get all active sessions.
249    pub fn get_all_sessions(&self) -> Vec<Arc<DebugSession>> {
250        self.sessions
251            .iter()
252            .map(|entry| entry.value().clone())
253            .collect()
254    }
255
256    /// Get session count.
257    pub fn session_count(&self) -> usize {
258        self.sessions.len()
259    }
260}
261
262impl Default for Debugger {
263    fn default() -> Self {
264        Self::new()
265    }
266}
267
268/// Debug command for interactive debugging.
269#[derive(Debug, Clone, Serialize, Deserialize)]
270pub enum DebugCommand {
271    /// Continue execution.
272    Continue,
273    /// Step to next task.
274    Step,
275    /// Step over (skip subtasks).
276    StepOver,
277    /// Step into subtask.
278    StepInto,
279    /// Step out of current context.
280    StepOut,
281    /// Pause execution.
282    Pause,
283    /// Add breakpoint.
284    AddBreakpoint {
285        /// Task ID.
286        task_id: String,
287        /// Condition.
288        condition: Option<String>,
289    },
290    /// Remove breakpoint.
291    RemoveBreakpoint {
292        /// Breakpoint ID.
293        breakpoint_id: String,
294    },
295    /// Inspect variable.
296    InspectVariable {
297        /// Variable name.
298        name: String,
299    },
300    /// Set variable value.
301    SetVariable {
302        /// Variable name.
303        name: String,
304        /// Variable value.
305        value: serde_json::Value,
306    },
307    /// Get call stack.
308    GetCallStack,
309    /// Terminate execution.
310    Terminate,
311}
312
313#[cfg(test)]
314mod tests {
315    use super::*;
316
317    #[tokio::test]
318    async fn test_debug_session_creation() {
319        let session = DebugSession::new("workflow1".to_string(), "exec1".to_string());
320        assert_eq!(session.execution_id(), "exec1");
321
322        let info = session.get_info().await;
323        assert_eq!(info.workflow_id, "workflow1");
324    }
325
326    #[tokio::test]
327    async fn test_breakpoints() {
328        let session = DebugSession::new("workflow1".to_string(), "exec1".to_string());
329
330        let bp_id = session.add_breakpoint("task1".to_string(), None);
331        assert!(session.should_pause("task1").await);
332
333        session.remove_breakpoint(&bp_id);
334        assert!(!session.should_pause("task1").await);
335    }
336
337    #[tokio::test]
338    async fn test_pause_resume() {
339        let session = DebugSession::new("workflow1".to_string(), "exec1".to_string());
340
341        session
342            .pause("task1".to_string(), "breakpoint".to_string())
343            .await;
344        let info = session.get_info().await;
345        assert!(matches!(info.state, DebugState::Paused { .. }));
346
347        session.resume().await;
348        let info = session.get_info().await;
349        assert!(matches!(info.state, DebugState::Running));
350    }
351
352    #[tokio::test]
353    async fn test_debugger() {
354        let debugger = Debugger::new();
355
356        let _session = debugger.start_session("workflow1".to_string(), "exec1".to_string());
357        assert_eq!(debugger.session_count(), 1);
358
359        let retrieved = debugger.get_session("exec1");
360        assert!(retrieved.is_some());
361
362        debugger.end_session("exec1");
363        assert_eq!(debugger.session_count(), 0);
364    }
365
366    #[tokio::test]
367    async fn test_variables() {
368        let session = DebugSession::new("workflow1".to_string(), "exec1".to_string());
369
370        session
371            .set_variable("test_var".to_string(), serde_json::json!(42))
372            .await;
373
374        let info = session.get_info().await;
375        assert_eq!(info.variables.get("test_var"), Some(&serde_json::json!(42)));
376    }
377}