telltale_runtime/effects/handler.rs
1//! Effect Handler Architecture for Choreographic Programming
2//!
3//! This module provides a clean effect boundary between pure choreographic logic
4//! and runtime transport implementations. It allows for testable, composable,
5//! and runtime-agnostic protocol implementations.
6//!
7//! # Architecture
8//!
9//! The effect handler system separates concerns:
10//! - **Choreographic Logic**: Pure protocol specification (what to do)
11//! - **Effect Handlers**: Runtime implementation (how to do it)
12//! - **Interpreters**: Execute choreographic programs using handlers
13//! - **Contract Profiles**: Machine-checkable statements of semantic obligations
14//! versus transport-policy freedom, defined in `effects::contract`
15//!
16//! # Example
17//!
18//! ```text
19//! use telltale_runtime::{ChoreoHandler, LabelId};
20//!
21//! #[async_trait]
22//! impl ChoreoHandler for MyHandler {
23//! type Role = MyRole;
24//! type Endpoint = MyEndpoint;
25//!
26//! async fn send<M>(&mut self, ep: &mut Self::Endpoint, to: Self::Role, msg: &M) -> Result<()> {
27//! // Implementation
28//! }
29//! // ... other methods
30//! }
31//! ```
32//!
33//! ## ProtocolMachine Boundary
34//!
35//! The bytecode ProtocolMachine in `telltale-machine` exposes a separate, synchronous
36//! `EffectHandler` trait for simulation/runtime integration. It is not
37//! interchangeable with `ChoreoHandler`: `ChoreoHandler` is async and typed
38//! over concrete message/role types, while the ProtocolMachine handler operates over
39//! bytecode values and must remain session-local for determinism.
40
41use async_trait::async_trait;
42use serde::{de::DeserializeOwned, Serialize};
43use std::any::TypeId;
44use std::fmt::Debug;
45use std::time::Duration;
46use thiserror::Error;
47
48use crate::effects::contract::{
49 DeliveryModel, DocumentedHandlerContract, ExtensionDispatchContract, ExtensionDispatchMode,
50 HandlerContractProfile, HandlerContractTier, ProtocolSemanticContract, RetryPolicy,
51 TimeoutPolicy, TransportPolicyContract,
52};
53use crate::effects::registry::{ExtensibleHandler, ExtensionRegistry};
54use crate::identifiers::RoleName;
55
56#[path = "handler_context.rs"]
57mod context;
58pub use context::ContextExt;
59
60/// Trait for role identifiers in choreographies
61///
62/// Roles are typically generated as enums per choreography, but any type
63/// implementing the required traits can serve as a role identifier.
64pub trait RoleId: Copy + Eq + std::hash::Hash + Debug + Send + Sync + 'static {
65 /// Protocol-specific label type associated with this role type.
66 type Label: LabelId;
67
68 /// Get the canonical role name for this role identifier.
69 fn role_name(&self) -> RoleName;
70
71 /// Optional index for parameterized roles.
72 fn role_index(&self) -> Option<u32> {
73 None
74 }
75}
76
77/// Labels identify branches in internal/external choice.
78///
79/// Labels must be stable identifiers that can be sent across the wire
80/// and re-hydrated on the receiving side.
81pub trait LabelId: Copy + Eq + std::hash::Hash + Debug + Send + Sync + 'static {
82 /// Stable textual identifier for serialization/logging.
83 fn as_str(&self) -> &'static str;
84
85 /// Parse a label from its textual identifier.
86 fn from_str(label: &str) -> Option<Self>;
87}
88
89/// Typed message tag for receive effects.
90#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
91pub struct MessageTag {
92 type_id: TypeId,
93 type_name: &'static str,
94}
95
96impl MessageTag {
97 /// Create a tag for a concrete message type.
98 #[must_use]
99 pub fn of<T: 'static>() -> Self {
100 Self {
101 type_id: TypeId::of::<T>(),
102 type_name: std::any::type_name::<T>(),
103 }
104 }
105
106 /// Access the underlying `TypeId`.
107 #[must_use]
108 pub fn type_id(&self) -> TypeId {
109 self.type_id
110 }
111
112 /// Access the human-readable type name.
113 #[must_use]
114 pub fn type_name(&self) -> &'static str {
115 self.type_name
116 }
117}
118
119/// Session endpoint trait
120///
121/// Represents the runtime-specific connection state (e.g., Telltale channel bundle).
122/// The generated code will be generic over the endpoint type.
123pub trait Endpoint: Send {}
124impl<T: Send> Endpoint for T {}
125
126/// Errors that can occur during choreographic execution
127#[derive(Debug, Error)]
128pub enum ChoreographyError {
129 /// Transport-layer error (network, channel failure, etc.)
130 #[error("transport error: {0}")]
131 Transport(String),
132
133 /// Message serialization/deserialization error
134 #[error("serialization error: {0}")]
135 Serialization(String),
136
137 /// Session transport send operation failed.
138 #[error("{channel_type} send failed: {reason}")]
139 ChannelSendFailed {
140 /// Type of session transport (for example "SinkStream")
141 channel_type: &'static str,
142 /// Human-readable failure reason
143 reason: String,
144 },
145
146 /// Session transport was closed unexpectedly during operation.
147 #[error("{channel_type} closed during {operation}")]
148 ChannelClosed {
149 /// Type of session transport (for example "SinkStream")
150 channel_type: &'static str,
151 /// Operation being performed when channel closed
152 operation: &'static str,
153 },
154
155 /// No session registered for the specified peer.
156 #[error("no session registered for peer: {peer}")]
157 NoPeerChannel {
158 /// String representation of the peer role
159 peer: String,
160 },
161
162 /// Label serialization failed during choice/offer
163 #[error("label {operation} failed: {reason}")]
164 LabelSerializationFailed {
165 /// Operation: "serialization" or "deserialization"
166 operation: &'static str,
167 /// Human-readable failure reason
168 reason: String,
169 },
170
171 /// Message serialization failed with type context
172 #[error("{operation} of {type_name} failed: {reason}")]
173 MessageSerializationFailed {
174 /// Operation: "Serialization" or "Deserialization"
175 operation: &'static str,
176 /// Name of the type being serialized
177 type_name: &'static str,
178 /// Human-readable failure reason
179 reason: String,
180 },
181
182 /// Operation exceeded the specified timeout
183 #[error("timeout after {0:?}")]
184 Timeout(Duration),
185
186 /// Protocol specification was violated at runtime
187 #[error("protocol violation: {0}")]
188 ProtocolViolation(String),
189
190 /// Referenced role not found in the choreography
191 #[error("role {0:?} not found in this choreography")]
192 UnknownRole(String),
193
194 /// Error with protocol execution context
195 ///
196 /// Wraps an inner error with information about where in the protocol
197 /// the error occurred (protocol name, role, phase).
198 #[error("{protocol}::{role} at phase '{phase}': {inner}")]
199 ProtocolContext {
200 /// Name of the protocol being executed
201 protocol: &'static str,
202 /// Name of the role executing when error occurred
203 role: &'static str,
204 /// Current phase/step in the protocol
205 phase: &'static str,
206 /// The underlying error
207 #[source]
208 inner: Box<ChoreographyError>,
209 },
210
211 /// Error with role-specific context
212 #[error("[{role}] {inner}")]
213 RoleContext {
214 /// Name of the role where error occurred
215 role: &'static str,
216 /// Optional role index for parameterized roles
217 index: Option<u32>,
218 /// The underlying error
219 #[source]
220 inner: Box<ChoreographyError>,
221 },
222
223 /// Error during message exchange with another role
224 #[error("{operation} {message_type} {direction} {other_role}: {inner}")]
225 MessageContext {
226 /// The operation being performed (send/recv)
227 operation: &'static str,
228 /// The type of message involved
229 message_type: &'static str,
230 /// Direction (to/from)
231 direction: &'static str,
232 /// The other role involved in the exchange
233 other_role: &'static str,
234 /// The underlying error
235 #[source]
236 inner: Box<ChoreographyError>,
237 },
238
239 /// Error during choice/branch operation
240 #[error("choice error at {role}: {details}")]
241 ChoiceError {
242 /// The role making or receiving the choice
243 role: &'static str,
244 /// Details about the choice error
245 details: String,
246 },
247
248 /// Generic wrapped error with context string
249 #[error("{context}: {inner}")]
250 WithContext {
251 /// Additional context about the error
252 context: String,
253 /// The underlying error
254 #[source]
255 inner: Box<ChoreographyError>,
256 },
257
258 /// Invalid choice: the chosen branch was not among expected options
259 #[error("invalid choice: expected one of {expected:?}, got {actual}")]
260 InvalidChoice {
261 /// Expected branch labels
262 expected: Vec<String>,
263 /// Actual branch label provided
264 actual: String,
265 },
266
267 /// General execution error
268 #[error("execution error: {0}")]
269 ExecutionError(String),
270
271 /// Role family is empty after resolution
272 #[error("role family '{0}' resolved to empty set")]
273 EmptyRoleFamily(String),
274
275 /// Role family not found in adapter
276 #[error("role family '{0}' not found")]
277 RoleFamilyNotFound(String),
278
279 /// Role range is invalid
280 #[error("invalid role range for '{family}': [{start}, {end})")]
281 InvalidRoleRange {
282 /// The role family name
283 family: String,
284 /// Range start (inclusive)
285 start: u32,
286 /// Range end (exclusive)
287 end: u32,
288 },
289
290 /// Insufficient responses received from role family
291 #[error("insufficient responses: expected {expected}, received {received}")]
292 InsufficientResponses {
293 /// Expected minimum number of responses
294 expected: usize,
295 /// Actual number of responses received
296 received: usize,
297 },
298
299 /// Feature not implemented
300 #[error("not implemented: {0}")]
301 NotImplemented(String),
302}
303
304/// Result type for choreography operations.
305pub type ChoreoResult<T> = std::result::Result<T, ChoreographyError>;
306
307/// The core effect handler trait that abstracts all communication effects
308///
309/// This trait defines the primitive operations for choreographic protocols:
310/// sending, receiving, choosing, offering, and timeouts. Implement this trait
311/// to provide custom transport mechanisms (in-memory, network, etc.).
312///
313/// # Type Parameters
314///
315/// - `Role`: The type representing protocol participants
316/// - `Endpoint`: The connection state for this protocol execution
317///
318/// # Async implementation notes
319///
320/// We deliberately use the `async_trait` macro here so the trait stays object-safe,
321/// which lets middleware stacks (e.g. `Trace<Retry<H>>`) erase handlers behind trait
322/// objects. The macro also enforces `Send` on all returned futures, so the bounds on
323/// methods like [`with_timeout`](ChoreoHandler::with_timeout) apply equally to native
324/// multithreaded runtimes and single-threaded WASM builds.
325#[async_trait]
326pub trait ChoreoHandler: Send {
327 /// The role type for this choreography
328 type Role: RoleId;
329 /// The endpoint type maintaining connection state
330 type Endpoint: Endpoint;
331
332 /// Send a message to a specific role
333 ///
334 /// # Arguments
335 ///
336 /// * `ep` - The session endpoint
337 /// * `to` - The recipient role
338 /// * `msg` - The message to send (must be serializable)
339 async fn send<M: Serialize + Send + Sync>(
340 &mut self,
341 ep: &mut Self::Endpoint,
342 to: Self::Role,
343 msg: &M,
344 ) -> ChoreoResult<()>;
345
346 /// Receive a strongly-typed message from a specific role
347 ///
348 /// # Arguments
349 ///
350 /// * `ep` - The session endpoint
351 /// * `from` - The sender role
352 ///
353 /// # Returns
354 ///
355 /// The received message of type `M`
356 async fn recv<M: DeserializeOwned + Send>(
357 &mut self,
358 ep: &mut Self::Endpoint,
359 from: Self::Role,
360 ) -> ChoreoResult<M>;
361
362 /// Internal choice: broadcast a label selection
363 ///
364 /// Used by the choosing role to inform others of the selected branch.
365 ///
366 /// # Arguments
367 ///
368 /// * `ep` - The session endpoint
369 /// * `who` - The role making the choice (usually the current role)
370 /// * `label` - The selected branch label
371 async fn choose(
372 &mut self,
373 ep: &mut Self::Endpoint,
374 who: Self::Role,
375 label: <Self::Role as RoleId>::Label,
376 ) -> ChoreoResult<()>;
377
378 /// External choice: receive a label selection
379 ///
380 /// Used by non-choosing roles to receive the branch selection from another role.
381 ///
382 /// # Arguments
383 ///
384 /// * `ep` - The session endpoint
385 /// * `from` - The role that made the choice
386 ///
387 /// # Returns
388 ///
389 /// The label selected by the choosing role
390 async fn offer(
391 &mut self,
392 ep: &mut Self::Endpoint,
393 from: Self::Role,
394 ) -> ChoreoResult<<Self::Role as RoleId>::Label>;
395
396 /// Execute a future with a timeout
397 ///
398 /// # Arguments
399 ///
400 /// * `ep` - The session endpoint
401 /// * `at` - The role where timeout is enforced
402 /// * `dur` - Maximum duration to wait
403 /// * `body` - The future to execute
404 ///
405 /// # Returns
406 ///
407 /// Result of the future, or timeout error if duration exceeded
408 async fn with_timeout<F, T>(
409 &mut self,
410 ep: &mut Self::Endpoint,
411 at: Self::Role,
412 dur: Duration,
413 body: F,
414 ) -> ChoreoResult<T>
415 where
416 F: std::future::Future<Output = ChoreoResult<T>> + Send;
417
418 /// Broadcast a message to multiple recipients
419 ///
420 /// Default implementation sends sequentially. Override for optimized broadcasting.
421 async fn broadcast<M: Serialize + Send + Sync>(
422 &mut self,
423 ep: &mut Self::Endpoint,
424 recipients: &[Self::Role],
425 msg: &M,
426 ) -> ChoreoResult<()> {
427 for &recipient in recipients {
428 self.send(ep, recipient, msg).await?;
429 }
430 Ok(())
431 }
432
433 /// Send messages to multiple recipients in parallel
434 ///
435 /// Default implementation sends sequentially. Override for true parallelism.
436 async fn parallel_send<M: Serialize + Send + Sync>(
437 &mut self,
438 ep: &mut Self::Endpoint,
439 sends: &[(Self::Role, M)],
440 ) -> ChoreoResult<()> {
441 // Default implementation: sequential sends
442 for (recipient, msg) in sends {
443 self.send(ep, *recipient, msg).await?;
444 }
445 Ok(())
446 }
447}
448
449/// Extension trait for handler lifecycle management
450///
451/// Provides setup and teardown methods for managing handler state and connections.
452#[async_trait]
453pub trait ChoreoHandlerExt: ChoreoHandler {
454 /// Setup phase - establish connections, initialize state
455 ///
456 /// Called before protocol execution begins.
457 async fn setup(&mut self, role: Self::Role) -> ChoreoResult<Self::Endpoint>;
458
459 /// Teardown phase - close connections, cleanup
460 ///
461 /// Called after protocol execution completes.
462 async fn teardown(&mut self, ep: Self::Endpoint) -> ChoreoResult<()>;
463}
464
465/// A no-op handler for testing pure choreographic logic
466///
467/// This handler performs no actual communication, making it useful for
468/// testing protocol logic without network overhead.
469pub struct NoOpHandler<R: RoleId> {
470 _phantom: std::marker::PhantomData<R>,
471 registry: ExtensionRegistry<(), R>,
472}
473
474impl<R: RoleId> NoOpHandler<R> {
475 /// Create a new no-op handler
476 #[must_use]
477 pub fn new() -> Self {
478 Self {
479 _phantom: std::marker::PhantomData,
480 registry: ExtensionRegistry::new(),
481 }
482 }
483}
484
485impl<R: RoleId> Default for NoOpHandler<R> {
486 fn default() -> Self {
487 Self::new()
488 }
489}
490
491impl<R: RoleId> DocumentedHandlerContract for NoOpHandler<R> {
492 fn contract_profile() -> HandlerContractProfile {
493 HandlerContractProfile {
494 handler_name: std::any::type_name::<Self>(),
495 tier: HandlerContractTier::ObservationalHarness,
496 semantics: ProtocolSemanticContract {
497 typed_send_recv_roundtrip: false,
498 exact_choice_label_preservation: false,
499 fail_closed_transport_errors: true,
500 timeouts_scoped_to_enforcing_role: true,
501 deterministic_for_regression: true,
502 can_materialize_values: false,
503 },
504 transport: TransportPolicyContract {
505 delivery_model: DeliveryModel::NoTransport,
506 retry_policy: RetryPolicy::None,
507 timeout_policy: TimeoutPolicy::EnforcingRoleOnly,
508 },
509 extension_dispatch: ExtensionDispatchContract {
510 mode: ExtensionDispatchMode::Unsupported,
511 fail_closed_when_unregistered: false,
512 type_exact_before_side_effects: false,
513 },
514 notes: vec![
515 "send/choose succeed as no-op observability aids",
516 "recv/offer intentionally fail closed instead of inventing values",
517 ],
518 }
519 }
520}
521
522#[async_trait]
523impl<R: RoleId + 'static> ExtensibleHandler for NoOpHandler<R> {
524 fn extension_registry(&self) -> &ExtensionRegistry<Self::Endpoint, Self::Role> {
525 &self.registry
526 }
527}
528
529#[async_trait]
530impl<R: RoleId + 'static> ChoreoHandler for NoOpHandler<R> {
531 type Role = R;
532 type Endpoint = ();
533
534 async fn send<M: Serialize + Send + Sync>(
535 &mut self,
536 _ep: &mut Self::Endpoint,
537 _to: Self::Role,
538 _msg: &M,
539 ) -> ChoreoResult<()> {
540 Ok(())
541 }
542
543 async fn recv<M: DeserializeOwned + Send>(
544 &mut self,
545 _ep: &mut Self::Endpoint,
546 _from: Self::Role,
547 ) -> ChoreoResult<M> {
548 Err(ChoreographyError::Transport(
549 "NoOpHandler cannot receive".into(),
550 ))
551 }
552
553 async fn choose(
554 &mut self,
555 _ep: &mut Self::Endpoint,
556 _who: Self::Role,
557 _label: <Self::Role as RoleId>::Label,
558 ) -> ChoreoResult<()> {
559 Ok(())
560 }
561
562 async fn offer(
563 &mut self,
564 _ep: &mut Self::Endpoint,
565 _from: Self::Role,
566 ) -> ChoreoResult<<Self::Role as RoleId>::Label> {
567 Err(ChoreographyError::Transport(
568 "NoOpHandler cannot offer".into(),
569 ))
570 }
571
572 async fn with_timeout<F, T>(
573 &mut self,
574 _ep: &mut Self::Endpoint,
575 _at: Self::Role,
576 _dur: Duration,
577 body: F,
578 ) -> ChoreoResult<T>
579 where
580 F: std::future::Future<Output = ChoreoResult<T>> + Send,
581 {
582 body.await
583 }
584}