Skip to main content

cortexai_crew/
time_travel.rs

1//! Time Travel / State History
2//!
3//! Provides the ability to replay, fork, and debug workflow executions:
4//! - Full execution history with state snapshots at each step
5//! - Replay from any point in history
6//! - Fork execution to explore alternative paths
7//! - Compare states across different branches
8//! - Debug mode with step-by-step execution
9//!
10//! ## Example
11//!
12//! ```rust,ignore
13//! use cortexai_crew::time_travel::{TimeTravelGraph, HistoryBrowser};
14//!
15//! // Wrap a graph with time travel capabilities
16//! let tt_graph = TimeTravelGraph::new(graph);
17//!
18//! // Execute and record full history
19//! let execution = tt_graph.execute(initial_state).await?;
20//!
21//! // Browse history
22//! let browser = execution.browser();
23//! println!("Steps: {}", browser.len());
24//!
25//! // Replay from step 3
26//! let forked = tt_graph.replay_from(&execution, 3).await?;
27//!
28//! // Fork and modify state at step 5
29//! let mut state_at_5 = browser.state_at(5)?;
30//! state_at_5.set("override", true);
31//! let branched = tt_graph.fork_from(&execution, 5, state_at_5).await?;
32//!
33//! // Compare branches
34//! let diff = browser.diff(5, branched.browser(), 5)?;
35//! ```
36
37use crate::graph::{Graph, GraphResult, GraphState, GraphStatus, END};
38use chrono::{DateTime, Utc};
39use cortexai_core::errors::CrewError;
40use serde::{Deserialize, Serialize};
41use std::collections::HashMap;
42use std::sync::Arc;
43use tokio::sync::RwLock;
44
45/// A single step in execution history
46#[derive(Debug, Clone, Serialize, Deserialize)]
47pub struct HistoryStep {
48    /// Step number (0-indexed)
49    pub step: usize,
50    /// Node that was executed
51    pub node_id: String,
52    /// State before node execution
53    pub state_before: GraphState,
54    /// State after node execution
55    pub state_after: GraphState,
56    /// Execution duration in microseconds
57    pub duration_us: u64,
58    /// Timestamp when this step executed
59    pub timestamp: DateTime<Utc>,
60    /// Any error that occurred
61    pub error: Option<String>,
62}
63
64impl HistoryStep {
65    /// Get the state changes made by this step
66    pub fn changes(&self) -> StateChanges {
67        diff_states(&self.state_before, &self.state_after)
68    }
69}
70
71/// Represents changes between two states
72#[derive(Debug, Clone, Serialize, Deserialize)]
73pub struct StateChanges {
74    /// Keys that were added
75    pub added: Vec<String>,
76    /// Keys that were removed
77    pub removed: Vec<String>,
78    /// Keys that were modified (old_value, new_value)
79    pub modified: HashMap<String, (serde_json::Value, serde_json::Value)>,
80}
81
82impl StateChanges {
83    /// Check if there are any changes
84    pub fn is_empty(&self) -> bool {
85        self.added.is_empty() && self.removed.is_empty() && self.modified.is_empty()
86    }
87
88    /// Get a summary of changes
89    pub fn summary(&self) -> String {
90        let mut parts = Vec::new();
91        if !self.added.is_empty() {
92            parts.push(format!("+{} added", self.added.len()));
93        }
94        if !self.removed.is_empty() {
95            parts.push(format!("-{} removed", self.removed.len()));
96        }
97        if !self.modified.is_empty() {
98            parts.push(format!("~{} modified", self.modified.len()));
99        }
100        if parts.is_empty() {
101            "no changes".to_string()
102        } else {
103            parts.join(", ")
104        }
105    }
106}
107
108/// Compare two states and return their differences
109pub fn diff_states(before: &GraphState, after: &GraphState) -> StateChanges {
110    let mut changes = StateChanges {
111        added: Vec::new(),
112        removed: Vec::new(),
113        modified: HashMap::new(),
114    };
115
116    let before_obj = before.data.as_object();
117    let after_obj = after.data.as_object();
118
119    if let (Some(b), Some(a)) = (before_obj, after_obj) {
120        // Find added and modified
121        for (key, new_val) in a {
122            match b.get(key) {
123                None => changes.added.push(key.clone()),
124                Some(old_val) if old_val != new_val => {
125                    changes
126                        .modified
127                        .insert(key.clone(), (old_val.clone(), new_val.clone()));
128                }
129                _ => {}
130            }
131        }
132
133        // Find removed
134        for key in b.keys() {
135            if !a.contains_key(key) {
136                changes.removed.push(key.clone());
137            }
138        }
139    }
140
141    changes
142}
143
144/// Full execution history
145#[derive(Debug, Clone, Serialize, Deserialize)]
146pub struct ExecutionHistory {
147    /// Unique execution ID
148    pub id: String,
149    /// Graph ID
150    pub graph_id: String,
151    /// All steps in order
152    pub steps: Vec<HistoryStep>,
153    /// Final result
154    pub result: GraphResult,
155    /// Branch ID (for forked executions)
156    pub branch_id: Option<String>,
157    /// Parent execution ID (if forked)
158    pub parent_id: Option<String>,
159    /// Step in parent where fork occurred
160    pub fork_step: Option<usize>,
161    /// Execution start time
162    pub started_at: DateTime<Utc>,
163    /// Execution end time
164    pub ended_at: DateTime<Utc>,
165}
166
167impl ExecutionHistory {
168    /// Create a browser for this history
169    pub fn browser(&self) -> HistoryBrowser<'_> {
170        HistoryBrowser { history: self }
171    }
172
173    /// Get total execution time in milliseconds
174    pub fn duration_ms(&self) -> i64 {
175        self.ended_at
176            .signed_duration_since(self.started_at)
177            .num_milliseconds()
178    }
179
180    /// Check if this is a forked execution
181    pub fn is_fork(&self) -> bool {
182        self.parent_id.is_some()
183    }
184}
185
186/// Browser for navigating execution history
187#[derive(Debug)]
188pub struct HistoryBrowser<'a> {
189    history: &'a ExecutionHistory,
190}
191
192impl<'a> HistoryBrowser<'a> {
193    /// Get number of steps
194    pub fn len(&self) -> usize {
195        self.history.steps.len()
196    }
197
198    /// Check if history is empty
199    pub fn is_empty(&self) -> bool {
200        self.history.steps.is_empty()
201    }
202
203    /// Get step at index
204    pub fn step_at(&self, index: usize) -> Option<&HistoryStep> {
205        self.history.steps.get(index)
206    }
207
208    /// Get state at a specific step (after execution)
209    pub fn state_at(&self, index: usize) -> Result<GraphState, CrewError> {
210        self.history
211            .steps
212            .get(index)
213            .map(|s| s.state_after.clone())
214            .ok_or_else(|| CrewError::TaskNotFound(format!("Step {} not found", index)))
215    }
216
217    /// Get state before a specific step
218    pub fn state_before(&self, index: usize) -> Result<GraphState, CrewError> {
219        self.history
220            .steps
221            .get(index)
222            .map(|s| s.state_before.clone())
223            .ok_or_else(|| CrewError::TaskNotFound(format!("Step {} not found", index)))
224    }
225
226    /// Get all nodes visited in order
227    pub fn visited_nodes(&self) -> Vec<&str> {
228        self.history
229            .steps
230            .iter()
231            .map(|s| s.node_id.as_str())
232            .collect()
233    }
234
235    /// Find steps where a specific key changed
236    pub fn find_key_changes(&self, key: &str) -> Vec<usize> {
237        self.history
238            .steps
239            .iter()
240            .enumerate()
241            .filter(|(_, step)| {
242                let changes = step.changes();
243                changes.added.contains(&key.to_string())
244                    || changes.removed.contains(&key.to_string())
245                    || changes.modified.contains_key(key)
246            })
247            .map(|(i, _)| i)
248            .collect()
249    }
250
251    /// Find steps that executed a specific node
252    pub fn find_node_executions(&self, node_id: &str) -> Vec<usize> {
253        self.history
254            .steps
255            .iter()
256            .enumerate()
257            .filter(|(_, step)| step.node_id == node_id)
258            .map(|(i, _)| i)
259            .collect()
260    }
261
262    /// Get a summary of the execution
263    pub fn summary(&self) -> ExecutionSummary {
264        let mut node_counts: HashMap<String, usize> = HashMap::new();
265        let mut total_duration_us = 0u64;
266
267        for step in &self.history.steps {
268            *node_counts.entry(step.node_id.clone()).or_insert(0) += 1;
269            total_duration_us += step.duration_us;
270        }
271
272        ExecutionSummary {
273            total_steps: self.history.steps.len(),
274            unique_nodes: node_counts.len(),
275            node_execution_counts: node_counts,
276            total_duration_us,
277            status: self.history.result.status,
278            is_fork: self.history.is_fork(),
279        }
280    }
281
282    /// Compare this execution with another at specific steps
283    pub fn diff(
284        &self,
285        self_step: usize,
286        other: &HistoryBrowser,
287        other_step: usize,
288    ) -> Result<StateChanges, CrewError> {
289        let self_state = self.state_at(self_step)?;
290        let other_state = other.state_at(other_step)?;
291        Ok(diff_states(&self_state, &other_state))
292    }
293
294    /// Get execution timeline as a formatted string
295    pub fn timeline(&self) -> String {
296        let mut lines = Vec::new();
297        lines.push(format!("Execution {} Timeline:", self.history.id));
298        lines.push("─".repeat(50));
299
300        for step in &self.history.steps {
301            let changes = step.changes();
302            let status = if step.error.is_some() { "✗" } else { "✓" };
303            lines.push(format!(
304                "{} Step {}: {} ({:.2}ms) - {}",
305                status,
306                step.step,
307                step.node_id,
308                step.duration_us as f64 / 1000.0,
309                changes.summary()
310            ));
311        }
312
313        lines.push("─".repeat(50));
314        lines.push(format!("Status: {:?}", self.history.result.status));
315        lines.join("\n")
316    }
317}
318
319/// Summary of an execution
320#[derive(Debug, Clone, Serialize, Deserialize)]
321pub struct ExecutionSummary {
322    /// Total number of steps
323    pub total_steps: usize,
324    /// Number of unique nodes executed
325    pub unique_nodes: usize,
326    /// How many times each node was executed
327    pub node_execution_counts: HashMap<String, usize>,
328    /// Total execution time in microseconds
329    pub total_duration_us: u64,
330    /// Final status
331    pub status: GraphStatus,
332    /// Whether this was a forked execution
333    pub is_fork: bool,
334}
335
336/// Storage for execution histories
337#[async_trait::async_trait]
338pub trait HistoryStore: Send + Sync {
339    /// Save an execution history
340    async fn save(&self, history: ExecutionHistory) -> Result<(), CrewError>;
341    /// Load an execution history by ID
342    async fn load(&self, id: &str) -> Result<Option<ExecutionHistory>, CrewError>;
343    /// List all executions for a graph
344    async fn list_for_graph(&self, graph_id: &str) -> Result<Vec<String>, CrewError>;
345    /// List all forks of an execution
346    async fn list_forks(&self, parent_id: &str) -> Result<Vec<String>, CrewError>;
347    /// Delete an execution history
348    async fn delete(&self, id: &str) -> Result<(), CrewError>;
349}
350
351/// In-memory history store
352#[derive(Default)]
353pub struct InMemoryHistoryStore {
354    histories: RwLock<HashMap<String, ExecutionHistory>>,
355}
356
357impl InMemoryHistoryStore {
358    /// Create a new in-memory store
359    pub fn new() -> Self {
360        Self::default()
361    }
362}
363
364#[async_trait::async_trait]
365impl HistoryStore for InMemoryHistoryStore {
366    async fn save(&self, history: ExecutionHistory) -> Result<(), CrewError> {
367        self.histories
368            .write()
369            .await
370            .insert(history.id.clone(), history);
371        Ok(())
372    }
373
374    async fn load(&self, id: &str) -> Result<Option<ExecutionHistory>, CrewError> {
375        Ok(self.histories.read().await.get(id).cloned())
376    }
377
378    async fn list_for_graph(&self, graph_id: &str) -> Result<Vec<String>, CrewError> {
379        Ok(self
380            .histories
381            .read()
382            .await
383            .values()
384            .filter(|h| h.graph_id == graph_id)
385            .map(|h| h.id.clone())
386            .collect())
387    }
388
389    async fn list_forks(&self, parent_id: &str) -> Result<Vec<String>, CrewError> {
390        Ok(self
391            .histories
392            .read()
393            .await
394            .values()
395            .filter(|h| h.parent_id.as_deref() == Some(parent_id))
396            .map(|h| h.id.clone())
397            .collect())
398    }
399
400    async fn delete(&self, id: &str) -> Result<(), CrewError> {
401        self.histories.write().await.remove(id);
402        Ok(())
403    }
404}
405
406/// Graph wrapper with time travel capabilities
407pub struct TimeTravelGraph {
408    graph: Arc<Graph>,
409    store: Arc<dyn HistoryStore>,
410}
411
412impl TimeTravelGraph {
413    /// Create a new time travel graph with in-memory storage
414    pub fn new(graph: Graph) -> Self {
415        Self {
416            graph: Arc::new(graph),
417            store: Arc::new(InMemoryHistoryStore::new()),
418        }
419    }
420
421    /// Create with custom history store
422    pub fn with_store(graph: Graph, store: Arc<dyn HistoryStore>) -> Self {
423        Self {
424            graph: Arc::new(graph),
425            store,
426        }
427    }
428
429    /// Get the underlying graph
430    pub fn graph(&self) -> &Graph {
431        &self.graph
432    }
433
434    /// Execute the graph and record full history
435    pub async fn execute(&self, initial_state: GraphState) -> Result<ExecutionHistory, CrewError> {
436        let execution_id = uuid::Uuid::new_v4().to_string();
437        let started_at = Utc::now();
438        let mut steps = Vec::new();
439        let mut state = initial_state;
440        state.metadata.started_at = Some(started_at);
441        state.metadata.iterations = 0;
442
443        let mut current_node = self.graph.entry_node.clone();
444        let mut step_num = 0;
445
446        let result = loop {
447            // Check max iterations
448            if state.metadata.iterations >= self.graph.config.max_iterations {
449                break GraphResult {
450                    state: state.clone(),
451                    status: GraphStatus::MaxIterations,
452                    error: Some(format!(
453                        "Hit maximum iterations: {}",
454                        self.graph.config.max_iterations
455                    )),
456                };
457            }
458
459            // Check for END node
460            if current_node == END {
461                state.metadata.execution_time_ms = started_at
462                    .signed_duration_since(Utc::now())
463                    .num_milliseconds()
464                    .unsigned_abs();
465                break GraphResult {
466                    state: state.clone(),
467                    status: GraphStatus::Success,
468                    error: None,
469                };
470            }
471
472            // Get node
473            let node = match self.graph.nodes.get(&current_node) {
474                Some(n) => n,
475                None => {
476                    break GraphResult {
477                        state: state.clone(),
478                        status: GraphStatus::Failed,
479                        error: Some(format!("Node not found: {}", current_node)),
480                    };
481                }
482            };
483
484            // Record state before
485            let state_before = state.clone();
486            let step_start = std::time::Instant::now();
487
488            // Execute node
489            state.metadata.visited_nodes.push(current_node.clone());
490            state.metadata.iterations += 1;
491
492            let (new_state, error) = match node.executor.call(state).await {
493                Ok(s) => (s, None),
494                Err(e) => {
495                    let err_msg = e.to_string();
496                    // Record the failed step
497                    steps.push(HistoryStep {
498                        step: step_num,
499                        node_id: current_node.clone(),
500                        state_before: state_before.clone(),
501                        state_after: state_before.clone(),
502                        duration_us: step_start.elapsed().as_micros() as u64,
503                        timestamp: Utc::now(),
504                        error: Some(err_msg.clone()),
505                    });
506
507                    break GraphResult {
508                        state: state_before,
509                        status: GraphStatus::Failed,
510                        error: Some(err_msg),
511                    };
512                }
513            };
514
515            let duration_us = step_start.elapsed().as_micros() as u64;
516
517            // Record step
518            steps.push(HistoryStep {
519                step: step_num,
520                node_id: current_node.clone(),
521                state_before,
522                state_after: new_state.clone(),
523                duration_us,
524                timestamp: Utc::now(),
525                error,
526            });
527
528            state = new_state;
529            step_num += 1;
530
531            // Find next node
532            current_node = self.graph.find_next_node(&current_node, &state)?;
533        };
534
535        let history = ExecutionHistory {
536            id: execution_id,
537            graph_id: self.graph.id.clone(),
538            steps,
539            result,
540            branch_id: None,
541            parent_id: None,
542            fork_step: None,
543            started_at,
544            ended_at: Utc::now(),
545        };
546
547        // Store history
548        self.store.save(history.clone()).await?;
549
550        Ok(history)
551    }
552
553    /// Replay execution from a specific step
554    pub async fn replay_from(
555        &self,
556        history: &ExecutionHistory,
557        from_step: usize,
558    ) -> Result<ExecutionHistory, CrewError> {
559        // Get state at the specified step
560        let state = history
561            .steps
562            .get(from_step)
563            .map(|s| s.state_after.clone())
564            .ok_or_else(|| CrewError::TaskNotFound(format!("Step {} not found", from_step)))?;
565
566        // Get the next node after that step
567        let next_node = if from_step + 1 < history.steps.len() {
568            history.steps[from_step + 1].node_id.clone()
569        } else {
570            // Re-evaluate the conditional edge
571            let current_node = &history.steps[from_step].node_id;
572            self.graph.find_next_node(current_node, &state)?
573        };
574
575        self.execute_from_state(state, &next_node, Some(history.id.clone()), Some(from_step))
576            .await
577    }
578
579    /// Fork execution from a specific step with modified state
580    pub async fn fork_from(
581        &self,
582        history: &ExecutionHistory,
583        from_step: usize,
584        modified_state: GraphState,
585    ) -> Result<ExecutionHistory, CrewError> {
586        // Get the next node after that step
587        let next_node = if from_step + 1 < history.steps.len() {
588            history.steps[from_step + 1].node_id.clone()
589        } else {
590            let current_node = &history.steps[from_step].node_id;
591            self.graph.find_next_node(current_node, &modified_state)?
592        };
593
594        self.execute_from_state(
595            modified_state,
596            &next_node,
597            Some(history.id.clone()),
598            Some(from_step),
599        )
600        .await
601    }
602
603    /// Execute from a specific state and node (internal helper)
604    async fn execute_from_state(
605        &self,
606        mut state: GraphState,
607        start_node: &str,
608        parent_id: Option<String>,
609        fork_step: Option<usize>,
610    ) -> Result<ExecutionHistory, CrewError> {
611        let execution_id = uuid::Uuid::new_v4().to_string();
612        let branch_id = if parent_id.is_some() {
613            Some(uuid::Uuid::new_v4().to_string())
614        } else {
615            None
616        };
617        let started_at = Utc::now();
618        let mut steps = Vec::new();
619        state.metadata.started_at = Some(started_at);
620
621        let mut current_node = start_node.to_string();
622        let mut step_num = 0;
623
624        let result = loop {
625            if state.metadata.iterations >= self.graph.config.max_iterations {
626                break GraphResult {
627                    state: state.clone(),
628                    status: GraphStatus::MaxIterations,
629                    error: Some(format!(
630                        "Hit maximum iterations: {}",
631                        self.graph.config.max_iterations
632                    )),
633                };
634            }
635
636            if current_node == END {
637                break GraphResult {
638                    state: state.clone(),
639                    status: GraphStatus::Success,
640                    error: None,
641                };
642            }
643
644            let node = match self.graph.nodes.get(&current_node) {
645                Some(n) => n,
646                None => {
647                    break GraphResult {
648                        state: state.clone(),
649                        status: GraphStatus::Failed,
650                        error: Some(format!("Node not found: {}", current_node)),
651                    };
652                }
653            };
654
655            let state_before = state.clone();
656            let step_start = std::time::Instant::now();
657
658            state.metadata.visited_nodes.push(current_node.clone());
659            state.metadata.iterations += 1;
660
661            let (new_state, error) = match node.executor.call(state).await {
662                Ok(s) => (s, None),
663                Err(e) => {
664                    let err_msg = e.to_string();
665                    steps.push(HistoryStep {
666                        step: step_num,
667                        node_id: current_node.clone(),
668                        state_before: state_before.clone(),
669                        state_after: state_before.clone(),
670                        duration_us: step_start.elapsed().as_micros() as u64,
671                        timestamp: Utc::now(),
672                        error: Some(err_msg.clone()),
673                    });
674
675                    break GraphResult {
676                        state: state_before,
677                        status: GraphStatus::Failed,
678                        error: Some(err_msg),
679                    };
680                }
681            };
682
683            let duration_us = step_start.elapsed().as_micros() as u64;
684
685            steps.push(HistoryStep {
686                step: step_num,
687                node_id: current_node.clone(),
688                state_before,
689                state_after: new_state.clone(),
690                duration_us,
691                timestamp: Utc::now(),
692                error,
693            });
694
695            state = new_state;
696            step_num += 1;
697            current_node = self.graph.find_next_node(&current_node, &state)?;
698        };
699
700        let history = ExecutionHistory {
701            id: execution_id,
702            graph_id: self.graph.id.clone(),
703            steps,
704            result,
705            branch_id,
706            parent_id,
707            fork_step,
708            started_at,
709            ended_at: Utc::now(),
710        };
711
712        self.store.save(history.clone()).await?;
713
714        Ok(history)
715    }
716
717    /// Load an execution history by ID
718    pub async fn load_history(&self, id: &str) -> Result<Option<ExecutionHistory>, CrewError> {
719        self.store.load(id).await
720    }
721
722    /// List all executions for this graph
723    pub async fn list_executions(&self) -> Result<Vec<String>, CrewError> {
724        self.store.list_for_graph(&self.graph.id).await
725    }
726
727    /// List all forks of an execution
728    pub async fn list_forks(&self, parent_id: &str) -> Result<Vec<String>, CrewError> {
729        self.store.list_forks(parent_id).await
730    }
731
732    /// Compare two executions
733    pub async fn compare_executions(
734        &self,
735        id1: &str,
736        id2: &str,
737    ) -> Result<ExecutionComparison, CrewError> {
738        let h1 = self
739            .store
740            .load(id1)
741            .await?
742            .ok_or_else(|| CrewError::TaskNotFound(format!("Execution {} not found", id1)))?;
743
744        let h2 = self
745            .store
746            .load(id2)
747            .await?
748            .ok_or_else(|| CrewError::TaskNotFound(format!("Execution {} not found", id2)))?;
749
750        Ok(ExecutionComparison::new(&h1, &h2))
751    }
752}
753
754/// Comparison between two executions
755#[derive(Debug, Clone, Serialize, Deserialize)]
756pub struct ExecutionComparison {
757    /// ID of first execution
758    pub execution1_id: String,
759    /// ID of second execution
760    pub execution2_id: String,
761    /// Steps that diverged (first differing step)
762    pub divergence_step: Option<usize>,
763    /// Number of steps in execution 1
764    pub steps1: usize,
765    /// Number of steps in execution 2
766    pub steps2: usize,
767    /// Status of execution 1
768    pub status1: GraphStatus,
769    /// Status of execution 2
770    pub status2: GraphStatus,
771    /// Final state differences
772    pub final_state_diff: StateChanges,
773    /// Whether they share a common ancestor (fork relationship)
774    pub is_fork_comparison: bool,
775}
776
777impl ExecutionComparison {
778    /// Create a comparison between two executions
779    pub fn new(h1: &ExecutionHistory, h2: &ExecutionHistory) -> Self {
780        let divergence_step = Self::find_divergence(h1, h2);
781        let final_state_diff = diff_states(&h1.result.state, &h2.result.state);
782        let is_fork_comparison =
783            h1.parent_id == Some(h2.id.clone()) || h2.parent_id == Some(h1.id.clone());
784
785        Self {
786            execution1_id: h1.id.clone(),
787            execution2_id: h2.id.clone(),
788            divergence_step,
789            steps1: h1.steps.len(),
790            steps2: h2.steps.len(),
791            status1: h1.result.status,
792            status2: h2.result.status,
793            final_state_diff,
794            is_fork_comparison,
795        }
796    }
797
798    fn find_divergence(h1: &ExecutionHistory, h2: &ExecutionHistory) -> Option<usize> {
799        let min_len = h1.steps.len().min(h2.steps.len());
800        for i in 0..min_len {
801            if h1.steps[i].node_id != h2.steps[i].node_id {
802                return Some(i);
803            }
804            // Also check if state diverged even with same node
805            let diff = diff_states(&h1.steps[i].state_after, &h2.steps[i].state_after);
806            if !diff.is_empty() {
807                return Some(i);
808            }
809        }
810        // If one is longer, divergence is at the end of the shorter
811        if h1.steps.len() != h2.steps.len() {
812            Some(min_len)
813        } else {
814            None // Identical executions
815        }
816    }
817
818    /// Get a summary of the comparison
819    pub fn summary(&self) -> String {
820        let mut lines = Vec::new();
821        lines.push(format!(
822            "Comparing {} vs {}",
823            &self.execution1_id[..8],
824            &self.execution2_id[..8]
825        ));
826
827        if self.is_fork_comparison {
828            lines.push("  (Fork relationship detected)".to_string());
829        }
830
831        lines.push(format!("  Steps: {} vs {}", self.steps1, self.steps2));
832        lines.push(format!(
833            "  Status: {:?} vs {:?}",
834            self.status1, self.status2
835        ));
836
837        if let Some(step) = self.divergence_step {
838            lines.push(format!("  Diverged at step: {}", step));
839        } else {
840            lines.push("  No divergence (identical executions)".to_string());
841        }
842
843        lines.push(format!(
844            "  Final state diff: {}",
845            self.final_state_diff.summary()
846        ));
847
848        lines.join("\n")
849    }
850}
851
852/// Debug mode for step-by-step execution
853pub struct DebugSession {
854    graph: Arc<Graph>,
855    state: GraphState,
856    current_node: String,
857    step_count: usize,
858    history: Vec<HistoryStep>,
859    breakpoints: Vec<String>,
860    started_at: DateTime<Utc>,
861}
862
863impl DebugSession {
864    /// Create a new debug session
865    pub fn new(graph: Arc<Graph>, initial_state: GraphState) -> Self {
866        let entry_node = graph.entry_node.clone();
867        Self {
868            graph,
869            state: initial_state,
870            current_node: entry_node,
871            step_count: 0,
872            history: Vec::new(),
873            breakpoints: Vec::new(),
874            started_at: Utc::now(),
875        }
876    }
877
878    /// Add a breakpoint at a node
879    pub fn add_breakpoint(&mut self, node_id: impl Into<String>) {
880        self.breakpoints.push(node_id.into());
881    }
882
883    /// Remove a breakpoint
884    pub fn remove_breakpoint(&mut self, node_id: &str) {
885        self.breakpoints.retain(|n| n != node_id);
886    }
887
888    /// Get current state
889    pub fn current_state(&self) -> &GraphState {
890        &self.state
891    }
892
893    /// Get current node
894    pub fn current_node(&self) -> &str {
895        &self.current_node
896    }
897
898    /// Check if execution is finished
899    pub fn is_finished(&self) -> bool {
900        self.current_node == END
901            || self.state.metadata.iterations >= self.graph.config.max_iterations
902    }
903
904    /// Execute a single step
905    pub async fn step(&mut self) -> Result<StepResult, CrewError> {
906        if self.is_finished() {
907            return Ok(StepResult::Finished);
908        }
909
910        let node = self
911            .graph
912            .nodes
913            .get(&self.current_node)
914            .ok_or_else(|| CrewError::TaskNotFound(self.current_node.clone()))?;
915
916        let state_before = self.state.clone();
917        let step_start = std::time::Instant::now();
918
919        self.state
920            .metadata
921            .visited_nodes
922            .push(self.current_node.clone());
923        self.state.metadata.iterations += 1;
924
925        let new_state = node.executor.call(self.state.clone()).await?;
926        let duration_us = step_start.elapsed().as_micros() as u64;
927
928        let step = HistoryStep {
929            step: self.step_count,
930            node_id: self.current_node.clone(),
931            state_before,
932            state_after: new_state.clone(),
933            duration_us,
934            timestamp: Utc::now(),
935            error: None,
936        };
937
938        self.history.push(step.clone());
939        self.state = new_state;
940        self.step_count += 1;
941
942        let next_node = self.graph.find_next_node(&self.current_node, &self.state)?;
943        self.current_node = next_node.clone();
944
945        if next_node == END {
946            Ok(StepResult::Finished)
947        } else if self.breakpoints.contains(&next_node) {
948            Ok(StepResult::Breakpoint(next_node))
949        } else {
950            Ok(StepResult::Continue(step))
951        }
952    }
953
954    /// Run until a breakpoint or completion
955    pub async fn run(&mut self) -> Result<StepResult, CrewError> {
956        loop {
957            let result = self.step().await?;
958            match &result {
959                StepResult::Continue(_) => continue,
960                _ => return Ok(result),
961            }
962        }
963    }
964
965    /// Modify current state
966    pub fn modify_state<F>(&mut self, f: F)
967    where
968        F: FnOnce(&mut GraphState),
969    {
970        f(&mut self.state);
971    }
972
973    /// Get execution history so far
974    pub fn history(&self) -> &[HistoryStep] {
975        &self.history
976    }
977
978    /// Convert to final ExecutionHistory
979    pub fn into_history(self, status: GraphStatus, error: Option<String>) -> ExecutionHistory {
980        ExecutionHistory {
981            id: uuid::Uuid::new_v4().to_string(),
982            graph_id: self.graph.id.clone(),
983            steps: self.history,
984            result: GraphResult {
985                state: self.state,
986                status,
987                error,
988            },
989            branch_id: None,
990            parent_id: None,
991            fork_step: None,
992            started_at: self.started_at,
993            ended_at: Utc::now(),
994        }
995    }
996}
997
998/// Result of a single debug step
999#[derive(Debug)]
1000pub enum StepResult {
1001    /// Continue to next step (includes the completed step)
1002    Continue(HistoryStep),
1003    /// Hit a breakpoint (includes the node ID)
1004    Breakpoint(String),
1005    /// Execution finished
1006    Finished,
1007}
1008
1009#[cfg(test)]
1010mod tests {
1011    use super::*;
1012    use crate::graph::GraphBuilder;
1013
1014    fn create_test_graph() -> Graph {
1015        GraphBuilder::new("test_graph")
1016            .add_node("step1", |mut state: GraphState| async move {
1017                state.set("step1_done", true);
1018                state.set("counter", 1);
1019                Ok(state)
1020            })
1021            .add_node("step2", |mut state: GraphState| async move {
1022                let counter: i32 = state.get("counter").unwrap_or(0);
1023                state.set("counter", counter + 1);
1024                state.set("step2_done", true);
1025                Ok(state)
1026            })
1027            .add_node("step3", |mut state: GraphState| async move {
1028                let counter: i32 = state.get("counter").unwrap_or(0);
1029                state.set("counter", counter + 1);
1030                state.set("step3_done", true);
1031                Ok(state)
1032            })
1033            .add_edge("step1", "step2")
1034            .add_edge("step2", "step3")
1035            .add_edge("step3", END)
1036            .set_entry("step1")
1037            .build()
1038            .unwrap()
1039    }
1040
1041    #[tokio::test]
1042    async fn test_execute_with_history() {
1043        let graph = create_test_graph();
1044        let tt = TimeTravelGraph::new(graph);
1045
1046        let history = tt.execute(GraphState::new()).await.unwrap();
1047
1048        assert_eq!(history.steps.len(), 3);
1049        assert_eq!(history.result.status, GraphStatus::Success);
1050        assert_eq!(history.steps[0].node_id, "step1");
1051        assert_eq!(history.steps[1].node_id, "step2");
1052        assert_eq!(history.steps[2].node_id, "step3");
1053    }
1054
1055    #[tokio::test]
1056    async fn test_history_browser() {
1057        let graph = create_test_graph();
1058        let tt = TimeTravelGraph::new(graph);
1059
1060        let history = tt.execute(GraphState::new()).await.unwrap();
1061        let browser = history.browser();
1062
1063        assert_eq!(browser.len(), 3);
1064        assert_eq!(browser.visited_nodes(), vec!["step1", "step2", "step3"]);
1065
1066        let state_at_1 = browser.state_at(1).unwrap();
1067        let counter: i32 = state_at_1.get("counter").unwrap();
1068        assert_eq!(counter, 2);
1069    }
1070
1071    #[tokio::test]
1072    async fn test_find_key_changes() {
1073        let graph = create_test_graph();
1074        let tt = TimeTravelGraph::new(graph);
1075
1076        let history = tt.execute(GraphState::new()).await.unwrap();
1077        let browser = history.browser();
1078
1079        // counter changes in all 3 steps
1080        let counter_changes = browser.find_key_changes("counter");
1081        assert_eq!(counter_changes.len(), 3);
1082
1083        // step1_done only changes in step 0
1084        let step1_changes = browser.find_key_changes("step1_done");
1085        assert_eq!(step1_changes.len(), 1);
1086        assert_eq!(step1_changes[0], 0);
1087    }
1088
1089    #[tokio::test]
1090    async fn test_replay_from_step() {
1091        let graph = create_test_graph();
1092        let tt = TimeTravelGraph::new(graph);
1093
1094        let original = tt.execute(GraphState::new()).await.unwrap();
1095
1096        // Replay from step 1 (after step2 executed)
1097        let replayed = tt.replay_from(&original, 1).await.unwrap();
1098
1099        // Should only have step3
1100        assert_eq!(replayed.steps.len(), 1);
1101        assert_eq!(replayed.steps[0].node_id, "step3");
1102        assert!(replayed.parent_id.is_some());
1103        assert_eq!(replayed.fork_step, Some(1));
1104    }
1105
1106    #[tokio::test]
1107    async fn test_fork_with_modified_state() {
1108        let graph = create_test_graph();
1109        let tt = TimeTravelGraph::new(graph);
1110
1111        let original = tt.execute(GraphState::new()).await.unwrap();
1112
1113        // Fork from step 0 with modified state
1114        let mut modified_state = original.steps[0].state_after.clone();
1115        modified_state.set("counter", 100); // Override counter
1116
1117        let forked = tt.fork_from(&original, 0, modified_state).await.unwrap();
1118
1119        // Forked execution should continue with modified counter
1120        let final_counter: i32 = forked.result.state.get("counter").unwrap();
1121        assert_eq!(final_counter, 102); // 100 + 1 + 1
1122    }
1123
1124    #[tokio::test]
1125    async fn test_state_diff() {
1126        let mut before = GraphState::new();
1127        before.set("a", 1);
1128        before.set("b", 2);
1129
1130        let mut after = GraphState::new();
1131        after.set("a", 1); // unchanged
1132        after.set("b", 3); // modified
1133        after.set("c", 4); // added
1134
1135        let diff = diff_states(&before, &after);
1136
1137        assert!(diff.added.contains(&"c".to_string()));
1138        assert!(diff.modified.contains_key("b"));
1139        assert!(!diff.removed.contains(&"a".to_string()));
1140    }
1141
1142    #[tokio::test]
1143    async fn test_compare_executions() {
1144        let graph = create_test_graph();
1145        let tt = TimeTravelGraph::new(graph);
1146
1147        let exec1 = tt.execute(GraphState::new()).await.unwrap();
1148
1149        // Fork with different state
1150        let mut modified = exec1.steps[0].state_after.clone();
1151        modified.set("custom", "value");
1152        let exec2 = tt.fork_from(&exec1, 0, modified).await.unwrap();
1153
1154        let comparison = tt.compare_executions(&exec1.id, &exec2.id).await.unwrap();
1155
1156        assert!(comparison.is_fork_comparison);
1157        assert!(!comparison.final_state_diff.is_empty());
1158    }
1159
1160    #[tokio::test]
1161    async fn test_debug_session() {
1162        let graph = Arc::new(create_test_graph());
1163        let mut session = DebugSession::new(graph, GraphState::new());
1164
1165        // Step through manually
1166        let result1 = session.step().await.unwrap();
1167        assert!(matches!(result1, StepResult::Continue(_)));
1168        assert_eq!(session.current_node(), "step2");
1169
1170        let result2 = session.step().await.unwrap();
1171        assert!(matches!(result2, StepResult::Continue(_)));
1172        assert_eq!(session.current_node(), "step3");
1173
1174        let result3 = session.step().await.unwrap();
1175        assert!(matches!(result3, StepResult::Finished));
1176    }
1177
1178    #[tokio::test]
1179    async fn test_debug_breakpoints() {
1180        let graph = Arc::new(create_test_graph());
1181        let mut session = DebugSession::new(graph, GraphState::new());
1182
1183        session.add_breakpoint("step3");
1184
1185        // Run until breakpoint
1186        let result = session.run().await.unwrap();
1187        assert!(matches!(result, StepResult::Breakpoint(ref n) if n == "step3"));
1188        assert_eq!(session.current_node(), "step3");
1189
1190        // Continue to end
1191        let result = session.run().await.unwrap();
1192        assert!(matches!(result, StepResult::Finished));
1193    }
1194
1195    #[tokio::test]
1196    async fn test_debug_modify_state() {
1197        let graph = Arc::new(create_test_graph());
1198        let mut session = DebugSession::new(graph, GraphState::new());
1199
1200        // Execute step1
1201        session.step().await.unwrap();
1202
1203        // Modify state before step2
1204        session.modify_state(|state| {
1205            state.set("counter", 50);
1206        });
1207
1208        // Continue execution
1209        session.run().await.unwrap();
1210
1211        // Final counter should be 52 (50 + 1 + 1)
1212        let final_counter: i32 = session.current_state().get("counter").unwrap();
1213        assert_eq!(final_counter, 52);
1214    }
1215
1216    #[tokio::test]
1217    async fn test_execution_summary() {
1218        let graph = create_test_graph();
1219        let tt = TimeTravelGraph::new(graph);
1220
1221        let history = tt.execute(GraphState::new()).await.unwrap();
1222        let summary = history.browser().summary();
1223
1224        assert_eq!(summary.total_steps, 3);
1225        assert_eq!(summary.unique_nodes, 3);
1226        assert_eq!(summary.status, GraphStatus::Success);
1227        assert!(!summary.is_fork);
1228    }
1229
1230    #[tokio::test]
1231    async fn test_timeline_output() {
1232        let graph = create_test_graph();
1233        let tt = TimeTravelGraph::new(graph);
1234
1235        let history = tt.execute(GraphState::new()).await.unwrap();
1236        let timeline = history.browser().timeline();
1237
1238        assert!(timeline.contains("step1"));
1239        assert!(timeline.contains("step2"));
1240        assert!(timeline.contains("step3"));
1241        assert!(timeline.contains("Success"));
1242    }
1243}