use chrono::{DateTime, Utc};
use dashmap::DashMap;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::RwLock;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DebugInfo {
pub workflow_id: String,
pub execution_id: String,
pub timestamp: DateTime<Utc>,
pub state: DebugState,
pub task_states: HashMap<String, TaskDebugState>,
pub breakpoints: Vec<Breakpoint>,
pub variables: HashMap<String, serde_json::Value>,
pub call_stack: Vec<StackFrame>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum DebugState {
Running,
Paused {
task_id: String,
reason: String,
},
Stepping,
Completed,
Failed {
error: String,
},
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TaskDebugState {
pub task_id: String,
pub status: String,
pub inputs: HashMap<String, serde_json::Value>,
pub outputs: Option<HashMap<String, serde_json::Value>>,
pub duration_ms: Option<u64>,
pub error: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Breakpoint {
pub id: String,
pub task_id: String,
pub condition: Option<String>,
pub enabled: bool,
pub hit_count: usize,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StackFrame {
pub task_id: String,
pub task_name: String,
pub index: usize,
}
pub struct DebugSession {
execution_id: String,
info: Arc<RwLock<DebugInfo>>,
breakpoints: Arc<DashMap<String, Breakpoint>>,
}
impl DebugSession {
pub fn new(workflow_id: String, execution_id: String) -> Self {
Self {
execution_id: execution_id.clone(),
info: Arc::new(RwLock::new(DebugInfo {
workflow_id,
execution_id,
timestamp: Utc::now(),
state: DebugState::Running,
task_states: HashMap::new(),
breakpoints: Vec::new(),
variables: HashMap::new(),
call_stack: Vec::new(),
})),
breakpoints: Arc::new(DashMap::new()),
}
}
pub fn add_breakpoint(&self, task_id: String, condition: Option<String>) -> String {
let id = uuid::Uuid::new_v4().to_string();
let breakpoint = Breakpoint {
id: id.clone(),
task_id,
condition,
enabled: true,
hit_count: 0,
};
self.breakpoints.insert(id.clone(), breakpoint);
id
}
pub fn remove_breakpoint(&self, breakpoint_id: &str) -> Option<Breakpoint> {
self.breakpoints.remove(breakpoint_id).map(|(_, bp)| bp)
}
pub fn enable_breakpoint(&self, breakpoint_id: &str) {
if let Some(mut bp) = self.breakpoints.get_mut(breakpoint_id) {
bp.enabled = true;
}
}
pub fn disable_breakpoint(&self, breakpoint_id: &str) {
if let Some(mut bp) = self.breakpoints.get_mut(breakpoint_id) {
bp.enabled = false;
}
}
pub async fn should_pause(&self, task_id: &str) -> bool {
for entry in self.breakpoints.iter() {
let bp = entry.value();
if bp.enabled && bp.task_id == task_id {
if bp.condition.is_some() {
}
return true;
}
}
false
}
pub async fn pause(&self, task_id: String, reason: String) {
let mut info = self.info.write().await;
info.state = DebugState::Paused { task_id, reason };
info.timestamp = Utc::now();
}
pub async fn resume(&self) {
let mut info = self.info.write().await;
info.state = DebugState::Running;
info.timestamp = Utc::now();
}
pub async fn step(&self) {
let mut info = self.info.write().await;
info.state = DebugState::Stepping;
info.timestamp = Utc::now();
}
pub async fn update_task_state(&self, task_id: String, state: TaskDebugState) {
let mut info = self.info.write().await;
info.task_states.insert(task_id, state);
info.timestamp = Utc::now();
}
pub async fn set_variable(&self, name: String, value: serde_json::Value) {
let mut info = self.info.write().await;
info.variables.insert(name, value);
}
pub async fn get_info(&self) -> DebugInfo {
self.info.read().await.clone()
}
pub fn execution_id(&self) -> &str {
&self.execution_id
}
}
pub struct Debugger {
sessions: Arc<DashMap<String, Arc<DebugSession>>>,
}
impl Debugger {
pub fn new() -> Self {
Self {
sessions: Arc::new(DashMap::new()),
}
}
pub fn start_session(&self, workflow_id: String, execution_id: String) -> Arc<DebugSession> {
let session = Arc::new(DebugSession::new(workflow_id, execution_id.clone()));
self.sessions.insert(execution_id, session.clone());
session
}
pub fn get_session(&self, execution_id: &str) -> Option<Arc<DebugSession>> {
self.sessions.get(execution_id).map(|entry| entry.clone())
}
pub fn end_session(&self, execution_id: &str) {
self.sessions.remove(execution_id);
}
pub fn get_all_sessions(&self) -> Vec<Arc<DebugSession>> {
self.sessions
.iter()
.map(|entry| entry.value().clone())
.collect()
}
pub fn session_count(&self) -> usize {
self.sessions.len()
}
}
impl Default for Debugger {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum DebugCommand {
Continue,
Step,
StepOver,
StepInto,
StepOut,
Pause,
AddBreakpoint {
task_id: String,
condition: Option<String>,
},
RemoveBreakpoint {
breakpoint_id: String,
},
InspectVariable {
name: String,
},
SetVariable {
name: String,
value: serde_json::Value,
},
GetCallStack,
Terminate,
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_debug_session_creation() {
let session = DebugSession::new("workflow1".to_string(), "exec1".to_string());
assert_eq!(session.execution_id(), "exec1");
let info = session.get_info().await;
assert_eq!(info.workflow_id, "workflow1");
}
#[tokio::test]
async fn test_breakpoints() {
let session = DebugSession::new("workflow1".to_string(), "exec1".to_string());
let bp_id = session.add_breakpoint("task1".to_string(), None);
assert!(session.should_pause("task1").await);
session.remove_breakpoint(&bp_id);
assert!(!session.should_pause("task1").await);
}
#[tokio::test]
async fn test_pause_resume() {
let session = DebugSession::new("workflow1".to_string(), "exec1".to_string());
session
.pause("task1".to_string(), "breakpoint".to_string())
.await;
let info = session.get_info().await;
assert!(matches!(info.state, DebugState::Paused { .. }));
session.resume().await;
let info = session.get_info().await;
assert!(matches!(info.state, DebugState::Running));
}
#[tokio::test]
async fn test_debugger() {
let debugger = Debugger::new();
let _session = debugger.start_session("workflow1".to_string(), "exec1".to_string());
assert_eq!(debugger.session_count(), 1);
let retrieved = debugger.get_session("exec1");
assert!(retrieved.is_some());
debugger.end_session("exec1");
assert_eq!(debugger.session_count(), 0);
}
#[tokio::test]
async fn test_variables() {
let session = DebugSession::new("workflow1".to_string(), "exec1".to_string());
session
.set_variable("test_var".to_string(), serde_json::json!(42))
.await;
let info = session.get_info().await;
assert_eq!(info.variables.get("test_var"), Some(&serde_json::json!(42)));
}
}