Skip to main content

telltale_runtime/effects/
handler.rs

1//! Effect Handler Architecture for Choreographic Programming
2//!
3//! This module provides a clean effect boundary between pure choreographic logic
4//! and runtime transport implementations. It allows for testable, composable,
5//! and runtime-agnostic protocol implementations.
6//!
7//! # Architecture
8//!
9//! The effect handler system separates concerns:
10//! - **Choreographic Logic**: Pure protocol specification (what to do)
11//! - **Effect Handlers**: Runtime implementation (how to do it)
12//! - **Interpreters**: Execute choreographic programs using handlers
13//!
14//! # Example
15//!
16//! ```text
17//! use telltale_runtime::{ChoreoHandler, LabelId};
18//!
19//! #[async_trait]
20//! impl ChoreoHandler for MyHandler {
21//!     type Role = MyRole;
22//!     type Endpoint = MyEndpoint;
23//!
24//!     async fn send<M>(&mut self, ep: &mut Self::Endpoint, to: Self::Role, msg: &M) -> Result<()> {
25//!         // Implementation
26//!     }
27//!     // ... other methods
28//! }
29//! ```
30//!
31//! ## ProtocolMachine Boundary
32//!
33//! The bytecode ProtocolMachine in `telltale-machine` exposes a separate, synchronous
34//! `EffectHandler` trait for simulation/runtime integration. It is not
35//! interchangeable with `ChoreoHandler`: `ChoreoHandler` is async and typed
36//! over concrete message/role types, while the ProtocolMachine handler operates over
37//! bytecode values and must remain session-local for determinism.
38
39use async_trait::async_trait;
40use serde::{de::DeserializeOwned, Serialize};
41use std::any::TypeId;
42use std::fmt::Debug;
43use std::time::Duration;
44use thiserror::Error;
45
46use crate::effects::registry::{ExtensibleHandler, ExtensionRegistry};
47use crate::identifiers::RoleName;
48
49#[path = "handler_context.rs"]
50mod context;
51pub use context::ContextExt;
52
53/// Trait for role identifiers in choreographies
54///
55/// Roles are typically generated as enums per choreography, but any type
56/// implementing the required traits can serve as a role identifier.
57pub trait RoleId: Copy + Eq + std::hash::Hash + Debug + Send + Sync + 'static {
58    /// Protocol-specific label type associated with this role type.
59    type Label: LabelId;
60
61    /// Get the canonical role name for this role identifier.
62    fn role_name(&self) -> RoleName;
63
64    /// Optional index for parameterized roles.
65    fn role_index(&self) -> Option<u32> {
66        None
67    }
68}
69
70/// Labels identify branches in internal/external choice.
71///
72/// Labels must be stable identifiers that can be sent across the wire
73/// and re-hydrated on the receiving side.
74pub trait LabelId: Copy + Eq + std::hash::Hash + Debug + Send + Sync + 'static {
75    /// Stable textual identifier for serialization/logging.
76    fn as_str(&self) -> &'static str;
77
78    /// Parse a label from its textual identifier.
79    fn from_str(label: &str) -> Option<Self>;
80}
81
82/// Typed message tag for receive effects.
83#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
84pub struct MessageTag {
85    type_id: TypeId,
86    type_name: &'static str,
87}
88
89impl MessageTag {
90    /// Create a tag for a concrete message type.
91    #[must_use]
92    pub fn of<T: 'static>() -> Self {
93        Self {
94            type_id: TypeId::of::<T>(),
95            type_name: std::any::type_name::<T>(),
96        }
97    }
98
99    /// Access the underlying `TypeId`.
100    #[must_use]
101    pub fn type_id(&self) -> TypeId {
102        self.type_id
103    }
104
105    /// Access the human-readable type name.
106    #[must_use]
107    pub fn type_name(&self) -> &'static str {
108        self.type_name
109    }
110}
111
112/// Session endpoint trait
113///
114/// Represents the runtime-specific connection state (e.g., Telltale channel bundle).
115/// The generated code will be generic over the endpoint type.
116pub trait Endpoint: Send {}
117impl<T: Send> Endpoint for T {}
118
119/// Errors that can occur during choreographic execution
120#[derive(Debug, Error)]
121pub enum ChoreographyError {
122    /// Transport-layer error (network, channel failure, etc.)
123    #[error("transport error: {0}")]
124    Transport(String),
125
126    /// Message serialization/deserialization error
127    #[error("serialization error: {0}")]
128    Serialization(String),
129
130    /// Session transport send operation failed.
131    #[error("{channel_type} send failed: {reason}")]
132    ChannelSendFailed {
133        /// Type of session transport (for example "SinkStream")
134        channel_type: &'static str,
135        /// Human-readable failure reason
136        reason: String,
137    },
138
139    /// Session transport was closed unexpectedly during operation.
140    #[error("{channel_type} closed during {operation}")]
141    ChannelClosed {
142        /// Type of session transport (for example "SinkStream")
143        channel_type: &'static str,
144        /// Operation being performed when channel closed
145        operation: &'static str,
146    },
147
148    /// No session registered for the specified peer.
149    #[error("no session registered for peer: {peer}")]
150    NoPeerChannel {
151        /// String representation of the peer role
152        peer: String,
153    },
154
155    /// Label serialization failed during choice/offer
156    #[error("label {operation} failed: {reason}")]
157    LabelSerializationFailed {
158        /// Operation: "serialization" or "deserialization"
159        operation: &'static str,
160        /// Human-readable failure reason
161        reason: String,
162    },
163
164    /// Message serialization failed with type context
165    #[error("{operation} of {type_name} failed: {reason}")]
166    MessageSerializationFailed {
167        /// Operation: "Serialization" or "Deserialization"
168        operation: &'static str,
169        /// Name of the type being serialized
170        type_name: &'static str,
171        /// Human-readable failure reason
172        reason: String,
173    },
174
175    /// Operation exceeded the specified timeout
176    #[error("timeout after {0:?}")]
177    Timeout(Duration),
178
179    /// Protocol specification was violated at runtime
180    #[error("protocol violation: {0}")]
181    ProtocolViolation(String),
182
183    /// Referenced role not found in the choreography
184    #[error("role {0:?} not found in this choreography")]
185    UnknownRole(String),
186
187    /// Error with protocol execution context
188    ///
189    /// Wraps an inner error with information about where in the protocol
190    /// the error occurred (protocol name, role, phase).
191    #[error("{protocol}::{role} at phase '{phase}': {inner}")]
192    ProtocolContext {
193        /// Name of the protocol being executed
194        protocol: &'static str,
195        /// Name of the role executing when error occurred
196        role: &'static str,
197        /// Current phase/step in the protocol
198        phase: &'static str,
199        /// The underlying error
200        #[source]
201        inner: Box<ChoreographyError>,
202    },
203
204    /// Error with role-specific context
205    #[error("[{role}] {inner}")]
206    RoleContext {
207        /// Name of the role where error occurred
208        role: &'static str,
209        /// Optional role index for parameterized roles
210        index: Option<u32>,
211        /// The underlying error
212        #[source]
213        inner: Box<ChoreographyError>,
214    },
215
216    /// Error during message exchange with another role
217    #[error("{operation} {message_type} {direction} {other_role}: {inner}")]
218    MessageContext {
219        /// The operation being performed (send/recv)
220        operation: &'static str,
221        /// The type of message involved
222        message_type: &'static str,
223        /// Direction (to/from)
224        direction: &'static str,
225        /// The other role involved in the exchange
226        other_role: &'static str,
227        /// The underlying error
228        #[source]
229        inner: Box<ChoreographyError>,
230    },
231
232    /// Error during choice/branch operation
233    #[error("choice error at {role}: {details}")]
234    ChoiceError {
235        /// The role making or receiving the choice
236        role: &'static str,
237        /// Details about the choice error
238        details: String,
239    },
240
241    /// Generic wrapped error with context string
242    #[error("{context}: {inner}")]
243    WithContext {
244        /// Additional context about the error
245        context: String,
246        /// The underlying error
247        #[source]
248        inner: Box<ChoreographyError>,
249    },
250
251    /// Invalid choice: the chosen branch was not among expected options
252    #[error("invalid choice: expected one of {expected:?}, got {actual}")]
253    InvalidChoice {
254        /// Expected branch labels
255        expected: Vec<String>,
256        /// Actual branch label provided
257        actual: String,
258    },
259
260    /// General execution error
261    #[error("execution error: {0}")]
262    ExecutionError(String),
263
264    /// Role family is empty after resolution
265    #[error("role family '{0}' resolved to empty set")]
266    EmptyRoleFamily(String),
267
268    /// Role family not found in adapter
269    #[error("role family '{0}' not found")]
270    RoleFamilyNotFound(String),
271
272    /// Role range is invalid
273    #[error("invalid role range for '{family}': [{start}, {end})")]
274    InvalidRoleRange {
275        /// The role family name
276        family: String,
277        /// Range start (inclusive)
278        start: u32,
279        /// Range end (exclusive)
280        end: u32,
281    },
282
283    /// Insufficient responses received from role family
284    #[error("insufficient responses: expected {expected}, received {received}")]
285    InsufficientResponses {
286        /// Expected minimum number of responses
287        expected: usize,
288        /// Actual number of responses received
289        received: usize,
290    },
291
292    /// Feature not implemented
293    #[error("not implemented: {0}")]
294    NotImplemented(String),
295}
296
297/// Result type for choreography operations.
298pub type ChoreoResult<T> = std::result::Result<T, ChoreographyError>;
299
300/// The core effect handler trait that abstracts all communication effects
301///
302/// This trait defines the primitive operations for choreographic protocols:
303/// sending, receiving, choosing, offering, and timeouts. Implement this trait
304/// to provide custom transport mechanisms (in-memory, network, etc.).
305///
306/// # Type Parameters
307///
308/// - `Role`: The type representing protocol participants
309/// - `Endpoint`: The connection state for this protocol execution
310///
311/// # Async implementation notes
312///
313/// We deliberately use the `async_trait` macro here so the trait stays object-safe,
314/// which lets middleware stacks (e.g. `Trace<Retry<H>>`) erase handlers behind trait
315/// objects. The macro also enforces `Send` on all returned futures, so the bounds on
316/// methods like [`with_timeout`](ChoreoHandler::with_timeout) apply equally to native
317/// multithreaded runtimes and single-threaded WASM builds.
318#[async_trait]
319pub trait ChoreoHandler: Send {
320    /// The role type for this choreography
321    type Role: RoleId;
322    /// The endpoint type maintaining connection state
323    type Endpoint: Endpoint;
324
325    /// Send a message to a specific role
326    ///
327    /// # Arguments
328    ///
329    /// * `ep` - The session endpoint
330    /// * `to` - The recipient role
331    /// * `msg` - The message to send (must be serializable)
332    async fn send<M: Serialize + Send + Sync>(
333        &mut self,
334        ep: &mut Self::Endpoint,
335        to: Self::Role,
336        msg: &M,
337    ) -> ChoreoResult<()>;
338
339    /// Receive a strongly-typed message from a specific role
340    ///
341    /// # Arguments
342    ///
343    /// * `ep` - The session endpoint
344    /// * `from` - The sender role
345    ///
346    /// # Returns
347    ///
348    /// The received message of type `M`
349    async fn recv<M: DeserializeOwned + Send>(
350        &mut self,
351        ep: &mut Self::Endpoint,
352        from: Self::Role,
353    ) -> ChoreoResult<M>;
354
355    /// Internal choice: broadcast a label selection
356    ///
357    /// Used by the choosing role to inform others of the selected branch.
358    ///
359    /// # Arguments
360    ///
361    /// * `ep` - The session endpoint
362    /// * `who` - The role making the choice (usually the current role)
363    /// * `label` - The selected branch label
364    async fn choose(
365        &mut self,
366        ep: &mut Self::Endpoint,
367        who: Self::Role,
368        label: <Self::Role as RoleId>::Label,
369    ) -> ChoreoResult<()>;
370
371    /// External choice: receive a label selection
372    ///
373    /// Used by non-choosing roles to receive the branch selection from another role.
374    ///
375    /// # Arguments
376    ///
377    /// * `ep` - The session endpoint
378    /// * `from` - The role that made the choice
379    ///
380    /// # Returns
381    ///
382    /// The label selected by the choosing role
383    async fn offer(
384        &mut self,
385        ep: &mut Self::Endpoint,
386        from: Self::Role,
387    ) -> ChoreoResult<<Self::Role as RoleId>::Label>;
388
389    /// Execute a future with a timeout
390    ///
391    /// # Arguments
392    ///
393    /// * `ep` - The session endpoint
394    /// * `at` - The role where timeout is enforced
395    /// * `dur` - Maximum duration to wait
396    /// * `body` - The future to execute
397    ///
398    /// # Returns
399    ///
400    /// Result of the future, or timeout error if duration exceeded
401    async fn with_timeout<F, T>(
402        &mut self,
403        ep: &mut Self::Endpoint,
404        at: Self::Role,
405        dur: Duration,
406        body: F,
407    ) -> ChoreoResult<T>
408    where
409        F: std::future::Future<Output = ChoreoResult<T>> + Send;
410
411    /// Broadcast a message to multiple recipients
412    ///
413    /// Default implementation sends sequentially. Override for optimized broadcasting.
414    async fn broadcast<M: Serialize + Send + Sync>(
415        &mut self,
416        ep: &mut Self::Endpoint,
417        recipients: &[Self::Role],
418        msg: &M,
419    ) -> ChoreoResult<()> {
420        for &recipient in recipients {
421            self.send(ep, recipient, msg).await?;
422        }
423        Ok(())
424    }
425
426    /// Send messages to multiple recipients in parallel
427    ///
428    /// Default implementation sends sequentially. Override for true parallelism.
429    async fn parallel_send<M: Serialize + Send + Sync>(
430        &mut self,
431        ep: &mut Self::Endpoint,
432        sends: &[(Self::Role, M)],
433    ) -> ChoreoResult<()> {
434        // Default implementation: sequential sends
435        for (recipient, msg) in sends {
436            self.send(ep, *recipient, msg).await?;
437        }
438        Ok(())
439    }
440}
441
442/// Extension trait for handler lifecycle management
443///
444/// Provides setup and teardown methods for managing handler state and connections.
445#[async_trait]
446pub trait ChoreoHandlerExt: ChoreoHandler {
447    /// Setup phase - establish connections, initialize state
448    ///
449    /// Called before protocol execution begins.
450    async fn setup(&mut self, role: Self::Role) -> ChoreoResult<Self::Endpoint>;
451
452    /// Teardown phase - close connections, cleanup
453    ///
454    /// Called after protocol execution completes.
455    async fn teardown(&mut self, ep: Self::Endpoint) -> ChoreoResult<()>;
456}
457
458/// A no-op handler for testing pure choreographic logic
459///
460/// This handler performs no actual communication, making it useful for
461/// testing protocol logic without network overhead.
462pub struct NoOpHandler<R: RoleId> {
463    _phantom: std::marker::PhantomData<R>,
464    registry: ExtensionRegistry<(), R>,
465}
466
467impl<R: RoleId> NoOpHandler<R> {
468    /// Create a new no-op handler
469    #[must_use]
470    pub fn new() -> Self {
471        Self {
472            _phantom: std::marker::PhantomData,
473            registry: ExtensionRegistry::new(),
474        }
475    }
476}
477
478impl<R: RoleId> Default for NoOpHandler<R> {
479    fn default() -> Self {
480        Self::new()
481    }
482}
483
484#[async_trait]
485impl<R: RoleId + 'static> ExtensibleHandler for NoOpHandler<R> {
486    fn extension_registry(&self) -> &ExtensionRegistry<Self::Endpoint, Self::Role> {
487        &self.registry
488    }
489}
490
491#[async_trait]
492impl<R: RoleId + 'static> ChoreoHandler for NoOpHandler<R> {
493    type Role = R;
494    type Endpoint = ();
495
496    async fn send<M: Serialize + Send + Sync>(
497        &mut self,
498        _ep: &mut Self::Endpoint,
499        _to: Self::Role,
500        _msg: &M,
501    ) -> ChoreoResult<()> {
502        Ok(())
503    }
504
505    async fn recv<M: DeserializeOwned + Send>(
506        &mut self,
507        _ep: &mut Self::Endpoint,
508        _from: Self::Role,
509    ) -> ChoreoResult<M> {
510        Err(ChoreographyError::Transport(
511            "NoOpHandler cannot receive".into(),
512        ))
513    }
514
515    async fn choose(
516        &mut self,
517        _ep: &mut Self::Endpoint,
518        _who: Self::Role,
519        _label: <Self::Role as RoleId>::Label,
520    ) -> ChoreoResult<()> {
521        Ok(())
522    }
523
524    async fn offer(
525        &mut self,
526        _ep: &mut Self::Endpoint,
527        _from: Self::Role,
528    ) -> ChoreoResult<<Self::Role as RoleId>::Label> {
529        Err(ChoreographyError::Transport(
530            "NoOpHandler cannot offer".into(),
531        ))
532    }
533
534    async fn with_timeout<F, T>(
535        &mut self,
536        _ep: &mut Self::Endpoint,
537        _at: Self::Role,
538        _dur: Duration,
539        body: F,
540    ) -> ChoreoResult<T>
541    where
542        F: std::future::Future<Output = ChoreoResult<T>> + Send,
543    {
544        body.await
545    }
546}