1use crate::graph::{Graph, GraphResult, GraphState, GraphStatus, END};
38use chrono::{DateTime, Utc};
39use cortexai_core::errors::CrewError;
40use serde::{Deserialize, Serialize};
41use std::collections::HashMap;
42use std::sync::Arc;
43use tokio::sync::RwLock;
44
45#[derive(Debug, Clone, Serialize, Deserialize)]
47pub struct HistoryStep {
48 pub step: usize,
50 pub node_id: String,
52 pub state_before: GraphState,
54 pub state_after: GraphState,
56 pub duration_us: u64,
58 pub timestamp: DateTime<Utc>,
60 pub error: Option<String>,
62}
63
64impl HistoryStep {
65 pub fn changes(&self) -> StateChanges {
67 diff_states(&self.state_before, &self.state_after)
68 }
69}
70
71#[derive(Debug, Clone, Serialize, Deserialize)]
73pub struct StateChanges {
74 pub added: Vec<String>,
76 pub removed: Vec<String>,
78 pub modified: HashMap<String, (serde_json::Value, serde_json::Value)>,
80}
81
82impl StateChanges {
83 pub fn is_empty(&self) -> bool {
85 self.added.is_empty() && self.removed.is_empty() && self.modified.is_empty()
86 }
87
88 pub fn summary(&self) -> String {
90 let mut parts = Vec::new();
91 if !self.added.is_empty() {
92 parts.push(format!("+{} added", self.added.len()));
93 }
94 if !self.removed.is_empty() {
95 parts.push(format!("-{} removed", self.removed.len()));
96 }
97 if !self.modified.is_empty() {
98 parts.push(format!("~{} modified", self.modified.len()));
99 }
100 if parts.is_empty() {
101 "no changes".to_string()
102 } else {
103 parts.join(", ")
104 }
105 }
106}
107
108pub fn diff_states(before: &GraphState, after: &GraphState) -> StateChanges {
110 let mut changes = StateChanges {
111 added: Vec::new(),
112 removed: Vec::new(),
113 modified: HashMap::new(),
114 };
115
116 let before_obj = before.data.as_object();
117 let after_obj = after.data.as_object();
118
119 if let (Some(b), Some(a)) = (before_obj, after_obj) {
120 for (key, new_val) in a {
122 match b.get(key) {
123 None => changes.added.push(key.clone()),
124 Some(old_val) if old_val != new_val => {
125 changes
126 .modified
127 .insert(key.clone(), (old_val.clone(), new_val.clone()));
128 }
129 _ => {}
130 }
131 }
132
133 for key in b.keys() {
135 if !a.contains_key(key) {
136 changes.removed.push(key.clone());
137 }
138 }
139 }
140
141 changes
142}
143
144#[derive(Debug, Clone, Serialize, Deserialize)]
146pub struct ExecutionHistory {
147 pub id: String,
149 pub graph_id: String,
151 pub steps: Vec<HistoryStep>,
153 pub result: GraphResult,
155 pub branch_id: Option<String>,
157 pub parent_id: Option<String>,
159 pub fork_step: Option<usize>,
161 pub started_at: DateTime<Utc>,
163 pub ended_at: DateTime<Utc>,
165}
166
167impl ExecutionHistory {
168 pub fn browser(&self) -> HistoryBrowser<'_> {
170 HistoryBrowser { history: self }
171 }
172
173 pub fn duration_ms(&self) -> i64 {
175 self.ended_at
176 .signed_duration_since(self.started_at)
177 .num_milliseconds()
178 }
179
180 pub fn is_fork(&self) -> bool {
182 self.parent_id.is_some()
183 }
184}
185
186#[derive(Debug)]
188pub struct HistoryBrowser<'a> {
189 history: &'a ExecutionHistory,
190}
191
192impl<'a> HistoryBrowser<'a> {
193 pub fn len(&self) -> usize {
195 self.history.steps.len()
196 }
197
198 pub fn is_empty(&self) -> bool {
200 self.history.steps.is_empty()
201 }
202
203 pub fn step_at(&self, index: usize) -> Option<&HistoryStep> {
205 self.history.steps.get(index)
206 }
207
208 pub fn state_at(&self, index: usize) -> Result<GraphState, CrewError> {
210 self.history
211 .steps
212 .get(index)
213 .map(|s| s.state_after.clone())
214 .ok_or_else(|| CrewError::TaskNotFound(format!("Step {} not found", index)))
215 }
216
217 pub fn state_before(&self, index: usize) -> Result<GraphState, CrewError> {
219 self.history
220 .steps
221 .get(index)
222 .map(|s| s.state_before.clone())
223 .ok_or_else(|| CrewError::TaskNotFound(format!("Step {} not found", index)))
224 }
225
226 pub fn visited_nodes(&self) -> Vec<&str> {
228 self.history
229 .steps
230 .iter()
231 .map(|s| s.node_id.as_str())
232 .collect()
233 }
234
235 pub fn find_key_changes(&self, key: &str) -> Vec<usize> {
237 self.history
238 .steps
239 .iter()
240 .enumerate()
241 .filter(|(_, step)| {
242 let changes = step.changes();
243 changes.added.contains(&key.to_string())
244 || changes.removed.contains(&key.to_string())
245 || changes.modified.contains_key(key)
246 })
247 .map(|(i, _)| i)
248 .collect()
249 }
250
251 pub fn find_node_executions(&self, node_id: &str) -> Vec<usize> {
253 self.history
254 .steps
255 .iter()
256 .enumerate()
257 .filter(|(_, step)| step.node_id == node_id)
258 .map(|(i, _)| i)
259 .collect()
260 }
261
262 pub fn summary(&self) -> ExecutionSummary {
264 let mut node_counts: HashMap<String, usize> = HashMap::new();
265 let mut total_duration_us = 0u64;
266
267 for step in &self.history.steps {
268 *node_counts.entry(step.node_id.clone()).or_insert(0) += 1;
269 total_duration_us += step.duration_us;
270 }
271
272 ExecutionSummary {
273 total_steps: self.history.steps.len(),
274 unique_nodes: node_counts.len(),
275 node_execution_counts: node_counts,
276 total_duration_us,
277 status: self.history.result.status,
278 is_fork: self.history.is_fork(),
279 }
280 }
281
282 pub fn diff(
284 &self,
285 self_step: usize,
286 other: &HistoryBrowser,
287 other_step: usize,
288 ) -> Result<StateChanges, CrewError> {
289 let self_state = self.state_at(self_step)?;
290 let other_state = other.state_at(other_step)?;
291 Ok(diff_states(&self_state, &other_state))
292 }
293
294 pub fn timeline(&self) -> String {
296 let mut lines = Vec::new();
297 lines.push(format!("Execution {} Timeline:", self.history.id));
298 lines.push("─".repeat(50));
299
300 for step in &self.history.steps {
301 let changes = step.changes();
302 let status = if step.error.is_some() { "✗" } else { "✓" };
303 lines.push(format!(
304 "{} Step {}: {} ({:.2}ms) - {}",
305 status,
306 step.step,
307 step.node_id,
308 step.duration_us as f64 / 1000.0,
309 changes.summary()
310 ));
311 }
312
313 lines.push("─".repeat(50));
314 lines.push(format!("Status: {:?}", self.history.result.status));
315 lines.join("\n")
316 }
317}
318
319#[derive(Debug, Clone, Serialize, Deserialize)]
321pub struct ExecutionSummary {
322 pub total_steps: usize,
324 pub unique_nodes: usize,
326 pub node_execution_counts: HashMap<String, usize>,
328 pub total_duration_us: u64,
330 pub status: GraphStatus,
332 pub is_fork: bool,
334}
335
336#[async_trait::async_trait]
338pub trait HistoryStore: Send + Sync {
339 async fn save(&self, history: ExecutionHistory) -> Result<(), CrewError>;
341 async fn load(&self, id: &str) -> Result<Option<ExecutionHistory>, CrewError>;
343 async fn list_for_graph(&self, graph_id: &str) -> Result<Vec<String>, CrewError>;
345 async fn list_forks(&self, parent_id: &str) -> Result<Vec<String>, CrewError>;
347 async fn delete(&self, id: &str) -> Result<(), CrewError>;
349}
350
351#[derive(Default)]
353pub struct InMemoryHistoryStore {
354 histories: RwLock<HashMap<String, ExecutionHistory>>,
355}
356
357impl InMemoryHistoryStore {
358 pub fn new() -> Self {
360 Self::default()
361 }
362}
363
364#[async_trait::async_trait]
365impl HistoryStore for InMemoryHistoryStore {
366 async fn save(&self, history: ExecutionHistory) -> Result<(), CrewError> {
367 self.histories
368 .write()
369 .await
370 .insert(history.id.clone(), history);
371 Ok(())
372 }
373
374 async fn load(&self, id: &str) -> Result<Option<ExecutionHistory>, CrewError> {
375 Ok(self.histories.read().await.get(id).cloned())
376 }
377
378 async fn list_for_graph(&self, graph_id: &str) -> Result<Vec<String>, CrewError> {
379 Ok(self
380 .histories
381 .read()
382 .await
383 .values()
384 .filter(|h| h.graph_id == graph_id)
385 .map(|h| h.id.clone())
386 .collect())
387 }
388
389 async fn list_forks(&self, parent_id: &str) -> Result<Vec<String>, CrewError> {
390 Ok(self
391 .histories
392 .read()
393 .await
394 .values()
395 .filter(|h| h.parent_id.as_deref() == Some(parent_id))
396 .map(|h| h.id.clone())
397 .collect())
398 }
399
400 async fn delete(&self, id: &str) -> Result<(), CrewError> {
401 self.histories.write().await.remove(id);
402 Ok(())
403 }
404}
405
406pub struct TimeTravelGraph {
408 graph: Arc<Graph>,
409 store: Arc<dyn HistoryStore>,
410}
411
412impl TimeTravelGraph {
413 pub fn new(graph: Graph) -> Self {
415 Self {
416 graph: Arc::new(graph),
417 store: Arc::new(InMemoryHistoryStore::new()),
418 }
419 }
420
421 pub fn with_store(graph: Graph, store: Arc<dyn HistoryStore>) -> Self {
423 Self {
424 graph: Arc::new(graph),
425 store,
426 }
427 }
428
429 pub fn graph(&self) -> &Graph {
431 &self.graph
432 }
433
434 pub async fn execute(&self, initial_state: GraphState) -> Result<ExecutionHistory, CrewError> {
436 let execution_id = uuid::Uuid::new_v4().to_string();
437 let started_at = Utc::now();
438 let mut steps = Vec::new();
439 let mut state = initial_state;
440 state.metadata.started_at = Some(started_at);
441 state.metadata.iterations = 0;
442
443 let mut current_node = self.graph.entry_node.clone();
444 let mut step_num = 0;
445
446 let result = loop {
447 if state.metadata.iterations >= self.graph.config.max_iterations {
449 break GraphResult {
450 state: state.clone(),
451 status: GraphStatus::MaxIterations,
452 error: Some(format!(
453 "Hit maximum iterations: {}",
454 self.graph.config.max_iterations
455 )),
456 };
457 }
458
459 if current_node == END {
461 state.metadata.execution_time_ms = started_at
462 .signed_duration_since(Utc::now())
463 .num_milliseconds()
464 .unsigned_abs();
465 break GraphResult {
466 state: state.clone(),
467 status: GraphStatus::Success,
468 error: None,
469 };
470 }
471
472 let node = match self.graph.nodes.get(¤t_node) {
474 Some(n) => n,
475 None => {
476 break GraphResult {
477 state: state.clone(),
478 status: GraphStatus::Failed,
479 error: Some(format!("Node not found: {}", current_node)),
480 };
481 }
482 };
483
484 let state_before = state.clone();
486 let step_start = std::time::Instant::now();
487
488 state.metadata.visited_nodes.push(current_node.clone());
490 state.metadata.iterations += 1;
491
492 let (new_state, error) = match node.executor.call(state).await {
493 Ok(s) => (s, None),
494 Err(e) => {
495 let err_msg = e.to_string();
496 steps.push(HistoryStep {
498 step: step_num,
499 node_id: current_node.clone(),
500 state_before: state_before.clone(),
501 state_after: state_before.clone(),
502 duration_us: step_start.elapsed().as_micros() as u64,
503 timestamp: Utc::now(),
504 error: Some(err_msg.clone()),
505 });
506
507 break GraphResult {
508 state: state_before,
509 status: GraphStatus::Failed,
510 error: Some(err_msg),
511 };
512 }
513 };
514
515 let duration_us = step_start.elapsed().as_micros() as u64;
516
517 steps.push(HistoryStep {
519 step: step_num,
520 node_id: current_node.clone(),
521 state_before,
522 state_after: new_state.clone(),
523 duration_us,
524 timestamp: Utc::now(),
525 error,
526 });
527
528 state = new_state;
529 step_num += 1;
530
531 current_node = self.graph.find_next_node(¤t_node, &state)?;
533 };
534
535 let history = ExecutionHistory {
536 id: execution_id,
537 graph_id: self.graph.id.clone(),
538 steps,
539 result,
540 branch_id: None,
541 parent_id: None,
542 fork_step: None,
543 started_at,
544 ended_at: Utc::now(),
545 };
546
547 self.store.save(history.clone()).await?;
549
550 Ok(history)
551 }
552
553 pub async fn replay_from(
555 &self,
556 history: &ExecutionHistory,
557 from_step: usize,
558 ) -> Result<ExecutionHistory, CrewError> {
559 let state = history
561 .steps
562 .get(from_step)
563 .map(|s| s.state_after.clone())
564 .ok_or_else(|| CrewError::TaskNotFound(format!("Step {} not found", from_step)))?;
565
566 let next_node = if from_step + 1 < history.steps.len() {
568 history.steps[from_step + 1].node_id.clone()
569 } else {
570 let current_node = &history.steps[from_step].node_id;
572 self.graph.find_next_node(current_node, &state)?
573 };
574
575 self.execute_from_state(state, &next_node, Some(history.id.clone()), Some(from_step))
576 .await
577 }
578
579 pub async fn fork_from(
581 &self,
582 history: &ExecutionHistory,
583 from_step: usize,
584 modified_state: GraphState,
585 ) -> Result<ExecutionHistory, CrewError> {
586 let next_node = if from_step + 1 < history.steps.len() {
588 history.steps[from_step + 1].node_id.clone()
589 } else {
590 let current_node = &history.steps[from_step].node_id;
591 self.graph.find_next_node(current_node, &modified_state)?
592 };
593
594 self.execute_from_state(
595 modified_state,
596 &next_node,
597 Some(history.id.clone()),
598 Some(from_step),
599 )
600 .await
601 }
602
603 async fn execute_from_state(
605 &self,
606 mut state: GraphState,
607 start_node: &str,
608 parent_id: Option<String>,
609 fork_step: Option<usize>,
610 ) -> Result<ExecutionHistory, CrewError> {
611 let execution_id = uuid::Uuid::new_v4().to_string();
612 let branch_id = if parent_id.is_some() {
613 Some(uuid::Uuid::new_v4().to_string())
614 } else {
615 None
616 };
617 let started_at = Utc::now();
618 let mut steps = Vec::new();
619 state.metadata.started_at = Some(started_at);
620
621 let mut current_node = start_node.to_string();
622 let mut step_num = 0;
623
624 let result = loop {
625 if state.metadata.iterations >= self.graph.config.max_iterations {
626 break GraphResult {
627 state: state.clone(),
628 status: GraphStatus::MaxIterations,
629 error: Some(format!(
630 "Hit maximum iterations: {}",
631 self.graph.config.max_iterations
632 )),
633 };
634 }
635
636 if current_node == END {
637 break GraphResult {
638 state: state.clone(),
639 status: GraphStatus::Success,
640 error: None,
641 };
642 }
643
644 let node = match self.graph.nodes.get(¤t_node) {
645 Some(n) => n,
646 None => {
647 break GraphResult {
648 state: state.clone(),
649 status: GraphStatus::Failed,
650 error: Some(format!("Node not found: {}", current_node)),
651 };
652 }
653 };
654
655 let state_before = state.clone();
656 let step_start = std::time::Instant::now();
657
658 state.metadata.visited_nodes.push(current_node.clone());
659 state.metadata.iterations += 1;
660
661 let (new_state, error) = match node.executor.call(state).await {
662 Ok(s) => (s, None),
663 Err(e) => {
664 let err_msg = e.to_string();
665 steps.push(HistoryStep {
666 step: step_num,
667 node_id: current_node.clone(),
668 state_before: state_before.clone(),
669 state_after: state_before.clone(),
670 duration_us: step_start.elapsed().as_micros() as u64,
671 timestamp: Utc::now(),
672 error: Some(err_msg.clone()),
673 });
674
675 break GraphResult {
676 state: state_before,
677 status: GraphStatus::Failed,
678 error: Some(err_msg),
679 };
680 }
681 };
682
683 let duration_us = step_start.elapsed().as_micros() as u64;
684
685 steps.push(HistoryStep {
686 step: step_num,
687 node_id: current_node.clone(),
688 state_before,
689 state_after: new_state.clone(),
690 duration_us,
691 timestamp: Utc::now(),
692 error,
693 });
694
695 state = new_state;
696 step_num += 1;
697 current_node = self.graph.find_next_node(¤t_node, &state)?;
698 };
699
700 let history = ExecutionHistory {
701 id: execution_id,
702 graph_id: self.graph.id.clone(),
703 steps,
704 result,
705 branch_id,
706 parent_id,
707 fork_step,
708 started_at,
709 ended_at: Utc::now(),
710 };
711
712 self.store.save(history.clone()).await?;
713
714 Ok(history)
715 }
716
717 pub async fn load_history(&self, id: &str) -> Result<Option<ExecutionHistory>, CrewError> {
719 self.store.load(id).await
720 }
721
722 pub async fn list_executions(&self) -> Result<Vec<String>, CrewError> {
724 self.store.list_for_graph(&self.graph.id).await
725 }
726
727 pub async fn list_forks(&self, parent_id: &str) -> Result<Vec<String>, CrewError> {
729 self.store.list_forks(parent_id).await
730 }
731
732 pub async fn compare_executions(
734 &self,
735 id1: &str,
736 id2: &str,
737 ) -> Result<ExecutionComparison, CrewError> {
738 let h1 = self
739 .store
740 .load(id1)
741 .await?
742 .ok_or_else(|| CrewError::TaskNotFound(format!("Execution {} not found", id1)))?;
743
744 let h2 = self
745 .store
746 .load(id2)
747 .await?
748 .ok_or_else(|| CrewError::TaskNotFound(format!("Execution {} not found", id2)))?;
749
750 Ok(ExecutionComparison::new(&h1, &h2))
751 }
752}
753
754#[derive(Debug, Clone, Serialize, Deserialize)]
756pub struct ExecutionComparison {
757 pub execution1_id: String,
759 pub execution2_id: String,
761 pub divergence_step: Option<usize>,
763 pub steps1: usize,
765 pub steps2: usize,
767 pub status1: GraphStatus,
769 pub status2: GraphStatus,
771 pub final_state_diff: StateChanges,
773 pub is_fork_comparison: bool,
775}
776
777impl ExecutionComparison {
778 pub fn new(h1: &ExecutionHistory, h2: &ExecutionHistory) -> Self {
780 let divergence_step = Self::find_divergence(h1, h2);
781 let final_state_diff = diff_states(&h1.result.state, &h2.result.state);
782 let is_fork_comparison =
783 h1.parent_id == Some(h2.id.clone()) || h2.parent_id == Some(h1.id.clone());
784
785 Self {
786 execution1_id: h1.id.clone(),
787 execution2_id: h2.id.clone(),
788 divergence_step,
789 steps1: h1.steps.len(),
790 steps2: h2.steps.len(),
791 status1: h1.result.status,
792 status2: h2.result.status,
793 final_state_diff,
794 is_fork_comparison,
795 }
796 }
797
798 fn find_divergence(h1: &ExecutionHistory, h2: &ExecutionHistory) -> Option<usize> {
799 let min_len = h1.steps.len().min(h2.steps.len());
800 for i in 0..min_len {
801 if h1.steps[i].node_id != h2.steps[i].node_id {
802 return Some(i);
803 }
804 let diff = diff_states(&h1.steps[i].state_after, &h2.steps[i].state_after);
806 if !diff.is_empty() {
807 return Some(i);
808 }
809 }
810 if h1.steps.len() != h2.steps.len() {
812 Some(min_len)
813 } else {
814 None }
816 }
817
818 pub fn summary(&self) -> String {
820 let mut lines = Vec::new();
821 lines.push(format!(
822 "Comparing {} vs {}",
823 &self.execution1_id[..8],
824 &self.execution2_id[..8]
825 ));
826
827 if self.is_fork_comparison {
828 lines.push(" (Fork relationship detected)".to_string());
829 }
830
831 lines.push(format!(" Steps: {} vs {}", self.steps1, self.steps2));
832 lines.push(format!(
833 " Status: {:?} vs {:?}",
834 self.status1, self.status2
835 ));
836
837 if let Some(step) = self.divergence_step {
838 lines.push(format!(" Diverged at step: {}", step));
839 } else {
840 lines.push(" No divergence (identical executions)".to_string());
841 }
842
843 lines.push(format!(
844 " Final state diff: {}",
845 self.final_state_diff.summary()
846 ));
847
848 lines.join("\n")
849 }
850}
851
852pub struct DebugSession {
854 graph: Arc<Graph>,
855 state: GraphState,
856 current_node: String,
857 step_count: usize,
858 history: Vec<HistoryStep>,
859 breakpoints: Vec<String>,
860 started_at: DateTime<Utc>,
861}
862
863impl DebugSession {
864 pub fn new(graph: Arc<Graph>, initial_state: GraphState) -> Self {
866 let entry_node = graph.entry_node.clone();
867 Self {
868 graph,
869 state: initial_state,
870 current_node: entry_node,
871 step_count: 0,
872 history: Vec::new(),
873 breakpoints: Vec::new(),
874 started_at: Utc::now(),
875 }
876 }
877
878 pub fn add_breakpoint(&mut self, node_id: impl Into<String>) {
880 self.breakpoints.push(node_id.into());
881 }
882
883 pub fn remove_breakpoint(&mut self, node_id: &str) {
885 self.breakpoints.retain(|n| n != node_id);
886 }
887
888 pub fn current_state(&self) -> &GraphState {
890 &self.state
891 }
892
893 pub fn current_node(&self) -> &str {
895 &self.current_node
896 }
897
898 pub fn is_finished(&self) -> bool {
900 self.current_node == END
901 || self.state.metadata.iterations >= self.graph.config.max_iterations
902 }
903
904 pub async fn step(&mut self) -> Result<StepResult, CrewError> {
906 if self.is_finished() {
907 return Ok(StepResult::Finished);
908 }
909
910 let node = self
911 .graph
912 .nodes
913 .get(&self.current_node)
914 .ok_or_else(|| CrewError::TaskNotFound(self.current_node.clone()))?;
915
916 let state_before = self.state.clone();
917 let step_start = std::time::Instant::now();
918
919 self.state
920 .metadata
921 .visited_nodes
922 .push(self.current_node.clone());
923 self.state.metadata.iterations += 1;
924
925 let new_state = node.executor.call(self.state.clone()).await?;
926 let duration_us = step_start.elapsed().as_micros() as u64;
927
928 let step = HistoryStep {
929 step: self.step_count,
930 node_id: self.current_node.clone(),
931 state_before,
932 state_after: new_state.clone(),
933 duration_us,
934 timestamp: Utc::now(),
935 error: None,
936 };
937
938 self.history.push(step.clone());
939 self.state = new_state;
940 self.step_count += 1;
941
942 let next_node = self.graph.find_next_node(&self.current_node, &self.state)?;
943 self.current_node = next_node.clone();
944
945 if next_node == END {
946 Ok(StepResult::Finished)
947 } else if self.breakpoints.contains(&next_node) {
948 Ok(StepResult::Breakpoint(next_node))
949 } else {
950 Ok(StepResult::Continue(step))
951 }
952 }
953
954 pub async fn run(&mut self) -> Result<StepResult, CrewError> {
956 loop {
957 let result = self.step().await?;
958 match &result {
959 StepResult::Continue(_) => continue,
960 _ => return Ok(result),
961 }
962 }
963 }
964
965 pub fn modify_state<F>(&mut self, f: F)
967 where
968 F: FnOnce(&mut GraphState),
969 {
970 f(&mut self.state);
971 }
972
973 pub fn history(&self) -> &[HistoryStep] {
975 &self.history
976 }
977
978 pub fn into_history(self, status: GraphStatus, error: Option<String>) -> ExecutionHistory {
980 ExecutionHistory {
981 id: uuid::Uuid::new_v4().to_string(),
982 graph_id: self.graph.id.clone(),
983 steps: self.history,
984 result: GraphResult {
985 state: self.state,
986 status,
987 error,
988 },
989 branch_id: None,
990 parent_id: None,
991 fork_step: None,
992 started_at: self.started_at,
993 ended_at: Utc::now(),
994 }
995 }
996}
997
998#[derive(Debug)]
1000pub enum StepResult {
1001 Continue(HistoryStep),
1003 Breakpoint(String),
1005 Finished,
1007}
1008
1009#[cfg(test)]
1010mod tests {
1011 use super::*;
1012 use crate::graph::GraphBuilder;
1013
1014 fn create_test_graph() -> Graph {
1015 GraphBuilder::new("test_graph")
1016 .add_node("step1", |mut state: GraphState| async move {
1017 state.set("step1_done", true);
1018 state.set("counter", 1);
1019 Ok(state)
1020 })
1021 .add_node("step2", |mut state: GraphState| async move {
1022 let counter: i32 = state.get("counter").unwrap_or(0);
1023 state.set("counter", counter + 1);
1024 state.set("step2_done", true);
1025 Ok(state)
1026 })
1027 .add_node("step3", |mut state: GraphState| async move {
1028 let counter: i32 = state.get("counter").unwrap_or(0);
1029 state.set("counter", counter + 1);
1030 state.set("step3_done", true);
1031 Ok(state)
1032 })
1033 .add_edge("step1", "step2")
1034 .add_edge("step2", "step3")
1035 .add_edge("step3", END)
1036 .set_entry("step1")
1037 .build()
1038 .unwrap()
1039 }
1040
1041 #[tokio::test]
1042 async fn test_execute_with_history() {
1043 let graph = create_test_graph();
1044 let tt = TimeTravelGraph::new(graph);
1045
1046 let history = tt.execute(GraphState::new()).await.unwrap();
1047
1048 assert_eq!(history.steps.len(), 3);
1049 assert_eq!(history.result.status, GraphStatus::Success);
1050 assert_eq!(history.steps[0].node_id, "step1");
1051 assert_eq!(history.steps[1].node_id, "step2");
1052 assert_eq!(history.steps[2].node_id, "step3");
1053 }
1054
1055 #[tokio::test]
1056 async fn test_history_browser() {
1057 let graph = create_test_graph();
1058 let tt = TimeTravelGraph::new(graph);
1059
1060 let history = tt.execute(GraphState::new()).await.unwrap();
1061 let browser = history.browser();
1062
1063 assert_eq!(browser.len(), 3);
1064 assert_eq!(browser.visited_nodes(), vec!["step1", "step2", "step3"]);
1065
1066 let state_at_1 = browser.state_at(1).unwrap();
1067 let counter: i32 = state_at_1.get("counter").unwrap();
1068 assert_eq!(counter, 2);
1069 }
1070
1071 #[tokio::test]
1072 async fn test_find_key_changes() {
1073 let graph = create_test_graph();
1074 let tt = TimeTravelGraph::new(graph);
1075
1076 let history = tt.execute(GraphState::new()).await.unwrap();
1077 let browser = history.browser();
1078
1079 let counter_changes = browser.find_key_changes("counter");
1081 assert_eq!(counter_changes.len(), 3);
1082
1083 let step1_changes = browser.find_key_changes("step1_done");
1085 assert_eq!(step1_changes.len(), 1);
1086 assert_eq!(step1_changes[0], 0);
1087 }
1088
1089 #[tokio::test]
1090 async fn test_replay_from_step() {
1091 let graph = create_test_graph();
1092 let tt = TimeTravelGraph::new(graph);
1093
1094 let original = tt.execute(GraphState::new()).await.unwrap();
1095
1096 let replayed = tt.replay_from(&original, 1).await.unwrap();
1098
1099 assert_eq!(replayed.steps.len(), 1);
1101 assert_eq!(replayed.steps[0].node_id, "step3");
1102 assert!(replayed.parent_id.is_some());
1103 assert_eq!(replayed.fork_step, Some(1));
1104 }
1105
1106 #[tokio::test]
1107 async fn test_fork_with_modified_state() {
1108 let graph = create_test_graph();
1109 let tt = TimeTravelGraph::new(graph);
1110
1111 let original = tt.execute(GraphState::new()).await.unwrap();
1112
1113 let mut modified_state = original.steps[0].state_after.clone();
1115 modified_state.set("counter", 100); let forked = tt.fork_from(&original, 0, modified_state).await.unwrap();
1118
1119 let final_counter: i32 = forked.result.state.get("counter").unwrap();
1121 assert_eq!(final_counter, 102); }
1123
1124 #[tokio::test]
1125 async fn test_state_diff() {
1126 let mut before = GraphState::new();
1127 before.set("a", 1);
1128 before.set("b", 2);
1129
1130 let mut after = GraphState::new();
1131 after.set("a", 1); after.set("b", 3); after.set("c", 4); let diff = diff_states(&before, &after);
1136
1137 assert!(diff.added.contains(&"c".to_string()));
1138 assert!(diff.modified.contains_key("b"));
1139 assert!(!diff.removed.contains(&"a".to_string()));
1140 }
1141
1142 #[tokio::test]
1143 async fn test_compare_executions() {
1144 let graph = create_test_graph();
1145 let tt = TimeTravelGraph::new(graph);
1146
1147 let exec1 = tt.execute(GraphState::new()).await.unwrap();
1148
1149 let mut modified = exec1.steps[0].state_after.clone();
1151 modified.set("custom", "value");
1152 let exec2 = tt.fork_from(&exec1, 0, modified).await.unwrap();
1153
1154 let comparison = tt.compare_executions(&exec1.id, &exec2.id).await.unwrap();
1155
1156 assert!(comparison.is_fork_comparison);
1157 assert!(!comparison.final_state_diff.is_empty());
1158 }
1159
1160 #[tokio::test]
1161 async fn test_debug_session() {
1162 let graph = Arc::new(create_test_graph());
1163 let mut session = DebugSession::new(graph, GraphState::new());
1164
1165 let result1 = session.step().await.unwrap();
1167 assert!(matches!(result1, StepResult::Continue(_)));
1168 assert_eq!(session.current_node(), "step2");
1169
1170 let result2 = session.step().await.unwrap();
1171 assert!(matches!(result2, StepResult::Continue(_)));
1172 assert_eq!(session.current_node(), "step3");
1173
1174 let result3 = session.step().await.unwrap();
1175 assert!(matches!(result3, StepResult::Finished));
1176 }
1177
1178 #[tokio::test]
1179 async fn test_debug_breakpoints() {
1180 let graph = Arc::new(create_test_graph());
1181 let mut session = DebugSession::new(graph, GraphState::new());
1182
1183 session.add_breakpoint("step3");
1184
1185 let result = session.run().await.unwrap();
1187 assert!(matches!(result, StepResult::Breakpoint(ref n) if n == "step3"));
1188 assert_eq!(session.current_node(), "step3");
1189
1190 let result = session.run().await.unwrap();
1192 assert!(matches!(result, StepResult::Finished));
1193 }
1194
1195 #[tokio::test]
1196 async fn test_debug_modify_state() {
1197 let graph = Arc::new(create_test_graph());
1198 let mut session = DebugSession::new(graph, GraphState::new());
1199
1200 session.step().await.unwrap();
1202
1203 session.modify_state(|state| {
1205 state.set("counter", 50);
1206 });
1207
1208 session.run().await.unwrap();
1210
1211 let final_counter: i32 = session.current_state().get("counter").unwrap();
1213 assert_eq!(final_counter, 52);
1214 }
1215
1216 #[tokio::test]
1217 async fn test_execution_summary() {
1218 let graph = create_test_graph();
1219 let tt = TimeTravelGraph::new(graph);
1220
1221 let history = tt.execute(GraphState::new()).await.unwrap();
1222 let summary = history.browser().summary();
1223
1224 assert_eq!(summary.total_steps, 3);
1225 assert_eq!(summary.unique_nodes, 3);
1226 assert_eq!(summary.status, GraphStatus::Success);
1227 assert!(!summary.is_fork);
1228 }
1229
1230 #[tokio::test]
1231 async fn test_timeline_output() {
1232 let graph = create_test_graph();
1233 let tt = TimeTravelGraph::new(graph);
1234
1235 let history = tt.execute(GraphState::new()).await.unwrap();
1236 let timeline = history.browser().timeline();
1237
1238 assert!(timeline.contains("step1"));
1239 assert!(timeline.contains("step2"));
1240 assert!(timeline.contains("step3"));
1241 assert!(timeline.contains("Success"));
1242 }
1243}