Skip to main content

telltale_runtime/effects/
handler.rs

1//! Effect Handler Architecture for Choreographic Programming
2//!
3//! This module provides a clean effect boundary between pure choreographic logic
4//! and runtime transport implementations. It allows for testable, composable,
5//! and runtime-agnostic protocol implementations.
6//!
7//! # Architecture
8//!
9//! The effect handler system separates concerns:
10//! - **Choreographic Logic**: Pure protocol specification (what to do)
11//! - **Effect Handlers**: Runtime implementation (how to do it)
12//! - **Interpreters**: Execute choreographic programs using handlers
13//! - **Contract Profiles**: Machine-checkable statements of semantic obligations
14//!   versus transport-policy freedom, defined in `effects::contract`
15//!
16//! # Example
17//!
18//! ```text
19//! use telltale_runtime::{ChoreoHandler, LabelId};
20//!
21//! #[async_trait]
22//! impl ChoreoHandler for MyHandler {
23//!     type Role = MyRole;
24//!     type Endpoint = MyEndpoint;
25//!
26//!     async fn send<M>(&mut self, ep: &mut Self::Endpoint, to: Self::Role, msg: &M) -> Result<()> {
27//!         // Implementation
28//!     }
29//!     // ... other methods
30//! }
31//! ```
32//!
33//! ## ProtocolMachine Boundary
34//!
35//! The bytecode ProtocolMachine in `telltale-machine` exposes a separate, synchronous
36//! `EffectHandler` trait for simulation/runtime integration. It is not
37//! interchangeable with `ChoreoHandler`: `ChoreoHandler` is async and typed
38//! over concrete message/role types, while the ProtocolMachine handler operates over
39//! bytecode values and must remain session-local for determinism.
40
41use async_trait::async_trait;
42use serde::{de::DeserializeOwned, Serialize};
43use std::any::TypeId;
44use std::fmt::Debug;
45use std::time::Duration;
46use thiserror::Error;
47
48use crate::effects::contract::{
49    DeliveryModel, DocumentedHandlerContract, ExtensionDispatchContract, ExtensionDispatchMode,
50    HandlerContractProfile, HandlerContractTier, ProtocolSemanticContract, RetryPolicy,
51    TimeoutPolicy, TransportPolicyContract,
52};
53use crate::effects::registry::{ExtensibleHandler, ExtensionRegistry};
54use crate::identifiers::RoleName;
55
56#[path = "handler_context.rs"]
57mod context;
58pub use context::ContextExt;
59
60/// Trait for role identifiers in choreographies
61///
62/// Roles are typically generated as enums per choreography, but any type
63/// implementing the required traits can serve as a role identifier.
64pub trait RoleId: Copy + Eq + std::hash::Hash + Debug + Send + Sync + 'static {
65    /// Protocol-specific label type associated with this role type.
66    type Label: LabelId;
67
68    /// Get the canonical role name for this role identifier.
69    fn role_name(&self) -> RoleName;
70
71    /// Optional index for parameterized roles.
72    fn role_index(&self) -> Option<u32> {
73        None
74    }
75}
76
77/// Labels identify branches in internal/external choice.
78///
79/// Labels must be stable identifiers that can be sent across the wire
80/// and re-hydrated on the receiving side.
81pub trait LabelId: Copy + Eq + std::hash::Hash + Debug + Send + Sync + 'static {
82    /// Stable textual identifier for serialization/logging.
83    fn as_str(&self) -> &'static str;
84
85    /// Parse a label from its textual identifier.
86    fn from_str(label: &str) -> Option<Self>;
87}
88
89/// Typed message tag for receive effects.
90#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
91pub struct MessageTag {
92    type_id: TypeId,
93    type_name: &'static str,
94}
95
96impl MessageTag {
97    /// Create a tag for a concrete message type.
98    #[must_use]
99    pub fn of<T: 'static>() -> Self {
100        Self {
101            type_id: TypeId::of::<T>(),
102            type_name: std::any::type_name::<T>(),
103        }
104    }
105
106    /// Access the underlying `TypeId`.
107    #[must_use]
108    pub fn type_id(&self) -> TypeId {
109        self.type_id
110    }
111
112    /// Access the human-readable type name.
113    #[must_use]
114    pub fn type_name(&self) -> &'static str {
115        self.type_name
116    }
117}
118
119/// Session endpoint trait
120///
121/// Represents the runtime-specific connection state (e.g., Telltale channel bundle).
122/// The generated code will be generic over the endpoint type.
123pub trait Endpoint: Send {}
124impl<T: Send> Endpoint for T {}
125
126/// Errors that can occur during choreographic execution
127#[derive(Debug, Error)]
128pub enum ChoreographyError {
129    /// Transport-layer error (network, channel failure, etc.)
130    #[error("transport error: {0}")]
131    Transport(String),
132
133    /// Message serialization/deserialization error
134    #[error("serialization error: {0}")]
135    Serialization(String),
136
137    /// Session transport send operation failed.
138    #[error("{channel_type} send failed: {reason}")]
139    ChannelSendFailed {
140        /// Type of session transport (for example "SinkStream")
141        channel_type: &'static str,
142        /// Human-readable failure reason
143        reason: String,
144    },
145
146    /// Session transport was closed unexpectedly during operation.
147    #[error("{channel_type} closed during {operation}")]
148    ChannelClosed {
149        /// Type of session transport (for example "SinkStream")
150        channel_type: &'static str,
151        /// Operation being performed when channel closed
152        operation: &'static str,
153    },
154
155    /// No session registered for the specified peer.
156    #[error("no session registered for peer: {peer}")]
157    NoPeerChannel {
158        /// String representation of the peer role
159        peer: String,
160    },
161
162    /// Label serialization failed during choice/offer
163    #[error("label {operation} failed: {reason}")]
164    LabelSerializationFailed {
165        /// Operation: "serialization" or "deserialization"
166        operation: &'static str,
167        /// Human-readable failure reason
168        reason: String,
169    },
170
171    /// Message serialization failed with type context
172    #[error("{operation} of {type_name} failed: {reason}")]
173    MessageSerializationFailed {
174        /// Operation: "Serialization" or "Deserialization"
175        operation: &'static str,
176        /// Name of the type being serialized
177        type_name: &'static str,
178        /// Human-readable failure reason
179        reason: String,
180    },
181
182    /// Operation exceeded the specified timeout
183    #[error("timeout after {0:?}")]
184    Timeout(Duration),
185
186    /// Protocol specification was violated at runtime
187    #[error("protocol violation: {0}")]
188    ProtocolViolation(String),
189
190    /// Referenced role not found in the choreography
191    #[error("role {0:?} not found in this choreography")]
192    UnknownRole(String),
193
194    /// Error with protocol execution context
195    ///
196    /// Wraps an inner error with information about where in the protocol
197    /// the error occurred (protocol name, role, phase).
198    #[error("{protocol}::{role} at phase '{phase}': {inner}")]
199    ProtocolContext {
200        /// Name of the protocol being executed
201        protocol: &'static str,
202        /// Name of the role executing when error occurred
203        role: &'static str,
204        /// Current phase/step in the protocol
205        phase: &'static str,
206        /// The underlying error
207        #[source]
208        inner: Box<ChoreographyError>,
209    },
210
211    /// Error with role-specific context
212    #[error("[{role}] {inner}")]
213    RoleContext {
214        /// Name of the role where error occurred
215        role: &'static str,
216        /// Optional role index for parameterized roles
217        index: Option<u32>,
218        /// The underlying error
219        #[source]
220        inner: Box<ChoreographyError>,
221    },
222
223    /// Error during message exchange with another role
224    #[error("{operation} {message_type} {direction} {other_role}: {inner}")]
225    MessageContext {
226        /// The operation being performed (send/recv)
227        operation: &'static str,
228        /// The type of message involved
229        message_type: &'static str,
230        /// Direction (to/from)
231        direction: &'static str,
232        /// The other role involved in the exchange
233        other_role: &'static str,
234        /// The underlying error
235        #[source]
236        inner: Box<ChoreographyError>,
237    },
238
239    /// Error during choice/branch operation
240    #[error("choice error at {role}: {details}")]
241    ChoiceError {
242        /// The role making or receiving the choice
243        role: &'static str,
244        /// Details about the choice error
245        details: String,
246    },
247
248    /// Generic wrapped error with context string
249    #[error("{context}: {inner}")]
250    WithContext {
251        /// Additional context about the error
252        context: String,
253        /// The underlying error
254        #[source]
255        inner: Box<ChoreographyError>,
256    },
257
258    /// Invalid choice: the chosen branch was not among expected options
259    #[error("invalid choice: expected one of {expected:?}, got {actual}")]
260    InvalidChoice {
261        /// Expected branch labels
262        expected: Vec<String>,
263        /// Actual branch label provided
264        actual: String,
265    },
266
267    /// General execution error
268    #[error("execution error: {0}")]
269    ExecutionError(String),
270
271    /// Role family is empty after resolution
272    #[error("role family '{0}' resolved to empty set")]
273    EmptyRoleFamily(String),
274
275    /// Role family not found in adapter
276    #[error("role family '{0}' not found")]
277    RoleFamilyNotFound(String),
278
279    /// Role range is invalid
280    #[error("invalid role range for '{family}': [{start}, {end})")]
281    InvalidRoleRange {
282        /// The role family name
283        family: String,
284        /// Range start (inclusive)
285        start: u32,
286        /// Range end (exclusive)
287        end: u32,
288    },
289
290    /// Insufficient responses received from role family
291    #[error("insufficient responses: expected {expected}, received {received}")]
292    InsufficientResponses {
293        /// Expected minimum number of responses
294        expected: usize,
295        /// Actual number of responses received
296        received: usize,
297    },
298
299    /// Feature not implemented
300    #[error("not implemented: {0}")]
301    NotImplemented(String),
302}
303
304/// Result type for choreography operations.
305pub type ChoreoResult<T> = std::result::Result<T, ChoreographyError>;
306
307/// The core effect handler trait that abstracts all communication effects
308///
309/// This trait defines the primitive operations for choreographic protocols:
310/// sending, receiving, choosing, offering, and timeouts. Implement this trait
311/// to provide custom transport mechanisms (in-memory, network, etc.).
312///
313/// # Type Parameters
314///
315/// - `Role`: The type representing protocol participants
316/// - `Endpoint`: The connection state for this protocol execution
317///
318/// # Async implementation notes
319///
320/// We deliberately use the `async_trait` macro here so the trait stays object-safe,
321/// which lets middleware stacks (e.g. `Trace<Retry<H>>`) erase handlers behind trait
322/// objects. The macro also enforces `Send` on all returned futures, so the bounds on
323/// methods like [`with_timeout`](ChoreoHandler::with_timeout) apply equally to native
324/// multithreaded runtimes and single-threaded WASM builds.
325#[async_trait]
326pub trait ChoreoHandler: Send {
327    /// The role type for this choreography
328    type Role: RoleId;
329    /// The endpoint type maintaining connection state
330    type Endpoint: Endpoint;
331
332    /// Send a message to a specific role
333    ///
334    /// # Arguments
335    ///
336    /// * `ep` - The session endpoint
337    /// * `to` - The recipient role
338    /// * `msg` - The message to send (must be serializable)
339    async fn send<M: Serialize + Send + Sync>(
340        &mut self,
341        ep: &mut Self::Endpoint,
342        to: Self::Role,
343        msg: &M,
344    ) -> ChoreoResult<()>;
345
346    /// Receive a strongly-typed message from a specific role
347    ///
348    /// # Arguments
349    ///
350    /// * `ep` - The session endpoint
351    /// * `from` - The sender role
352    ///
353    /// # Returns
354    ///
355    /// The received message of type `M`
356    async fn recv<M: DeserializeOwned + Send>(
357        &mut self,
358        ep: &mut Self::Endpoint,
359        from: Self::Role,
360    ) -> ChoreoResult<M>;
361
362    /// Internal choice: broadcast a label selection
363    ///
364    /// Used by the choosing role to inform others of the selected branch.
365    ///
366    /// # Arguments
367    ///
368    /// * `ep` - The session endpoint
369    /// * `who` - The role making the choice (usually the current role)
370    /// * `label` - The selected branch label
371    async fn choose(
372        &mut self,
373        ep: &mut Self::Endpoint,
374        who: Self::Role,
375        label: <Self::Role as RoleId>::Label,
376    ) -> ChoreoResult<()>;
377
378    /// External choice: receive a label selection
379    ///
380    /// Used by non-choosing roles to receive the branch selection from another role.
381    ///
382    /// # Arguments
383    ///
384    /// * `ep` - The session endpoint
385    /// * `from` - The role that made the choice
386    ///
387    /// # Returns
388    ///
389    /// The label selected by the choosing role
390    async fn offer(
391        &mut self,
392        ep: &mut Self::Endpoint,
393        from: Self::Role,
394    ) -> ChoreoResult<<Self::Role as RoleId>::Label>;
395
396    /// Execute a future with a timeout
397    ///
398    /// # Arguments
399    ///
400    /// * `ep` - The session endpoint
401    /// * `at` - The role where timeout is enforced
402    /// * `dur` - Maximum duration to wait
403    /// * `body` - The future to execute
404    ///
405    /// # Returns
406    ///
407    /// Result of the future, or timeout error if duration exceeded
408    async fn with_timeout<F, T>(
409        &mut self,
410        ep: &mut Self::Endpoint,
411        at: Self::Role,
412        dur: Duration,
413        body: F,
414    ) -> ChoreoResult<T>
415    where
416        F: std::future::Future<Output = ChoreoResult<T>> + Send;
417
418    /// Broadcast a message to multiple recipients
419    ///
420    /// Default implementation sends sequentially. Override for optimized broadcasting.
421    async fn broadcast<M: Serialize + Send + Sync>(
422        &mut self,
423        ep: &mut Self::Endpoint,
424        recipients: &[Self::Role],
425        msg: &M,
426    ) -> ChoreoResult<()> {
427        for &recipient in recipients {
428            self.send(ep, recipient, msg).await?;
429        }
430        Ok(())
431    }
432
433    /// Send messages to multiple recipients in parallel
434    ///
435    /// Default implementation sends sequentially. Override for true parallelism.
436    async fn parallel_send<M: Serialize + Send + Sync>(
437        &mut self,
438        ep: &mut Self::Endpoint,
439        sends: &[(Self::Role, M)],
440    ) -> ChoreoResult<()> {
441        // Default implementation: sequential sends
442        for (recipient, msg) in sends {
443            self.send(ep, *recipient, msg).await?;
444        }
445        Ok(())
446    }
447}
448
449/// Extension trait for handler lifecycle management
450///
451/// Provides setup and teardown methods for managing handler state and connections.
452#[async_trait]
453pub trait ChoreoHandlerExt: ChoreoHandler {
454    /// Setup phase - establish connections, initialize state
455    ///
456    /// Called before protocol execution begins.
457    async fn setup(&mut self, role: Self::Role) -> ChoreoResult<Self::Endpoint>;
458
459    /// Teardown phase - close connections, cleanup
460    ///
461    /// Called after protocol execution completes.
462    async fn teardown(&mut self, ep: Self::Endpoint) -> ChoreoResult<()>;
463}
464
465/// A no-op handler for testing pure choreographic logic
466///
467/// This handler performs no actual communication, making it useful for
468/// testing protocol logic without network overhead.
469pub struct NoOpHandler<R: RoleId> {
470    _phantom: std::marker::PhantomData<R>,
471    registry: ExtensionRegistry<(), R>,
472}
473
474impl<R: RoleId> NoOpHandler<R> {
475    /// Create a new no-op handler
476    #[must_use]
477    pub fn new() -> Self {
478        Self {
479            _phantom: std::marker::PhantomData,
480            registry: ExtensionRegistry::new(),
481        }
482    }
483}
484
485impl<R: RoleId> Default for NoOpHandler<R> {
486    fn default() -> Self {
487        Self::new()
488    }
489}
490
491impl<R: RoleId> DocumentedHandlerContract for NoOpHandler<R> {
492    fn contract_profile() -> HandlerContractProfile {
493        HandlerContractProfile {
494            handler_name: std::any::type_name::<Self>(),
495            tier: HandlerContractTier::ObservationalHarness,
496            semantics: ProtocolSemanticContract {
497                typed_send_recv_roundtrip: false,
498                exact_choice_label_preservation: false,
499                fail_closed_transport_errors: true,
500                timeouts_scoped_to_enforcing_role: true,
501                deterministic_for_regression: true,
502                can_materialize_values: false,
503            },
504            transport: TransportPolicyContract {
505                delivery_model: DeliveryModel::NoTransport,
506                retry_policy: RetryPolicy::None,
507                timeout_policy: TimeoutPolicy::EnforcingRoleOnly,
508            },
509            extension_dispatch: ExtensionDispatchContract {
510                mode: ExtensionDispatchMode::Unsupported,
511                fail_closed_when_unregistered: false,
512                type_exact_before_side_effects: false,
513            },
514            notes: vec![
515                "send/choose succeed as no-op observability aids",
516                "recv/offer intentionally fail closed instead of inventing values",
517            ],
518        }
519    }
520}
521
522#[async_trait]
523impl<R: RoleId + 'static> ExtensibleHandler for NoOpHandler<R> {
524    fn extension_registry(&self) -> &ExtensionRegistry<Self::Endpoint, Self::Role> {
525        &self.registry
526    }
527}
528
529#[async_trait]
530impl<R: RoleId + 'static> ChoreoHandler for NoOpHandler<R> {
531    type Role = R;
532    type Endpoint = ();
533
534    async fn send<M: Serialize + Send + Sync>(
535        &mut self,
536        _ep: &mut Self::Endpoint,
537        _to: Self::Role,
538        _msg: &M,
539    ) -> ChoreoResult<()> {
540        Ok(())
541    }
542
543    async fn recv<M: DeserializeOwned + Send>(
544        &mut self,
545        _ep: &mut Self::Endpoint,
546        _from: Self::Role,
547    ) -> ChoreoResult<M> {
548        Err(ChoreographyError::Transport(
549            "NoOpHandler cannot receive".into(),
550        ))
551    }
552
553    async fn choose(
554        &mut self,
555        _ep: &mut Self::Endpoint,
556        _who: Self::Role,
557        _label: <Self::Role as RoleId>::Label,
558    ) -> ChoreoResult<()> {
559        Ok(())
560    }
561
562    async fn offer(
563        &mut self,
564        _ep: &mut Self::Endpoint,
565        _from: Self::Role,
566    ) -> ChoreoResult<<Self::Role as RoleId>::Label> {
567        Err(ChoreographyError::Transport(
568            "NoOpHandler cannot offer".into(),
569        ))
570    }
571
572    async fn with_timeout<F, T>(
573        &mut self,
574        _ep: &mut Self::Endpoint,
575        _at: Self::Role,
576        _dur: Duration,
577        body: F,
578    ) -> ChoreoResult<T>
579    where
580        F: std::future::Future<Output = ChoreoResult<T>> + Send,
581    {
582        body.await
583    }
584}