use async_trait::async_trait;
use serde::{de::DeserializeOwned, Serialize};
use std::any::TypeId;
use std::fmt::Debug;
use std::time::Duration;
use thiserror::Error;
use crate::effects::contract::{
DeliveryModel, DocumentedHandlerContract, ExtensionDispatchContract, ExtensionDispatchMode,
HandlerContractProfile, HandlerContractTier, ProtocolSemanticContract, RetryPolicy,
TimeoutPolicy, TransportPolicyContract,
};
use crate::effects::registry::{ExtensibleHandler, ExtensionRegistry};
use crate::identifiers::RoleName;
#[path = "handler_context.rs"]
mod context;
pub use context::ContextExt;
pub trait RoleId: Copy + Eq + std::hash::Hash + Debug + Send + Sync + 'static {
type Label: LabelId;
fn role_name(&self) -> RoleName;
fn role_index(&self) -> Option<u32> {
None
}
}
pub trait LabelId: Copy + Eq + std::hash::Hash + Debug + Send + Sync + 'static {
fn as_str(&self) -> &'static str;
fn from_str(label: &str) -> Option<Self>;
}
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
pub struct MessageTag {
type_id: TypeId,
type_name: &'static str,
}
impl MessageTag {
#[must_use]
pub fn of<T: 'static>() -> Self {
Self {
type_id: TypeId::of::<T>(),
type_name: std::any::type_name::<T>(),
}
}
#[must_use]
pub fn type_id(&self) -> TypeId {
self.type_id
}
#[must_use]
pub fn type_name(&self) -> &'static str {
self.type_name
}
}
pub trait Endpoint: Send {}
impl<T: Send> Endpoint for T {}
#[derive(Debug, Error)]
pub enum ChoreographyError {
#[error("transport error: {0}")]
Transport(String),
#[error("serialization error: {0}")]
Serialization(String),
#[error("{channel_type} send failed: {reason}")]
ChannelSendFailed {
channel_type: &'static str,
reason: String,
},
#[error("{channel_type} closed during {operation}")]
ChannelClosed {
channel_type: &'static str,
operation: &'static str,
},
#[error("no session registered for peer: {peer}")]
NoPeerChannel {
peer: String,
},
#[error("label {operation} failed: {reason}")]
LabelSerializationFailed {
operation: &'static str,
reason: String,
},
#[error("{operation} of {type_name} failed: {reason}")]
MessageSerializationFailed {
operation: &'static str,
type_name: &'static str,
reason: String,
},
#[error("timeout after {0:?}")]
Timeout(Duration),
#[error("protocol violation: {0}")]
ProtocolViolation(String),
#[error("role {0:?} not found in this choreography")]
UnknownRole(String),
#[error("{protocol}::{role} at phase '{phase}': {inner}")]
ProtocolContext {
protocol: &'static str,
role: &'static str,
phase: &'static str,
#[source]
inner: Box<ChoreographyError>,
},
#[error("[{role}] {inner}")]
RoleContext {
role: &'static str,
index: Option<u32>,
#[source]
inner: Box<ChoreographyError>,
},
#[error("{operation} {message_type} {direction} {other_role}: {inner}")]
MessageContext {
operation: &'static str,
message_type: &'static str,
direction: &'static str,
other_role: &'static str,
#[source]
inner: Box<ChoreographyError>,
},
#[error("choice error at {role}: {details}")]
ChoiceError {
role: &'static str,
details: String,
},
#[error("{context}: {inner}")]
WithContext {
context: String,
#[source]
inner: Box<ChoreographyError>,
},
#[error("invalid choice: expected one of {expected:?}, got {actual}")]
InvalidChoice {
expected: Vec<String>,
actual: String,
},
#[error("execution error: {0}")]
ExecutionError(String),
#[error("role family '{0}' resolved to empty set")]
EmptyRoleFamily(String),
#[error("role family '{0}' not found")]
RoleFamilyNotFound(String),
#[error("invalid role range for '{family}': [{start}, {end})")]
InvalidRoleRange {
family: String,
start: u32,
end: u32,
},
#[error("insufficient responses: expected {expected}, received {received}")]
InsufficientResponses {
expected: usize,
received: usize,
},
#[error("not implemented: {0}")]
NotImplemented(String),
}
pub type ChoreoResult<T> = std::result::Result<T, ChoreographyError>;
#[async_trait]
pub trait ChoreoHandler: Send {
type Role: RoleId;
type Endpoint: Endpoint;
async fn send<M: Serialize + Send + Sync>(
&mut self,
ep: &mut Self::Endpoint,
to: Self::Role,
msg: &M,
) -> ChoreoResult<()>;
async fn recv<M: DeserializeOwned + Send>(
&mut self,
ep: &mut Self::Endpoint,
from: Self::Role,
) -> ChoreoResult<M>;
async fn choose(
&mut self,
ep: &mut Self::Endpoint,
who: Self::Role,
label: <Self::Role as RoleId>::Label,
) -> ChoreoResult<()>;
async fn offer(
&mut self,
ep: &mut Self::Endpoint,
from: Self::Role,
) -> ChoreoResult<<Self::Role as RoleId>::Label>;
async fn with_timeout<F, T>(
&mut self,
ep: &mut Self::Endpoint,
at: Self::Role,
dur: Duration,
body: F,
) -> ChoreoResult<T>
where
F: std::future::Future<Output = ChoreoResult<T>> + Send;
async fn broadcast<M: Serialize + Send + Sync>(
&mut self,
ep: &mut Self::Endpoint,
recipients: &[Self::Role],
msg: &M,
) -> ChoreoResult<()> {
for &recipient in recipients {
self.send(ep, recipient, msg).await?;
}
Ok(())
}
async fn parallel_send<M: Serialize + Send + Sync>(
&mut self,
ep: &mut Self::Endpoint,
sends: &[(Self::Role, M)],
) -> ChoreoResult<()> {
for (recipient, msg) in sends {
self.send(ep, *recipient, msg).await?;
}
Ok(())
}
}
#[async_trait]
pub trait ChoreoHandlerExt: ChoreoHandler {
async fn setup(&mut self, role: Self::Role) -> ChoreoResult<Self::Endpoint>;
async fn teardown(&mut self, ep: Self::Endpoint) -> ChoreoResult<()>;
}
pub struct NoOpHandler<R: RoleId> {
_phantom: std::marker::PhantomData<R>,
registry: ExtensionRegistry<(), R>,
}
impl<R: RoleId> NoOpHandler<R> {
#[must_use]
pub fn new() -> Self {
Self {
_phantom: std::marker::PhantomData,
registry: ExtensionRegistry::new(),
}
}
}
impl<R: RoleId> Default for NoOpHandler<R> {
fn default() -> Self {
Self::new()
}
}
impl<R: RoleId> DocumentedHandlerContract for NoOpHandler<R> {
fn contract_profile() -> HandlerContractProfile {
HandlerContractProfile {
handler_name: std::any::type_name::<Self>(),
tier: HandlerContractTier::ObservationalHarness,
semantics: ProtocolSemanticContract {
typed_send_recv_roundtrip: false,
exact_choice_label_preservation: false,
fail_closed_transport_errors: true,
timeouts_scoped_to_enforcing_role: true,
deterministic_for_regression: true,
can_materialize_values: false,
},
transport: TransportPolicyContract {
delivery_model: DeliveryModel::NoTransport,
retry_policy: RetryPolicy::None,
timeout_policy: TimeoutPolicy::EnforcingRoleOnly,
},
extension_dispatch: ExtensionDispatchContract {
mode: ExtensionDispatchMode::Unsupported,
fail_closed_when_unregistered: false,
type_exact_before_side_effects: false,
},
notes: vec![
"send/choose succeed as no-op observability aids",
"recv/offer intentionally fail closed instead of inventing values",
],
}
}
}
#[async_trait]
impl<R: RoleId + 'static> ExtensibleHandler for NoOpHandler<R> {
fn extension_registry(&self) -> &ExtensionRegistry<Self::Endpoint, Self::Role> {
&self.registry
}
}
#[async_trait]
impl<R: RoleId + 'static> ChoreoHandler for NoOpHandler<R> {
type Role = R;
type Endpoint = ();
async fn send<M: Serialize + Send + Sync>(
&mut self,
_ep: &mut Self::Endpoint,
_to: Self::Role,
_msg: &M,
) -> ChoreoResult<()> {
Ok(())
}
async fn recv<M: DeserializeOwned + Send>(
&mut self,
_ep: &mut Self::Endpoint,
_from: Self::Role,
) -> ChoreoResult<M> {
Err(ChoreographyError::Transport(
"NoOpHandler cannot receive".into(),
))
}
async fn choose(
&mut self,
_ep: &mut Self::Endpoint,
_who: Self::Role,
_label: <Self::Role as RoleId>::Label,
) -> ChoreoResult<()> {
Ok(())
}
async fn offer(
&mut self,
_ep: &mut Self::Endpoint,
_from: Self::Role,
) -> ChoreoResult<<Self::Role as RoleId>::Label> {
Err(ChoreographyError::Transport(
"NoOpHandler cannot offer".into(),
))
}
async fn with_timeout<F, T>(
&mut self,
_ep: &mut Self::Endpoint,
_at: Self::Role,
_dur: Duration,
body: F,
) -> ChoreoResult<T>
where
F: std::future::Future<Output = ChoreoResult<T>> + Send,
{
body.await
}
}