Skip to main content

telltale_machine/effect/
runtime_types.rs

1/// Thread-safe effect-trace tape used by recording/replay handlers.
2#[derive(Debug)]
3pub struct EffectTraceTape {
4    next_effect_id: AtomicU64,
5    entries: Mutex<Vec<EffectTraceEntry>>,
6    exchanges: Mutex<Vec<EffectExchangeRecord>>,
7}
8
9impl Default for EffectTraceTape {
10    fn default() -> Self {
11        Self {
12            next_effect_id: AtomicU64::new(0),
13            entries: Mutex::new(Vec::new()),
14            exchanges: Mutex::new(Vec::new()),
15        }
16    }
17}
18
19fn decode_effect_result<T>(outputs: &JsonValue) -> Option<EffectResult<T>>
20where
21    T: DeserializeOwned,
22{
23    match outputs.get("status").and_then(JsonValue::as_str)? {
24        "success" => {
25            let value = serde_json::from_value(outputs.get("value")?.clone()).ok()?;
26            Some(EffectResult::Success(value))
27        }
28        "blocked" => Some(EffectResult::Blocked),
29        "failure" => {
30            let failure = serde_json::from_value(outputs.get("failure")?.clone()).ok()?;
31            Some(EffectResult::Failure(failure))
32        }
33        _ => None,
34    }
35}
36
37impl EffectTraceTape {
38    /// Create an empty tape.
39    #[must_use]
40    pub fn new() -> Self {
41        Self::default()
42    }
43
44    /// Create a tape from pre-recorded entries.
45    #[must_use]
46    pub fn from_entries(entries: Vec<EffectTraceEntry>) -> Self {
47        let next_effect_id = entries
48            .last()
49            .map_or(0, |entry| entry.effect_id.saturating_add(1));
50        Self {
51            next_effect_id: AtomicU64::new(next_effect_id),
52            entries: Mutex::new(entries),
53            exchanges: Mutex::new(Vec::new()),
54        }
55    }
56
57    /// Record one effect entry.
58    ///
59    /// # Panics
60    ///
61    /// Panics if the internal mutex is poisoned.
62    pub fn record(
63        &self,
64        effect_kind: &str,
65        inputs: JsonValue,
66        outputs: JsonValue,
67        handler_identity: &str,
68        topology: Option<TopologyPerturbation>,
69    ) {
70        let effect_id = self.next_effect_id.fetch_add(1, Ordering::Relaxed);
71        let (effect_interface, effect_operation) =
72            infer_effect_interface_and_operation(effect_kind);
73        let entry = EffectTraceEntry {
74            effect_id,
75            effect_kind: effect_kind.to_string(),
76            inputs,
77            outputs,
78            handler_identity: handler_identity.to_string(),
79            effect_interface,
80            effect_operation,
81            ordering_key: effect_id,
82            topology,
83        };
84        self.entries
85            .lock()
86            .unwrap_or_else(|poisoned| poisoned.into_inner())
87            .push(entry);
88    }
89
90    /// Record one canonical typed request/outcome exchange and derive the
91    /// legacy effect-trace entry from it.
92    ///
93    /// # Panics
94    ///
95    /// Panics if the internal mutex is poisoned.
96    pub fn record_exchange(
97        &self,
98        mut request: EffectRequest,
99        outcome: EffectOutcome,
100        handler_identity: &str,
101        topology: Option<TopologyPerturbation>,
102    ) {
103        let effect_id = self.next_effect_id.fetch_add(1, Ordering::Relaxed);
104        request.effect_id = Some(effect_id);
105        let effect_kind = match &request.body {
106            EffectRequestBody::SendDecision { .. } => "send_decision",
107            EffectRequestBody::Receive { .. } => "handle_recv",
108            EffectRequestBody::Choose { .. } => "handle_choose",
109            EffectRequestBody::InvokeStep { .. } => "invoke_step",
110            EffectRequestBody::Acquire { .. } => "handle_acquire",
111            EffectRequestBody::Release { .. } => "handle_release",
112            EffectRequestBody::TopologyEvents { .. } => "topology_events",
113            EffectRequestBody::WalSync { .. } => "wal_sync",
114            EffectRequestBody::OutputConditionHint { .. } => "output_condition_hint",
115        };
116        let request_json =
117            serde_json::to_value(&request).expect("effect request should serialize to json");
118        let outcome_json =
119            serde_json::to_value(&outcome).expect("effect outcome should serialize to json");
120        let exchange = EffectExchangeRecord {
121            effect_id,
122            handler_identity: handler_identity.to_string(),
123            ordering_key: effect_id,
124            request,
125            outcome,
126        };
127        let entry = EffectTraceEntry {
128            effect_id,
129            effect_kind: effect_kind.to_string(),
130            inputs: request_json,
131            outputs: outcome_json,
132            handler_identity: handler_identity.to_string(),
133            effect_interface: Some(exchange.request.metadata.interface_name.clone()),
134            effect_operation: Some(exchange.request.metadata.operation_name.clone()),
135            ordering_key: effect_id,
136            topology: topology.or_else(|| match &exchange.outcome.response {
137                Some(EffectResponse::TopologyEvents { events }) => events.first().cloned(),
138                _ => None,
139            }),
140        };
141        self.exchanges
142            .lock()
143            .unwrap_or_else(|poisoned| poisoned.into_inner())
144            .push(exchange);
145        self.entries
146            .lock()
147            .unwrap_or_else(|poisoned| poisoned.into_inner())
148            .push(entry);
149    }
150
151    /// Clone all recorded entries.
152    ///
153    /// # Panics
154    ///
155    /// Panics if the internal mutex is poisoned.
156    #[must_use]
157    pub fn entries(&self) -> Vec<EffectTraceEntry> {
158        self.entries
159            .lock()
160            .unwrap_or_else(|poisoned| poisoned.into_inner())
161            .clone()
162    }
163
164    /// Clone all recorded typed exchanges.
165    ///
166    /// # Panics
167    ///
168    /// Panics if the internal mutex is poisoned.
169    #[must_use]
170    pub fn exchanges(&self) -> Vec<EffectExchangeRecord> {
171        self.exchanges
172            .lock()
173            .unwrap_or_else(|poisoned| poisoned.into_inner())
174            .clone()
175    }
176}
177
178/// A handler wrapper that records effect outcomes for replay.
179pub struct RecordingEffectHandler<'a> {
180    inner: &'a dyn EffectHandler,
181    tape: EffectTraceTape,
182}
183
184impl<'a> RecordingEffectHandler<'a> {
185    /// Wrap a base handler and begin recording effect outcomes.
186    #[must_use]
187    pub fn new(inner: &'a dyn EffectHandler) -> Self {
188        Self {
189            inner,
190            tape: EffectTraceTape::new(),
191        }
192    }
193
194    /// Clone the recorded effect trace.
195    #[must_use]
196    pub fn effect_trace(&self) -> Vec<EffectTraceEntry> {
197        self.tape.entries()
198    }
199
200    /// Clone the recorded typed effect exchanges.
201    #[must_use]
202    pub fn effect_exchanges(&self) -> Vec<EffectExchangeRecord> {
203        self.tape.exchanges()
204    }
205}
206
207/// A replay-mode handler that serves recorded effect outcomes in order.
208pub struct ReplayEffectHandler<'a> {
209    entries: Arc<[EffectTraceEntry]>,
210    cursor: Mutex<usize>,
211    fallback: Option<&'a dyn EffectHandler>,
212}
213
214impl<'a> ReplayEffectHandler<'a> {
215    /// Build a replay handler without fallback behavior.
216    #[must_use]
217    pub fn new<E>(entries: E) -> Self
218    where
219        E: Into<Arc<[EffectTraceEntry]>>,
220    {
221        Self {
222            entries: entries.into(),
223            cursor: Mutex::new(0),
224            fallback: None,
225        }
226    }
227
228    /// Build a replay handler with fallback behavior for unsupported entries.
229    #[must_use]
230    pub fn with_fallback<E>(entries: E, fallback: &'a dyn EffectHandler) -> Self
231    where
232        E: Into<Arc<[EffectTraceEntry]>>,
233    {
234        Self {
235            entries: entries.into(),
236            cursor: Mutex::new(0),
237            fallback: Some(fallback),
238        }
239    }
240
241    /// Number of unconsumed entries.
242    ///
243    /// # Panics
244    ///
245    /// Panics if the internal mutex is poisoned.
246    #[must_use]
247    pub fn remaining(&self) -> usize {
248        let cursor = *self
249            .cursor
250            .lock()
251            .unwrap_or_else(|poisoned| poisoned.into_inner());
252        self.entries.len().saturating_sub(cursor)
253    }
254
255    fn trace_contains_kind(&self, kind: &str) -> bool {
256        self.entries.iter().any(|entry| entry.effect_kind == kind)
257    }
258
259    fn peek_handler_identity(&self) -> Option<String> {
260        let cursor = *self
261            .cursor
262            .lock()
263            .unwrap_or_else(|poisoned| poisoned.into_inner());
264        self.entries.get(cursor).map(|entry| entry.handler_identity.clone())
265    }
266
267    fn recorded_handler_identity(&self) -> Option<String> {
268        self.entries.first().map(|entry| entry.handler_identity.clone())
269    }
270
271    fn next_entry(&self, expected_kind: &str) -> Result<EffectTraceEntry, String> {
272        let mut cursor = self
273            .cursor
274            .lock()
275            .unwrap_or_else(|poisoned| poisoned.into_inner());
276        let idx = *cursor;
277        let Some(entry) = self.entries.get(idx) else {
278            return Err(format!(
279                "replay trace exhausted at index {idx}, expected {expected_kind}"
280            ));
281        };
282        if entry.effect_kind != expected_kind {
283            return Err(format!(
284                "replay trace kind mismatch at index {idx}: expected {expected_kind}, got {}",
285                entry.effect_kind
286            ));
287        }
288        *cursor = cursor.saturating_add(1);
289        Ok(entry.clone())
290    }
291
292    fn parse_send_decision(
293        outputs: &JsonValue,
294        _explicit_payload: Option<Value>,
295    ) -> Option<EffectResult<SendDecision>> {
296        let result = decode_effect_result::<SendDecision>(outputs)?;
297        match result {
298            EffectResult::Success(SendDecision::Deliver(payload)) => {
299                Some(EffectResult::Success(SendDecision::Deliver(payload)))
300            }
301            EffectResult::Success(SendDecision::Drop) => {
302                Some(EffectResult::Success(SendDecision::Drop))
303            }
304            EffectResult::Success(SendDecision::Defer) => {
305                Some(EffectResult::Success(SendDecision::Defer))
306            }
307            EffectResult::Blocked => Some(EffectResult::Blocked),
308            EffectResult::Failure(failure) => Some(EffectResult::Failure(failure)),
309        }
310    }
311}