rumpsteak_aura/
lib.rs

1//! Core Rumpsteak library for session-typed communication.
2//!
3//! This crate provides session types (Send, Receive, Select, Branch, End) and
4//! channel abstractions for safe multiparty communication.
5//!
6//! # Modules
7//!
8//! - [`types`] - Core session type definitions (GlobalType, LocalTypeR, Label)
9//! - [`channel`] - Channel abstractions for communication
10//!
11//! # Feature Flags
12//!
13//! ## Core Features
14//!
15//! | Feature | Description |
16//! |---------|-------------|
17//! | `serialize` | Enable serialization support for session types |
18//! | `test-utils` | Testing utilities (random generation, etc.) |
19//! | `wasm` | WebAssembly support |
20//!
21//! ## Theory Features
22//!
23//! | Feature | Description |
24//! |---------|-------------|
25//! | `theory` | Session type algorithms (projection, merge, duality, etc.) |
26//! | `theory-async-subtyping` | POPL 2021 asynchronous subtyping algorithm |
27//! | `theory-bounded` | Bounded recursion strategies |
28//!
29//! ## Meta Features
30//!
31//! | Feature | Description |
32//! |---------|-------------|
33//! | `full` | Enable all optional features |
34
35/// Channel abstractions for asynchronous communication.
36pub mod channel;
37
38// Re-export core types (always available)
39pub use rumpsteak_types as types;
40pub use rumpsteak_types::{GlobalType, Label, LocalTypeR, PayloadSort};
41
42// Re-export optional crates
43#[cfg(feature = "theory")]
44pub use rumpsteak_theory as theory;
45
46pub use rumpsteak_aura_macros::{session, Message, Role, Roles};
47
48/// Prelude module for convenient imports.
49pub mod prelude {
50    pub use super::{session, try_session};
51    pub use super::{
52        Branch, Choice, Choices, End, FromState, IntoSession, Message, Receive, ReceiveError, Role,
53        Route, Select, Send, Session, SessionError,
54    };
55    pub use rumpsteak_types::{GlobalType, Label, LocalTypeR, PayloadSort};
56}
57
58use futures::{FutureExt, Sink, SinkExt, Stream, StreamExt};
59use std::{
60    any::Any,
61    convert::Infallible,
62    future::Future,
63    marker::{self, PhantomData},
64};
65use thiserror::Error;
66
67/// Trait for types that can be sealed to prevent further use
68pub trait Sealable {
69    /// Seal this channel, preventing further operations
70    fn seal(&mut self);
71
72    /// Check if this channel is sealed
73    fn is_sealed(&self) -> bool;
74}
75
76/// Error that can occur during a session send operation.
77#[derive(Debug, Error)]
78pub enum SessionError<E> {
79    /// The session was used after being sealed.
80    #[error("session was used after being sealed")]
81    Sealed,
82    /// An error from the underlying channel.
83    #[error(transparent)]
84    Channel(E),
85}
86
87/// Error type for send operations, specialized to the channel's error type.
88pub type SendError<Q, R> =
89    SessionError<<<Q as Route<R>>::Route as Sink<<Q as Role>::Message>>::Error>;
90
91/// Error that can occur during a session receive operation.
92#[derive(Debug, Error)]
93pub enum ReceiveError {
94    /// The receiver stream is empty (no messages available).
95    #[error("receiver stream is empty")]
96    EmptyStream,
97    /// Received a message with an unexpected type.
98    #[error("received message with an unexpected type")]
99    UnexpectedType,
100    /// The session was used after being sealed.
101    #[error("session was used after being sealed")]
102    Sealed,
103}
104
105/// This trait represents a message to be exchanged between two participants.
106/// The generic type L is the type of the label (i.e. the content of the
107/// message).
108pub trait Message<L>: Sized {
109    /// Creates a message from a label.
110    fn upcast(label: L) -> Self;
111
112    /// Tries to get the label contained in the `Message`. This might fail,
113    /// typically if we are trying to get a label of the wrong type. In case of
114    /// failure, the result contains `self`, hence the message is not lost.
115    ///
116    /// # Errors
117    ///
118    /// Returns `Err(self)` if the message does not contain a label of type `L`.
119    fn downcast(self) -> Result<L, Self>;
120}
121
122impl<L: 'static> Message<L> for Box<dyn Any> {
123    fn upcast(label: L) -> Self {
124        Box::new(label)
125    }
126
127    fn downcast(self) -> Result<L, Self> {
128        self.downcast().map(|label| *label)
129    }
130}
131
132impl<L: marker::Send + 'static> Message<L> for Box<dyn Any + marker::Send> {
133    fn upcast(label: L) -> Self {
134        Box::new(label)
135    }
136
137    fn downcast(self) -> Result<L, Self> {
138        self.downcast().map(|label| *label)
139    }
140}
141
142impl<L: marker::Send + Sync + 'static> Message<L> for Box<dyn Any + marker::Send + Sync> {
143    fn upcast(label: L) -> Self {
144        Box::new(label)
145    }
146
147    fn downcast(self) -> Result<L, Self> {
148        self.downcast().map(|label| *label)
149    }
150}
151
152/// A participant in a session-typed protocol.
153///
154/// Roles define the message type and provide routing to other participants.
155pub trait Role {
156    /// The message type exchanged by this role.
157    type Message;
158
159    /// Seal all routes for this role, preventing further communication.
160    fn seal(&mut self);
161
162    /// Check if this role has been sealed.
163    fn is_sealed(&self) -> bool;
164}
165
166/// Provides a route to another role for communication.
167///
168/// A role implements `Route<R>` for each peer role R it can communicate with.
169pub trait Route<R>: Role + Sized {
170    /// The channel type used to communicate with role R.
171    type Route;
172
173    /// Get a mutable reference to the route for sending/receiving messages.
174    fn route(&mut self) -> &mut Self::Route;
175}
176
177/// This structure is mainly a placeholder for a `Role` and for types.
178/// Typically, each each state (in the sense of automata state) of the protocol,
179/// e.g. a `Send`, a `Receive`, etc, contains a `State`, as well as some type
180/// bounds. When an action is taken (e.g. when `send` is called on a `Send`),
181/// the `Send` will take it state and convert it into the continuation.
182pub struct State<'r, R: Role> {
183    role: &'r mut R,
184}
185
186impl<'r, R: Role> State<'r, R> {
187    #[inline]
188    fn new(role: &'r mut R) -> Self {
189        Self { role }
190    }
191}
192
193/// Trait for session types that can be constructed from a state.
194///
195/// All session types (Send, Receive, Select, Branch, End) implement this.
196pub trait FromState<'r> {
197    /// The role type this session state is for.
198    type Role: Role;
199
200    /// Construct this session state from the given state.
201    fn from_state(state: State<'r, Self::Role>) -> Self;
202}
203
204/// Marker trait for session types in a protocol.
205pub trait Session<'r>: FromState<'r> + private::Session {}
206
207/// Trait for types that can be converted into a session.
208pub trait IntoSession<'r>: FromState<'r> {
209    /// The session type to convert into.
210    type Session: Session<'r, Role = Self::Role>;
211
212    /// Convert this value into a session.
213    fn into_session(self) -> Self::Session;
214}
215
216/// This structure represents a terminated protocol.
217pub struct End<'r, R: Role> {
218    state: State<'r, R>,
219}
220
221impl<'r, R: Role> FromState<'r> for End<'r, R> {
222    type Role = R;
223
224    #[inline]
225    fn from_state(state: State<'r, Self::Role>) -> Self {
226        Self { state }
227    }
228}
229
230impl<R: Role> End<'_, R> {
231    /// Consume the End state and seal the role
232    pub fn seal(self) {
233        self.state.role.seal();
234    }
235}
236
237impl<R: Role> Drop for End<'_, R> {
238    fn drop(&mut self) {
239        // Seal the role when End is dropped
240        self.state.role.seal();
241    }
242}
243
244impl<R: Role> private::Session for End<'_, R> {}
245
246impl<'r, R: Role> Session<'r> for End<'r, R> {}
247
248/// This structure represents a protocol which next action is to send.
249pub struct Send<'q, Q: Role, R, L, S: FromState<'q, Role = Q>> {
250    state: State<'q, Q>,
251    phantom: PhantomData<(R, L, S)>,
252}
253
254impl<'q, Q: Role, R, L, S: FromState<'q, Role = Q>> FromState<'q> for Send<'q, Q, R, L, S> {
255    type Role = Q;
256
257    #[inline]
258    fn from_state(state: State<'q, Self::Role>) -> Self {
259        Self {
260            state,
261            phantom: PhantomData,
262        }
263    }
264}
265
266impl<'q, Q: Route<R>, R, L, S: FromState<'q, Role = Q>> Send<'q, Q, R, L, S>
267where
268    Q::Message: Message<L>,
269    Q::Route: Sink<Q::Message> + Unpin,
270{
271    /// Sends a message to the peer role.
272    ///
273    /// # Errors
274    ///
275    /// Returns `Err(SessionError::Sealed)` if the session has been sealed.
276    /// Returns `Err(SessionError::Channel(_))` if the underlying channel fails.
277    #[inline]
278    pub async fn send(self, label: L) -> Result<S, SendError<Q, R>> {
279        if self.state.role.is_sealed() {
280            return Err(SessionError::Sealed);
281        }
282        self.state
283            .role
284            .route()
285            .send(Message::upcast(label))
286            .await
287            .map_err(SessionError::Channel)?;
288        Ok(FromState::from_state(self.state))
289    }
290}
291
292impl<'q, Q: Role, R, L, S: FromState<'q, Role = Q>> private::Session for Send<'q, Q, R, L, S> {}
293
294impl<'q, Q: Role, R, L, S: FromState<'q, Role = Q>> Session<'q> for Send<'q, Q, R, L, S> {}
295
296/// This structure represents a protocol which next action is to receive .
297pub struct Receive<'q, Q: Role, R, L, S: FromState<'q, Role = Q>> {
298    state: State<'q, Q>,
299    phantom: PhantomData<(R, L, S)>,
300}
301
302impl<'q, Q: Role, R, L, S: FromState<'q, Role = Q>> FromState<'q> for Receive<'q, Q, R, L, S> {
303    type Role = Q;
304
305    #[inline]
306    fn from_state(state: State<'q, Self::Role>) -> Self {
307        Self {
308            state,
309            phantom: PhantomData,
310        }
311    }
312}
313
314impl<'q, Q: Route<R>, R, L, S: FromState<'q, Role = Q>> Receive<'q, Q, R, L, S>
315where
316    Q::Message: Message<L>,
317    Q::Route: Stream<Item = Q::Message> + Unpin,
318{
319    /// Receives a message from the peer role.
320    ///
321    /// # Errors
322    ///
323    /// Returns `Err(ReceiveError::Sealed)` if the session has been sealed.
324    /// Returns `Err(ReceiveError::EmptyStream)` if the channel has no messages.
325    /// Returns `Err(ReceiveError::UnexpectedType)` if the message type is wrong.
326    #[inline]
327    pub async fn receive(self) -> Result<(L, S), ReceiveError> {
328        if self.state.role.is_sealed() {
329            return Err(ReceiveError::Sealed);
330        }
331        let message = self.state.role.route().next().await;
332        let message = message.ok_or(ReceiveError::EmptyStream)?;
333        let label = message.downcast().or(Err(ReceiveError::UnexpectedType))?;
334        Ok((label, FromState::from_state(self.state)))
335    }
336}
337
338impl<'q, Q: Role, R, L, S: FromState<'q, Role = Q>> private::Session for Receive<'q, Q, R, L, S> {}
339
340impl<'q, Q: Role, R, L, S: FromState<'q, Role = Q>> Session<'q> for Receive<'q, Q, R, L, S> {}
341
342/// Maps a choice label to its continuation session type.
343pub trait Choice<'r, L> {
344    /// The session type to continue with after selecting this label.
345    type Session: FromState<'r>;
346}
347
348/// A protocol state where this role selects one of several branches.
349///
350/// The choice is sent to role R, and C determines the continuation for each label.
351pub struct Select<'q, Q: Role, R, C> {
352    state: State<'q, Q>,
353    phantom: PhantomData<(R, C)>,
354}
355
356impl<'q, Q: Role, R, C> FromState<'q> for Select<'q, Q, R, C> {
357    type Role = Q;
358
359    #[inline]
360    fn from_state(state: State<'q, Self::Role>) -> Self {
361        Self {
362            state,
363            phantom: PhantomData,
364        }
365    }
366}
367
368impl<'q, Q: Route<R>, R, C> Select<'q, Q, R, C>
369where
370    Q::Route: Sink<Q::Message> + Unpin,
371{
372    /// Selects a branch to take in a choice, sending the label to the peer.
373    ///
374    /// # Errors
375    ///
376    /// Returns `Err(SessionError::Sealed)` if the session has been sealed.
377    /// Returns `Err(SessionError::Channel(_))` if the underlying channel fails.
378    #[inline]
379    pub async fn select<L>(self, label: L) -> Result<<C as Choice<'q, L>>::Session, SendError<Q, R>>
380    where
381        Q::Message: Message<L>,
382        C: Choice<'q, L>,
383        C::Session: FromState<'q, Role = Q>,
384    {
385        if self.state.role.is_sealed() {
386            return Err(SessionError::Sealed);
387        }
388        self.state
389            .role
390            .route()
391            .send(Message::upcast(label))
392            .await
393            .map_err(SessionError::Channel)?;
394        Ok(FromState::from_state(self.state))
395    }
396}
397
398impl<Q: Role, R, C> private::Session for Select<'_, Q, R, C> {}
399
400impl<'q, Q: Role, R, C> Session<'q> for Select<'q, Q, R, C> {}
401
402/// Trait for an enum of possible branch choices.
403///
404/// Implemented by generated choice enums to dispatch incoming messages.
405pub trait Choices<'r>: Sized {
406    /// The role type this choice set is for.
407    type Role: Role;
408
409    /// Attempts to downcast a message into one of the available choices.
410    ///
411    /// # Errors
412    ///
413    /// Returns `Err(message)` if the message does not match any choice variant.
414    fn downcast(
415        state: State<'r, Self::Role>,
416        message: <Self::Role as Role>::Message,
417    ) -> Result<Self, <Self::Role as Role>::Message>;
418}
419
420/// A protocol state where this role receives one of several branch choices.
421///
422/// The choice is received from role R, and C is the enum of possible continuations.
423pub struct Branch<'q, Q: Role, R, C> {
424    state: State<'q, Q>,
425    phantom: PhantomData<(R, C)>,
426}
427
428impl<'q, Q: Role, R, C> FromState<'q> for Branch<'q, Q, R, C> {
429    type Role = Q;
430
431    #[inline]
432    fn from_state(state: State<'q, Self::Role>) -> Self {
433        Self {
434            state,
435            phantom: PhantomData,
436        }
437    }
438}
439
440impl<'q, Q: Route<R>, R, C: Choices<'q, Role = Q>> Branch<'q, Q, R, C>
441where
442    Q::Route: Stream<Item = Q::Message> + Unpin,
443{
444    /// Waits for a message and branches based on which choice is received.
445    ///
446    /// # Errors
447    ///
448    /// Returns `Err(ReceiveError::Sealed)` if the session has been sealed.
449    /// Returns `Err(ReceiveError::EmptyStream)` if the channel has no messages.
450    /// Returns `Err(ReceiveError::UnexpectedType)` if the message type is wrong.
451    #[inline]
452    pub async fn branch(self) -> Result<C, ReceiveError> {
453        if self.state.role.is_sealed() {
454            return Err(ReceiveError::Sealed);
455        }
456        let message = self.state.role.route().next().await;
457        let message = message.ok_or(ReceiveError::EmptyStream)?;
458        let choice = C::downcast(self.state, message);
459        choice.or(Err(ReceiveError::UnexpectedType))
460    }
461}
462
463impl<Q: Role, R, C> private::Session for Branch<'_, Q, R, C> {}
464
465impl<'q, Q: Role, R, C> Session<'q> for Branch<'q, Q, R, C> {}
466
467/// Guard that ensures proper session cleanup and detects protocol violations
468struct SessionGuard {
469    completed: bool,
470}
471
472impl SessionGuard {
473    fn new() -> Self {
474        Self { completed: false }
475    }
476
477    fn mark_completed(&mut self) {
478        self.completed = true;
479    }
480}
481
482impl Drop for SessionGuard {
483    fn drop(&mut self) {
484        if !self.completed {
485            // In debug mode, panic if the session was not properly completed
486            #[cfg(debug_assertions)]
487            {
488                assert!(
489                    std::thread::panicking(),
490                    "Session dropped without completing! This indicates a protocol violation."
491                );
492            }
493        }
494    }
495}
496
497/// Run a session-typed protocol that cannot fail.
498///
499/// This is a convenience wrapper around [`try_session`] for infallible protocols.
500#[inline]
501pub async fn session<'r, R: Role, S: FromState<'r, Role = R>, T, F>(
502    role: &'r mut R,
503    f: impl FnOnce(S) -> F,
504) -> T
505where
506    F: Future<Output = (T, End<'r, R>)>,
507{
508    let output = try_session(role, |s| f(s).map(Ok)).await;
509    output.unwrap_or_else(|infallible: Infallible| match infallible {})
510}
511
512/// Runs a session that may fail, returning a result.
513///
514/// # Errors
515///
516/// Returns the error from the session function `f` if it fails.
517#[inline]
518pub async fn try_session<'r, R: Role, S: FromState<'r, Role = R>, T, E, F>(
519    role: &'r mut R,
520    f: impl FnOnce(S) -> F,
521) -> Result<T, E>
522where
523    F: Future<Output = Result<(T, End<'r, R>), E>>,
524{
525    let mut guard = SessionGuard::new();
526    let session = FromState::from_state(State::new(role));
527    let result = f(session).await;
528
529    if result.is_ok() {
530        guard.mark_completed();
531    }
532
533    // End will seal the role when dropped
534    result.map(|(output, _)| output)
535}
536
537mod private {
538    pub trait Session {}
539}
540
541#[cfg(test)]
542mod channel_test;