Skip to main content

axon_frontend/
session.rs

1//! Session types — the algebra of typed bidirectional dialogue.
2//!
3//! §Fase 41.a — *WebSocket as a Cognitive Primitive*. This module is the pure
4//! mathematical core of the paper (`docs/paper_websocket_cognitive_primitive.md`):
5//! the session-type grammar (§3.1), the **duality** involution `(·)⊥` (§3.2),
6//! the **regular-coinductive equality** for recursive (`μ`) types, and the
7//! **connection law** — a connection with endpoints typed `S` and `T` is
8//! well-formed iff `T ≡ S⊥`. Grounded in Caires & Pfenning's Curry–Howard
9//! correspondence (session types ARE intuitionistic linear-logic propositions),
10//! it is the static guarantee RFC 6455 lacks: *what one end sends, the other
11//! expects* — making a dialogue **deadlock-free and protocol-conformant by
12//! construction**, not by per-message runtime validation.
13//!
14//! This is the pure algebra only: no parser/AST (Fase 41.b), no runtime (41.d),
15//! no multiparty projection (41.h). The payload carried by `send`/`recv` is an
16//! opaque [`Payload`] (a canonical type name); 41.b binds it to the real AST
17//! value types — the duality + equality algebra here depends only on payload
18//! *equality*, never on payload structure, so it is decoupled by construction.
19//!
20//! §Fase 41.c — **credit-refined backpressure** (D2 of the plan vivo, §4.2 of
21//! the paper). `Send` / `Recv` now carry an optional credit index `n: u64`
22//! (`!ⁿA.S` / `?ⁿA.S`); `None` is the unbounded fragment (`!∞A.S`, the algebra
23//! before 41.c). The "send at n = 0 has no typing rule" axiom is implemented by
24//! [`SessionType::has_send_at_zero`] (an explicit `!⁰A.S` in the type is
25//! unprovable) and by the **Presburger-decidable** flow analysis
26//! [`SessionType::credit_analyse`], which — given a socket budget `k` — checks
27//! that every send fires at an available credit `> 0` (no rule at n=0) and that
28//! every recursive body is **sustainable** (per-iteration net send count
29//! `Δ = #send − #recv ≤ 0`, the loop-fixpoint inequality). All constraints are
30//! linear over the naturals → decidable in the theory of Presburger arithmetic.
31
32use std::collections::BTreeMap;
33use std::fmt;
34
35use serde::{Deserialize, Serialize};
36
37/// The value type carried by a `send`/`recv`. Opaque at this layer (a canonical
38/// type name); Fase 41.b replaces it with the real AST value type. Duality and
39/// equality treat it nominally — only `Payload == Payload` matters.
40///
41/// `#[serde(transparent)]` — the JSON encoding is the bare type-name string
42/// (the wire shape the §Fase 41.g sealed-snapshot serialiser depends on);
43/// `Payload("Msg")` ↔ `"Msg"` on the wire.
44#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
45#[serde(transparent)]
46pub struct Payload(pub String);
47
48impl Payload {
49    pub fn new(name: impl Into<String>) -> Self {
50        Payload(name.into())
51    }
52}
53
54impl fmt::Display for Payload {
55    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
56        f.write_str(&self.0)
57    }
58}
59
60/// A session type — the protocol of one endpoint of a connection (§3.1 of the
61/// paper). `Select`/`Branch` carry their labelled continuations in a `BTreeMap`
62/// so the label set is canonically ordered (deterministic duality + equality).
63///
64/// `Serialize` + `Deserialize` — §Fase 41.g sealed-snapshot resume needs the
65/// residual cursor + the protocol schema serialisable. The encoding is
66/// stable across the algebra layer + the enterprise persistence layer: the
67/// same JSON shape goes into the AAD-bound `cognitive_states` ciphertext
68/// and comes back out via [`SessionRuntime::resume`].
69#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
70pub enum SessionType {
71    /// `end` — the dialogue is complete.
72    End,
73    /// `!ⁿA.S` — send a value of type `A`, then behave as `S`. The optional
74    /// `credit` is the Fase 41.c index `n` (paper §4.2): `Some(n)` types a send
75    /// that *requires* `n > 0` available credit (the "no rule at n = 0" axiom
76    /// makes `Some(0)` unprovable); `None` is the unbounded fragment `!∞A.S`.
77    Send {
78        payload: Payload,
79        credit: Option<u64>,
80        cont: Box<SessionType>,
81    },
82    /// `?ⁿA.S` — receive a value of type `A`, then behave as `S`. Symmetric to
83    /// [`SessionType::Send`]: the index `n` bounds the receiver-side window.
84    Recv {
85        payload: Payload,
86        credit: Option<u64>,
87        cont: Box<SessionType>,
88    },
89    /// `⊕{ℓᵢ:Sᵢ}` — internal choice: this endpoint *selects* a label.
90    Select(BTreeMap<String, SessionType>),
91    /// `&{ℓᵢ:Sᵢ}` — external choice: this endpoint *offers* the branches.
92    Branch(BTreeMap<String, SessionType>),
93    /// `μX.S` — recursive session (equirecursive: `μX.S ≡ S[μX.S/X]`).
94    Rec(String, Box<SessionType>),
95    /// `X` — a recursion variable (bound by an enclosing `Rec`).
96    Var(String),
97}
98
99impl SessionType {
100    // ── Smart constructors (ergonomic + keep call sites readable) ──────────
101
102    /// `!A.S` — unbounded send (`credit = None`, the pre-41.c fragment).
103    pub fn send(payload: impl Into<String>, then: SessionType) -> Self {
104        SessionType::Send {
105            payload: Payload::new(payload),
106            credit: None,
107            cont: Box::new(then),
108        }
109    }
110    /// `?A.S` — unbounded receive (`credit = None`).
111    pub fn recv(payload: impl Into<String>, then: SessionType) -> Self {
112        SessionType::Recv {
113            payload: Payload::new(payload),
114            credit: None,
115            cont: Box::new(then),
116        }
117    }
118    /// `!ⁿA.S` — credit-refined send (Fase 41.c, paper §4.2). The continuation
119    /// `then` runs in the same window — the budget is global to the socket; the
120    /// `n` here is the *snapshot* of available credit demanded at this step.
121    pub fn send_credit(payload: impl Into<String>, n: u64, then: SessionType) -> Self {
122        SessionType::Send {
123            payload: Payload::new(payload),
124            credit: Some(n),
125            cont: Box::new(then),
126        }
127    }
128    /// `?ⁿA.S` — credit-refined receive (Fase 41.c).
129    pub fn recv_credit(payload: impl Into<String>, n: u64, then: SessionType) -> Self {
130        SessionType::Recv {
131            payload: Payload::new(payload),
132            credit: Some(n),
133            cont: Box::new(then),
134        }
135    }
136    pub fn select(branches: impl IntoIterator<Item = (String, SessionType)>) -> Self {
137        SessionType::Select(branches.into_iter().collect())
138    }
139    pub fn branch(branches: impl IntoIterator<Item = (String, SessionType)>) -> Self {
140        SessionType::Branch(branches.into_iter().collect())
141    }
142    pub fn rec(var: impl Into<String>, body: SessionType) -> Self {
143        SessionType::Rec(var.into(), Box::new(body))
144    }
145    pub fn var(name: impl Into<String>) -> Self {
146        SessionType::Var(name.into())
147    }
148
149    // ── Duality (§3.2): the involution that swaps the two sides ────────────
150
151    /// The dual `S⊥`: swaps `send`↔`recv` and `select`↔`branch`, recursing into
152    /// continuations; `end`, `Rec` binders and `Var`s are preserved. Payloads
153    /// **and** the credit index `n` are unchanged — `(!ⁿA.S)⊥ = ?ⁿA.S⊥` (same
154    /// `A`, same `n`, opposite direction). Symmetric credit is the standard
155    /// credit-flow semantics (Rast lineage): the sender's window-of-n is
156    /// exactly what the receiver-side is sized to absorb.
157    pub fn dual(&self) -> SessionType {
158        match self {
159            SessionType::End => SessionType::End,
160            SessionType::Send { payload, credit, cont } => SessionType::Recv {
161                payload: payload.clone(),
162                credit: *credit,
163                cont: Box::new(cont.dual()),
164            },
165            SessionType::Recv { payload, credit, cont } => SessionType::Send {
166                payload: payload.clone(),
167                credit: *credit,
168                cont: Box::new(cont.dual()),
169            },
170            SessionType::Select(m) => SessionType::Branch(dual_map(m)),
171            SessionType::Branch(m) => SessionType::Select(dual_map(m)),
172            SessionType::Rec(x, b) => SessionType::Rec(x.clone(), Box::new(b.dual())),
173            SessionType::Var(x) => SessionType::Var(x.clone()),
174        }
175    }
176
177    // ── Equirecursive unfolding + capture-stopping substitution ────────────
178
179    /// Substitute the free variable `var` by `repl`. Stops at a shadowing
180    /// `Rec(var, …)` (the inner binder re-captures the name).
181    fn subst(&self, var: &str, repl: &SessionType) -> SessionType {
182        match self {
183            SessionType::End => SessionType::End,
184            SessionType::Send { payload, credit, cont } => SessionType::Send {
185                payload: payload.clone(),
186                credit: *credit,
187                cont: Box::new(cont.subst(var, repl)),
188            },
189            SessionType::Recv { payload, credit, cont } => SessionType::Recv {
190                payload: payload.clone(),
191                credit: *credit,
192                cont: Box::new(cont.subst(var, repl)),
193            },
194            SessionType::Select(m) => SessionType::Select(subst_map(m, var, repl)),
195            SessionType::Branch(m) => SessionType::Branch(subst_map(m, var, repl)),
196            SessionType::Rec(x, b) => {
197                if x == var {
198                    self.clone() // shadowed — leave the inner Rec untouched
199                } else {
200                    SessionType::Rec(x.clone(), Box::new(b.subst(var, repl)))
201                }
202            }
203            SessionType::Var(x) => {
204                if x == var {
205                    repl.clone()
206                } else {
207                    self.clone()
208                }
209            }
210        }
211    }
212
213    /// Unfold every *leading* `Rec` so the head constructor is exposed:
214    /// `μX.S ↦ S[μX.S/X]`, repeated. Terminates for **contractive** types
215    /// (a guard appears under each `Rec` before the variable recurs).
216    ///
217    /// Public so the 41.d runtime can drive the session-type cursor over a
218    /// live connection: after every operational step the continuation is
219    /// re-unfolded so the cursor never carries a leading `Rec` for the
220    /// state machine to interpret.
221    pub fn unfold_head(&self) -> SessionType {
222        let mut t = self.clone();
223        while let SessionType::Rec(x, b) = t {
224            let whole = SessionType::Rec(x.clone(), b.clone());
225            t = b.subst(&x, &whole);
226        }
227        t
228    }
229
230    // ── Regular-coinductive equality ──────────────────────────────────────
231
232    /// Equirecursive equality: `S ≡ T` iff their infinite unfoldings coincide.
233    /// Decided by the standard coinductive algorithm — assume the pair equal,
234    /// unfold leading `Rec`s, compare heads, recurse; a re-encountered pair is
235    /// discharged by the assumption (the greatest fixed point). Terminates
236    /// because a regular type has finitely many distinct sub-pairs.
237    pub fn equiv(&self, other: &SessionType) -> bool {
238        let mut assumed: Vec<(SessionType, SessionType)> = Vec::new();
239        equiv_inner(self, other, &mut assumed)
240    }
241
242    /// The **connection law** (§3.2): a connection whose two endpoints are typed
243    /// `self` and `peer` is well-formed iff `peer ≡ self⊥`. Symmetric up to
244    /// involutivity (`(S⊥)⊥ ≡ S`).
245    pub fn is_dual_to(&self, peer: &SessionType) -> bool {
246        peer.equiv(&self.dual())
247    }
248
249    // ── Fase 41.c — credit-refined backpressure (D2, paper §4.2) ────────────
250
251    /// Stamp every (recursively-reachable) `Send` and `Recv` with the credit
252    /// index `n`. Idempotent on already-stamped types. Used by the type
253    /// checker to lift the socket's `backpressure: credit(k)` annotation onto
254    /// the bare session protocol so the algebra-level analysis can discharge
255    /// the constraint.
256    pub fn with_credit(&self, n: u64) -> SessionType {
257        match self {
258            SessionType::End => SessionType::End,
259            SessionType::Send { payload, cont, .. } => SessionType::Send {
260                payload: payload.clone(),
261                credit: Some(n),
262                cont: Box::new(cont.with_credit(n)),
263            },
264            SessionType::Recv { payload, cont, .. } => SessionType::Recv {
265                payload: payload.clone(),
266                credit: Some(n),
267                cont: Box::new(cont.with_credit(n)),
268            },
269            SessionType::Select(m) => SessionType::Select(
270                m.iter().map(|(l, s)| (l.clone(), s.with_credit(n))).collect(),
271            ),
272            SessionType::Branch(m) => SessionType::Branch(
273                m.iter().map(|(l, s)| (l.clone(), s.with_credit(n))).collect(),
274            ),
275            SessionType::Rec(x, b) => SessionType::Rec(x.clone(), Box::new(b.with_credit(n))),
276            SessionType::Var(x) => SessionType::Var(x.clone()),
277        }
278    }
279
280    /// The "no rule at n = 0" axiom (paper §4.2): an explicit `!⁰A.S` in the
281    /// type is **unprovable** — there is no typing rule for a send at zero
282    /// available credit. Returns the offending payload of the first such send
283    /// (in a deterministic left-to-right walk) if any.
284    ///
285    /// Decidable in linear time over the type structure.
286    pub fn has_send_at_zero(&self) -> Option<Payload> {
287        match self {
288            SessionType::End => None,
289            SessionType::Send { payload, credit: Some(0), .. } => Some(payload.clone()),
290            SessionType::Send { cont, .. } | SessionType::Recv { cont, .. } => cont.has_send_at_zero(),
291            SessionType::Select(m) | SessionType::Branch(m) => {
292                m.values().find_map(|s| s.has_send_at_zero())
293            }
294            SessionType::Rec(_, b) => b.has_send_at_zero(),
295            SessionType::Var(_) => None,
296        }
297    }
298
299    /// Decide the **credit conformance** of `self` against a budget `k`
300    /// (the socket's `backpressure: credit(k)` window). This is the
301    /// Presburger discharge — the constraints are linear arithmetic over the
302    /// naturals, so satisfiability is decidable; the algorithm here is the
303    /// direct fixpoint formulation specialised to closed, contractive session
304    /// types (Rast lineage, §4.2 of the paper).
305    ///
306    /// The check fires three kinds of error:
307    ///
308    /// 1. **Send at zero** — an explicit `!⁰A.S` in the type. Unprovable by
309    ///    construction (no typing rule applies).
310    /// 2. **Burst overflow** — a straight-line send burst exceeding the
311    ///    available window. With initial budget `k`, the abstract trace must
312    ///    never reach `available_credit < 0` at a send.
313    /// 3. **Loop unsustainability** — a recursive body whose per-iteration net
314    ///    send count `Δ = #send − #recv` is strictly positive: each iteration
315    ///    drains the window, so unbounded iteration is unsound under *any*
316    ///    finite budget. (`Δ ≤ 0` is the Presburger fixpoint inequality.)
317    ///
318    /// Returns `Ok(())` if the protocol is conformant, or [`CreditError`] with
319    /// the offending witness. Total over closed, contractive session types.
320    pub fn credit_analyse(&self, budget: u64) -> Result<(), CreditError> {
321        if let Some(p) = self.has_send_at_zero() {
322            return Err(CreditError::SendAtZero { payload: p });
323        }
324        // Initial window = full budget. The walker tracks the minimum
325        // available credit reachable along any execution path; if at any send
326        // it would fall below 0 → BurstOverflow. Recursive bodies are
327        // discharged by the Δ ≤ 0 fixpoint inequality.
328        let _final = credit_walk(self, budget as i64, budget as i64)?;
329        Ok(())
330    }
331
332    /// Enumerate the **recurring paths** of `self` w.r.t. recursion variable
333    /// `x` — every trace from the root that reaches `Var(x)`. Each path is
334    /// reported as `(#send, #recv)`; terminating paths (reaching `End` or a
335    /// different free variable) are dropped (they don't iterate, so they
336    /// don't constrain unbounded sustainability). Shadowing `Rec(x, …)` cuts
337    /// the descent — references inside refer to the inner binder.
338    ///
339    /// Total in time linear in the size of `self`; the path count is bounded
340    /// by the number of leaves of the choice tree.
341    pub fn recurring_paths(&self, x: &str) -> Vec<(u64, u64)> {
342        let mut out = Vec::new();
343        recurring_paths_into(self, x, 0, 0, &mut out);
344        out
345    }
346
347    /// Worst-case (maximum-Δ) recurring path of `self` w.r.t. `x`. Used by
348    /// the type checker to report the offending iteration count. Returns
349    /// `(0, 0)` if there are no recurring paths.
350    pub fn credit_delta(&self, x: &str) -> (u64, u64) {
351        self.recurring_paths(x)
352            .into_iter()
353            .max_by_key(|(s, r)| *s as i64 - *r as i64)
354            .unwrap_or((0, 0))
355    }
356
357    // ── Fase 41.e — SSE-as-fragment unification (D3, paper §4.4) ─────────
358
359    /// True iff `self` lies in the **SSE producer fragment**: the
360    /// connection only sends to its peer. Concretely the type contains
361    /// only `End`, `Send`, internal-`Select`, `Rec`, and `Var` — no
362    /// `Recv` (would mean the producer expects client input) and no
363    /// `Branch` (would mean the producer offers a choice the client
364    /// picks). For such a type the §4.4 identity `S_SSE = Π↓(S_WS)`
365    /// holds with `Π↓ = id`: the protocol *is already* the SSE fragment,
366    /// runnable over W3C SSE without WebSocket bidirectionality.
367    ///
368    /// Total over closed, contractive session types; linear in the size
369    /// of `self`.
370    pub fn projects_to_sse(&self) -> bool {
371        self.has_polarity(Polarity::Producer)
372    }
373
374    /// Dual of [`projects_to_sse`] — the **SSE consumer fragment**: the
375    /// connection only receives from its peer (`End`, `Recv`,
376    /// external-`Branch`, `Rec`, `Var`). The §4.4 theorem
377    /// `Π↓(S)⊥ = Π↑(S⊥)` ties this to `projects_to_sse` via duality:
378    /// `S.projects_to_sse() ⇔ S.dual().projects_to_sse_consumer()`.
379    pub fn projects_to_sse_consumer(&self) -> bool {
380        self.has_polarity(Polarity::Consumer)
381    }
382
383    /// Unified polarity test. The two SSE fragments are exactly the two
384    /// inhabitants of [`Polarity`]: `Producer = !/⊕/end/μ/var-only` and
385    /// `Consumer = ?/&/end/μ/var-only`.
386    pub fn has_polarity(&self, p: Polarity) -> bool {
387        match (self, p) {
388            (SessionType::End, _) => true,
389            (SessionType::Var(_), _) => true,
390            (SessionType::Send { cont, .. }, Polarity::Producer) => cont.has_polarity(p),
391            (SessionType::Recv { cont, .. }, Polarity::Consumer) => cont.has_polarity(p),
392            (SessionType::Select(arms), Polarity::Producer) => {
393                arms.values().all(|s| s.has_polarity(p))
394            }
395            (SessionType::Branch(arms), Polarity::Consumer) => {
396                arms.values().all(|s| s.has_polarity(p))
397            }
398            (SessionType::Rec(_, body), _) => body.has_polarity(p),
399            // Wrong-polarity head: `Send` in Consumer fragment, `Recv` in
400            // Producer fragment, `Branch` in Producer fragment, `Select`
401            // in Consumer fragment. Each one immediately disqualifies the
402            // type from the single-polarity SSE projection.
403            _ => false,
404        }
405    }
406}
407
408/// Which side of an SSE-projectable connection a session type describes
409/// — the **producer** (server-side, only sends/selects) or the
410/// **consumer** (client-side, only receives/branches). Used by
411/// [`SessionType::has_polarity`] to discharge the §4.4 SSE-fragment
412/// predicate `Π↓(S_WS) = S_SSE`.
413#[derive(Debug, Clone, Copy, PartialEq, Eq)]
414pub enum Polarity {
415    /// Server-side: only outbound actions (`Send`, `Select`).
416    Producer,
417    /// Client-side: only inbound actions (`Recv`, `Branch`).
418    Consumer,
419}
420
421impl Polarity {
422    /// The dual of this polarity — used to express the connection-law
423    /// preservation: an SSE-projectable session has a Producer role and
424    /// a Consumer role, related by duality.
425    pub fn flip(self) -> Self {
426        match self {
427            Polarity::Producer => Polarity::Consumer,
428            Polarity::Consumer => Polarity::Producer,
429        }
430    }
431}
432
433/// The Presburger discharge's negative verdict — the witness of an
434/// unconformant credit constraint. Surfaced verbatim by the type checker.
435#[derive(Debug, Clone, PartialEq, Eq)]
436pub enum CreditError {
437    /// An explicit `!⁰A.S` — the "no rule at n=0" axiom rejects it.
438    SendAtZero { payload: Payload },
439    /// A straight-line send burst exceeds the budget `k`: at the offending
440    /// send the abstract credit window would fall below 0.
441    BurstOverflow { payload: Payload, budget: u64, burst: u64 },
442    /// A recursive body has Δ > 0 (per iteration drains the window): no finite
443    /// budget makes unbounded iteration sound.
444    LoopUnsustainable { sends_per_iter: u64, recvs_per_iter: u64 },
445}
446
447impl fmt::Display for CreditError {
448    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
449        match self {
450            CreditError::SendAtZero { payload } => {
451                write!(f, "send `{payload}` at credit n=0 has no typing rule (D2, §4.2)")
452            }
453            CreditError::BurstOverflow { payload, budget, burst } => write!(
454                f,
455                "credit-window overflow at send `{payload}`: the protocol requires a \
456                 send-burst of {burst} but the socket's `credit({budget})` cannot absorb it"
457            ),
458            CreditError::LoopUnsustainable { sends_per_iter, recvs_per_iter } => write!(
459                f,
460                "recursive body is unsustainable: Δ = {sends_per_iter} - {recvs_per_iter} > 0 \
461                 (no finite credit window keeps unbounded iteration in flight)"
462            ),
463        }
464    }
465}
466
467/// Abstract-interpretation walker for the credit constraint. `available` is the
468/// current window snapshot; `budget` is the maximum (recv refills are capped at
469/// budget, the standard credit-flow semantics). Returns the available credit
470/// at the end of the executed branch (the *minimum* across choice arms so the
471/// caller sees the worst-case continuation).
472fn credit_walk(t: &SessionType, available: i64, budget: i64) -> Result<i64, CreditError> {
473    match t {
474        SessionType::End => Ok(available),
475        SessionType::Send { payload, cont, .. } => {
476            let next = available - 1;
477            if next < 0 {
478                return Err(CreditError::BurstOverflow {
479                    payload: payload.clone(),
480                    budget: budget as u64,
481                    burst: (budget - available + 1) as u64,
482                });
483            }
484            credit_walk(cont, next, budget)
485        }
486        SessionType::Recv { cont, .. } => {
487            // A recv refills one credit, capped at the budget (TCP-window
488            // semantics: the receiver never accumulates more than `k`).
489            let next = (available + 1).min(budget);
490            credit_walk(cont, next, budget)
491        }
492        SessionType::Select(m) | SessionType::Branch(m) => {
493            // Each arm must be conformant on its own; the conservative
494            // post-state is the minimum (worst case) across arms.
495            let mut worst = available;
496            for arm in m.values() {
497                let post = credit_walk(arm, available, budget)?;
498                if post < worst {
499                    worst = post;
500                }
501            }
502            Ok(worst)
503        }
504        SessionType::Rec(x, body) => {
505            // Loop sustainability (Presburger fixpoint): for every recurring
506            // path back to `Var(x)`, the per-iteration net send count must
507            // satisfy `Δ = #send − #recv ≤ 0`. A non-recurring arm (one that
508            // terminates in `end`) is exempt — it executes at most once.
509            // If *any* recurring path has Δ > 0, the window strictly drains
510            // on that iteration and no finite `k` is sufficient → reject.
511            for (s, r) in body.recurring_paths(x) {
512                if s > r {
513                    return Err(CreditError::LoopUnsustainable {
514                        sends_per_iter: s,
515                        recvs_per_iter: r,
516                    });
517                }
518            }
519            // Walk one iteration so a burst inside the body is surfaced even
520            // when the loop is sustainable on net (Δ ≤ 0 doesn't bound peak).
521            credit_walk(body, available, budget)
522        }
523        SessionType::Var(_) => {
524            // Recursion re-entry: nothing further to walk on this iteration;
525            // the fixpoint check above already vetted sustainability.
526            Ok(available)
527        }
528    }
529}
530
531/// Enumerate `(#send, #recv)` for every path from `t` that reaches `Var(x)`
532/// (the loop-recurring traces). Paths that hit `End` or a free `Var(y≠x)` are
533/// dropped — they exit the loop, not iterate. A shadowing `Rec(x, _)` cuts the
534/// descent (the inner binder re-captures the name). Total in linear time.
535fn recurring_paths_into(t: &SessionType, x: &str, s: u64, r: u64, out: &mut Vec<(u64, u64)>) {
536    match t {
537        SessionType::End => {} // terminates — not a recurring path
538        SessionType::Var(y) if y == x => out.push((s, r)),
539        SessionType::Var(_) => {} // a free var that isn't our loop's
540        SessionType::Send { cont, .. } => recurring_paths_into(cont, x, s + 1, r, out),
541        SessionType::Recv { cont, .. } => recurring_paths_into(cont, x, s, r + 1, out),
542        SessionType::Select(m) | SessionType::Branch(m) => {
543            // Each arm is its own trace — descend into all of them.
544            for arm in m.values() {
545                recurring_paths_into(arm, x, s, r, out);
546            }
547        }
548        SessionType::Rec(y, body) if y != x => recurring_paths_into(body, x, s, r, out),
549        SessionType::Rec(_, _) => {} // shadows x — its inner Var refers to itself
550    }
551}
552
553fn dual_map(m: &BTreeMap<String, SessionType>) -> BTreeMap<String, SessionType> {
554    m.iter().map(|(l, s)| (l.clone(), s.dual())).collect()
555}
556
557fn subst_map(m: &BTreeMap<String, SessionType>, var: &str, repl: &SessionType) -> BTreeMap<String, SessionType> {
558    m.iter().map(|(l, s)| (l.clone(), s.subst(var, repl))).collect()
559}
560
561fn equiv_inner(s: &SessionType, t: &SessionType, assumed: &mut Vec<(SessionType, SessionType)>) -> bool {
562    // Coinduction: a pair we are already proving equal is taken as equal.
563    if assumed.iter().any(|(x, y)| x == s && y == t) {
564        return true;
565    }
566    assumed.push((s.clone(), t.clone()));
567
568    match (s.unfold_head(), t.unfold_head()) {
569        (SessionType::End, SessionType::End) => true,
570        (
571            SessionType::Send { payload: a, credit: ca, cont: sk },
572            SessionType::Send { payload: b, credit: cb, cont: tk },
573        ) => a == b && ca == cb && equiv_inner(&sk, &tk, assumed),
574        (
575            SessionType::Recv { payload: a, credit: ca, cont: sk },
576            SessionType::Recv { payload: b, credit: cb, cont: tk },
577        ) => a == b && ca == cb && equiv_inner(&sk, &tk, assumed),
578        (SessionType::Select(m1), SessionType::Select(m2)) => equiv_maps(&m1, &m2, assumed),
579        (SessionType::Branch(m1), SessionType::Branch(m2)) => equiv_maps(&m1, &m2, assumed),
580        // A bare `Var` survives unfolding only if it is free (open type); compare
581        // nominally. Closed, contractive types never reach this with a head Var.
582        (SessionType::Var(x), SessionType::Var(y)) => x == y,
583        _ => false,
584    }
585}
586
587fn equiv_maps(
588    m1: &BTreeMap<String, SessionType>,
589    m2: &BTreeMap<String, SessionType>,
590    assumed: &mut Vec<(SessionType, SessionType)>,
591) -> bool {
592    // Same label set, and equal continuations label-by-label.
593    if m1.len() != m2.len() || !m1.keys().all(|l| m2.contains_key(l)) {
594        return false;
595    }
596    m1.iter().all(|(l, s1)| equiv_inner(s1, &m2[l], assumed))
597}
598
599impl fmt::Display for SessionType {
600    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
601        match self {
602            SessionType::End => f.write_str("end"),
603            SessionType::Send { payload, credit, cont } => match credit {
604                Some(n) => write!(f, "!^{n}{payload}.{cont}"),
605                None => write!(f, "!{payload}.{cont}"),
606            },
607            SessionType::Recv { payload, credit, cont } => match credit {
608                Some(n) => write!(f, "?^{n}{payload}.{cont}"),
609                None => write!(f, "?{payload}.{cont}"),
610            },
611            SessionType::Select(m) => write_choice(f, "+", m),
612            SessionType::Branch(m) => write_choice(f, "&", m),
613            SessionType::Rec(x, b) => write!(f, "rec {x}.{b}"),
614            SessionType::Var(x) => f.write_str(x),
615        }
616    }
617}
618
619fn write_choice(f: &mut fmt::Formatter<'_>, sym: &str, m: &BTreeMap<String, SessionType>) -> fmt::Result {
620    write!(f, "{sym}{{")?;
621    for (i, (l, s)) in m.iter().enumerate() {
622        if i > 0 {
623            f.write_str(", ")?;
624        }
625        write!(f, "{l}: {s}")?;
626    }
627    f.write_str("}")
628}
629
630#[cfg(test)]
631mod tests {
632    use super::*;
633
634    // Helpers for readable session-type literals.
635    fn sel(pairs: &[(&str, SessionType)]) -> SessionType {
636        SessionType::select(pairs.iter().map(|(l, s)| (l.to_string(), s.clone())))
637    }
638    fn brn(pairs: &[(&str, SessionType)]) -> SessionType {
639        SessionType::branch(pairs.iter().map(|(l, s)| (l.to_string(), s.clone())))
640    }
641
642    // ── Duality basics ─────────────────────────────────────────────────────
643
644    #[test]
645    fn dual_swaps_send_recv_and_keeps_payload() {
646        let s = SessionType::send("Int", SessionType::End);
647        assert_eq!(s.dual(), SessionType::recv("Int", SessionType::End));
648        // The payload (`Int`) is unchanged — only the direction flips.
649        assert_eq!(SessionType::recv("Int", SessionType::End).dual(), s);
650    }
651
652    #[test]
653    fn dual_swaps_select_branch() {
654        let s = sel(&[("a", SessionType::End), ("b", SessionType::send("T", SessionType::End))]);
655        let d = s.dual();
656        assert!(matches!(d, SessionType::Branch(_)));
657        // …and the continuations are dualised too.
658        assert_eq!(d, brn(&[("a", SessionType::End), ("b", SessionType::recv("T", SessionType::End))]));
659    }
660
661    // ── Involutivity: (S⊥)⊥ ≡ S — the cornerstone of the connection law ─────
662
663    #[test]
664    fn duality_is_an_involution() {
665        let samples = vec![
666            SessionType::End,
667            SessionType::send("A", SessionType::recv("B", SessionType::End)),
668            sel(&[("x", SessionType::End), ("y", SessionType::recv("Q", SessionType::End))]),
669            // recursive: rec X. !Msg. &{ more: X, done: end }
670            SessionType::rec(
671                "X",
672                SessionType::send("Msg", brn(&[("more", SessionType::var("X")), ("done", SessionType::End)])),
673            ),
674        ];
675        for s in samples {
676            assert!(s.dual().dual().equiv(&s), "(S⊥)⊥ ≢ S for {s}");
677        }
678    }
679
680    // ── The connection law: S is dual to S⊥, and only to S⊥ ─────────────────
681
682    #[test]
683    fn connection_law_holds_for_dual_and_fails_otherwise() {
684        let s = SessionType::send("Q", SessionType::recv("R", SessionType::End));
685        assert!(s.is_dual_to(&s.dual()), "a session must be dual to its own dual");
686        // Not dual to itself (it sends where the peer must receive).
687        assert!(!s.is_dual_to(&s));
688        // Not dual to a peer with a mismatched payload.
689        let wrong = SessionType::recv("Q", SessionType::send("WRONG", SessionType::End));
690        assert!(!s.is_dual_to(&wrong));
691    }
692
693    // ── Regular-coinductive equality: fold/unfold + α-renaming ──────────────
694
695    #[test]
696    fn equirecursive_fold_unfold_equality() {
697        // μX. !A.X  ≡  !A.(μX. !A.X)   — one unfolding is equal.
698        let folded = SessionType::rec("X", SessionType::send("A", SessionType::var("X")));
699        let unfolded = SessionType::send("A", folded.clone());
700        assert!(folded.equiv(&unfolded));
701        assert!(unfolded.equiv(&folded));
702    }
703
704    #[test]
705    fn equality_is_insensitive_to_bound_variable_name() {
706        let x = SessionType::rec("X", SessionType::send("A", SessionType::var("X")));
707        let y = SessionType::rec("Y", SessionType::send("A", SessionType::var("Y")));
708        assert!(x.equiv(&y), "α-equivalent recursive sessions must be equal");
709    }
710
711    #[test]
712    fn equality_reflexive_and_rejects_real_differences() {
713        let s = sel(&[("a", SessionType::send("T", SessionType::End)), ("b", SessionType::End)]);
714        assert!(s.equiv(&s));
715        // Direction differs.
716        assert!(!SessionType::send("T", SessionType::End).equiv(&SessionType::recv("T", SessionType::End)));
717        // Payload differs.
718        assert!(!SessionType::send("A", SessionType::End).equiv(&SessionType::send("B", SessionType::End)));
719        // Label set differs.
720        let s2 = sel(&[("a", SessionType::send("T", SessionType::End)), ("c", SessionType::End)]);
721        assert!(!s.equiv(&s2));
722        // Choice kind differs (select vs branch).
723        assert!(!sel(&[("a", SessionType::End)]).equiv(&brn(&[("a", SessionType::End)])));
724    }
725
726    #[test]
727    fn connection_law_holds_for_recursive_dialogue() {
728        // A realistic chat dialogue: rec X. +{ ask: !Utterance. &{ token: ?Token.X, done: end }, cancel: end }
729        let client = SessionType::rec(
730            "X",
731            sel(&[
732                (
733                    "ask",
734                    SessionType::send(
735                        "Utterance",
736                        brn(&[("token", SessionType::recv("Token", SessionType::var("X"))), ("done", SessionType::End)]),
737                    ),
738                ),
739                ("cancel", SessionType::End),
740            ]),
741        );
742        // The server endpoint is the structural dual; the law must accept it,
743        // and reject the (non-dual) identical copy.
744        assert!(client.is_dual_to(&client.dual()));
745        assert!(!client.is_dual_to(&client));
746        // equiv terminates on this recursive type (no stack blow-up).
747        assert!(client.equiv(&client));
748    }
749
750    #[test]
751    fn display_is_readable() {
752        let s = SessionType::send("Int", SessionType::recv("Bool", SessionType::End));
753        assert_eq!(s.to_string(), "!Int.?Bool.end");
754        assert_eq!(SessionType::rec("X", SessionType::var("X")).to_string(), "rec X.X");
755    }
756
757    // ── Fase 41.c — credit-refined backpressure (D2) ─────────────────────────
758
759    #[test]
760    fn dual_preserves_credit_index() {
761        // (!ⁿA.S)⊥ = ?ⁿA.S⊥ — same credit, opposite direction.
762        let s = SessionType::send_credit("Msg", 7, SessionType::End);
763        assert_eq!(s.dual(), SessionType::recv_credit("Msg", 7, SessionType::End));
764        // Round-trip preserves the credit through both polarities.
765        assert!(s.dual().dual().equiv(&s));
766    }
767
768    #[test]
769    fn equality_distinguishes_credit_index() {
770        // Different numeric credit ⇒ structurally distinct types.
771        let a = SessionType::send_credit("T", 1, SessionType::End);
772        let b = SessionType::send_credit("T", 2, SessionType::End);
773        assert!(!a.equiv(&b));
774        // Unbounded (credit=None) is distinct from any stamped credit.
775        let unbounded = SessionType::send("T", SessionType::End);
776        assert!(!a.equiv(&unbounded));
777    }
778
779    #[test]
780    fn with_credit_stamps_every_send_and_recv() {
781        let bare = SessionType::send(
782            "A",
783            SessionType::recv("B", SessionType::send("C", SessionType::End)),
784        );
785        let stamped = bare.with_credit(4);
786        let expected = SessionType::send_credit(
787            "A",
788            4,
789            SessionType::recv_credit("B", 4, SessionType::send_credit("C", 4, SessionType::End)),
790        );
791        assert_eq!(stamped, expected);
792        // Idempotent on already-stamped.
793        assert_eq!(stamped.with_credit(4), stamped);
794    }
795
796    #[test]
797    fn has_send_at_zero_finds_the_unprovable_send() {
798        let bad = SessionType::recv(
799            "Q",
800            SessionType::send_credit("Boom", 0, SessionType::End),
801        );
802        assert_eq!(bad.has_send_at_zero(), Some(Payload::new("Boom")));
803        // A protocol with no `!⁰…` is clean.
804        let ok = SessionType::send_credit("A", 3, SessionType::End);
805        assert_eq!(ok.has_send_at_zero(), None);
806        // Sends inside choice arms are reached.
807        let choice = sel(&[("ask", SessionType::send_credit("X", 0, SessionType::End))]);
808        assert_eq!(choice.has_send_at_zero(), Some(Payload::new("X")));
809    }
810
811    // ── The Presburger discharge: credit_analyse(budget) ────────────────────
812
813    #[test]
814    fn credit_analyse_accepts_a_straight_line_protocol_within_budget() {
815        // Two consecutive sends; budget = 2 ⇒ enough window.
816        let s = SessionType::send("A", SessionType::send("B", SessionType::End));
817        assert!(s.credit_analyse(2).is_ok());
818    }
819
820    #[test]
821    fn credit_analyse_rejects_burst_overflow() {
822        // Three sends in a row; budget = 2 ⇒ the third send hits available = 0.
823        let s = SessionType::send(
824            "A",
825            SessionType::send("B", SessionType::send("C", SessionType::End)),
826        );
827        match s.credit_analyse(2) {
828            Err(CreditError::BurstOverflow { payload, budget: 2, .. }) => {
829                assert_eq!(payload, Payload::new("C"));
830            }
831            other => panic!("expected BurstOverflow, got {other:?}"),
832        }
833    }
834
835    #[test]
836    fn credit_analyse_rejects_explicit_send_at_zero() {
837        let s = SessionType::send_credit("X", 0, SessionType::End);
838        match s.credit_analyse(8) {
839            Err(CreditError::SendAtZero { payload }) => assert_eq!(payload, Payload::new("X")),
840            other => panic!("expected SendAtZero, got {other:?}"),
841        }
842    }
843
844    #[test]
845    fn credit_analyse_rejects_unsustainable_loop() {
846        // rec X. !A.!B.?Ack.X — Δ = 2 - 1 = 1 > 0; no finite budget keeps
847        // unbounded iteration in flight.
848        let s = SessionType::rec(
849            "X",
850            SessionType::send(
851                "A",
852                SessionType::send("B", SessionType::recv("Ack", SessionType::var("X"))),
853            ),
854        );
855        match s.credit_analyse(100) {
856            Err(CreditError::LoopUnsustainable { sends_per_iter: 2, recvs_per_iter: 1 }) => {}
857            other => panic!("expected LoopUnsustainable(2,1), got {other:?}"),
858        }
859    }
860
861    #[test]
862    fn credit_analyse_accepts_a_balanced_loop() {
863        // rec X. !A.?Ack.X — Δ = 1 - 1 = 0; sustainable under budget ≥ 1.
864        let s = SessionType::rec(
865            "X",
866            SessionType::send("A", SessionType::recv("Ack", SessionType::var("X"))),
867        );
868        assert!(s.credit_analyse(1).is_ok());
869        assert!(s.credit_analyse(8).is_ok());
870    }
871
872    #[test]
873    fn credit_analyse_walks_choice_arms_worst_case() {
874        // +{ ask: !A.!B.end, quit: end } — the ask arm needs window 2.
875        let s = sel(&[
876            ("ask", SessionType::send("A", SessionType::send("B", SessionType::End))),
877            ("quit", SessionType::End),
878        ]);
879        assert!(s.credit_analyse(2).is_ok()); // both arms fit
880        assert!(matches!(
881            s.credit_analyse(1),
882            Err(CreditError::BurstOverflow { .. })
883        ));
884    }
885
886    #[test]
887    fn credit_delta_counts_per_iteration() {
888        // rec X. !A.!B.?Ack.X — the body's single recurring path has Δ = (2, 1).
889        let body = SessionType::send(
890            "A",
891            SessionType::send("B", SessionType::recv("Ack", SessionType::var("X"))),
892        );
893        assert_eq!(body.credit_delta("X"), (2, 1));
894        // Non-recurring tail (no Var(X)) yields no recurring paths → (0, 0).
895        let non_recurring = SessionType::send("A", SessionType::End);
896        assert_eq!(non_recurring.credit_delta("X"), (0, 0));
897        // Choice: only the recurring arm contributes; `cancel: end` is exempt.
898        let body_chat = sel(&[
899            (
900                "ask",
901                SessionType::send("U", SessionType::recv("Tok", SessionType::var("X"))),
902            ),
903            ("cancel", SessionType::End),
904        ]);
905        assert_eq!(body_chat.credit_delta("X"), (1, 1));
906    }
907
908    // ── Fase 41.e — SSE-as-fragment unification (D3) ─────────────────────
909
910    #[test]
911    fn pure_send_chain_is_in_the_sse_producer_fragment() {
912        // !A.!B.end — the canonical SSE shape: a server emits a sequence
913        // of events, no client input.
914        let s = SessionType::send("A", SessionType::send("B", SessionType::End));
915        assert!(s.projects_to_sse());
916        // Its dual is the consumer-side, in the dual SSE fragment.
917        assert!(s.dual().projects_to_sse_consumer());
918    }
919
920    #[test]
921    fn any_recv_disqualifies_the_producer_fragment() {
922        // Even one `Recv` makes the type two-polarity.
923        let s = SessionType::send("Q", SessionType::recv("Ack", SessionType::End));
924        assert!(!s.projects_to_sse());
925        // …and the dual still has the offending direction, just flipped.
926        assert!(!s.dual().projects_to_sse_consumer());
927    }
928
929    #[test]
930    fn branch_disqualifies_the_producer_fragment() {
931        // Branch = client picks. The producer (server) cannot offer one.
932        let s = SessionType::branch([("ack".into(), SessionType::End)]);
933        assert!(!s.projects_to_sse());
934        // The dual is a Select, which IS in the producer fragment.
935        assert!(s.dual().projects_to_sse());
936    }
937
938    #[test]
939    fn select_is_in_the_producer_fragment_iff_all_arms_are() {
940        // ⊕{a: !A.end, b: end} — pure server-side internal choice.
941        let ok = sel(&[
942            ("a", SessionType::send("A", SessionType::End)),
943            ("b", SessionType::End),
944        ]);
945        assert!(ok.projects_to_sse());
946        // ⊕{a: !A.end, b: ?Q.end} — one arm asks for client input,
947        // disqualifies the entire choice.
948        let bad = sel(&[
949            ("a", SessionType::send("A", SessionType::End)),
950            ("b", SessionType::recv("Q", SessionType::End)),
951        ]);
952        assert!(!bad.projects_to_sse());
953    }
954
955    #[test]
956    fn recursive_sse_token_stream_is_in_the_producer_fragment() {
957        // rec X. !Token.X — an unbounded SSE token stream. The canonical
958        // example: every Fase 33 server-token stream is exactly this
959        // type (modulo the closing `end` we typically wrap it in).
960        let s = SessionType::rec(
961            "X",
962            SessionType::send("Token", SessionType::var("X")),
963        );
964        assert!(s.projects_to_sse());
965        // …and the dual SSE-consumer view is `rec X. ?Token.X`.
966        assert!(s.dual().projects_to_sse_consumer());
967    }
968
969    #[test]
970    fn the_two_polarities_partition_the_sse_projectable_space() {
971        // For every S, S.projects_to_sse() ⇔ S.dual().projects_to_sse_consumer().
972        // We sample a handful of producer-side shapes + the negative cases.
973        let samples_producer: Vec<SessionType> = vec![
974            SessionType::End,
975            SessionType::send("A", SessionType::End),
976            sel(&[("x", SessionType::End), ("y", SessionType::send("T", SessionType::End))]),
977            SessionType::rec("X", SessionType::send("T", SessionType::var("X"))),
978        ];
979        for s in samples_producer {
980            assert!(s.projects_to_sse(), "{s} should project to SSE producer");
981            assert!(s.dual().projects_to_sse_consumer(), "{s}⊥ should project to SSE consumer");
982        }
983        let samples_non_sse: Vec<SessionType> = vec![
984            SessionType::recv("A", SessionType::send("B", SessionType::End)),
985            SessionType::send("A", SessionType::recv("B", SessionType::End)),
986            brn(&[("x", SessionType::End)]),
987        ];
988        for s in samples_non_sse {
989            assert!(!s.projects_to_sse(), "{s} should NOT project to SSE producer");
990        }
991    }
992
993    #[test]
994    fn polarity_flip_is_an_involution() {
995        assert_eq!(Polarity::Producer.flip(), Polarity::Consumer);
996        assert_eq!(Polarity::Consumer.flip(), Polarity::Producer);
997        for p in [Polarity::Producer, Polarity::Consumer] {
998            assert_eq!(p.flip().flip(), p);
999        }
1000    }
1001
1002    #[test]
1003    fn credit_analyse_is_total_on_realistic_chat_dialogue() {
1004        // The 41.a chat sample: rec X. +{ ask: !Utterance. &{ token: ?Token.X,
1005        // done: end }, cancel: end }. Worst-case arm has Δ = 1 - 1 = 0.
1006        let client = SessionType::rec(
1007            "X",
1008            sel(&[
1009                (
1010                    "ask",
1011                    SessionType::send(
1012                        "Utterance",
1013                        brn(&[
1014                            ("token", SessionType::recv("Token", SessionType::var("X"))),
1015                            ("done", SessionType::End),
1016                        ]),
1017                    ),
1018                ),
1019                ("cancel", SessionType::End),
1020            ]),
1021        );
1022        assert!(client.credit_analyse(4).is_ok());
1023        // The dual receiver also conforms — symmetric credit.
1024        assert!(client.dual().credit_analyse(4).is_ok());
1025    }
1026}