telltale_runtime/effects/handler.rs
1//! Effect Handler Architecture for Choreographic Programming
2//!
3//! This module provides a clean effect boundary between pure choreographic logic
4//! and runtime transport implementations. It allows for testable, composable,
5//! and runtime-agnostic protocol implementations.
6//!
7//! # Architecture
8//!
9//! The effect handler system separates concerns:
10//! - **Choreographic Logic**: Pure protocol specification (what to do)
11//! - **Effect Handlers**: Runtime implementation (how to do it)
12//! - **Interpreters**: Execute choreographic programs using handlers
13//!
14//! # Example
15//!
16//! ```text
17//! use telltale_runtime::{ChoreoHandler, LabelId};
18//!
19//! #[async_trait]
20//! impl ChoreoHandler for MyHandler {
21//! type Role = MyRole;
22//! type Endpoint = MyEndpoint;
23//!
24//! async fn send<M>(&mut self, ep: &mut Self::Endpoint, to: Self::Role, msg: &M) -> Result<()> {
25//! // Implementation
26//! }
27//! // ... other methods
28//! }
29//! ```
30//!
31//! ## ProtocolMachine Boundary
32//!
33//! The bytecode ProtocolMachine in `telltale-machine` exposes a separate, synchronous
34//! `EffectHandler` trait for simulation/runtime integration. It is not
35//! interchangeable with `ChoreoHandler`: `ChoreoHandler` is async and typed
36//! over concrete message/role types, while the ProtocolMachine handler operates over
37//! bytecode values and must remain session-local for determinism.
38
39use async_trait::async_trait;
40use serde::{de::DeserializeOwned, Serialize};
41use std::any::TypeId;
42use std::fmt::Debug;
43use std::time::Duration;
44use thiserror::Error;
45
46use crate::effects::registry::{ExtensibleHandler, ExtensionRegistry};
47use crate::identifiers::RoleName;
48
49#[path = "handler_context.rs"]
50mod context;
51pub use context::ContextExt;
52
53/// Trait for role identifiers in choreographies
54///
55/// Roles are typically generated as enums per choreography, but any type
56/// implementing the required traits can serve as a role identifier.
57pub trait RoleId: Copy + Eq + std::hash::Hash + Debug + Send + Sync + 'static {
58 /// Protocol-specific label type associated with this role type.
59 type Label: LabelId;
60
61 /// Get the canonical role name for this role identifier.
62 fn role_name(&self) -> RoleName;
63
64 /// Optional index for parameterized roles.
65 fn role_index(&self) -> Option<u32> {
66 None
67 }
68}
69
70/// Labels identify branches in internal/external choice.
71///
72/// Labels must be stable identifiers that can be sent across the wire
73/// and re-hydrated on the receiving side.
74pub trait LabelId: Copy + Eq + std::hash::Hash + Debug + Send + Sync + 'static {
75 /// Stable textual identifier for serialization/logging.
76 fn as_str(&self) -> &'static str;
77
78 /// Parse a label from its textual identifier.
79 fn from_str(label: &str) -> Option<Self>;
80}
81
82/// Typed message tag for receive effects.
83#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
84pub struct MessageTag {
85 type_id: TypeId,
86 type_name: &'static str,
87}
88
89impl MessageTag {
90 /// Create a tag for a concrete message type.
91 #[must_use]
92 pub fn of<T: 'static>() -> Self {
93 Self {
94 type_id: TypeId::of::<T>(),
95 type_name: std::any::type_name::<T>(),
96 }
97 }
98
99 /// Access the underlying `TypeId`.
100 #[must_use]
101 pub fn type_id(&self) -> TypeId {
102 self.type_id
103 }
104
105 /// Access the human-readable type name.
106 #[must_use]
107 pub fn type_name(&self) -> &'static str {
108 self.type_name
109 }
110}
111
112/// Session endpoint trait
113///
114/// Represents the runtime-specific connection state (e.g., Telltale channel bundle).
115/// The generated code will be generic over the endpoint type.
116pub trait Endpoint: Send {}
117impl<T: Send> Endpoint for T {}
118
119/// Errors that can occur during choreographic execution
120#[derive(Debug, Error)]
121pub enum ChoreographyError {
122 /// Transport-layer error (network, channel failure, etc.)
123 #[error("transport error: {0}")]
124 Transport(String),
125
126 /// Message serialization/deserialization error
127 #[error("serialization error: {0}")]
128 Serialization(String),
129
130 /// Session transport send operation failed.
131 #[error("{channel_type} send failed: {reason}")]
132 ChannelSendFailed {
133 /// Type of session transport (for example "SinkStream")
134 channel_type: &'static str,
135 /// Human-readable failure reason
136 reason: String,
137 },
138
139 /// Session transport was closed unexpectedly during operation.
140 #[error("{channel_type} closed during {operation}")]
141 ChannelClosed {
142 /// Type of session transport (for example "SinkStream")
143 channel_type: &'static str,
144 /// Operation being performed when channel closed
145 operation: &'static str,
146 },
147
148 /// No session registered for the specified peer.
149 #[error("no session registered for peer: {peer}")]
150 NoPeerChannel {
151 /// String representation of the peer role
152 peer: String,
153 },
154
155 /// Label serialization failed during choice/offer
156 #[error("label {operation} failed: {reason}")]
157 LabelSerializationFailed {
158 /// Operation: "serialization" or "deserialization"
159 operation: &'static str,
160 /// Human-readable failure reason
161 reason: String,
162 },
163
164 /// Message serialization failed with type context
165 #[error("{operation} of {type_name} failed: {reason}")]
166 MessageSerializationFailed {
167 /// Operation: "Serialization" or "Deserialization"
168 operation: &'static str,
169 /// Name of the type being serialized
170 type_name: &'static str,
171 /// Human-readable failure reason
172 reason: String,
173 },
174
175 /// Operation exceeded the specified timeout
176 #[error("timeout after {0:?}")]
177 Timeout(Duration),
178
179 /// Protocol specification was violated at runtime
180 #[error("protocol violation: {0}")]
181 ProtocolViolation(String),
182
183 /// Referenced role not found in the choreography
184 #[error("role {0:?} not found in this choreography")]
185 UnknownRole(String),
186
187 /// Error with protocol execution context
188 ///
189 /// Wraps an inner error with information about where in the protocol
190 /// the error occurred (protocol name, role, phase).
191 #[error("{protocol}::{role} at phase '{phase}': {inner}")]
192 ProtocolContext {
193 /// Name of the protocol being executed
194 protocol: &'static str,
195 /// Name of the role executing when error occurred
196 role: &'static str,
197 /// Current phase/step in the protocol
198 phase: &'static str,
199 /// The underlying error
200 #[source]
201 inner: Box<ChoreographyError>,
202 },
203
204 /// Error with role-specific context
205 #[error("[{role}] {inner}")]
206 RoleContext {
207 /// Name of the role where error occurred
208 role: &'static str,
209 /// Optional role index for parameterized roles
210 index: Option<u32>,
211 /// The underlying error
212 #[source]
213 inner: Box<ChoreographyError>,
214 },
215
216 /// Error during message exchange with another role
217 #[error("{operation} {message_type} {direction} {other_role}: {inner}")]
218 MessageContext {
219 /// The operation being performed (send/recv)
220 operation: &'static str,
221 /// The type of message involved
222 message_type: &'static str,
223 /// Direction (to/from)
224 direction: &'static str,
225 /// The other role involved in the exchange
226 other_role: &'static str,
227 /// The underlying error
228 #[source]
229 inner: Box<ChoreographyError>,
230 },
231
232 /// Error during choice/branch operation
233 #[error("choice error at {role}: {details}")]
234 ChoiceError {
235 /// The role making or receiving the choice
236 role: &'static str,
237 /// Details about the choice error
238 details: String,
239 },
240
241 /// Generic wrapped error with context string
242 #[error("{context}: {inner}")]
243 WithContext {
244 /// Additional context about the error
245 context: String,
246 /// The underlying error
247 #[source]
248 inner: Box<ChoreographyError>,
249 },
250
251 /// Invalid choice: the chosen branch was not among expected options
252 #[error("invalid choice: expected one of {expected:?}, got {actual}")]
253 InvalidChoice {
254 /// Expected branch labels
255 expected: Vec<String>,
256 /// Actual branch label provided
257 actual: String,
258 },
259
260 /// General execution error
261 #[error("execution error: {0}")]
262 ExecutionError(String),
263
264 /// Role family is empty after resolution
265 #[error("role family '{0}' resolved to empty set")]
266 EmptyRoleFamily(String),
267
268 /// Role family not found in adapter
269 #[error("role family '{0}' not found")]
270 RoleFamilyNotFound(String),
271
272 /// Role range is invalid
273 #[error("invalid role range for '{family}': [{start}, {end})")]
274 InvalidRoleRange {
275 /// The role family name
276 family: String,
277 /// Range start (inclusive)
278 start: u32,
279 /// Range end (exclusive)
280 end: u32,
281 },
282
283 /// Insufficient responses received from role family
284 #[error("insufficient responses: expected {expected}, received {received}")]
285 InsufficientResponses {
286 /// Expected minimum number of responses
287 expected: usize,
288 /// Actual number of responses received
289 received: usize,
290 },
291
292 /// Feature not implemented
293 #[error("not implemented: {0}")]
294 NotImplemented(String),
295}
296
297/// Result type for choreography operations.
298pub type ChoreoResult<T> = std::result::Result<T, ChoreographyError>;
299
300/// The core effect handler trait that abstracts all communication effects
301///
302/// This trait defines the primitive operations for choreographic protocols:
303/// sending, receiving, choosing, offering, and timeouts. Implement this trait
304/// to provide custom transport mechanisms (in-memory, network, etc.).
305///
306/// # Type Parameters
307///
308/// - `Role`: The type representing protocol participants
309/// - `Endpoint`: The connection state for this protocol execution
310///
311/// # Async implementation notes
312///
313/// We deliberately use the `async_trait` macro here so the trait stays object-safe,
314/// which lets middleware stacks (e.g. `Trace<Retry<H>>`) erase handlers behind trait
315/// objects. The macro also enforces `Send` on all returned futures, so the bounds on
316/// methods like [`with_timeout`](ChoreoHandler::with_timeout) apply equally to native
317/// multithreaded runtimes and single-threaded WASM builds.
318#[async_trait]
319pub trait ChoreoHandler: Send {
320 /// The role type for this choreography
321 type Role: RoleId;
322 /// The endpoint type maintaining connection state
323 type Endpoint: Endpoint;
324
325 /// Send a message to a specific role
326 ///
327 /// # Arguments
328 ///
329 /// * `ep` - The session endpoint
330 /// * `to` - The recipient role
331 /// * `msg` - The message to send (must be serializable)
332 async fn send<M: Serialize + Send + Sync>(
333 &mut self,
334 ep: &mut Self::Endpoint,
335 to: Self::Role,
336 msg: &M,
337 ) -> ChoreoResult<()>;
338
339 /// Receive a strongly-typed message from a specific role
340 ///
341 /// # Arguments
342 ///
343 /// * `ep` - The session endpoint
344 /// * `from` - The sender role
345 ///
346 /// # Returns
347 ///
348 /// The received message of type `M`
349 async fn recv<M: DeserializeOwned + Send>(
350 &mut self,
351 ep: &mut Self::Endpoint,
352 from: Self::Role,
353 ) -> ChoreoResult<M>;
354
355 /// Internal choice: broadcast a label selection
356 ///
357 /// Used by the choosing role to inform others of the selected branch.
358 ///
359 /// # Arguments
360 ///
361 /// * `ep` - The session endpoint
362 /// * `who` - The role making the choice (usually the current role)
363 /// * `label` - The selected branch label
364 async fn choose(
365 &mut self,
366 ep: &mut Self::Endpoint,
367 who: Self::Role,
368 label: <Self::Role as RoleId>::Label,
369 ) -> ChoreoResult<()>;
370
371 /// External choice: receive a label selection
372 ///
373 /// Used by non-choosing roles to receive the branch selection from another role.
374 ///
375 /// # Arguments
376 ///
377 /// * `ep` - The session endpoint
378 /// * `from` - The role that made the choice
379 ///
380 /// # Returns
381 ///
382 /// The label selected by the choosing role
383 async fn offer(
384 &mut self,
385 ep: &mut Self::Endpoint,
386 from: Self::Role,
387 ) -> ChoreoResult<<Self::Role as RoleId>::Label>;
388
389 /// Execute a future with a timeout
390 ///
391 /// # Arguments
392 ///
393 /// * `ep` - The session endpoint
394 /// * `at` - The role where timeout is enforced
395 /// * `dur` - Maximum duration to wait
396 /// * `body` - The future to execute
397 ///
398 /// # Returns
399 ///
400 /// Result of the future, or timeout error if duration exceeded
401 async fn with_timeout<F, T>(
402 &mut self,
403 ep: &mut Self::Endpoint,
404 at: Self::Role,
405 dur: Duration,
406 body: F,
407 ) -> ChoreoResult<T>
408 where
409 F: std::future::Future<Output = ChoreoResult<T>> + Send;
410
411 /// Broadcast a message to multiple recipients
412 ///
413 /// Default implementation sends sequentially. Override for optimized broadcasting.
414 async fn broadcast<M: Serialize + Send + Sync>(
415 &mut self,
416 ep: &mut Self::Endpoint,
417 recipients: &[Self::Role],
418 msg: &M,
419 ) -> ChoreoResult<()> {
420 for &recipient in recipients {
421 self.send(ep, recipient, msg).await?;
422 }
423 Ok(())
424 }
425
426 /// Send messages to multiple recipients in parallel
427 ///
428 /// Default implementation sends sequentially. Override for true parallelism.
429 async fn parallel_send<M: Serialize + Send + Sync>(
430 &mut self,
431 ep: &mut Self::Endpoint,
432 sends: &[(Self::Role, M)],
433 ) -> ChoreoResult<()> {
434 // Default implementation: sequential sends
435 for (recipient, msg) in sends {
436 self.send(ep, *recipient, msg).await?;
437 }
438 Ok(())
439 }
440}
441
442/// Extension trait for handler lifecycle management
443///
444/// Provides setup and teardown methods for managing handler state and connections.
445#[async_trait]
446pub trait ChoreoHandlerExt: ChoreoHandler {
447 /// Setup phase - establish connections, initialize state
448 ///
449 /// Called before protocol execution begins.
450 async fn setup(&mut self, role: Self::Role) -> ChoreoResult<Self::Endpoint>;
451
452 /// Teardown phase - close connections, cleanup
453 ///
454 /// Called after protocol execution completes.
455 async fn teardown(&mut self, ep: Self::Endpoint) -> ChoreoResult<()>;
456}
457
458/// A no-op handler for testing pure choreographic logic
459///
460/// This handler performs no actual communication, making it useful for
461/// testing protocol logic without network overhead.
462pub struct NoOpHandler<R: RoleId> {
463 _phantom: std::marker::PhantomData<R>,
464 registry: ExtensionRegistry<(), R>,
465}
466
467impl<R: RoleId> NoOpHandler<R> {
468 /// Create a new no-op handler
469 #[must_use]
470 pub fn new() -> Self {
471 Self {
472 _phantom: std::marker::PhantomData,
473 registry: ExtensionRegistry::new(),
474 }
475 }
476}
477
478impl<R: RoleId> Default for NoOpHandler<R> {
479 fn default() -> Self {
480 Self::new()
481 }
482}
483
484#[async_trait]
485impl<R: RoleId + 'static> ExtensibleHandler for NoOpHandler<R> {
486 fn extension_registry(&self) -> &ExtensionRegistry<Self::Endpoint, Self::Role> {
487 &self.registry
488 }
489}
490
491#[async_trait]
492impl<R: RoleId + 'static> ChoreoHandler for NoOpHandler<R> {
493 type Role = R;
494 type Endpoint = ();
495
496 async fn send<M: Serialize + Send + Sync>(
497 &mut self,
498 _ep: &mut Self::Endpoint,
499 _to: Self::Role,
500 _msg: &M,
501 ) -> ChoreoResult<()> {
502 Ok(())
503 }
504
505 async fn recv<M: DeserializeOwned + Send>(
506 &mut self,
507 _ep: &mut Self::Endpoint,
508 _from: Self::Role,
509 ) -> ChoreoResult<M> {
510 Err(ChoreographyError::Transport(
511 "NoOpHandler cannot receive".into(),
512 ))
513 }
514
515 async fn choose(
516 &mut self,
517 _ep: &mut Self::Endpoint,
518 _who: Self::Role,
519 _label: <Self::Role as RoleId>::Label,
520 ) -> ChoreoResult<()> {
521 Ok(())
522 }
523
524 async fn offer(
525 &mut self,
526 _ep: &mut Self::Endpoint,
527 _from: Self::Role,
528 ) -> ChoreoResult<<Self::Role as RoleId>::Label> {
529 Err(ChoreographyError::Transport(
530 "NoOpHandler cannot offer".into(),
531 ))
532 }
533
534 async fn with_timeout<F, T>(
535 &mut self,
536 _ep: &mut Self::Endpoint,
537 _at: Self::Role,
538 _dur: Duration,
539 body: F,
540 ) -> ChoreoResult<T>
541 where
542 F: std::future::Future<Output = ChoreoResult<T>> + Send,
543 {
544 body.await
545 }
546}