#![allow(clippy::must_use_candidate)]
#![allow(clippy::manual_assert)]
#![allow(clippy::wildcard_imports)]
#![allow(clippy::type_complexity)]
#![allow(clippy::used_underscore_binding)]
use crate::channel::mpsc;
use crate::cx::Cx;
use crate::record::ObligationKind;
use std::future::Future;
use std::marker::PhantomData;
pub struct End;
pub struct Send<T, S> {
_t: PhantomData<T>,
_s: PhantomData<S>,
}
pub struct Recv<T, S> {
_t: PhantomData<T>,
_s: PhantomData<S>,
}
pub struct Select<A, B> {
_a: PhantomData<A>,
_b: PhantomData<B>,
}
pub struct Offer<A, B> {
_a: PhantomData<A>,
_b: PhantomData<B>,
}
pub struct Rec<F> {
_f: PhantomData<F>,
}
pub struct Var;
pub struct Initiator;
pub struct Responder;
pub(super) struct SessionTransport {
tx: mpsc::Sender<Box<dyn std::any::Any + std::marker::Send>>,
rx: mpsc::Receiver<Box<dyn std::any::Any + std::marker::Send>>,
}
#[must_use = "session channel must be driven to End; dropping mid-protocol leaks the obligation"]
pub struct Chan<R, S> {
channel_id: u64,
obligation_kind: ObligationKind,
closed: bool,
transport: Option<SessionTransport>,
_marker: PhantomData<(R, S)>,
}
impl<R, S> Chan<R, S> {
fn new_raw(channel_id: u64, obligation_kind: ObligationKind) -> Self {
Self {
channel_id,
obligation_kind,
closed: false,
transport: None,
_marker: PhantomData,
}
}
#[allow(dead_code)] fn new_with_transport(
channel_id: u64,
obligation_kind: ObligationKind,
transport: SessionTransport,
) -> Self {
Self {
channel_id,
obligation_kind,
closed: false,
transport: Some(transport),
_marker: PhantomData,
}
}
#[must_use]
pub fn is_transport_backed(&self) -> bool {
self.transport.is_some()
}
pub fn channel_id(&self) -> u64 {
self.channel_id
}
pub fn obligation_kind(&self) -> ObligationKind {
self.obligation_kind
}
fn take_transport_or_fail_closed(&mut self) -> Result<SessionTransport, SessionError> {
self.transport.take().ok_or(SessionError::NoTransport)
}
fn transition<S2>(mut self) -> Chan<R, S2> {
let channel_id = self.channel_id;
let obligation_kind = self.obligation_kind;
let transport = self.transport.take();
self.closed = true;
Chan {
channel_id,
obligation_kind,
closed: false,
transport,
_marker: PhantomData,
}
}
#[cfg(any(test, feature = "test-internals"))]
pub fn disarm_for_test(mut self) {
self.closed = true;
}
}
impl<R, T, S> Chan<R, Send<T, S>> {
pub fn send(self, _value: T) -> Chan<R, S> {
self.transition()
}
}
fn map_transport_send_error<T>(error: &mpsc::SendError<T>) -> SessionError {
match error {
mpsc::SendError::Disconnected(_) => SessionError::Closed,
mpsc::SendError::Cancelled(_) => SessionError::Cancelled,
mpsc::SendError::Full(_) => {
debug_assert!(
false,
"transport-backed session send unexpectedly returned SendError::Full"
);
SessionError::Closed
}
}
}
impl<R, T: std::marker::Send + 'static, S> Chan<R, Send<T, S>> {
pub fn send_async<'a>(
mut self,
cx: &'a Cx,
value: T,
) -> impl Future<Output = Result<Chan<R, S>, SessionError>> + 'a
where
R: 'a,
S: 'a,
{
self.closed = true;
async move {
let transport = self.take_transport_or_fail_closed()?;
let boxed = Box::new(value) as Box<dyn std::any::Any + std::marker::Send>;
if let Err(error) = transport.tx.send(cx, boxed).await {
return Err(map_transport_send_error(&error));
}
self.transport = Some(transport);
self.closed = false;
Ok(self.transition())
}
}
}
impl<R, T, S> Chan<R, Recv<T, S>> {
pub fn recv(self, value: T) -> (T, Chan<R, S>) {
(value, self.transition())
}
}
impl<R, T: std::marker::Send + 'static, S> Chan<R, Recv<T, S>> {
pub fn recv_async<'a>(
mut self,
cx: &'a Cx,
) -> impl Future<Output = Result<(T, Chan<R, S>), SessionError>> + 'a
where
R: 'a,
S: 'a,
T: 'a,
{
self.closed = true;
async move {
let mut transport = self.take_transport_or_fail_closed()?;
let boxed = match transport.rx.recv(cx).await {
Ok(boxed) => boxed,
Err(error) => {
return Err(map_transport_recv_error(error));
}
};
let Ok(value) = boxed.downcast::<T>() else {
return Err(SessionError::ProtocolViolation {
expected: std::any::type_name::<T>(),
actual: "unknown (downcast failed)",
});
};
self.transport = Some(transport);
self.closed = false;
Ok((*value, self.transition()))
}
}
}
fn map_transport_recv_error(error: mpsc::RecvError) -> SessionError {
match error {
mpsc::RecvError::Disconnected => SessionError::Closed,
mpsc::RecvError::Cancelled => SessionError::Cancelled,
mpsc::RecvError::Empty => {
debug_assert!(
false,
"transport-backed session recv unexpectedly returned RecvError::Empty"
);
SessionError::Closed
}
}
}
pub enum Selected<A, B> {
Left(A),
Right(B),
}
impl<R, A, B> Chan<R, Select<A, B>> {
pub fn select_left(self) -> Chan<R, A> {
self.transition()
}
pub fn select_right(self) -> Chan<R, B> {
self.transition()
}
pub fn select_left_async<'a>(
mut self,
cx: &'a Cx,
) -> impl Future<Output = Result<Chan<R, A>, SessionError>> + 'a
where
R: 'a,
A: 'a,
B: 'a,
{
self.closed = true;
async move {
let transport = self.take_transport_or_fail_closed()?;
let branch = Box::new(Branch::Left) as Box<dyn std::any::Any + std::marker::Send>;
if let Err(error) = transport.tx.send(cx, branch).await {
return Err(map_transport_send_error(&error));
}
self.transport = Some(transport);
self.closed = false;
Ok(self.transition())
}
}
pub fn select_right_async<'a>(
mut self,
cx: &'a Cx,
) -> impl Future<Output = Result<Chan<R, B>, SessionError>> + 'a
where
R: 'a,
A: 'a,
B: 'a,
{
self.closed = true;
async move {
let transport = self.take_transport_or_fail_closed()?;
let branch = Box::new(Branch::Right) as Box<dyn std::any::Any + std::marker::Send>;
if let Err(error) = transport.tx.send(cx, branch).await {
return Err(map_transport_send_error(&error));
}
self.transport = Some(transport);
self.closed = false;
Ok(self.transition())
}
}
}
impl<R, A, B> Chan<R, Offer<A, B>> {
pub fn offer(self, choice: Branch) -> Selected<Chan<R, A>, Chan<R, B>> {
match choice {
Branch::Left => Selected::Left(self.transition()),
Branch::Right => Selected::Right(self.transition()),
}
}
pub fn offer_async<'a>(
mut self,
cx: &'a Cx,
) -> impl Future<Output = Result<Selected<Chan<R, A>, Chan<R, B>>, SessionError>> + 'a
where
R: 'a,
A: 'a,
B: 'a,
{
self.closed = true;
async move {
let mut transport = self.take_transport_or_fail_closed()?;
let boxed = match transport.rx.recv(cx).await {
Ok(boxed) => boxed,
Err(error) => {
return Err(map_transport_recv_error(error));
}
};
let Ok(branch) = boxed.downcast::<Branch>() else {
return Err(SessionError::ProtocolViolation {
expected: "Branch (Left/Right)",
actual: "unknown (downcast failed)",
});
};
self.transport = Some(transport);
self.closed = false;
let branch = *branch;
match branch {
Branch::Left => Ok(Selected::Left(self.transition())),
Branch::Right => Ok(Selected::Right(self.transition())),
}
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum Branch {
Left,
Right,
}
#[derive(Debug)]
pub struct SessionProof {
pub channel_id: u64,
pub obligation_kind: ObligationKind,
}
impl<R> Chan<R, End> {
pub fn close(mut self) -> SessionProof {
self.closed = true;
SessionProof {
channel_id: self.channel_id,
obligation_kind: self.obligation_kind,
}
}
}
impl<R, S> Drop for Chan<R, S> {
fn drop(&mut self) {
if !self.closed {
if std::thread::panicking() {
return;
}
#[cfg(debug_assertions)]
panic!(
"SESSION LEAKED: channel {} ({}) dropped without reaching End state",
self.channel_id, self.obligation_kind,
);
#[cfg(not(debug_assertions))]
crate::tracing_compat::error!(
channel_id = %self.channel_id,
obligation_kind = %self.obligation_kind,
"SESSION LEAKED: dropped without reaching End state"
);
}
}
}
pub fn new_transport_pair<IS, RS>(
channel_id: u64,
obligation_kind: ObligationKind,
buffer: usize,
) -> (Chan<Initiator, IS>, Chan<Responder, RS>) {
let (tx_i2r, rx_i2r) = mpsc::channel::<Box<dyn std::any::Any + std::marker::Send>>(buffer);
let (tx_r2i, rx_r2i) = mpsc::channel::<Box<dyn std::any::Any + std::marker::Send>>(buffer);
(
Chan::new_with_transport(
channel_id,
obligation_kind,
SessionTransport {
tx: tx_i2r,
rx: rx_r2i,
},
),
Chan::new_with_transport(
channel_id,
obligation_kind,
SessionTransport {
tx: tx_r2i,
rx: rx_i2r,
},
),
)
}
#[cfg(feature = "proc-macros")]
asupersync_macros::session_protocol! {
send_permit<T> for SendPermit {
msg ReserveMsg;
msg AbortMsg;
send ReserveMsg => select {
send T => end,
send AbortMsg => end,
}
}
}
#[cfg(feature = "proc-macros")]
pub mod send_permit_compat {
pub use super::send_permit::InitiatorSession as SenderSession;
pub use super::send_permit::ResponderSession as ReceiverSession;
}
#[cfg(not(feature = "proc-macros"))]
pub mod send_permit {
use super::{Chan, End, Initiator, Offer, Recv, Responder, Select, Send, SessionTransport};
use crate::channel::mpsc;
use crate::record::ObligationKind;
pub struct ReserveMsg;
pub struct AbortMsg;
pub type SenderSession<T> = Send<ReserveMsg, Select<Send<T, End>, Send<AbortMsg, End>>>;
pub type InitiatorSession<T> = SenderSession<T>;
pub type ReceiverSession<T> = Recv<ReserveMsg, Offer<Recv<T, End>, Recv<AbortMsg, End>>>;
pub type ResponderSession<T> = ReceiverSession<T>;
pub fn new_session<T>(
channel_id: u64,
) -> (
Chan<Initiator, SenderSession<T>>,
Chan<Responder, ReceiverSession<T>>,
) {
(
Chan::new_raw(channel_id, ObligationKind::SendPermit),
Chan::new_raw(channel_id, ObligationKind::SendPermit),
)
}
pub fn new_session_with_transport<T>(
channel_id: u64,
buffer: usize,
) -> (
Chan<Initiator, SenderSession<T>>,
Chan<Responder, ReceiverSession<T>>,
) {
let (tx_i2r, rx_i2r) = mpsc::channel::<Box<dyn std::any::Any + std::marker::Send>>(buffer);
let (tx_r2i, rx_r2i) = mpsc::channel::<Box<dyn std::any::Any + std::marker::Send>>(buffer);
(
Chan::new_with_transport(
channel_id,
ObligationKind::SendPermit,
SessionTransport {
tx: tx_i2r,
rx: rx_r2i,
},
),
Chan::new_with_transport(
channel_id,
ObligationKind::SendPermit,
SessionTransport {
tx: tx_r2i,
rx: rx_i2r,
},
),
)
}
}
#[cfg(not(feature = "proc-macros"))]
pub mod send_permit_compat {
pub use super::send_permit::ReceiverSession;
pub use super::send_permit::SenderSession;
}
#[cfg(feature = "proc-macros")]
asupersync_macros::session_protocol! {
lease for Lease {
msg AcquireMsg;
msg RenewMsg;
msg ReleaseMsg;
send AcquireMsg => loop {
select {
send RenewMsg => continue,
send ReleaseMsg => end,
}
}
}
}
#[cfg(feature = "proc-macros")]
pub mod lease_compat {
pub use super::lease::InitiatorLoop as HolderLoop;
pub use super::lease::InitiatorSession as HolderSession;
pub use super::lease::ResponderLoop as ResourceLoop;
pub use super::lease::ResponderSession as ResourceSession;
}
#[cfg(not(feature = "proc-macros"))]
pub mod lease {
use super::{Chan, End, Initiator, Offer, Recv, Responder, Select, Send, SessionTransport};
use crate::record::ObligationKind;
pub struct AcquireMsg;
pub struct RenewMsg;
pub struct ReleaseMsg;
pub type HolderLoop = Select<Send<RenewMsg, End>, Send<ReleaseMsg, End>>;
pub type InitiatorLoop = HolderLoop;
pub type HolderSession = Send<AcquireMsg, HolderLoop>;
pub type InitiatorSession = HolderSession;
pub type ResourceLoop = Offer<Recv<RenewMsg, End>, Recv<ReleaseMsg, End>>;
pub type ResponderLoop = ResourceLoop;
pub type ResourceSession = Recv<AcquireMsg, ResourceLoop>;
pub type ResponderSession = ResourceSession;
pub fn new_session(
channel_id: u64,
) -> (
Chan<Initiator, HolderSession>,
Chan<Responder, ResourceSession>,
) {
(
Chan::new_raw(channel_id, ObligationKind::Lease),
Chan::new_raw(channel_id, ObligationKind::Lease),
)
}
pub fn new_session_with_transport(
channel_id: u64,
buffer: usize,
) -> (
Chan<Initiator, HolderSession>,
Chan<Responder, ResourceSession>,
) {
let (tx_i2r, rx_i2r) =
crate::channel::mpsc::channel::<Box<dyn std::any::Any + std::marker::Send>>(buffer);
let (tx_r2i, rx_r2i) =
crate::channel::mpsc::channel::<Box<dyn std::any::Any + std::marker::Send>>(buffer);
(
Chan::new_with_transport(
channel_id,
ObligationKind::Lease,
SessionTransport {
tx: tx_i2r,
rx: rx_r2i,
},
),
Chan::new_with_transport(
channel_id,
ObligationKind::Lease,
SessionTransport {
tx: tx_r2i,
rx: rx_i2r,
},
),
)
}
pub fn renew_loop(
channel_id: u64,
) -> (Chan<Initiator, HolderLoop>, Chan<Responder, ResourceLoop>) {
(
Chan::new_raw(channel_id, ObligationKind::Lease),
Chan::new_raw(channel_id, ObligationKind::Lease),
)
}
}
#[cfg(not(feature = "proc-macros"))]
pub mod lease_compat {
pub use super::lease::HolderLoop;
pub use super::lease::HolderSession;
pub use super::lease::ResourceLoop;
pub use super::lease::ResourceSession;
}
#[cfg(feature = "proc-macros")]
asupersync_macros::session_protocol! {
two_phase(kind: ObligationKind) {
msg ReserveMsg { kind: ObligationKind };
msg CommitMsg;
msg AbortMsg { reason: String };
send ReserveMsg => select {
send CommitMsg => end,
send AbortMsg => end,
}
}
}
#[cfg(feature = "proc-macros")]
pub mod two_phase_compat {
pub use super::two_phase::ResponderSession as ExecutorSession;
}
#[cfg(not(feature = "proc-macros"))]
pub mod two_phase {
use super::{Chan, End, Initiator, Offer, Recv, Responder, Select, Send};
use crate::record::ObligationKind;
#[derive(Debug, Clone)]
pub struct ReserveMsg {
pub kind: ObligationKind,
}
pub struct CommitMsg;
#[derive(Debug, Clone)]
pub struct AbortMsg {
pub reason: String,
}
pub type InitiatorSession = Send<ReserveMsg, Select<Send<CommitMsg, End>, Send<AbortMsg, End>>>;
pub type ExecutorSession = Recv<ReserveMsg, Offer<Recv<CommitMsg, End>, Recv<AbortMsg, End>>>;
pub type ResponderSession = ExecutorSession;
pub fn new_session(
channel_id: u64,
kind: ObligationKind,
) -> (
Chan<Initiator, InitiatorSession>,
Chan<Responder, ExecutorSession>,
) {
(
Chan::new_raw(channel_id, kind),
Chan::new_raw(channel_id, kind),
)
}
}
#[cfg(not(feature = "proc-macros"))]
pub mod two_phase_compat {
pub use super::two_phase::ExecutorSession;
}
pub mod delegation {
use super::{Chan, End, Initiator, Recv, Responder, Send};
use crate::record::ObligationKind;
pub type DelegatorSession<R, S> = Send<Chan<R, S>, End>;
pub type DelegateeSession<R, S> = Recv<Chan<R, S>, End>;
pub type DelegationPair<R, S> = (
Chan<Initiator, DelegatorSession<R, S>>,
Chan<Responder, DelegateeSession<R, S>>,
);
#[allow(clippy::type_complexity)]
pub fn new_delegation<R, S>(
channel_id: u64,
obligation_kind: ObligationKind,
) -> DelegationPair<R, S> {
(
Chan::new_raw(channel_id, obligation_kind),
Chan::new_raw(channel_id, obligation_kind),
)
}
}
pub struct TracingContract;
#[allow(dead_code)] pub struct TransportBridgeContract;
#[derive(Debug, Clone, PartialEq, Eq)]
#[allow(dead_code)] pub enum SessionError {
Cancelled,
Closed,
ProtocolViolation {
expected: &'static str,
actual: &'static str,
},
NoTransport,
}
impl std::fmt::Display for SessionError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Cancelled => write!(f, "session cancelled"),
Self::Closed => write!(f, "session peer closed"),
Self::ProtocolViolation { expected, actual } => {
write!(f, "protocol violation: expected {expected}, got {actual}")
}
Self::NoTransport => write!(f, "async operation on non-transport-backed channel"),
}
}
}
impl std::error::Error for SessionError {}
const DOC_COMPILE_FAIL_SURFACE: &str = "compile-fail doctests: src/obligation/session_types.rs";
const MIGRATION_INTEGRATION_SURFACE: &str =
"typed/dynamic migration surface: tests/session_type_obligations.rs";
const MIGRATION_GUIDE_SURFACE: &str = "migration guide: docs/integration.md";
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct SessionProtocolAdoptionSpec {
pub protocol_id: &'static str,
pub typed_entrypoint: &'static str,
pub dynamic_surface: &'static str,
pub states: &'static [&'static str],
pub transitions: &'static [&'static str],
pub compile_time_constraints: &'static [&'static str],
pub runtime_oracles: &'static [&'static str],
pub migration_test_surfaces: &'static [&'static str],
pub diagnostics_fields: &'static [&'static str],
pub initial_rollout_scope: &'static str,
pub avoid_for_now: &'static [&'static str],
}
impl SessionProtocolAdoptionSpec {
pub const fn send_permit() -> Self {
Self {
protocol_id: "send_permit",
typed_entrypoint: "asupersync::obligation::session_types::send_permit::new_session",
dynamic_surface: "channel reserve/send-or-abort flows plus asupersync::obligation::ledger::ObligationLedger::{acquire, commit, abort}",
states: &["Reserve", "Select<Send,Abort>", "End"],
transitions: &[
"send(ReserveMsg)",
"select_left() + send(T)",
"select_right() + send(AbortMsg)",
"close()",
],
compile_time_constraints: &[
"payload send is impossible before Reserve",
"exactly one terminal branch (Send or Abort) is consumed",
"the endpoint is linearly moved on every transition",
"delegation transfers ownership of the protocol endpoint instead of cloning it",
],
runtime_oracles: &[
"src/obligation/ledger.rs",
"src/obligation/marking.rs",
"src/obligation/no_leak_proof.rs",
"src/obligation/separation_logic.rs",
],
migration_test_surfaces: &[
DOC_COMPILE_FAIL_SURFACE,
MIGRATION_INTEGRATION_SURFACE,
MIGRATION_GUIDE_SURFACE,
],
diagnostics_fields: &[
"channel_id",
"from_state",
"to_state",
"trace_id",
"obligation_kind",
"protocol",
"transition",
],
initial_rollout_scope: "two-phase send/reserve paths that already resolve a SendPermit explicitly",
avoid_for_now: &[
"ambient channel wrappers that hide reserve/abort boundaries",
"surfaces that depend on implicit Drop-based cleanup instead of explicit resolution",
],
}
}
pub const fn lease() -> Self {
Self {
protocol_id: "lease",
typed_entrypoint: "asupersync::obligation::session_types::lease::new_session",
dynamic_surface: "lease-backed registry/resource flows such as asupersync::cx::NameLease plus ledger-backed Lease obligations",
states: &["Acquire", "HolderLoop<Renew|Release>", "End"],
transitions: &[
"send(AcquireMsg)",
"select_left() + send(RenewMsg)",
"select_right() + send(ReleaseMsg)",
"close()",
],
compile_time_constraints: &[
"Acquire must happen before Renew or Release",
"Renew and Release are mutually exclusive per loop iteration",
"Release is terminal and cannot be followed by another Renew",
"delegated lease endpoints preserve a single holder at the type level",
],
runtime_oracles: &[
"src/cx/registry.rs",
"src/obligation/ledger.rs",
"src/obligation/marking.rs",
"src/obligation/separation_logic.rs",
],
migration_test_surfaces: &[
DOC_COMPILE_FAIL_SURFACE,
MIGRATION_INTEGRATION_SURFACE,
MIGRATION_GUIDE_SURFACE,
],
diagnostics_fields: &[
"channel_id",
"from_state",
"to_state",
"trace_id",
"obligation_kind",
"protocol",
"transition",
],
initial_rollout_scope: "lease-backed naming/resource lifecycles with a single obvious holder and explicit release path",
avoid_for_now: &[
"multi-party renewal protocols without a single delegation owner",
"surfaces that currently encode renewal via ad hoc timers or hidden retries",
],
}
}
pub const fn two_phase() -> Self {
Self {
protocol_id: "two_phase",
typed_entrypoint: "asupersync::obligation::session_types::two_phase::new_session",
dynamic_surface: "two-phase reserve/commit-or-abort effects backed by asupersync::obligation::ledger::ObligationLedger::{acquire, commit, abort}",
states: &["Reserve(K)", "Select<Commit,Abort>", "End"],
transitions: &[
"send(ReserveMsg)",
"select_left() + send(CommitMsg)",
"select_right() + send(AbortMsg)",
"close()",
],
compile_time_constraints: &[
"Commit and Abort are mutually exclusive after Reserve",
"kind-specific reserve state cannot be skipped",
"terminal Commit or Abort consumes the endpoint",
"delegation keeps the reserved effect linear across task handoff",
],
runtime_oracles: &[
"src/obligation/ledger.rs",
"src/obligation/dialectica.rs",
"src/obligation/no_aliasing_proof.rs",
"src/obligation/separation_logic.rs",
],
migration_test_surfaces: &[
DOC_COMPILE_FAIL_SURFACE,
MIGRATION_INTEGRATION_SURFACE,
MIGRATION_GUIDE_SURFACE,
],
diagnostics_fields: &[
"channel_id",
"from_state",
"to_state",
"trace_id",
"obligation_kind",
"protocol",
"transition",
],
initial_rollout_scope: "small reserve/commit APIs where the effect boundary is already explicit and the fallback remains the ledger",
avoid_for_now: &[
"open-ended effect pipelines that cross opaque adapter boundaries",
"surfaces that require polymorphic branching beyond Commit or Abort in the first rollout",
],
}
}
}
#[must_use]
pub fn session_protocol_adoption_specs() -> Vec<SessionProtocolAdoptionSpec> {
vec![
SessionProtocolAdoptionSpec::send_permit(),
SessionProtocolAdoptionSpec::lease(),
SessionProtocolAdoptionSpec::two_phase(),
]
}
#[cfg(test)]
mod tests {
use super::*;
use crate::record::ObligationKind;
#[test]
fn send_permit_commit_path() {
let (sender, receiver) = send_permit::new_session::<String>(1);
let sender = sender.send(send_permit::ReserveMsg);
let sender = sender.select_left(); let sender = sender.send("hello".to_string());
let proof = sender.close();
assert_eq!(proof.channel_id, 1);
assert_eq!(proof.obligation_kind, ObligationKind::SendPermit);
let (_, receiver) = receiver.recv(send_permit::ReserveMsg);
match receiver.offer(Branch::Left) {
Selected::Left(ch) => {
let (msg, ch) = ch.recv("hello".to_string());
assert_eq!(msg, "hello");
let _proof = ch.close();
}
Selected::Right(_) => panic!("expected Left branch"),
}
}
#[test]
fn send_permit_abort_path() {
let (sender, receiver) = send_permit::new_session::<String>(2);
let sender = sender.send(send_permit::ReserveMsg);
let sender = sender.select_right(); let sender = sender.send(send_permit::AbortMsg);
let proof = sender.close();
assert_eq!(proof.channel_id, 2);
let (_, receiver) = receiver.recv(send_permit::ReserveMsg);
match receiver.offer(Branch::Right) {
Selected::Right(ch) => {
let (_, ch) = ch.recv(send_permit::AbortMsg);
let _proof = ch.close();
}
Selected::Left(_) => panic!("expected Right branch"),
}
}
#[test]
fn two_phase_commit_path() {
let (initiator, executor) = two_phase::new_session(3, ObligationKind::SendPermit);
let reserve_msg = two_phase::ReserveMsg {
kind: ObligationKind::SendPermit,
};
let initiator = initiator.send(reserve_msg.clone());
let initiator = initiator.select_left(); let initiator = initiator.send(two_phase::CommitMsg);
let proof = initiator.close();
assert_eq!(proof.obligation_kind, ObligationKind::SendPermit);
let (msg, executor) = executor.recv(reserve_msg);
assert_eq!(msg.kind, ObligationKind::SendPermit);
match executor.offer(Branch::Left) {
Selected::Left(ch) => {
let (_, ch) = ch.recv(two_phase::CommitMsg);
let _proof = ch.close();
}
Selected::Right(_) => panic!("expected Commit"),
}
}
#[test]
fn two_phase_abort_path() {
let (initiator, executor) = two_phase::new_session(4, ObligationKind::Lease);
let reserve_msg = two_phase::ReserveMsg {
kind: ObligationKind::Lease,
};
let initiator = initiator.send(reserve_msg.clone());
let initiator = initiator.select_right(); let abort_msg = two_phase::AbortMsg {
reason: "timeout".to_string(),
};
let initiator = initiator.send(abort_msg);
let proof = initiator.close();
assert_eq!(proof.obligation_kind, ObligationKind::Lease);
let (_, executor) = executor.recv(reserve_msg);
match executor.offer(Branch::Right) {
Selected::Right(ch) => {
let (msg, ch) = ch.recv(two_phase::AbortMsg {
reason: "timeout".to_string(),
});
assert_eq!(msg.reason, "timeout");
let _proof = ch.close();
}
Selected::Left(_) => panic!("expected Abort"),
}
}
#[test]
fn lease_acquire_and_release() {
let (holder, resource) = lease::new_session(5);
let holder = holder.send(lease::AcquireMsg);
let holder = holder.select_right(); let holder = holder.send(lease::ReleaseMsg);
let proof = holder.close();
assert_eq!(proof.obligation_kind, ObligationKind::Lease);
let (_, resource) = resource.recv(lease::AcquireMsg);
match resource.offer(Branch::Right) {
Selected::Right(ch) => {
let (_, ch) = ch.recv(lease::ReleaseMsg);
let _proof = ch.close();
}
Selected::Left(_) => panic!("expected Release"),
}
}
#[test]
fn lease_acquire_renew_release() {
let (holder, resource) = lease::new_session(6);
let holder = holder.send(lease::AcquireMsg);
let holder = holder.select_left(); let holder = holder.send(lease::RenewMsg);
let _proof_renew = holder.close();
let (holder2, resource2) = lease::renew_loop(6);
let holder2 = holder2.select_right(); let holder2 = holder2.send(lease::ReleaseMsg);
let proof = holder2.close();
assert_eq!(proof.obligation_kind, ObligationKind::Lease);
let (_, resource) = resource.recv(lease::AcquireMsg);
match resource.offer(Branch::Left) {
Selected::Left(ch) => {
let (_, ch) = ch.recv(lease::RenewMsg);
let _proof = ch.close();
}
Selected::Right(_) => panic!("expected Renew"),
}
match resource2.offer(Branch::Right) {
Selected::Right(ch) => {
let (_, ch) = ch.recv(lease::ReleaseMsg);
let _proof = ch.close();
}
Selected::Left(_) => panic!("expected Release"),
}
}
#[test]
fn session_protocol_adoption_specs_cover_priority_families() {
let specs = session_protocol_adoption_specs();
let ids = specs
.iter()
.map(|spec| spec.protocol_id)
.collect::<Vec<_>>();
assert_eq!(ids, vec!["send_permit", "lease", "two_phase"]);
assert!(
specs.iter().all(|spec| !spec.typed_entrypoint.is_empty()),
"typed entrypoints must be explicit"
);
assert!(
specs.iter().all(|spec| !spec.dynamic_surface.is_empty()),
"dynamic coexistence surfaces must be explicit"
);
}
#[test]
fn session_protocol_adoption_specs_document_oracles_and_migration_surfaces() {
for spec in session_protocol_adoption_specs() {
assert!(
!spec.runtime_oracles.is_empty(),
"runtime oracles must remain explicit for {}",
spec.protocol_id
);
assert!(
spec.runtime_oracles
.iter()
.all(|surface| surface.starts_with("src/")),
"runtime oracles must point at concrete source files for {}",
spec.protocol_id
);
assert!(
spec.migration_test_surfaces.len() >= 2,
"migration surfaces must include existing and planned coverage for {}",
spec.protocol_id
);
assert!(
!spec.initial_rollout_scope.is_empty(),
"initial rollout scope must be documented for {}",
spec.protocol_id
);
assert!(
!spec.avoid_for_now.is_empty(),
"deferred surfaces must be documented for {}",
spec.protocol_id
);
assert!(
spec.migration_test_surfaces
.iter()
.all(|surface| !surface.contains("planned")),
"migration surfaces must point at concrete live paths for {}",
spec.protocol_id
);
}
}
#[test]
fn session_protocol_adoption_specs_keep_diagnostics_fields_stable() {
for spec in session_protocol_adoption_specs() {
assert!(
spec.diagnostics_fields.contains(&"channel_id"),
"channel_id must remain stable for {}",
spec.protocol_id
);
assert!(
spec.diagnostics_fields.contains(&"trace_id"),
"trace_id must remain stable for {}",
spec.protocol_id
);
assert!(
spec.diagnostics_fields.contains(&"protocol"),
"protocol field must remain stable for {}",
spec.protocol_id
);
assert!(
spec.compile_time_constraints.len() >= 3,
"compile-time guarantees must stay substantive for {}",
spec.protocol_id
);
assert!(
spec.transitions.len() >= 3,
"state transitions must stay explicit for {}",
spec.protocol_id
);
}
}
#[test]
fn session_protocol_adoption_specs_reference_current_validation_surfaces() {
for spec in session_protocol_adoption_specs() {
assert!(
spec.migration_test_surfaces
.contains(&DOC_COMPILE_FAIL_SURFACE),
"compile-fail doctest surface must stay wired for {}",
spec.protocol_id
);
assert!(
spec.migration_test_surfaces
.contains(&MIGRATION_INTEGRATION_SURFACE),
"typed/dynamic migration surface must stay wired for {}",
spec.protocol_id
);
assert!(
spec.migration_test_surfaces
.contains(&MIGRATION_GUIDE_SURFACE),
"migration guide surface must stay wired for {}",
spec.protocol_id
);
}
}
#[test]
fn session_proof_fields() {
let (sender, _receiver) = send_permit::new_session::<u32>(42);
let sender = sender.send(send_permit::ReserveMsg);
let sender = sender.select_left();
let sender = sender.send(100_u32);
let proof = sender.close();
assert_eq!(proof.channel_id, 42);
assert_eq!(proof.obligation_kind, ObligationKind::SendPermit);
_receiver.disarm_for_test();
}
#[test]
#[should_panic(expected = "SESSION LEAKED")]
fn drop_mid_protocol_panics() {
let (sender, receiver) = send_permit::new_session::<u32>(99);
receiver.disarm_for_test();
let sender = sender.send(send_permit::ReserveMsg);
drop(sender); }
#[test]
fn transition_preserves_channel_id() {
let (sender, _receiver) = two_phase::new_session(77, ObligationKind::IoOp);
assert_eq!(sender.channel_id(), 77);
assert_eq!(sender.obligation_kind(), ObligationKind::IoOp);
let reserve_msg = two_phase::ReserveMsg {
kind: ObligationKind::IoOp,
};
let sender = sender.send(reserve_msg);
let sender = sender.select_left();
let sender = sender.send(two_phase::CommitMsg);
let proof = sender.close();
assert_eq!(proof.channel_id, 77);
_receiver.disarm_for_test();
}
#[test]
fn send_permit_dual_channels_share_identity() {
let (sender, receiver) = send_permit::new_session::<u32>(100);
let ids_match = sender.channel_id() == receiver.channel_id();
assert!(ids_match, "channel_id must match across endpoints");
let kinds_match = sender.obligation_kind() == receiver.obligation_kind();
assert!(kinds_match, "obligation_kind must match across endpoints");
assert_eq!(sender.obligation_kind(), ObligationKind::SendPermit);
let sender = sender.send(send_permit::ReserveMsg);
let sender = sender.select_left();
let sender = sender.send(42_u32);
let _proof = sender.close();
let (_, receiver) = receiver.recv(send_permit::ReserveMsg);
match receiver.offer(Branch::Left) {
Selected::Left(ch) => {
let (_, ch) = ch.recv(42_u32);
let _proof = ch.close();
}
Selected::Right(_) => panic!("expected Left"),
}
}
#[test]
fn delegation_pair_preserves_metadata() {
use delegation::new_delegation;
let (delegator_ch, delegatee_ch) = new_delegation::<Initiator, two_phase::InitiatorSession>(
201,
ObligationKind::SendPermit,
);
assert_eq!(delegator_ch.channel_id(), 201);
assert_eq!(delegator_ch.obligation_kind(), ObligationKind::SendPermit);
assert_eq!(delegatee_ch.channel_id(), 201);
assert_eq!(delegatee_ch.obligation_kind(), ObligationKind::SendPermit);
delegator_ch.disarm_for_test();
delegatee_ch.disarm_for_test();
}
#[test]
fn branch_debug_copy_eq() {
let left = Branch::Left;
let right = Branch::Right;
let dbg = format!("{left:?}");
assert!(dbg.contains("Left"));
let left2 = left;
assert_eq!(left, left2);
assert_ne!(left, right);
let right2 = right;
assert_eq!(right, right2);
}
#[test]
fn session_proof_debug() {
let proof = SessionProof {
channel_id: 42,
obligation_kind: ObligationKind::SendPermit,
};
let dbg = format!("{proof:?}");
assert!(dbg.contains("42"));
assert!(dbg.contains("SendPermit"));
}
#[test]
fn two_phase_reserve_msg_debug_clone() {
let msg = two_phase::ReserveMsg {
kind: ObligationKind::Lease,
};
let dbg = format!("{msg:?}");
assert!(dbg.contains("Lease"));
let cloned = msg;
assert_eq!(cloned.kind, ObligationKind::Lease);
}
#[test]
fn two_phase_abort_msg_debug_clone() {
let msg = two_phase::AbortMsg {
reason: "budget_exhausted".to_string(),
};
let dbg = format!("{msg:?}");
assert!(dbg.contains("budget_exhausted"));
let cloned = msg;
assert_eq!(cloned.reason, "budget_exhausted");
}
#[test]
fn selected_left_variant() {
let s: Selected<u32, &str> = Selected::Left(42);
match s {
Selected::Left(v) => assert_eq!(v, 42),
Selected::Right(_) => panic!("expected Left"),
}
}
#[test]
fn selected_right_variant() {
let s: Selected<u32, &str> = Selected::Right("hello");
match s {
Selected::Right(v) => assert_eq!(v, "hello"),
Selected::Left(_) => panic!("expected Right"),
}
}
#[test]
fn chan_accessors() {
let (sender, receiver) = send_permit::new_session::<u32>(55);
assert_eq!(sender.channel_id(), 55);
assert_eq!(sender.obligation_kind(), ObligationKind::SendPermit);
assert_eq!(receiver.channel_id(), 55);
assert_eq!(receiver.obligation_kind(), ObligationKind::SendPermit);
let sender = sender.send(send_permit::ReserveMsg);
let sender = sender.select_left();
let sender = sender.send(0_u32);
let _ = sender.close();
let (_, receiver) = receiver.recv(send_permit::ReserveMsg);
match receiver.offer(Branch::Left) {
Selected::Left(ch) => {
let (_, ch) = ch.recv(0_u32);
let _ = ch.close();
}
Selected::Right(_) => panic!("expected Left"),
}
}
#[test]
fn lease_new_session_obligation_kind() {
let (holder, resource) = lease::new_session(99);
assert_eq!(holder.obligation_kind(), ObligationKind::Lease);
assert_eq!(resource.obligation_kind(), ObligationKind::Lease);
let holder = holder.send(lease::AcquireMsg);
let holder = holder.select_right();
let holder = holder.send(lease::ReleaseMsg);
let _ = holder.close();
let (_, resource) = resource.recv(lease::AcquireMsg);
match resource.offer(Branch::Right) {
Selected::Right(ch) => {
let (_, ch) = ch.recv(lease::ReleaseMsg);
let _ = ch.close();
}
Selected::Left(_) => panic!("expected Right"),
}
}
#[test]
fn lease_multiple_renew_cycles() {
let (holder, resource) = lease::new_session(300);
let holder = holder.send(lease::AcquireMsg);
let holder = holder.select_left();
let holder = holder.send(lease::RenewMsg);
let _proof1 = holder.close();
let (_, resource) = resource.recv(lease::AcquireMsg);
match resource.offer(Branch::Left) {
Selected::Left(ch) => {
let (_, ch) = ch.recv(lease::RenewMsg);
let _proof = ch.close();
}
Selected::Right(_) => panic!("expected Renew"),
}
let (holder2, resource2) = lease::renew_loop(300);
let holder2 = holder2.select_left(); let holder2 = holder2.send(lease::RenewMsg);
let _proof2 = holder2.close();
match resource2.offer(Branch::Left) {
Selected::Left(ch) => {
let (_, ch) = ch.recv(lease::RenewMsg);
let _proof = ch.close();
}
Selected::Right(_) => panic!("expected Renew 2"),
}
let (holder3, resource3) = lease::renew_loop(300);
let holder3 = holder3.select_right(); let holder3 = holder3.send(lease::ReleaseMsg);
let proof = holder3.close();
assert_eq!(proof.obligation_kind, ObligationKind::Lease);
match resource3.offer(Branch::Right) {
Selected::Right(ch) => {
let (_, ch) = ch.recv(lease::ReleaseMsg);
let _proof = ch.close();
}
Selected::Left(_) => panic!("expected Release"),
}
}
#[test]
fn transport_backed_send_permit_happy_path() {
let (sender, receiver) = new_transport_pair::<
send_permit::InitiatorSession<u64>,
send_permit::ResponderSession<u64>,
>(100, ObligationKind::SendPermit, 4);
assert!(sender.is_transport_backed());
assert!(receiver.is_transport_backed());
futures_lite::future::block_on(async {
let cx = Cx::for_testing();
let sender = sender
.send_async(&cx, send_permit::ReserveMsg)
.await
.unwrap();
let (_, receiver) = receiver.recv_async(&cx).await.unwrap();
let sender = sender.select_left_async(&cx).await.unwrap();
let receiver = match receiver.offer_async(&cx).await.unwrap() {
Selected::Left(ch) => ch,
Selected::Right(_) => panic!("expected Left (Send) branch"),
};
let sender = sender.send_async(&cx, 42_u64).await.unwrap();
let (value, receiver) = receiver.recv_async(&cx).await.unwrap();
assert_eq!(value, 42_u64);
let proof_s = sender.close();
let proof_r = receiver.close();
assert_eq!(proof_s.channel_id, 100);
assert_eq!(proof_r.channel_id, 100);
assert_eq!(proof_s.obligation_kind, ObligationKind::SendPermit);
});
}
#[test]
fn transport_backed_send_permit_abort_path() {
let (sender, receiver) = new_transport_pair::<
send_permit::InitiatorSession<u64>,
send_permit::ResponderSession<u64>,
>(200, ObligationKind::SendPermit, 4);
futures_lite::future::block_on(async {
let cx = Cx::for_testing();
let sender = sender
.send_async(&cx, send_permit::ReserveMsg)
.await
.unwrap();
let (_, receiver) = receiver.recv_async(&cx).await.unwrap();
let sender = sender.select_right_async(&cx).await.unwrap();
let receiver = match receiver.offer_async(&cx).await.unwrap() {
Selected::Right(ch) => ch,
Selected::Left(_) => panic!("expected Right (Abort) branch"),
};
let sender = sender.send_async(&cx, send_permit::AbortMsg).await.unwrap();
let (_, receiver) = receiver.recv_async(&cx).await.unwrap();
let proof_s = sender.close();
let proof_r = receiver.close();
assert_eq!(proof_s.obligation_kind, ObligationKind::SendPermit);
assert_eq!(proof_r.obligation_kind, ObligationKind::SendPermit);
});
}
#[test]
fn transport_backed_peer_drop_returns_closed() {
let (sender, receiver) = new_transport_pair::<
send_permit::InitiatorSession<u64>,
send_permit::ResponderSession<u64>,
>(300, ObligationKind::SendPermit, 4);
receiver.disarm_for_test();
futures_lite::future::block_on(async {
let cx = Cx::for_testing();
let result = sender.send_async(&cx, send_permit::ReserveMsg).await;
match result {
Err(SessionError::Closed) => {} Err(other) => panic!("expected Closed, got {other}"),
Ok(ch) => {
ch.disarm_for_test();
panic!("expected error, got Ok");
}
}
});
}
#[test]
fn transport_backed_send_async_cancelled_cx_returns_cancelled() {
let (sender, receiver) = new_transport_pair::<
send_permit::InitiatorSession<u64>,
send_permit::ResponderSession<u64>,
>(301, ObligationKind::SendPermit, 4);
receiver.disarm_for_test();
futures_lite::future::block_on(async {
let cx = Cx::for_testing();
cx.set_cancel_reason(crate::types::CancelReason::user(
"transport-backed send cancelled",
));
let result = sender.send_async(&cx, send_permit::ReserveMsg).await;
assert!(matches!(result, Err(SessionError::Cancelled)));
});
}
#[test]
fn transport_backed_send_async_unpolled_future_drop_does_not_panic() {
let (sender, receiver) = new_transport_pair::<
send_permit::InitiatorSession<u64>,
send_permit::ResponderSession<u64>,
>(3011, ObligationKind::SendPermit, 4);
let cx = Cx::for_testing();
let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
let future = sender.send_async(&cx, send_permit::ReserveMsg);
drop(future);
}));
receiver.disarm_for_test();
assert!(
result.is_ok(),
"dropping an unpolled send_async future must not trip the session leak drop bomb"
);
}
#[test]
fn transport_backed_recv_async_cancelled_cx_returns_cancelled() {
let (sender, receiver) = new_transport_pair::<
send_permit::InitiatorSession<u64>,
send_permit::ResponderSession<u64>,
>(302, ObligationKind::SendPermit, 4);
sender.disarm_for_test();
futures_lite::future::block_on(async {
let cx = Cx::for_testing();
cx.set_cancel_reason(crate::types::CancelReason::user(
"transport-backed recv cancelled",
));
let result = receiver.recv_async(&cx).await;
assert!(matches!(result, Err(SessionError::Cancelled)));
});
}
#[test]
fn transport_backed_recv_async_unpolled_future_drop_does_not_panic() {
let (sender, receiver) = new_transport_pair::<
send_permit::InitiatorSession<u64>,
send_permit::ResponderSession<u64>,
>(3021, ObligationKind::SendPermit, 4);
let cx = Cx::for_testing();
let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
let future = receiver.recv_async(&cx);
drop(future);
}));
sender.disarm_for_test();
assert!(
result.is_ok(),
"dropping an unpolled recv_async future must not trip the session leak drop bomb"
);
}
#[test]
fn transport_backed_select_async_cancelled_cx_returns_cancelled() {
futures_lite::future::block_on(async {
let cx = Cx::for_testing();
let cancelled_cx = Cx::for_testing();
cancelled_cx.set_cancel_reason(crate::types::CancelReason::user(
"transport-backed select cancelled",
));
let (sender_left, receiver_left) = new_transport_pair::<
send_permit::InitiatorSession<u64>,
send_permit::ResponderSession<u64>,
>(303, ObligationKind::SendPermit, 4);
let sender_left = sender_left
.send_async(&cx, send_permit::ReserveMsg)
.await
.unwrap();
let (_, receiver_left) = receiver_left.recv_async(&cx).await.unwrap();
let left_result = sender_left.select_left_async(&cancelled_cx).await;
receiver_left.disarm_for_test();
assert!(matches!(left_result, Err(SessionError::Cancelled)));
let (sender_right, receiver_right) = new_transport_pair::<
send_permit::InitiatorSession<u64>,
send_permit::ResponderSession<u64>,
>(304, ObligationKind::SendPermit, 4);
let sender_right = sender_right
.send_async(&cx, send_permit::ReserveMsg)
.await
.unwrap();
let (_, receiver_right) = receiver_right.recv_async(&cx).await.unwrap();
let right_result = sender_right.select_right_async(&cancelled_cx).await;
receiver_right.disarm_for_test();
assert!(matches!(right_result, Err(SessionError::Cancelled)));
});
}
#[test]
fn transport_backed_select_async_unpolled_future_drop_does_not_panic() {
futures_lite::future::block_on(async {
let cx = Cx::for_testing();
let (sender_left, receiver_left) = new_transport_pair::<
send_permit::InitiatorSession<u64>,
send_permit::ResponderSession<u64>,
>(3031, ObligationKind::SendPermit, 4);
let sender_left = sender_left
.send_async(&cx, send_permit::ReserveMsg)
.await
.unwrap();
let (_, receiver_left) = receiver_left.recv_async(&cx).await.unwrap();
let left_result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
let future = sender_left.select_left_async(&cx);
drop(future);
}));
receiver_left.disarm_for_test();
assert!(
left_result.is_ok(),
"dropping an unpolled select_left_async future must not panic"
);
let (sender_right, receiver_right) = new_transport_pair::<
send_permit::InitiatorSession<u64>,
send_permit::ResponderSession<u64>,
>(3032, ObligationKind::SendPermit, 4);
let sender_right = sender_right
.send_async(&cx, send_permit::ReserveMsg)
.await
.unwrap();
let (_, receiver_right) = receiver_right.recv_async(&cx).await.unwrap();
let right_result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
let future = sender_right.select_right_async(&cx);
drop(future);
}));
receiver_right.disarm_for_test();
assert!(
right_result.is_ok(),
"dropping an unpolled select_right_async future must not panic"
);
});
}
#[test]
fn transport_backed_offer_async_cancelled_cx_returns_cancelled() {
let (sender, receiver) = new_transport_pair::<
send_permit::InitiatorSession<u64>,
send_permit::ResponderSession<u64>,
>(305, ObligationKind::SendPermit, 4);
futures_lite::future::block_on(async {
let cx = Cx::for_testing();
let sender = sender
.send_async(&cx, send_permit::ReserveMsg)
.await
.unwrap();
let (_, receiver) = receiver.recv_async(&cx).await.unwrap();
sender.disarm_for_test();
let cancelled_cx = Cx::for_testing();
cancelled_cx.set_cancel_reason(crate::types::CancelReason::user(
"transport-backed offer cancelled",
));
let result = receiver.offer_async(&cancelled_cx).await;
assert!(matches!(result, Err(SessionError::Cancelled)));
});
}
#[test]
fn transport_backed_offer_async_unpolled_future_drop_does_not_panic() {
futures_lite::future::block_on(async {
let cx = Cx::for_testing();
let (sender, receiver) = new_transport_pair::<
send_permit::InitiatorSession<u64>,
send_permit::ResponderSession<u64>,
>(3051, ObligationKind::SendPermit, 4);
let sender = sender
.send_async(&cx, send_permit::ReserveMsg)
.await
.unwrap();
let (_, receiver) = receiver.recv_async(&cx).await.unwrap();
let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
let future = receiver.offer_async(&cx);
drop(future);
}));
sender.disarm_for_test();
assert!(
result.is_ok(),
"dropping an unpolled offer_async future must not trip the session leak drop bomb"
);
});
}
#[test]
fn transport_backed_recv_offer_and_select_peer_drop_return_closed() {
futures_lite::future::block_on(async {
let cx = Cx::for_testing();
let (sender_recv, receiver_recv) = new_transport_pair::<
send_permit::InitiatorSession<u64>,
send_permit::ResponderSession<u64>,
>(306, ObligationKind::SendPermit, 4);
sender_recv.disarm_for_test();
let recv_result = receiver_recv.recv_async(&cx).await;
assert!(matches!(recv_result, Err(SessionError::Closed)));
let (sender_offer, receiver_offer) = new_transport_pair::<
send_permit::InitiatorSession<u64>,
send_permit::ResponderSession<u64>,
>(307, ObligationKind::SendPermit, 4);
let sender_offer = sender_offer
.send_async(&cx, send_permit::ReserveMsg)
.await
.unwrap();
let (_, receiver_offer) = receiver_offer.recv_async(&cx).await.unwrap();
sender_offer.disarm_for_test();
let offer_result = receiver_offer.offer_async(&cx).await;
assert!(matches!(offer_result, Err(SessionError::Closed)));
let (sender_select, receiver_select) = new_transport_pair::<
send_permit::InitiatorSession<u64>,
send_permit::ResponderSession<u64>,
>(308, ObligationKind::SendPermit, 4);
let sender_select = sender_select
.send_async(&cx, send_permit::ReserveMsg)
.await
.unwrap();
let (_, receiver_select) = receiver_select.recv_async(&cx).await.unwrap();
receiver_select.disarm_for_test();
let select_result = sender_select.select_left_async(&cx).await;
assert!(matches!(select_result, Err(SessionError::Closed)));
});
}
#[test]
fn select_left_async_fails_without_transport_backing() {
let (sender, receiver) = send_permit::new_session::<u64>(309);
receiver.disarm_for_test();
futures_lite::future::block_on(async {
let cx = Cx::for_testing();
let sender = sender.send(send_permit::ReserveMsg);
let result = sender.select_left_async(&cx).await;
assert_eq!(result.map(|_| ()), Err(SessionError::NoTransport));
});
}
#[test]
fn select_right_async_fails_without_transport_backing() {
let (sender, receiver) = send_permit::new_session::<u64>(310);
receiver.disarm_for_test();
futures_lite::future::block_on(async {
let cx = Cx::for_testing();
let sender = sender.send(send_permit::ReserveMsg);
let result = sender.select_right_async(&cx).await;
assert_eq!(result.map(|_| ()), Err(SessionError::NoTransport));
});
}
#[test]
fn recv_async_fails_without_transport_backing() {
let (sender, receiver) = send_permit::new_session::<u64>(311);
sender.disarm_for_test();
futures_lite::future::block_on(async {
let cx = Cx::for_testing();
let result = receiver.recv_async(&cx).await;
assert_eq!(result.map(|_| ()), Err(SessionError::NoTransport));
});
}
#[test]
fn send_async_fails_without_transport_backing() {
let (sender, receiver) = send_permit::new_session::<u64>(312);
receiver.disarm_for_test();
futures_lite::future::block_on(async {
let cx = Cx::for_testing();
let sender = sender.send(send_permit::ReserveMsg);
let sender = sender.select_left();
let result = sender.send_async(&cx, 7).await;
assert_eq!(result.map(|_| ()), Err(SessionError::NoTransport));
});
}
#[test]
fn offer_async_fails_without_transport_backing() {
let (sender, receiver) = send_permit::new_session::<u64>(313);
sender.disarm_for_test();
futures_lite::future::block_on(async {
let cx = Cx::for_testing();
let (_, receiver) = receiver.recv(send_permit::ReserveMsg);
let result = receiver.offer_async(&cx).await;
assert_eq!(result.map(|_| ()), Err(SessionError::NoTransport));
});
}
#[test]
fn transport_backed_is_transport_backed_flag() {
let (pure_s, pure_r) = send_permit::new_session::<u32>(1);
assert!(!pure_s.is_transport_backed());
assert!(!pure_r.is_transport_backed());
pure_s.disarm_for_test();
pure_r.disarm_for_test();
let (trans_s, trans_r) = new_transport_pair::<
send_permit::InitiatorSession<u32>,
send_permit::ResponderSession<u32>,
>(2, ObligationKind::SendPermit, 4);
assert!(trans_s.is_transport_backed());
assert!(trans_r.is_transport_backed());
trans_s.disarm_for_test();
trans_r.disarm_for_test();
}
#[test]
fn session_error_display() {
assert_eq!(SessionError::Cancelled.to_string(), "session cancelled");
assert_eq!(SessionError::Closed.to_string(), "session peer closed");
assert_eq!(
SessionError::ProtocolViolation {
expected: "u64",
actual: "String"
}
.to_string(),
"protocol violation: expected u64, got String"
);
}
}