Skip to main content

oris_kernel/kernel/
timeline_fork.rs

1//! Timeline forking: fork execution timelines from a specific checkpoint.
2//!
3//! This module provides [TimelineFork] to create alternate execution timelines:
4//! - **Fork from checkpoint N**: replay source run up to seq N.
5//! - **Inject alternate decision**: apply an alternate event at the fork point.
6//! - **Fork event stream**: continue under a new `branch_id` with forked events.
7
8use serde::{Deserialize, Serialize};
9
10use crate::kernel::event::{Event, SequencedEvent};
11use crate::kernel::identity::{RunId, Seq};
12use crate::kernel::reducer::Reducer;
13use crate::kernel::state::KernelState;
14use crate::kernel::EventStore;
15use crate::kernel::KernelError;
16
17/// A timeline fork: represents a forked execution from a checkpoint.
18#[derive(Clone, Debug, Serialize, Deserialize)]
19pub struct TimelineFork {
20    /// Original run that was forked.
21    pub source_run_id: RunId,
22    /// New branch (forked) run id.
23    pub branch_id: RunId,
24    /// Sequence number of the fork point (events up to and including this seq were replayed).
25    pub fork_point_seq: Seq,
26    /// The alternate event injected at the fork point (if any).
27    pub alternate_event: Option<Event>,
28}
29
30/// Result of a timeline fork operation.
31#[derive(Clone, Debug, Serialize, Deserialize)]
32pub struct ForkResult<S: KernelState> {
33    /// The fork metadata.
34    pub fork: TimelineFork,
35    /// Final state after applying the alternate path.
36    pub final_state: S,
37}
38
39/// Timeline forker: creates alternate execution timelines from checkpoints.
40pub struct TimelineForker<S: KernelState> {
41    /// Event store (source of truth).
42    pub events: Box<dyn EventStore>,
43    /// Reducer to apply events to state.
44    pub reducer: Box<dyn Reducer<S>>,
45}
46
47impl<S: KernelState> TimelineForker<S> {
48    /// Forks the timeline at checkpoint `fork_at_seq`.
49    ///
50    /// 1. Replays source run up to `fork_at_seq`.
51    /// 2. Injects `alternate_event` at the fork point.
52    /// 3. Continues replaying remaining events under new `branch_id`.
53    /// 4. Returns the fork metadata and final state.
54    pub fn fork(
55        &self,
56        source_run_id: &RunId,
57        branch_id: RunId,
58        fork_at_seq: Seq,
59        alternate_event: Event,
60        initial_state: S,
61    ) -> Result<ForkResult<S>, KernelError> {
62        // 1. Replay source run up to fork_at_seq
63        let (mut state, _) = self.replay_up_to(source_run_id, fork_at_seq, initial_state)?;
64
65        // 2. Inject alternate event at fork point
66        let alt_seq = fork_at_seq + 1;
67        let alt_se = SequencedEvent {
68            seq: alt_seq,
69            event: alternate_event.clone(),
70        };
71        self.reducer.apply(&mut state, &alt_se)?;
72
73        // 3. Continue replaying remaining events under branch_id
74        let remaining = self.events.scan(source_run_id, fork_at_seq + 1)?;
75        for se in remaining {
76            // Assign new seq under branch_id
77            let branched_se = SequencedEvent {
78                seq: se.seq,
79                event: se.event.clone(),
80            };
81            self.reducer.apply(&mut state, &branched_se)?;
82        }
83
84        // 4. Record forked events to the new branch (optional: write to store)
85        // For now, we return the result; actual persistence is optional.
86
87        let fork = TimelineFork {
88            source_run_id: source_run_id.clone(),
89            branch_id: branch_id.clone(),
90            fork_point_seq: fork_at_seq,
91            alternate_event: Some(alternate_event),
92        };
93
94        Ok(ForkResult {
95            fork,
96            final_state: state,
97        })
98    }
99
100    /// Replays the source run up to (and including) the given seq, returns state.
101    fn replay_up_to(
102        &self,
103        run_id: &RunId,
104        up_to_seq: Seq,
105        initial_state: S,
106    ) -> Result<(S, Seq), KernelError> {
107        let mut state = initial_state;
108        let events = self.events.scan(run_id, 1)?;
109        for se in events {
110            if se.seq > up_to_seq {
111                break;
112            }
113            self.reducer.apply(&mut state, &se)?;
114        }
115        Ok((state, up_to_seq))
116    }
117
118    /// Creates a fork that starts fresh (no alternate event) - useful for simulation/audit.
119    /// This replays the entire run under a new branch_id.
120    pub fn clone_timeline(
121        &self,
122        source_run_id: &RunId,
123        branch_id: RunId,
124        initial_state: S,
125    ) -> Result<ForkResult<S>, KernelError> {
126        let mut state = initial_state;
127        let events = self.events.scan(source_run_id, 1)?;
128        for se in events {
129            self.reducer.apply(&mut state, &se)?;
130        }
131
132        let fork = TimelineFork {
133            source_run_id: source_run_id.clone(),
134            branch_id,
135            fork_point_seq: 0,
136            alternate_event: None,
137        };
138
139        Ok(ForkResult {
140            fork,
141            final_state: state,
142        })
143    }
144}
145
146#[cfg(test)]
147mod tests {
148    use super::*;
149    use crate::kernel::event_store::InMemoryEventStore;
150    use crate::kernel::StateUpdatedOnlyReducer;
151    use serde::{Deserialize, Serialize};
152
153    #[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
154    struct TestState(u32);
155
156    impl crate::kernel::state::KernelState for TestState {
157        fn version(&self) -> u32 {
158            1
159        }
160    }
161
162    #[test]
163    fn fork_injects_alternate_event() {
164        let events = InMemoryEventStore::new();
165        let run_id: RunId = "source".into();
166        events
167            .append(
168                &run_id,
169                &[
170                    Event::StateUpdated {
171                        step_id: Some("a".into()),
172                        payload: serde_json::to_value(&TestState(1)).unwrap(),
173                    },
174                    Event::StateUpdated {
175                        step_id: Some("b".into()),
176                        payload: serde_json::to_value(&TestState(2)).unwrap(),
177                    },
178                    Event::StateUpdated {
179                        step_id: Some("c".into()),
180                        payload: serde_json::to_value(&TestState(3)).unwrap(),
181                    },
182                ],
183            )
184            .unwrap();
185
186        let forker = TimelineForker::<TestState> {
187            events: Box::new(events),
188            reducer: Box::new(StateUpdatedOnlyReducer),
189        };
190
191        // Fork at seq 1, inject alternate that sets state to 99
192        let result = forker
193            .fork(
194                &run_id,
195                "branch-1".into(),
196                1,
197                Event::StateUpdated {
198                    step_id: Some("alt-b".into()),
199                    payload: serde_json::to_value(&TestState(99)).unwrap(),
200                },
201                TestState(0),
202            )
203            .unwrap();
204
205        assert_eq!(result.fork.source_run_id, "source");
206        assert_eq!(result.fork.branch_id, "branch-1");
207        assert_eq!(result.fork.fork_point_seq, 1);
208        assert!(result.fork.alternate_event.is_some());
209
210        // Final state should be from alternate (99) + original c (3) = 99 (last wins)
211        assert_eq!(result.final_state, TestState(3));
212    }
213
214    #[test]
215    fn clone_timeline_replays_entire_run() {
216        let events = InMemoryEventStore::new();
217        let run_id: RunId = "source2".into();
218        events
219            .append(
220                &run_id,
221                &[
222                    Event::StateUpdated {
223                        step_id: Some("x".into()),
224                        payload: serde_json::to_value(&TestState(10)).unwrap(),
225                    },
226                    Event::Completed,
227                ],
228            )
229            .unwrap();
230
231        let forker = TimelineForker::<TestState> {
232            events: Box::new(events),
233            reducer: Box::new(StateUpdatedOnlyReducer),
234        };
235
236        let result = forker
237            .clone_timeline(&run_id, "clone".into(), TestState(0))
238            .unwrap();
239
240        assert_eq!(result.fork.source_run_id, "source2");
241        assert_eq!(result.fork.branch_id, "clone");
242        assert_eq!(result.final_state, TestState(10));
243    }
244}