oxigdal_workflow/monitoring/
debugging.rs1use chrono::{DateTime, Utc};
4use dashmap::DashMap;
5use serde::{Deserialize, Serialize};
6use std::collections::HashMap;
7use std::sync::Arc;
8use tokio::sync::RwLock;
9
10#[derive(Debug, Clone, Serialize, Deserialize)]
12pub struct DebugInfo {
13 pub workflow_id: String,
15 pub execution_id: String,
17 pub timestamp: DateTime<Utc>,
19 pub state: DebugState,
21 pub task_states: HashMap<String, TaskDebugState>,
23 pub breakpoints: Vec<Breakpoint>,
25 pub variables: HashMap<String, serde_json::Value>,
27 pub call_stack: Vec<StackFrame>,
29}
30
31#[derive(Debug, Clone, Serialize, Deserialize)]
33pub enum DebugState {
34 Running,
36 Paused {
38 task_id: String,
40 reason: String,
42 },
43 Stepping,
45 Completed,
47 Failed {
49 error: String,
51 },
52}
53
54#[derive(Debug, Clone, Serialize, Deserialize)]
56pub struct TaskDebugState {
57 pub task_id: String,
59 pub status: String,
61 pub inputs: HashMap<String, serde_json::Value>,
63 pub outputs: Option<HashMap<String, serde_json::Value>>,
65 pub duration_ms: Option<u64>,
67 pub error: Option<String>,
69}
70
71#[derive(Debug, Clone, Serialize, Deserialize)]
73pub struct Breakpoint {
74 pub id: String,
76 pub task_id: String,
78 pub condition: Option<String>,
80 pub enabled: bool,
82 pub hit_count: usize,
84}
85
86#[derive(Debug, Clone, Serialize, Deserialize)]
88pub struct StackFrame {
89 pub task_id: String,
91 pub task_name: String,
93 pub index: usize,
95}
96
97pub struct DebugSession {
99 execution_id: String,
100 info: Arc<RwLock<DebugInfo>>,
101 breakpoints: Arc<DashMap<String, Breakpoint>>,
102}
103
104impl DebugSession {
105 pub fn new(workflow_id: String, execution_id: String) -> Self {
107 Self {
108 execution_id: execution_id.clone(),
109 info: Arc::new(RwLock::new(DebugInfo {
110 workflow_id,
111 execution_id,
112 timestamp: Utc::now(),
113 state: DebugState::Running,
114 task_states: HashMap::new(),
115 breakpoints: Vec::new(),
116 variables: HashMap::new(),
117 call_stack: Vec::new(),
118 })),
119 breakpoints: Arc::new(DashMap::new()),
120 }
121 }
122
123 pub fn add_breakpoint(&self, task_id: String, condition: Option<String>) -> String {
125 let id = uuid::Uuid::new_v4().to_string();
126 let breakpoint = Breakpoint {
127 id: id.clone(),
128 task_id,
129 condition,
130 enabled: true,
131 hit_count: 0,
132 };
133
134 self.breakpoints.insert(id.clone(), breakpoint);
135 id
136 }
137
138 pub fn remove_breakpoint(&self, breakpoint_id: &str) -> Option<Breakpoint> {
140 self.breakpoints.remove(breakpoint_id).map(|(_, bp)| bp)
141 }
142
143 pub fn enable_breakpoint(&self, breakpoint_id: &str) {
145 if let Some(mut bp) = self.breakpoints.get_mut(breakpoint_id) {
146 bp.enabled = true;
147 }
148 }
149
150 pub fn disable_breakpoint(&self, breakpoint_id: &str) {
152 if let Some(mut bp) = self.breakpoints.get_mut(breakpoint_id) {
153 bp.enabled = false;
154 }
155 }
156
157 pub async fn should_pause(&self, task_id: &str) -> bool {
159 for entry in self.breakpoints.iter() {
160 let bp = entry.value();
161 if bp.enabled && bp.task_id == task_id {
162 if bp.condition.is_some() {
164 }
167 return true;
168 }
169 }
170 false
171 }
172
173 pub async fn pause(&self, task_id: String, reason: String) {
175 let mut info = self.info.write().await;
176 info.state = DebugState::Paused { task_id, reason };
177 info.timestamp = Utc::now();
178 }
179
180 pub async fn resume(&self) {
182 let mut info = self.info.write().await;
183 info.state = DebugState::Running;
184 info.timestamp = Utc::now();
185 }
186
187 pub async fn step(&self) {
189 let mut info = self.info.write().await;
190 info.state = DebugState::Stepping;
191 info.timestamp = Utc::now();
192 }
193
194 pub async fn update_task_state(&self, task_id: String, state: TaskDebugState) {
196 let mut info = self.info.write().await;
197 info.task_states.insert(task_id, state);
198 info.timestamp = Utc::now();
199 }
200
201 pub async fn set_variable(&self, name: String, value: serde_json::Value) {
203 let mut info = self.info.write().await;
204 info.variables.insert(name, value);
205 }
206
207 pub async fn get_info(&self) -> DebugInfo {
209 self.info.read().await.clone()
210 }
211
212 pub fn execution_id(&self) -> &str {
214 &self.execution_id
215 }
216}
217
218pub struct Debugger {
220 sessions: Arc<DashMap<String, Arc<DebugSession>>>,
221}
222
223impl Debugger {
224 pub fn new() -> Self {
226 Self {
227 sessions: Arc::new(DashMap::new()),
228 }
229 }
230
231 pub fn start_session(&self, workflow_id: String, execution_id: String) -> Arc<DebugSession> {
233 let session = Arc::new(DebugSession::new(workflow_id, execution_id.clone()));
234 self.sessions.insert(execution_id, session.clone());
235 session
236 }
237
238 pub fn get_session(&self, execution_id: &str) -> Option<Arc<DebugSession>> {
240 self.sessions.get(execution_id).map(|entry| entry.clone())
241 }
242
243 pub fn end_session(&self, execution_id: &str) {
245 self.sessions.remove(execution_id);
246 }
247
248 pub fn get_all_sessions(&self) -> Vec<Arc<DebugSession>> {
250 self.sessions
251 .iter()
252 .map(|entry| entry.value().clone())
253 .collect()
254 }
255
256 pub fn session_count(&self) -> usize {
258 self.sessions.len()
259 }
260}
261
262impl Default for Debugger {
263 fn default() -> Self {
264 Self::new()
265 }
266}
267
268#[derive(Debug, Clone, Serialize, Deserialize)]
270pub enum DebugCommand {
271 Continue,
273 Step,
275 StepOver,
277 StepInto,
279 StepOut,
281 Pause,
283 AddBreakpoint {
285 task_id: String,
287 condition: Option<String>,
289 },
290 RemoveBreakpoint {
292 breakpoint_id: String,
294 },
295 InspectVariable {
297 name: String,
299 },
300 SetVariable {
302 name: String,
304 value: serde_json::Value,
306 },
307 GetCallStack,
309 Terminate,
311}
312
313#[cfg(test)]
314mod tests {
315 use super::*;
316
317 #[tokio::test]
318 async fn test_debug_session_creation() {
319 let session = DebugSession::new("workflow1".to_string(), "exec1".to_string());
320 assert_eq!(session.execution_id(), "exec1");
321
322 let info = session.get_info().await;
323 assert_eq!(info.workflow_id, "workflow1");
324 }
325
326 #[tokio::test]
327 async fn test_breakpoints() {
328 let session = DebugSession::new("workflow1".to_string(), "exec1".to_string());
329
330 let bp_id = session.add_breakpoint("task1".to_string(), None);
331 assert!(session.should_pause("task1").await);
332
333 session.remove_breakpoint(&bp_id);
334 assert!(!session.should_pause("task1").await);
335 }
336
337 #[tokio::test]
338 async fn test_pause_resume() {
339 let session = DebugSession::new("workflow1".to_string(), "exec1".to_string());
340
341 session
342 .pause("task1".to_string(), "breakpoint".to_string())
343 .await;
344 let info = session.get_info().await;
345 assert!(matches!(info.state, DebugState::Paused { .. }));
346
347 session.resume().await;
348 let info = session.get_info().await;
349 assert!(matches!(info.state, DebugState::Running));
350 }
351
352 #[tokio::test]
353 async fn test_debugger() {
354 let debugger = Debugger::new();
355
356 let _session = debugger.start_session("workflow1".to_string(), "exec1".to_string());
357 assert_eq!(debugger.session_count(), 1);
358
359 let retrieved = debugger.get_session("exec1");
360 assert!(retrieved.is_some());
361
362 debugger.end_session("exec1");
363 assert_eq!(debugger.session_count(), 0);
364 }
365
366 #[tokio::test]
367 async fn test_variables() {
368 let session = DebugSession::new("workflow1".to_string(), "exec1".to_string());
369
370 session
371 .set_variable("test_var".to_string(), serde_json::json!(42))
372 .await;
373
374 let info = session.get_info().await;
375 assert_eq!(info.variables.get("test_var"), Some(&serde_json::json!(42)));
376 }
377}