1#[derive(Debug)]
3pub struct EffectTraceTape {
4 next_effect_id: AtomicU64,
5 entries: Mutex<Vec<EffectTraceEntry>>,
6 exchanges: Mutex<Vec<EffectExchangeRecord>>,
7}
8
9impl Default for EffectTraceTape {
10 fn default() -> Self {
11 Self {
12 next_effect_id: AtomicU64::new(0),
13 entries: Mutex::new(Vec::new()),
14 exchanges: Mutex::new(Vec::new()),
15 }
16 }
17}
18
19fn decode_effect_result<T>(outputs: &JsonValue) -> Option<EffectResult<T>>
20where
21 T: DeserializeOwned,
22{
23 match outputs.get("status").and_then(JsonValue::as_str)? {
24 "success" => {
25 let value = serde_json::from_value(outputs.get("value")?.clone()).ok()?;
26 Some(EffectResult::Success(value))
27 }
28 "blocked" => Some(EffectResult::Blocked),
29 "failure" => {
30 let failure = serde_json::from_value(outputs.get("failure")?.clone()).ok()?;
31 Some(EffectResult::Failure(failure))
32 }
33 _ => None,
34 }
35}
36
37impl EffectTraceTape {
38 #[must_use]
40 pub fn new() -> Self {
41 Self::default()
42 }
43
44 #[must_use]
46 pub fn from_entries(entries: Vec<EffectTraceEntry>) -> Self {
47 let next_effect_id = entries
48 .last()
49 .map_or(0, |entry| entry.effect_id.saturating_add(1));
50 Self {
51 next_effect_id: AtomicU64::new(next_effect_id),
52 entries: Mutex::new(entries),
53 exchanges: Mutex::new(Vec::new()),
54 }
55 }
56
57 pub fn record(
63 &self,
64 effect_kind: &str,
65 inputs: JsonValue,
66 outputs: JsonValue,
67 handler_identity: &str,
68 topology: Option<TopologyPerturbation>,
69 ) {
70 let effect_id = self.next_effect_id.fetch_add(1, Ordering::Relaxed);
71 let (effect_interface, effect_operation) =
72 infer_effect_interface_and_operation(effect_kind);
73 let entry = EffectTraceEntry {
74 effect_id,
75 effect_kind: effect_kind.to_string(),
76 inputs,
77 outputs,
78 handler_identity: handler_identity.to_string(),
79 effect_interface,
80 effect_operation,
81 ordering_key: effect_id,
82 topology,
83 };
84 self.entries
85 .lock()
86 .unwrap_or_else(|poisoned| poisoned.into_inner())
87 .push(entry);
88 }
89
90 pub fn record_exchange(
97 &self,
98 mut request: EffectRequest,
99 outcome: EffectOutcome,
100 handler_identity: &str,
101 topology: Option<TopologyPerturbation>,
102 ) {
103 let effect_id = self.next_effect_id.fetch_add(1, Ordering::Relaxed);
104 request.effect_id = Some(effect_id);
105 let effect_kind = match &request.body {
106 EffectRequestBody::SendDecision { .. } => "send_decision",
107 EffectRequestBody::Receive { .. } => "handle_recv",
108 EffectRequestBody::Choose { .. } => "handle_choose",
109 EffectRequestBody::InvokeStep { .. } => "invoke_step",
110 EffectRequestBody::Acquire { .. } => "handle_acquire",
111 EffectRequestBody::Release { .. } => "handle_release",
112 EffectRequestBody::TopologyEvents { .. } => "topology_events",
113 EffectRequestBody::WalSync { .. } => "wal_sync",
114 EffectRequestBody::OutputConditionHint { .. } => "output_condition_hint",
115 };
116 let request_json =
117 serde_json::to_value(&request).expect("effect request should serialize to json");
118 let outcome_json =
119 serde_json::to_value(&outcome).expect("effect outcome should serialize to json");
120 let exchange = EffectExchangeRecord {
121 effect_id,
122 handler_identity: handler_identity.to_string(),
123 ordering_key: effect_id,
124 request,
125 outcome,
126 };
127 let entry = EffectTraceEntry {
128 effect_id,
129 effect_kind: effect_kind.to_string(),
130 inputs: request_json,
131 outputs: outcome_json,
132 handler_identity: handler_identity.to_string(),
133 effect_interface: Some(exchange.request.metadata.interface_name.clone()),
134 effect_operation: Some(exchange.request.metadata.operation_name.clone()),
135 ordering_key: effect_id,
136 topology: topology.or_else(|| match &exchange.outcome.response {
137 Some(EffectResponse::TopologyEvents { events }) => events.first().cloned(),
138 _ => None,
139 }),
140 };
141 self.exchanges
142 .lock()
143 .unwrap_or_else(|poisoned| poisoned.into_inner())
144 .push(exchange);
145 self.entries
146 .lock()
147 .unwrap_or_else(|poisoned| poisoned.into_inner())
148 .push(entry);
149 }
150
151 #[must_use]
157 pub fn entries(&self) -> Vec<EffectTraceEntry> {
158 self.entries
159 .lock()
160 .unwrap_or_else(|poisoned| poisoned.into_inner())
161 .clone()
162 }
163
164 #[must_use]
170 pub fn exchanges(&self) -> Vec<EffectExchangeRecord> {
171 self.exchanges
172 .lock()
173 .unwrap_or_else(|poisoned| poisoned.into_inner())
174 .clone()
175 }
176}
177
178pub struct RecordingEffectHandler<'a> {
180 inner: &'a dyn EffectHandler,
181 tape: EffectTraceTape,
182}
183
184impl<'a> RecordingEffectHandler<'a> {
185 #[must_use]
187 pub fn new(inner: &'a dyn EffectHandler) -> Self {
188 Self {
189 inner,
190 tape: EffectTraceTape::new(),
191 }
192 }
193
194 #[must_use]
196 pub fn effect_trace(&self) -> Vec<EffectTraceEntry> {
197 self.tape.entries()
198 }
199
200 #[must_use]
202 pub fn effect_exchanges(&self) -> Vec<EffectExchangeRecord> {
203 self.tape.exchanges()
204 }
205}
206
207pub struct ReplayEffectHandler<'a> {
209 entries: Arc<[EffectTraceEntry]>,
210 cursor: Mutex<usize>,
211 fallback: Option<&'a dyn EffectHandler>,
212}
213
214impl<'a> ReplayEffectHandler<'a> {
215 #[must_use]
217 pub fn new<E>(entries: E) -> Self
218 where
219 E: Into<Arc<[EffectTraceEntry]>>,
220 {
221 Self {
222 entries: entries.into(),
223 cursor: Mutex::new(0),
224 fallback: None,
225 }
226 }
227
228 #[must_use]
230 pub fn with_fallback<E>(entries: E, fallback: &'a dyn EffectHandler) -> Self
231 where
232 E: Into<Arc<[EffectTraceEntry]>>,
233 {
234 Self {
235 entries: entries.into(),
236 cursor: Mutex::new(0),
237 fallback: Some(fallback),
238 }
239 }
240
241 #[must_use]
247 pub fn remaining(&self) -> usize {
248 let cursor = *self
249 .cursor
250 .lock()
251 .unwrap_or_else(|poisoned| poisoned.into_inner());
252 self.entries.len().saturating_sub(cursor)
253 }
254
255 fn trace_contains_kind(&self, kind: &str) -> bool {
256 self.entries.iter().any(|entry| entry.effect_kind == kind)
257 }
258
259 fn peek_handler_identity(&self) -> Option<String> {
260 let cursor = *self
261 .cursor
262 .lock()
263 .unwrap_or_else(|poisoned| poisoned.into_inner());
264 self.entries.get(cursor).map(|entry| entry.handler_identity.clone())
265 }
266
267 fn recorded_handler_identity(&self) -> Option<String> {
268 self.entries.first().map(|entry| entry.handler_identity.clone())
269 }
270
271 fn next_entry(&self, expected_kind: &str) -> Result<EffectTraceEntry, String> {
272 let mut cursor = self
273 .cursor
274 .lock()
275 .unwrap_or_else(|poisoned| poisoned.into_inner());
276 let idx = *cursor;
277 let Some(entry) = self.entries.get(idx) else {
278 return Err(format!(
279 "replay trace exhausted at index {idx}, expected {expected_kind}"
280 ));
281 };
282 if entry.effect_kind != expected_kind {
283 return Err(format!(
284 "replay trace kind mismatch at index {idx}: expected {expected_kind}, got {}",
285 entry.effect_kind
286 ));
287 }
288 *cursor = cursor.saturating_add(1);
289 Ok(entry.clone())
290 }
291
292 fn parse_send_decision(
293 outputs: &JsonValue,
294 _explicit_payload: Option<Value>,
295 ) -> Option<EffectResult<SendDecision>> {
296 let result = decode_effect_result::<SendDecision>(outputs)?;
297 match result {
298 EffectResult::Success(SendDecision::Deliver(payload)) => {
299 Some(EffectResult::Success(SendDecision::Deliver(payload)))
300 }
301 EffectResult::Success(SendDecision::Drop) => {
302 Some(EffectResult::Success(SendDecision::Drop))
303 }
304 EffectResult::Success(SendDecision::Defer) => {
305 Some(EffectResult::Success(SendDecision::Defer))
306 }
307 EffectResult::Blocked => Some(EffectResult::Blocked),
308 EffectResult::Failure(failure) => Some(EffectResult::Failure(failure)),
309 }
310 }
311}