1use serde::{Deserialize, Serialize};
9
10use crate::kernel::event::{Event, SequencedEvent};
11use crate::kernel::identity::{RunId, Seq};
12use crate::kernel::reducer::Reducer;
13use crate::kernel::state::KernelState;
14use crate::kernel::EventStore;
15use crate::kernel::KernelError;
16
17#[derive(Clone, Debug, Serialize, Deserialize)]
19pub struct TimelineFork {
20 pub source_run_id: RunId,
22 pub branch_id: RunId,
24 pub fork_point_seq: Seq,
26 pub alternate_event: Option<Event>,
28}
29
30#[derive(Clone, Debug, Serialize, Deserialize)]
32pub struct ForkResult<S: KernelState> {
33 pub fork: TimelineFork,
35 pub final_state: S,
37}
38
39pub struct TimelineForker<S: KernelState> {
41 pub events: Box<dyn EventStore>,
43 pub reducer: Box<dyn Reducer<S>>,
45}
46
47impl<S: KernelState> TimelineForker<S> {
48 pub fn fork(
55 &self,
56 source_run_id: &RunId,
57 branch_id: RunId,
58 fork_at_seq: Seq,
59 alternate_event: Event,
60 initial_state: S,
61 ) -> Result<ForkResult<S>, KernelError> {
62 let (mut state, _) = self.replay_up_to(source_run_id, fork_at_seq, initial_state)?;
64
65 let alt_seq = fork_at_seq + 1;
67 let alt_se = SequencedEvent {
68 seq: alt_seq,
69 event: alternate_event.clone(),
70 };
71 self.reducer.apply(&mut state, &alt_se)?;
72
73 let remaining = self.events.scan(source_run_id, fork_at_seq + 1)?;
75 for se in remaining {
76 let branched_se = SequencedEvent {
78 seq: se.seq,
79 event: se.event.clone(),
80 };
81 self.reducer.apply(&mut state, &branched_se)?;
82 }
83
84 let fork = TimelineFork {
88 source_run_id: source_run_id.clone(),
89 branch_id: branch_id.clone(),
90 fork_point_seq: fork_at_seq,
91 alternate_event: Some(alternate_event),
92 };
93
94 Ok(ForkResult {
95 fork,
96 final_state: state,
97 })
98 }
99
100 fn replay_up_to(
102 &self,
103 run_id: &RunId,
104 up_to_seq: Seq,
105 initial_state: S,
106 ) -> Result<(S, Seq), KernelError> {
107 let mut state = initial_state;
108 let events = self.events.scan(run_id, 1)?;
109 for se in events {
110 if se.seq > up_to_seq {
111 break;
112 }
113 self.reducer.apply(&mut state, &se)?;
114 }
115 Ok((state, up_to_seq))
116 }
117
118 pub fn clone_timeline(
121 &self,
122 source_run_id: &RunId,
123 branch_id: RunId,
124 initial_state: S,
125 ) -> Result<ForkResult<S>, KernelError> {
126 let mut state = initial_state;
127 let events = self.events.scan(source_run_id, 1)?;
128 for se in events {
129 self.reducer.apply(&mut state, &se)?;
130 }
131
132 let fork = TimelineFork {
133 source_run_id: source_run_id.clone(),
134 branch_id,
135 fork_point_seq: 0,
136 alternate_event: None,
137 };
138
139 Ok(ForkResult {
140 fork,
141 final_state: state,
142 })
143 }
144}
145
146#[cfg(test)]
147mod tests {
148 use super::*;
149 use crate::kernel::event_store::InMemoryEventStore;
150 use crate::kernel::StateUpdatedOnlyReducer;
151 use serde::{Deserialize, Serialize};
152
153 #[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
154 struct TestState(u32);
155
156 impl crate::kernel::state::KernelState for TestState {
157 fn version(&self) -> u32 {
158 1
159 }
160 }
161
162 #[test]
163 fn fork_injects_alternate_event() {
164 let events = InMemoryEventStore::new();
165 let run_id: RunId = "source".into();
166 events
167 .append(
168 &run_id,
169 &[
170 Event::StateUpdated {
171 step_id: Some("a".into()),
172 payload: serde_json::to_value(&TestState(1)).unwrap(),
173 },
174 Event::StateUpdated {
175 step_id: Some("b".into()),
176 payload: serde_json::to_value(&TestState(2)).unwrap(),
177 },
178 Event::StateUpdated {
179 step_id: Some("c".into()),
180 payload: serde_json::to_value(&TestState(3)).unwrap(),
181 },
182 ],
183 )
184 .unwrap();
185
186 let forker = TimelineForker::<TestState> {
187 events: Box::new(events),
188 reducer: Box::new(StateUpdatedOnlyReducer),
189 };
190
191 let result = forker
193 .fork(
194 &run_id,
195 "branch-1".into(),
196 1,
197 Event::StateUpdated {
198 step_id: Some("alt-b".into()),
199 payload: serde_json::to_value(&TestState(99)).unwrap(),
200 },
201 TestState(0),
202 )
203 .unwrap();
204
205 assert_eq!(result.fork.source_run_id, "source");
206 assert_eq!(result.fork.branch_id, "branch-1");
207 assert_eq!(result.fork.fork_point_seq, 1);
208 assert!(result.fork.alternate_event.is_some());
209
210 assert_eq!(result.final_state, TestState(3));
212 }
213
214 #[test]
215 fn clone_timeline_replays_entire_run() {
216 let events = InMemoryEventStore::new();
217 let run_id: RunId = "source2".into();
218 events
219 .append(
220 &run_id,
221 &[
222 Event::StateUpdated {
223 step_id: Some("x".into()),
224 payload: serde_json::to_value(&TestState(10)).unwrap(),
225 },
226 Event::Completed,
227 ],
228 )
229 .unwrap();
230
231 let forker = TimelineForker::<TestState> {
232 events: Box::new(events),
233 reducer: Box::new(StateUpdatedOnlyReducer),
234 };
235
236 let result = forker
237 .clone_timeline(&run_id, "clone".into(), TestState(0))
238 .unwrap();
239
240 assert_eq!(result.fork.source_run_id, "source2");
241 assert_eq!(result.fork.branch_id, "clone");
242 assert_eq!(result.final_state, TestState(10));
243 }
244}