Skip to main content

telltale_machine/effect/
handler_trait.rs

1/// ProtocolMachine-level effect handler.
2///
3/// This is the interface between a guest runtime and the surrounding host
4/// runtime. Each choreography can bind a different handler at session open
5/// time.
6///
7/// Host-contract rules:
8/// - Methods on this trait are synchronous. Async I/O, transport polling,
9///   storage flushes, and background retries must happen outside handler
10///   execution and feed their results back through canonical ingress.
11/// - Implementations must treat the provided `state` as session-local scratch
12///   for the current request only. They must not rely on unrelated session
13///   state or mutate ProtocolMachine session metadata through side channels.
14/// - Host-managed session-local mutation should flow through an explicit
15///   ownership capability such as `OwnedSession`, not through ad hoc access to
16///   the session store while handlers are executing.
17pub trait EffectHandler: Send + Sync {
18    /// Stable identifier for effect-trace attribution.
19    fn handler_identity(&self) -> String {
20        crate::session::DEFAULT_HANDLER_ID.to_string()
21    }
22
23    /// Canonical typed effect boundary for guest-runtime execution.
24    ///
25    /// Runtime code must route host-facing effect work through this method so
26    /// the request/outcome contract remains explicit and replay-visible.
27    ///
28    /// The default implementation preserves compatibility for existing
29    /// helper-method-based handlers by translating each typed request into the
30    /// corresponding helper method. New code should prefer overriding
31    /// `handle_effect` directly.
32    #[allow(clippy::too_many_lines)]
33    fn handle_effect(&self, request: EffectRequest) -> EffectOutcome {
34        if let Err(failure) = request.metadata.validate() {
35            return EffectOutcome::failure(failure);
36        }
37
38        match request.body {
39            EffectRequestBody::SendDecision {
40                role,
41                partner,
42                label,
43                state,
44                payload,
45            } => {
46                let Some(sid) = request.session else {
47                    return EffectOutcome::failure(EffectFailure::contract_violation(
48                        "send_decision request is missing session",
49                    ));
50                };
51                match self.send_decision(SendDecisionInput {
52                    sid,
53                    role: &role,
54                    partner: &partner,
55                    label: &label,
56                    state: &state,
57                    payload,
58                }) {
59                    EffectResult::Success(decision) => {
60                        EffectOutcome::success(EffectResponse::SendDecision { decision })
61                    }
62                    EffectResult::Blocked => EffectOutcome::blocked(),
63                    EffectResult::Failure(failure) => EffectOutcome::failure(failure),
64                }
65            }
66            EffectRequestBody::Receive {
67                role,
68                partner,
69                label,
70                state,
71                payload,
72            } => {
73                let mut state = state;
74                match self.handle_recv(&role, &partner, &label, &mut state, &payload) {
75                    EffectResult::Success(()) => {
76                        EffectOutcome::success(EffectResponse::Receive { state })
77                    }
78                    EffectResult::Blocked => EffectOutcome::blocked(),
79                    EffectResult::Failure(failure) => EffectOutcome::failure(failure),
80                }
81            }
82            EffectRequestBody::Choose {
83                role,
84                partner,
85                labels,
86                state,
87            } => match self.handle_choose(&role, &partner, &labels, &state) {
88                EffectResult::Success(label) => {
89                    EffectOutcome::success(EffectResponse::Choose { label })
90                }
91                EffectResult::Blocked => EffectOutcome::blocked(),
92                EffectResult::Failure(failure) => EffectOutcome::failure(failure),
93            },
94            EffectRequestBody::InvokeStep { role, state } => {
95                let mut state = state;
96                match self.step(&role, &mut state) {
97                    EffectResult::Success(()) => {
98                        EffectOutcome::success(EffectResponse::InvokeStep { state })
99                    }
100                    EffectResult::Blocked => EffectOutcome::blocked(),
101                    EffectResult::Failure(failure) => EffectOutcome::failure(failure),
102                }
103            }
104            EffectRequestBody::Acquire { role, layer, state } => {
105                let Some(sid) = request.session else {
106                    return EffectOutcome::failure(EffectFailure::contract_violation(
107                        "acquire request is missing session",
108                    ));
109                };
110                match self.handle_acquire(sid, &role, &layer, &state) {
111                    EffectResult::Success(evidence) => {
112                        EffectOutcome::success(EffectResponse::Acquire { evidence })
113                    }
114                    EffectResult::Blocked => EffectOutcome::blocked(),
115                    EffectResult::Failure(failure) => EffectOutcome::failure(failure),
116                }
117            }
118            EffectRequestBody::Release {
119                role,
120                layer,
121                evidence,
122                state,
123            } => {
124                let Some(sid) = request.session else {
125                    return EffectOutcome::failure(EffectFailure::contract_violation(
126                        "release request is missing session",
127                    ));
128                };
129                match self.handle_release(sid, &role, &layer, &evidence, &state) {
130                    EffectResult::Success(()) => EffectOutcome::success(EffectResponse::Release),
131                    EffectResult::Blocked => EffectOutcome::blocked(),
132                    EffectResult::Failure(failure) => EffectOutcome::failure(failure),
133                }
134            }
135            EffectRequestBody::TopologyEvents { tick } => match self.topology_events(tick) {
136                EffectResult::Success(events) => {
137                    EffectOutcome::success(EffectResponse::TopologyEvents { events })
138                }
139                EffectResult::Blocked => EffectOutcome::blocked(),
140                EffectResult::Failure(failure) => EffectOutcome::failure(failure),
141            },
142            EffectRequestBody::WalSync { sync } => match self.wal_sync(&sync) {
143                EffectResult::Success(()) => EffectOutcome::success(EffectResponse::WalSync),
144                EffectResult::Blocked => EffectOutcome::blocked(),
145                EffectResult::Failure(failure) => EffectOutcome::failure(failure),
146            },
147            EffectRequestBody::OutputConditionHint { role, state } => {
148                let Some(sid) = request.session else {
149                    return EffectOutcome::failure(EffectFailure::contract_violation(
150                        "output_condition_hint request is missing session",
151                    ));
152                };
153                let hint = self.output_condition_hint(sid, &role, &state);
154                EffectOutcome::success(EffectResponse::OutputConditionHint { hint })
155            }
156        }
157    }
158
159    /// Compute the payload for a send instruction.
160    ///
161    /// Helper hook used by the default `send_decision` implementation and by
162    /// custom runners that want direct payload computation.
163    ///
164    /// # Arguments
165    /// * `role` - The sending role
166    /// * `partner` - The receiving role
167    /// * `label` - The message label
168    /// * `state` - The coroutine's register file (for reading state)
169    ///
170    /// Returns a typed outcome for the request.
171    fn handle_send(
172        &self,
173        role: &str,
174        partner: &str,
175        label: &str,
176        state: &[Value],
177    ) -> EffectResult<Value>;
178
179    /// Decide how to handle a send, optionally with a precomputed payload.
180    ///
181    /// Middleware can override this to model loss/delay/corruption. The default
182    /// behavior computes a payload via `handle_send` unless one is provided.
183    ///
184    /// Returns a typed outcome for the request.
185    fn send_decision(&self, input: SendDecisionInput<'_>) -> EffectResult<SendDecision> {
186        if let Some(payload) = input.payload {
187            EffectResult::success(SendDecision::Deliver(payload))
188        } else {
189            self.handle_send(input.role, input.partner, input.label, input.state)
190                .map_success(SendDecision::Deliver)
191        }
192    }
193
194    /// Process a received value.
195    ///
196    /// # Arguments
197    /// * `role` - The receiving role
198    /// * `partner` - The sending role
199    /// * `label` - The message label
200    /// * `state` - The coroutine's register file (mutable for state updates)
201    /// * `payload` - The received value
202    ///
203    /// Returns a typed outcome for the request.
204    fn handle_recv(
205        &self,
206        role: &str,
207        partner: &str,
208        label: &str,
209        state: &mut Vec<Value>,
210        payload: &Value,
211    ) -> EffectResult<()>;
212
213    /// Choose which branch to take for internal choice (select).
214    ///
215    /// Branch-selection helper for custom runners.
216    ///
217    /// The canonical ProtocolMachine resolves branch labels from received payloads and does
218    /// not call this method in default dispatch paths.
219    ///
220    /// # Arguments
221    /// * `role` - The choosing role
222    /// * `partner` - The partner role
223    /// * `labels` - The available branch labels
224    /// * `state` - The coroutine's register file (for reading state)
225    ///
226    /// Returns a typed outcome for the request.
227    fn handle_choose(
228        &self,
229        role: &str,
230        partner: &str,
231        labels: &[String],
232        state: &[Value],
233    ) -> EffectResult<String>;
234
235    /// Perform an integration step after a protocol round.
236    ///
237    /// Called after all sends/receives for a tick are complete.
238    ///
239    /// Returns a typed outcome for the request.
240    fn step(&self, role: &str, state: &mut Vec<Value>) -> EffectResult<()>;
241
242    /// Attempt to acquire a guard layer.
243    ///
244    /// Returning `EffectResult::Blocked` causes the coroutine to block.
245    /// `Success(evidence)` grants the acquire and binds the evidence value.
246    fn handle_acquire(
247        &self,
248        _sid: SessionId,
249        _role: &str,
250        _layer: &str,
251        _state: &[Value],
252    ) -> EffectResult<Value> {
253        EffectResult::success(Value::Unit)
254    }
255
256    /// Release a guard layer using previously acquired evidence.
257    fn handle_release(
258        &self,
259        _sid: SessionId,
260        _role: &str,
261        _layer: &str,
262        _evidence: &Value,
263        _state: &[Value],
264    ) -> EffectResult<()> {
265        EffectResult::success(())
266    }
267
268    /// Whether this handler can service the internal `wal_sync` effect.
269    fn supports_wal_sync(&self) -> bool {
270        false
271    }
272
273    /// Confirm that the agreement WAL has been durably synchronized.
274    fn wal_sync(&self, _sync: &crate::durable::WalSyncRequest) -> EffectResult<()> {
275        EffectResult::failure(EffectFailure::contract_violation(
276            "wal_sync requires an AgreementWalHandler wrapper",
277        ))
278    }
279
280    /// Topology perturbations injected by the environment for this scheduler tick.
281    ///
282    /// The ProtocolMachine ingests these before selecting coroutines for the round. This is
283    /// a canonical ingress surface for external events; implementations should
284    /// stage async discoveries before this method is called rather than doing
285    /// async work from inside request handling.
286    ///
287    /// Returns a typed outcome for the request.
288    fn topology_events(&self, _tick: u64) -> EffectResult<Vec<TopologyPerturbation>> {
289        EffectResult::success(Vec::new())
290    }
291
292    /// Optional output-condition metadata for commit gating.
293    ///
294    /// The ProtocolMachine calls this only when a step emits observable events. Returning `None`
295    /// delegates to ProtocolMachine-default metadata.
296    fn output_condition_hint(
297        &self,
298        _sid: SessionId,
299        _role: &str,
300        _state: &[Value],
301    ) -> Option<OutputConditionHint> {
302        None
303    }
304}
305
306impl<T: EffectHandler + ?Sized> EffectHandler for &T {
307    fn handler_identity(&self) -> String {
308        (**self).handler_identity()
309    }
310
311    fn handle_effect(&self, request: EffectRequest) -> EffectOutcome {
312        (**self).handle_effect(request)
313    }
314
315    fn handle_send(
316        &self,
317        role: &str,
318        partner: &str,
319        label: &str,
320        state: &[Value],
321    ) -> EffectResult<Value> {
322        (**self).handle_send(role, partner, label, state)
323    }
324
325    fn send_decision(&self, input: SendDecisionInput<'_>) -> EffectResult<SendDecision> {
326        (**self).send_decision(input)
327    }
328
329    fn handle_recv(
330        &self,
331        role: &str,
332        partner: &str,
333        label: &str,
334        state: &mut Vec<Value>,
335        payload: &Value,
336    ) -> EffectResult<()> {
337        (**self).handle_recv(role, partner, label, state, payload)
338    }
339
340    fn handle_choose(
341        &self,
342        role: &str,
343        partner: &str,
344        labels: &[String],
345        state: &[Value],
346    ) -> EffectResult<String> {
347        (**self).handle_choose(role, partner, labels, state)
348    }
349
350    fn step(&self, role: &str, state: &mut Vec<Value>) -> EffectResult<()> {
351        (**self).step(role, state)
352    }
353
354    fn handle_acquire(
355        &self,
356        sid: SessionId,
357        role: &str,
358        layer: &str,
359        state: &[Value],
360    ) -> EffectResult<Value> {
361        (**self).handle_acquire(sid, role, layer, state)
362    }
363
364    fn handle_release(
365        &self,
366        sid: SessionId,
367        role: &str,
368        layer: &str,
369        evidence: &Value,
370        state: &[Value],
371    ) -> EffectResult<()> {
372        (**self).handle_release(sid, role, layer, evidence, state)
373    }
374
375    fn topology_events(&self, tick: u64) -> EffectResult<Vec<TopologyPerturbation>> {
376        (**self).topology_events(tick)
377    }
378
379    fn output_condition_hint(
380        &self,
381        sid: SessionId,
382        role: &str,
383        state: &[Value],
384    ) -> Option<OutputConditionHint> {
385        (**self).output_condition_hint(sid, role, state)
386    }
387
388    fn supports_wal_sync(&self) -> bool {
389        (**self).supports_wal_sync()
390    }
391
392    fn wal_sync(&self, sync: &crate::durable::WalSyncRequest) -> EffectResult<()> {
393        (**self).wal_sync(sync)
394    }
395}