use std::collections::{BTreeMap, BTreeSet};
use std::fmt;
use serde::{Deserialize, Serialize};
use crate::session::{Payload, SessionType};
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
#[serde(transparent)]
pub struct Role(pub String);
impl Role {
pub fn new(name: impl Into<String>) -> Self {
Role(name.into())
}
}
impl fmt::Display for Role {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str(&self.0)
}
}
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub enum GlobalType {
End,
Message {
from: Role,
to: Role,
payload: Payload,
cont: Box<GlobalType>,
},
Choice {
from: Role,
to: Role,
arms: BTreeMap<String, GlobalType>,
},
Rec(String, Box<GlobalType>),
Var(String),
}
impl GlobalType {
pub fn message(
from: impl Into<String>,
to: impl Into<String>,
payload: impl Into<String>,
cont: GlobalType,
) -> Self {
GlobalType::Message {
from: Role::new(from),
to: Role::new(to),
payload: Payload(payload.into()),
cont: Box::new(cont),
}
}
pub fn choice(
from: impl Into<String>,
to: impl Into<String>,
arms: impl IntoIterator<Item = (String, GlobalType)>,
) -> Self {
GlobalType::Choice {
from: Role::new(from),
to: Role::new(to),
arms: arms.into_iter().collect(),
}
}
pub fn rec(var: impl Into<String>, body: GlobalType) -> Self {
GlobalType::Rec(var.into(), Box::new(body))
}
pub fn var(name: impl Into<String>) -> Self {
GlobalType::Var(name.into())
}
pub fn roles(&self) -> BTreeSet<Role> {
let mut out = BTreeSet::new();
collect_roles(self, &mut out);
out
}
pub fn project(&self, r: &Role) -> Result<SessionType, ProjectionError> {
project_inner(self, r)
}
pub fn project_all(&self) -> Result<BTreeMap<Role, SessionType>, ProjectionError> {
let mut out = BTreeMap::new();
for role in self.roles() {
let local = self.project(&role)?;
out.insert(role, local);
}
Ok(out)
}
pub fn is_safely_realizable(&self) -> bool {
self.project_all().is_ok()
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum ProjectionError {
SelfMessage { role: Role },
EmptyChoice { from: Role, to: Role },
MergeFailed {
role: Role,
labels: (String, String),
left: Box<SessionType>,
right: Box<SessionType>,
},
UnboundVariable(String),
}
impl fmt::Display for ProjectionError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
ProjectionError::SelfMessage { role } => write!(
f,
"global protocol has a self-message at role `{role}` — \
a global type's projection has no rule for `p → p`"
),
ProjectionError::EmptyChoice { from, to } => write!(
f,
"global choice `{from} → {to}` has no arms — \
the projection for `{from}` is the empty internal-choice ⊕{{}}"
),
ProjectionError::MergeFailed { role, labels, left, right } => write!(
f,
"global type is not safely realizable: role `{role}` cannot \
observe the choice — arm `{}` projects to `{left}` and arm `{}` \
projects to `{right}` (they must be ≡ for {role} to stay in step)",
labels.0, labels.1
),
ProjectionError::UnboundVariable(var) => write!(
f,
"global type has a free recursion variable `{var}` — \
no enclosing `μ{var}. _` binds it"
),
}
}
}
impl std::error::Error for ProjectionError {}
fn collect_roles(g: &GlobalType, out: &mut BTreeSet<Role>) {
match g {
GlobalType::End | GlobalType::Var(_) => {}
GlobalType::Message { from, to, cont, .. } => {
out.insert(from.clone());
out.insert(to.clone());
collect_roles(cont, out);
}
GlobalType::Choice { from, to, arms } => {
out.insert(from.clone());
out.insert(to.clone());
for g in arms.values() {
collect_roles(g, out);
}
}
GlobalType::Rec(_, body) => collect_roles(body, out),
}
}
fn project_inner(g: &GlobalType, r: &Role) -> Result<SessionType, ProjectionError> {
match g {
GlobalType::End => Ok(SessionType::End),
GlobalType::Message { from, to, payload, cont } => {
if from == to {
return Err(ProjectionError::SelfMessage { role: from.clone() });
}
let k = project_inner(cont, r)?;
if r == from {
Ok(SessionType::Send {
payload: payload.clone(),
credit: None,
cont: Box::new(k),
})
} else if r == to {
Ok(SessionType::Recv {
payload: payload.clone(),
credit: None,
cont: Box::new(k),
})
} else {
Ok(k)
}
}
GlobalType::Choice { from, to, arms } => {
if from == to {
return Err(ProjectionError::SelfMessage { role: from.clone() });
}
if arms.is_empty() {
return Err(ProjectionError::EmptyChoice {
from: from.clone(),
to: to.clone(),
});
}
let mut arm_projections: Vec<(&String, SessionType)> = Vec::with_capacity(arms.len());
for (label, arm) in arms {
let local = project_inner(arm, r)?;
arm_projections.push((label, local));
}
if r == from {
let map: BTreeMap<String, SessionType> = arm_projections
.into_iter()
.map(|(l, s)| (l.clone(), s))
.collect();
Ok(SessionType::Select(map))
} else if r == to {
let map: BTreeMap<String, SessionType> = arm_projections
.into_iter()
.map(|(l, s)| (l.clone(), s))
.collect();
Ok(SessionType::Branch(map))
} else {
let mut iter = arm_projections.into_iter();
let (first_label, first_proj) =
iter.next().expect("non-empty arms (checked above)");
let mut canonical = first_proj;
let canonical_label = first_label.clone();
for (label, proj) in iter {
if !canonical.equiv(&proj) {
return Err(ProjectionError::MergeFailed {
role: r.clone(),
labels: ordered_pair(canonical_label.clone(), label.clone()),
left: Box::new(canonical),
right: Box::new(proj),
});
}
}
Ok(canonical)
}
}
GlobalType::Rec(var, body) => {
let mut body_roles = BTreeSet::new();
collect_roles(body, &mut body_roles);
if !body_roles.contains(r) {
return Ok(SessionType::End);
}
let inner = project_inner(body, r)?;
if contains_var(&inner, var) {
Ok(SessionType::Rec(var.clone(), Box::new(inner)))
} else {
Ok(inner)
}
}
GlobalType::Var(var) => Ok(SessionType::Var(var.clone())),
}
}
fn contains_var(t: &SessionType, var: &str) -> bool {
match t {
SessionType::End => false,
SessionType::Send { cont, .. } | SessionType::Recv { cont, .. } => {
contains_var(cont, var)
}
SessionType::Select(m) | SessionType::Branch(m) => {
m.values().any(|s| contains_var(s, var))
}
SessionType::Rec(y, b) if y == var => false, SessionType::Rec(_, b) => contains_var(b, var),
SessionType::Var(x) => x == var,
}
}
fn ordered_pair(a: String, b: String) -> (String, String) {
if a <= b {
(a, b)
} else {
(b, a)
}
}
#[cfg(test)]
mod tests {
use super::*;
fn r(s: &str) -> Role {
Role::new(s)
}
#[test]
fn project_sender_yields_send() {
let g = GlobalType::message("A", "B", "Msg", GlobalType::End);
assert_eq!(
g.project(&r("A")).unwrap(),
SessionType::send("Msg", SessionType::End)
);
}
#[test]
fn project_receiver_yields_recv() {
let g = GlobalType::message("A", "B", "Msg", GlobalType::End);
assert_eq!(
g.project(&r("B")).unwrap(),
SessionType::recv("Msg", SessionType::End)
);
}
#[test]
fn projection_for_uninvolved_role_skips_the_message() {
let g = GlobalType::message("A", "B", "Msg", GlobalType::End);
assert_eq!(g.project(&r("C")).unwrap(), SessionType::End);
let g2 = GlobalType::message(
"A",
"B",
"Msg",
GlobalType::message("B", "A", "Ack", GlobalType::End),
);
assert_eq!(g2.project(&r("C")).unwrap(), SessionType::End);
}
#[test]
fn project_chooser_yields_internal_choice() {
let g = GlobalType::choice(
"A",
"B",
[("yes".into(), GlobalType::End), ("no".into(), GlobalType::End)],
);
let p = g.project(&r("A")).unwrap();
assert!(matches!(p, SessionType::Select(_)));
}
#[test]
fn project_offerer_yields_external_choice() {
let g = GlobalType::choice(
"A",
"B",
[("yes".into(), GlobalType::End), ("no".into(), GlobalType::End)],
);
let p = g.project(&r("B")).unwrap();
assert!(matches!(p, SessionType::Branch(_)));
}
#[test]
fn merge_condition_passes_when_uninvolved_role_sees_equivalent_arms() {
let arm = GlobalType::message("C", "D", "T", GlobalType::End);
let g = GlobalType::choice(
"A",
"B",
[("yes".into(), arm.clone()), ("no".into(), arm)],
);
let pc = g.project(&r("C")).unwrap();
assert_eq!(pc, SessionType::send("T", SessionType::End));
let pd = g.project(&r("D")).unwrap();
assert_eq!(pd, SessionType::recv("T", SessionType::End));
}
#[test]
fn merge_condition_fails_when_uninvolved_role_sees_divergent_arms() {
let g = GlobalType::choice(
"A",
"B",
[
("no".into(), GlobalType::End),
(
"yes".into(),
GlobalType::message("C", "D", "T", GlobalType::End),
),
],
);
match g.project(&r("C")) {
Err(ProjectionError::MergeFailed { role, labels, .. }) => {
assert_eq!(role, r("C"));
assert_eq!(labels, ("no".into(), "yes".into()));
}
other => panic!("expected MergeFailed for C, got {other:?}"),
}
}
#[test]
fn recursion_round_trips_through_projection_for_an_iterating_role() {
let g = GlobalType::rec(
"X",
GlobalType::message("A", "B", "T", GlobalType::var("X")),
);
let pa = g.project(&r("A")).unwrap();
assert_eq!(
pa,
SessionType::rec("X", SessionType::send("T", SessionType::var("X")))
);
let pb = g.project(&r("B")).unwrap();
assert_eq!(
pb,
SessionType::rec("X", SessionType::recv("T", SessionType::var("X")))
);
}
#[test]
fn projection_elides_vacuous_rec_for_a_non_iterating_role() {
let g = GlobalType::rec(
"X",
GlobalType::message("A", "B", "T", GlobalType::var("X")),
);
let pc = g.project(&r("C")).unwrap();
assert!(!matches!(pc, SessionType::Rec(_, _)));
}
#[test]
fn self_message_is_rejected() {
let g = GlobalType::message("A", "A", "T", GlobalType::End);
match g.project(&r("B")) {
Err(ProjectionError::SelfMessage { role }) => assert_eq!(role, r("A")),
other => panic!("expected SelfMessage, got {other:?}"),
}
}
#[test]
fn empty_choice_is_rejected() {
let g = GlobalType::choice("A", "B", []);
match g.project(&r("A")) {
Err(ProjectionError::EmptyChoice { from, to }) => {
assert_eq!(from, r("A"));
assert_eq!(to, r("B"));
}
other => panic!("expected EmptyChoice, got {other:?}"),
}
}
#[test]
fn roles_collects_every_participant() {
let g = GlobalType::message(
"User",
"Agent",
"Query",
GlobalType::message(
"Agent",
"Tool",
"Sub",
GlobalType::message(
"Tool",
"Agent",
"Resp",
GlobalType::message("Agent", "User", "Reply", GlobalType::End),
),
),
);
let roles = g.roles();
assert_eq!(roles, [r("Agent"), r("Tool"), r("User")].into_iter().collect());
}
#[test]
fn project_all_succeeds_on_a_safely_realizable_three_role_protocol() {
let g = GlobalType::message(
"User",
"Agent",
"Query",
GlobalType::message(
"Agent",
"Tool",
"Sub",
GlobalType::message(
"Tool",
"Agent",
"Resp",
GlobalType::message("Agent", "User", "Reply", GlobalType::End),
),
),
);
let all = g.project_all().expect("safely realizable");
assert_eq!(all.len(), 3);
assert!(g.is_safely_realizable());
let pu = &all[&r("User")];
assert_eq!(
pu,
&SessionType::send("Query", SessionType::recv("Reply", SessionType::End))
);
let pt = &all[&r("Tool")];
assert_eq!(
pt,
&SessionType::recv("Sub", SessionType::send("Resp", SessionType::End))
);
let pa = &all[&r("Agent")];
assert_eq!(
pa,
&SessionType::recv(
"Query",
SessionType::send(
"Sub",
SessionType::recv("Resp", SessionType::send("Reply", SessionType::End))
)
)
);
}
#[test]
fn is_safely_realizable_rejects_the_diverging_choice() {
let g = GlobalType::choice(
"A",
"B",
[
("yes".into(), GlobalType::message("C", "D", "T", GlobalType::End)),
("no".into(), GlobalType::End),
],
);
assert!(!g.is_safely_realizable());
}
#[test]
fn global_type_round_trips_through_json() {
let g = GlobalType::rec(
"X",
GlobalType::choice(
"Client",
"Server",
[
(
"ask".into(),
GlobalType::message(
"Server",
"Client",
"Token",
GlobalType::var("X"),
),
),
("cancel".into(), GlobalType::End),
],
),
);
let json = serde_json::to_string(&g).unwrap();
let back: GlobalType = serde_json::from_str(&json).unwrap();
assert_eq!(back, g);
}
#[test]
fn kivi_three_role_recursive_chat_projects_per_role() {
let g = GlobalType::rec(
"X",
GlobalType::message(
"User",
"Agent",
"Utterance",
GlobalType::message(
"Agent",
"Skill",
"Sub",
GlobalType::message(
"Skill",
"Agent",
"Resp",
GlobalType::message("Agent", "User", "Reply", GlobalType::var("X")),
),
),
),
);
let all = g.project_all().expect("3-role chat is safely realizable");
assert_eq!(all.len(), 3);
for role in [r("User"), r("Agent"), r("Skill")] {
assert!(
matches!(all[&role], SessionType::Rec(_, _)),
"{role} should iterate (got {})",
all[&role]
);
}
let skill = &all[&r("Skill")];
if let SessionType::Rec(_, body) = skill {
assert_eq!(
**body,
SessionType::recv("Sub", SessionType::send("Resp", SessionType::var("X")))
);
}
}
#[test]
fn safely_realizable_choice_projects_for_every_role() {
let g = GlobalType::choice(
"Agent",
"User",
[
(
"ack".into(),
GlobalType::message("Agent", "Skill", "T", GlobalType::End),
),
(
"nak".into(),
GlobalType::message("Agent", "Skill", "T", GlobalType::End),
),
],
);
let all = g.project_all().expect("uniform-continuation choice is realizable");
assert_eq!(all.len(), 3);
assert_eq!(
all[&r("Skill")],
SessionType::recv("T", SessionType::End)
);
assert!(matches!(all[&r("Agent")], SessionType::Select(_)));
assert!(matches!(all[&r("User")], SessionType::Branch(_)));
}
#[test]
fn contains_var_handles_shadowing_correctly() {
let t = SessionType::rec(
"Y",
SessionType::send("T", SessionType::var("X")),
);
assert!(contains_var(&t, "X"));
let t2 = SessionType::rec(
"X",
SessionType::send("T", SessionType::var("X")),
);
assert!(!contains_var(&t2, "X"));
}
}