1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
//! Timeline forking: fork execution timelines from a specific checkpoint.
//!
//! This module provides [TimelineFork] to create alternate execution timelines:
//! - **Fork from checkpoint N**: replay source run up to seq N.
//! - **Inject alternate decision**: apply an alternate event at the fork point.
//! - **Fork event stream**: continue under a new `branch_id` with forked events.
use serde::{Deserialize, Serialize};
use crate::kernel::event::{Event, SequencedEvent};
use crate::kernel::identity::{RunId, Seq};
use crate::kernel::reducer::Reducer;
use crate::kernel::state::KernelState;
use crate::kernel::EventStore;
use crate::kernel::KernelError;
/// A timeline fork: represents a forked execution from a checkpoint.
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct TimelineFork {
/// Original run that was forked.
pub source_run_id: RunId,
/// New branch (forked) run id.
pub branch_id: RunId,
/// Sequence number of the fork point (events up to and including this seq were replayed).
pub fork_point_seq: Seq,
/// The alternate event injected at the fork point (if any).
pub alternate_event: Option<Event>,
}
/// Result of a timeline fork operation.
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct ForkResult<S: KernelState> {
/// The fork metadata.
pub fork: TimelineFork,
/// Final state after applying the alternate path.
pub final_state: S,
}
/// Timeline forker: creates alternate execution timelines from checkpoints.
pub struct TimelineForker<S: KernelState> {
/// Event store (source of truth).
pub events: Box<dyn EventStore>,
/// Reducer to apply events to state.
pub reducer: Box<dyn Reducer<S>>,
}
impl<S: KernelState> TimelineForker<S> {
/// Forks the timeline at checkpoint `fork_at_seq`.
///
/// 1. Replays source run up to `fork_at_seq`.
/// 2. Injects `alternate_event` at the fork point.
/// 3. Continues replaying remaining events under new `branch_id`.
/// 4. Returns the fork metadata and final state.
pub fn fork(
&self,
source_run_id: &RunId,
branch_id: RunId,
fork_at_seq: Seq,
alternate_event: Event,
initial_state: S,
) -> Result<ForkResult<S>, KernelError> {
// 1. Replay source run up to fork_at_seq
let (mut state, _) = self.replay_up_to(source_run_id, fork_at_seq, initial_state)?;
// 2. Inject alternate event at fork point
let alt_seq = fork_at_seq + 1;
let alt_se = SequencedEvent {
seq: alt_seq,
event: alternate_event.clone(),
};
self.reducer.apply(&mut state, &alt_se)?;
// 3. Continue replaying remaining events under branch_id
let remaining = self.events.scan(source_run_id, fork_at_seq + 1)?;
for se in remaining {
// Assign new seq under branch_id
let branched_se = SequencedEvent {
seq: se.seq,
event: se.event.clone(),
};
self.reducer.apply(&mut state, &branched_se)?;
}
// 4. Record forked events to the new branch (optional: write to store)
// For now, we return the result; actual persistence is optional.
let fork = TimelineFork {
source_run_id: source_run_id.clone(),
branch_id: branch_id.clone(),
fork_point_seq: fork_at_seq,
alternate_event: Some(alternate_event),
};
Ok(ForkResult {
fork,
final_state: state,
})
}
/// Replays the source run up to (and including) the given seq, returns state.
fn replay_up_to(
&self,
run_id: &RunId,
up_to_seq: Seq,
initial_state: S,
) -> Result<(S, Seq), KernelError> {
let mut state = initial_state;
let events = self.events.scan(run_id, 1)?;
for se in events {
if se.seq > up_to_seq {
break;
}
self.reducer.apply(&mut state, &se)?;
}
Ok((state, up_to_seq))
}
/// Creates a fork that starts fresh (no alternate event) - useful for simulation/audit.
/// This replays the entire run under a new branch_id.
pub fn clone_timeline(
&self,
source_run_id: &RunId,
branch_id: RunId,
initial_state: S,
) -> Result<ForkResult<S>, KernelError> {
let mut state = initial_state;
let events = self.events.scan(source_run_id, 1)?;
for se in events {
self.reducer.apply(&mut state, &se)?;
}
let fork = TimelineFork {
source_run_id: source_run_id.clone(),
branch_id,
fork_point_seq: 0,
alternate_event: None,
};
Ok(ForkResult {
fork,
final_state: state,
})
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::kernel::event_store::InMemoryEventStore;
use crate::kernel::StateUpdatedOnlyReducer;
use serde::{Deserialize, Serialize};
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
struct TestState(u32);
impl crate::kernel::state::KernelState for TestState {
fn version(&self) -> u32 {
1
}
}
#[test]
fn fork_injects_alternate_event() {
let events = InMemoryEventStore::new();
let run_id: RunId = "source".into();
events
.append(
&run_id,
&[
Event::StateUpdated {
step_id: Some("a".into()),
payload: serde_json::to_value(&TestState(1)).unwrap(),
},
Event::StateUpdated {
step_id: Some("b".into()),
payload: serde_json::to_value(&TestState(2)).unwrap(),
},
Event::StateUpdated {
step_id: Some("c".into()),
payload: serde_json::to_value(&TestState(3)).unwrap(),
},
],
)
.unwrap();
let forker = TimelineForker::<TestState> {
events: Box::new(events),
reducer: Box::new(StateUpdatedOnlyReducer),
};
// Fork at seq 1, inject alternate that sets state to 99
let result = forker
.fork(
&run_id,
"branch-1".into(),
1,
Event::StateUpdated {
step_id: Some("alt-b".into()),
payload: serde_json::to_value(&TestState(99)).unwrap(),
},
TestState(0),
)
.unwrap();
assert_eq!(result.fork.source_run_id, "source");
assert_eq!(result.fork.branch_id, "branch-1");
assert_eq!(result.fork.fork_point_seq, 1);
assert!(result.fork.alternate_event.is_some());
// Final state should be from alternate (99) + original c (3) = 99 (last wins)
assert_eq!(result.final_state, TestState(3));
}
#[test]
fn clone_timeline_replays_entire_run() {
let events = InMemoryEventStore::new();
let run_id: RunId = "source2".into();
events
.append(
&run_id,
&[
Event::StateUpdated {
step_id: Some("x".into()),
payload: serde_json::to_value(&TestState(10)).unwrap(),
},
Event::Completed,
],
)
.unwrap();
let forker = TimelineForker::<TestState> {
events: Box::new(events),
reducer: Box::new(StateUpdatedOnlyReducer),
};
let result = forker
.clone_timeline(&run_id, "clone".into(), TestState(0))
.unwrap();
assert_eq!(result.fork.source_run_id, "source2");
assert_eq!(result.fork.branch_id, "clone");
assert_eq!(result.final_state, TestState(10));
}
}