1use std::collections::{HashMap, HashSet};
28use std::fmt;
29use std::sync::atomic::{AtomicBool, Ordering};
30use std::sync::{Arc, Mutex};
31use std::time::{SystemTime, UNIX_EPOCH};
32
33use tokio::sync::{mpsc, Mutex as AsyncMutex};
34
35use axon_frontend::ir_nodes::{IRChannel, IRProgram};
36
37#[derive(Debug, Clone, PartialEq, Eq)]
48pub enum TypedChannelError {
49 ChannelNotFound {
51 name: String,
52 registered: Vec<String>,
53 },
54 SchemaMismatch(String),
60 CapabilityGate(String),
63 LifetimeViolation { name: String, count: u32 },
69 Transport(String),
71}
72
73impl fmt::Display for TypedChannelError {
74 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
75 match self {
76 TypedChannelError::ChannelNotFound { name, registered } => write!(
77 f,
78 "channel '{name}' not in TypedChannelRegistry (registered: {registered:?})"
79 ),
80 TypedChannelError::SchemaMismatch(msg) => write!(f, "{msg}"),
81 TypedChannelError::CapabilityGate(msg) => write!(f, "{msg}"),
82 TypedChannelError::LifetimeViolation { name, count } => write!(
83 f,
84 "channel '{name}' is linear but has been consumed {count} times (linear ⇒ exactly once)"
85 ),
86 TypedChannelError::Transport(msg) => write!(f, "transport: {msg}"),
87 }
88 }
89}
90
91impl std::error::Error for TypedChannelError {}
92
93pub type Result<T> = std::result::Result<T, TypedChannelError>;
95
96#[derive(Debug, Clone, PartialEq)]
112pub struct Capability {
113 pub capability_id: String,
115 pub channel_name: String,
117 pub shield_ref: String,
119 pub delta_pub: f64,
121 pub issued_at: f64,
123}
124
125#[derive(Debug, Clone, PartialEq)]
143pub struct TypedChannelHandle {
144 pub name: String,
145 pub message: String,
147 pub qos: String,
148 pub lifetime: String,
149 pub persistence: String,
150 pub shield_ref: String,
151 pub consumed_count: u32,
153}
154
155impl TypedChannelHandle {
156 pub fn new(name: impl Into<String>, message: impl Into<String>) -> Self {
160 Self {
161 name: name.into(),
162 message: message.into(),
163 qos: "at_least_once".to_string(),
164 lifetime: "affine".to_string(),
165 persistence: "ephemeral".to_string(),
166 shield_ref: String::new(),
167 consumed_count: 0,
168 }
169 }
170
171 pub fn is_publishable(&self) -> bool {
173 !self.shield_ref.is_empty()
174 }
175
176 pub fn carries_channel(&self) -> bool {
178 self.message.starts_with("Channel<") && self.message.ends_with('>')
179 }
180
181 pub fn inner_message_type(&self) -> &str {
187 if !self.carries_channel() {
188 return &self.message;
189 }
190 &self.message["Channel<".len()..self.message.len() - 1]
191 }
192
193 pub fn from_ir(ir: &IRChannel) -> Self {
195 Self {
196 name: ir.name.clone(),
197 message: ir.message.clone(),
198 qos: ir.qos.clone(),
199 lifetime: ir.lifetime.clone(),
200 persistence: ir.persistence.clone(),
201 shield_ref: ir.shield_ref.clone(),
202 consumed_count: 0,
203 }
204 }
205}
206
207#[derive(Debug, Clone)]
216pub enum TypedPayload {
217 Scalar(serde_json::Value),
218 Handle(TypedChannelHandle),
219}
220
221impl TypedPayload {
222 pub fn scalar<V: Into<serde_json::Value>>(v: V) -> Self {
224 TypedPayload::Scalar(v.into())
225 }
226
227 pub fn handle(h: TypedChannelHandle) -> Self {
229 TypedPayload::Handle(h)
230 }
231
232 pub fn is_handle(&self) -> bool {
233 matches!(self, TypedPayload::Handle(_))
234 }
235}
236
237#[derive(Debug, Clone)]
239pub struct TypedEvent {
240 pub channel: String,
241 pub payload: TypedPayload,
242 pub event_id: String,
243 pub timestamp_secs: f64,
244}
245
246#[derive(Debug, Default)]
256pub struct TypedChannelRegistry {
257 handles: HashMap<String, TypedChannelHandle>,
258}
259
260impl TypedChannelRegistry {
261 pub fn new() -> Self {
262 Self::default()
263 }
264
265 pub fn register(&mut self, handle: TypedChannelHandle) {
268 self.handles.insert(handle.name.clone(), handle);
269 }
270
271 pub fn register_from_ir(&mut self, ir: &IRChannel) -> TypedChannelHandle {
273 let handle = TypedChannelHandle::from_ir(ir);
274 self.handles.insert(handle.name.clone(), handle.clone());
275 handle
276 }
277
278 pub fn get(&self, name: &str) -> Result<&TypedChannelHandle> {
279 self.handles
280 .get(name)
281 .ok_or_else(|| TypedChannelError::ChannelNotFound {
282 name: name.to_string(),
283 registered: self.names(),
284 })
285 }
286
287 fn get_mut(&mut self, name: &str) -> Result<&mut TypedChannelHandle> {
288 let registered = self.names();
289 self.handles
290 .get_mut(name)
291 .ok_or_else(|| TypedChannelError::ChannelNotFound {
292 name: name.to_string(),
293 registered,
294 })
295 }
296
297 pub fn has(&self, name: &str) -> bool {
298 self.handles.contains_key(name)
299 }
300
301 pub fn names(&self) -> Vec<String> {
302 let mut v: Vec<String> = self.handles.keys().cloned().collect();
303 v.sort();
304 v
305 }
306
307 pub fn len(&self) -> usize {
308 self.handles.len()
309 }
310
311 pub fn is_empty(&self) -> bool {
312 self.handles.is_empty()
313 }
314}
315
316pub type ShieldComplianceFn = Arc<dyn Fn(&str, &TypedChannelHandle) -> bool + Send + Sync>;
329
330fn default_compliance_check() -> ShieldComplianceFn {
331 Arc::new(|_, _| true)
332}
333
334struct ChannelTransport {
347 tx: mpsc::UnboundedSender<TypedEvent>,
348 rx: AsyncMutex<mpsc::UnboundedReceiver<TypedEvent>>,
349 closed: AtomicBool,
350}
351
352impl ChannelTransport {
353 fn new() -> Self {
354 let (tx, rx) = mpsc::unbounded_channel();
355 Self {
356 tx,
357 rx: AsyncMutex::new(rx),
358 closed: AtomicBool::new(false),
359 }
360 }
361
362 fn send(&self, event: TypedEvent) -> Result<()> {
363 if self.closed.load(Ordering::Acquire) {
364 return Err(TypedChannelError::Transport(
365 "channel is closed".to_string(),
366 ));
367 }
368 self.tx
369 .send(event)
370 .map_err(|e| TypedChannelError::Transport(format!("send failed: {e}")))
371 }
372
373 async fn recv(&self) -> Result<TypedEvent> {
374 let mut rx = self.rx.lock().await;
375 rx.recv().await.ok_or_else(|| {
376 TypedChannelError::Transport("channel sender dropped".to_string())
377 })
378 }
379
380 fn close(&self) {
381 self.closed.store(true, Ordering::Release);
382 }
383}
384
385pub struct TypedEventBus {
415 registry: Mutex<TypedChannelRegistry>,
416 transports: Mutex<HashMap<String, Arc<ChannelTransport>>>,
417 broadcast_subs: Mutex<HashMap<String, Vec<mpsc::UnboundedSender<TypedEvent>>>>,
418 capabilities: Mutex<HashMap<String, Capability>>,
419 delivered_ids: Mutex<HashMap<String, HashSet<String>>>,
420 compliance_check: ShieldComplianceFn,
421}
422
423impl Default for TypedEventBus {
424 fn default() -> Self {
425 Self::new()
426 }
427}
428
429impl TypedEventBus {
430 pub fn new() -> Self {
432 Self::with_compliance_check(default_compliance_check())
433 }
434
435 pub fn with_compliance_check(check: ShieldComplianceFn) -> Self {
437 Self {
438 registry: Mutex::new(TypedChannelRegistry::new()),
439 transports: Mutex::new(HashMap::new()),
440 broadcast_subs: Mutex::new(HashMap::new()),
441 capabilities: Mutex::new(HashMap::new()),
442 delivered_ids: Mutex::new(HashMap::new()),
443 compliance_check: check,
444 }
445 }
446
447 pub fn from_ir_program(ir: &IRProgram) -> Self {
450 Self::from_ir_program_with(ir, default_compliance_check())
451 }
452
453 pub fn from_ir_program_with(ir: &IRProgram, check: ShieldComplianceFn) -> Self {
456 let bus = Self::with_compliance_check(check);
457 {
458 let mut reg = bus.registry.lock().unwrap();
459 for ch in &ir.channels {
460 reg.register_from_ir(ch);
461 }
462 }
463 bus
464 }
465
466 pub fn register(&self, handle: TypedChannelHandle) {
467 self.registry.lock().unwrap().register(handle);
468 }
469
470 pub fn register_from_ir(&self, ir: &IRChannel) -> TypedChannelHandle {
471 self.registry.lock().unwrap().register_from_ir(ir)
472 }
473
474 pub fn get_handle(&self, name: &str) -> Result<TypedChannelHandle> {
476 self.registry.lock().unwrap().get(name).cloned()
477 }
478
479 pub fn channel_names(&self) -> Vec<String> {
480 self.registry.lock().unwrap().names()
481 }
482
483 pub async fn emit(&self, channel: &str, payload: TypedPayload) -> Result<()> {
493 let handle = self.get_handle(channel)?;
494 Self::check_emit_schema(&handle, &payload)?;
495
496 let event = TypedEvent {
497 channel: channel.to_string(),
498 payload,
499 event_id: gen_uuid(),
500 timestamp_secs: now_secs(),
501 };
502
503 self.dispatch(&handle, event)?;
504 self.consume(channel)?;
505 Ok(())
506 }
507
508 fn check_emit_schema(handle: &TypedChannelHandle, payload: &TypedPayload) -> Result<()> {
509 match payload {
510 TypedPayload::Handle(inner) => {
511 if !handle.carries_channel() {
512 return Err(TypedChannelError::SchemaMismatch(format!(
513 "emit on '{}' (message: {}) received a channel handle, but the channel is not second-order — expected scalar payload",
514 handle.name, handle.message,
515 )));
516 }
517 let expected_inner = handle.inner_message_type();
518 if inner.message != expected_inner {
519 return Err(TypedChannelError::SchemaMismatch(format!(
520 "emit on '{}' expects Channel<{}> but received handle for '{}' (second-order schema mismatch, paper §3.2)",
521 handle.name, expected_inner, inner.message,
522 )));
523 }
524 Ok(())
525 }
526 TypedPayload::Scalar(_) => {
527 if handle.carries_channel() {
528 return Err(TypedChannelError::SchemaMismatch(format!(
529 "emit on '{}' (message: {}) requires a channel handle but received scalar — pass TypedPayload::Handle(handle) for mobility",
530 handle.name, handle.message,
531 )));
532 }
533 Ok(())
534 }
535 }
536 }
537
538 fn dispatch(&self, handle: &TypedChannelHandle, event: TypedEvent) -> Result<()> {
539 match handle.qos.as_str() {
540 "broadcast" => {
541 let subs = self.broadcast_subs.lock().unwrap();
542 if let Some(queues) = subs.get(&handle.name) {
543 for queue in queues {
544 let _ = queue.send(event.clone());
547 }
548 }
549 Ok(())
550 }
551 "at_most_once" => {
552 let transport = self.transport_for(&handle.name);
553 let _ = transport.send(event);
555 Ok(())
556 }
557 "exactly_once" => {
558 {
559 let mut delivered = self.delivered_ids.lock().unwrap();
560 let seen = delivered.entry(handle.name.clone()).or_default();
561 if seen.contains(&event.event_id) {
562 return Ok(());
563 }
564 seen.insert(event.event_id.clone());
565 }
566 let transport = self.transport_for(&handle.name);
567 transport.send(event)
568 }
569 _ => {
570 let transport = self.transport_for(&handle.name);
575 transport.send(event)
576 }
577 }
578 }
579
580 fn transport_for(&self, channel: &str) -> Arc<ChannelTransport> {
581 let mut transports = self.transports.lock().unwrap();
582 transports
583 .entry(channel.to_string())
584 .or_insert_with(|| Arc::new(ChannelTransport::new()))
585 .clone()
586 }
587
588 fn consume(&self, channel: &str) -> Result<()> {
589 let mut reg = self.registry.lock().unwrap();
590 let handle = reg.get_mut(channel)?;
591 handle.consumed_count += 1;
592 if handle.lifetime == "linear" && handle.consumed_count > 1 {
593 return Err(TypedChannelError::LifetimeViolation {
594 name: handle.name.clone(),
595 count: handle.consumed_count,
596 });
597 }
598 Ok(())
602 }
603
604 pub async fn publish(&self, channel: &str, shield: &str) -> Result<Capability> {
617 if shield.is_empty() {
618 return Err(TypedChannelError::CapabilityGate(format!(
619 "publish '{channel}' requires a non-empty shield (D8 — capability extrusion is shield-mediated)"
620 )));
621 }
622 let handle = self.get_handle(channel)?;
623 if !handle.is_publishable() {
624 return Err(TypedChannelError::CapabilityGate(format!(
625 "channel '{channel}' is not publishable: its definition declares no shield_ref (D8)"
626 )));
627 }
628 if shield != handle.shield_ref {
629 return Err(TypedChannelError::CapabilityGate(format!(
630 "publish '{channel}' requires shield '{}' (declared on the channel) but received '{shield}'",
631 handle.shield_ref
632 )));
633 }
634 if !(self.compliance_check)(shield, &handle) {
635 return Err(TypedChannelError::CapabilityGate(format!(
636 "shield '{shield}' does not cover compliance required by channel '{channel}'"
637 )));
638 }
639
640 let cap = Capability {
641 capability_id: gen_uuid(),
642 channel_name: channel.to_string(),
643 shield_ref: shield.to_string(),
644 delta_pub: 0.05,
645 issued_at: now_secs(),
646 };
647 self.capabilities
648 .lock()
649 .unwrap()
650 .insert(cap.capability_id.clone(), cap.clone());
651 Ok(cap)
652 }
653
654 pub async fn discover(&self, capability: &Capability) -> Result<TypedChannelHandle> {
660 let removed = {
661 let mut caps = self.capabilities.lock().unwrap();
662 caps.remove(&capability.capability_id)
663 };
664 if removed.is_none() {
665 return Err(TypedChannelError::CapabilityGate(format!(
666 "capability '{}' has been revoked or was never issued by this bus",
667 capability.capability_id,
668 )));
669 }
670 self.get_handle(&capability.channel_name)
671 }
672
673 pub fn subscribe_broadcast(
679 &self,
680 channel: &str,
681 ) -> Result<mpsc::UnboundedReceiver<TypedEvent>> {
682 let handle = self.get_handle(channel)?;
683 if handle.qos != "broadcast" {
684 return Err(TypedChannelError::SchemaMismatch(format!(
685 "subscribe_broadcast called on '{channel}' but its qos is {}, not broadcast",
686 handle.qos,
687 )));
688 }
689 let (tx, rx) = mpsc::unbounded_channel();
690 self.broadcast_subs
691 .lock()
692 .unwrap()
693 .entry(channel.to_string())
694 .or_default()
695 .push(tx);
696 Ok(rx)
697 }
698
699 pub async fn receive(&self, channel: &str) -> Result<TypedEvent> {
705 let handle = self.get_handle(channel)?;
706 if handle.qos == "broadcast" {
707 return Err(TypedChannelError::SchemaMismatch(format!(
708 "channel '{channel}' has qos=broadcast — call subscribe_broadcast() to get a per-subscriber queue"
709 )));
710 }
711 let transport = self.transport_for(channel);
712 transport.recv().await
713 }
714
715 pub fn issued_capabilities(&self) -> usize {
719 self.capabilities.lock().unwrap().len()
720 }
721
722 pub fn close_all(&self) {
725 self.capabilities.lock().unwrap().clear();
726 self.broadcast_subs.lock().unwrap().clear();
727 self.delivered_ids.lock().unwrap().clear();
728 let transports = self.transports.lock().unwrap();
729 for t in transports.values() {
730 t.close();
731 }
732 }
733}
734
735fn gen_uuid() -> String {
740 uuid::Uuid::new_v4().to_string()
741}
742
743fn now_secs() -> f64 {
744 SystemTime::now()
745 .duration_since(UNIX_EPOCH)
746 .map(|d| d.as_secs_f64())
747 .unwrap_or(0.0)
748}
749
750#[cfg(test)]
755mod tests {
756 use super::*;
757 use axon_frontend::ir_nodes::{IRChannel, IRProgram};
758 use serde_json::json;
759
760 fn ir_channel(
763 name: &str,
764 message: &str,
765 qos: &str,
766 lifetime: &str,
767 persistence: &str,
768 shield: &str,
769 ) -> IRChannel {
770 IRChannel {
771 node_type: "IRChannel",
772 source_line: 0,
773 source_column: 0,
774 name: name.to_string(),
775 message: message.to_string(),
776 qos: qos.to_string(),
777 lifetime: lifetime.to_string(),
778 persistence: persistence.to_string(),
779 shield_ref: shield.to_string(),
780 }
781 }
782
783 fn handle(name: &str, message: &str) -> TypedChannelHandle {
784 TypedChannelHandle::new(name, message)
785 }
786
787 #[test]
790 fn handle_defaults_match_d1() {
791 let h = handle("Orders", "Order");
792 assert_eq!(h.qos, "at_least_once");
793 assert_eq!(h.lifetime, "affine");
794 assert_eq!(h.persistence, "ephemeral");
795 assert_eq!(h.shield_ref, "");
796 assert_eq!(h.consumed_count, 0);
797 }
798
799 #[test]
800 fn handle_is_publishable_iff_shield() {
801 let mut h = handle("Orders", "Order");
802 assert!(!h.is_publishable());
803 h.shield_ref = "PublicBroker".into();
804 assert!(h.is_publishable());
805 }
806
807 #[test]
808 fn handle_carries_channel_second_order() {
809 let h = handle("Broker", "Channel<Order>");
810 assert!(h.carries_channel());
811 let h_first = handle("Orders", "Order");
812 assert!(!h_first.carries_channel());
813 }
814
815 #[test]
816 fn handle_inner_message_type_unwrap() {
817 let h_so = handle("Broker", "Channel<Order>");
818 assert_eq!(h_so.inner_message_type(), "Order");
819 let h_first = handle("Orders", "Order");
820 assert_eq!(h_first.inner_message_type(), "Order");
821 let h_third = handle("Outer", "Channel<Channel<Order>>");
822 assert_eq!(h_third.inner_message_type(), "Channel<Order>");
823 }
824
825 #[test]
826 fn handle_from_ir_round_trip() {
827 let ir = ir_channel(
828 "Orders",
829 "Order",
830 "exactly_once",
831 "linear",
832 "persistent",
833 "PublicBroker",
834 );
835 let h = TypedChannelHandle::from_ir(&ir);
836 assert_eq!(h.name, "Orders");
837 assert_eq!(h.message, "Order");
838 assert_eq!(h.qos, "exactly_once");
839 assert_eq!(h.lifetime, "linear");
840 assert_eq!(h.persistence, "persistent");
841 assert_eq!(h.shield_ref, "PublicBroker");
842 assert_eq!(h.consumed_count, 0);
843 }
844
845 #[test]
848 fn registry_register_and_get() {
849 let mut reg = TypedChannelRegistry::new();
850 reg.register(handle("Orders", "Order"));
851 assert!(reg.has("Orders"));
852 assert_eq!(reg.get("Orders").unwrap().message, "Order");
853 }
854
855 #[test]
856 fn registry_unknown_returns_error_with_registered() {
857 let mut reg = TypedChannelRegistry::new();
858 reg.register(handle("Orders", "Order"));
859 let err = reg.get("Missing").unwrap_err();
860 match err {
861 TypedChannelError::ChannelNotFound { name, registered } => {
862 assert_eq!(name, "Missing");
863 assert_eq!(registered, vec!["Orders".to_string()]);
864 }
865 other => panic!("expected ChannelNotFound, got {other:?}"),
866 }
867 }
868
869 #[test]
870 fn registry_overwrite_replaces() {
871 let mut reg = TypedChannelRegistry::new();
872 reg.register(handle("Orders", "Order"));
873 let mut h2 = handle("Orders", "OrderV2");
874 h2.qos = "exactly_once".into();
875 reg.register(h2);
876 let stored = reg.get("Orders").unwrap();
877 assert_eq!(stored.message, "OrderV2");
878 assert_eq!(stored.qos, "exactly_once");
879 }
880
881 #[test]
882 fn registry_names_sorted() {
883 let mut reg = TypedChannelRegistry::new();
884 reg.register(handle("ZetaOrders", "Order"));
885 reg.register(handle("Alpha", "Alpha"));
886 reg.register(handle("Mu", "Mu"));
887 assert_eq!(
888 reg.names(),
889 vec!["Alpha".to_string(), "Mu".to_string(), "ZetaOrders".to_string()]
890 );
891 }
892
893 #[test]
894 fn registry_register_from_ir_returns_handle() {
895 let mut reg = TypedChannelRegistry::new();
896 let ir = ir_channel(
897 "Orders",
898 "Order",
899 "at_least_once",
900 "affine",
901 "ephemeral",
902 "Σ",
903 );
904 let h = reg.register_from_ir(&ir);
905 assert_eq!(h.shield_ref, "Σ");
906 assert_eq!(reg.get("Orders").unwrap().shield_ref, "Σ");
907 }
908
909 fn empty_ir_program() -> IRProgram {
912 IRProgram::new()
913 }
914
915 #[test]
916 fn bus_from_ir_program_registers_channels() {
917 let mut ir = empty_ir_program();
918 ir.channels.push(ir_channel(
919 "Orders",
920 "Order",
921 "at_least_once",
922 "affine",
923 "ephemeral",
924 "",
925 ));
926 ir.channels.push(ir_channel(
927 "Broker",
928 "Channel<Order>",
929 "exactly_once",
930 "affine",
931 "ephemeral",
932 "PublicBroker",
933 ));
934 let bus = TypedEventBus::from_ir_program(&ir);
935 let names = bus.channel_names();
936 assert_eq!(names, vec!["Broker".to_string(), "Orders".to_string()]);
937 assert!(bus.get_handle("Broker").unwrap().is_publishable());
938 }
939
940 #[test]
941 fn bus_default_compliance_is_permissive() {
942 let bus = TypedEventBus::new();
945 let mut h = handle("Orders", "Order");
946 h.shield_ref = "Σ".into();
947 bus.register(h);
948 let cap = futures_executor_block_on(bus.publish("Orders", "Σ")).unwrap();
949 assert_eq!(cap.channel_name, "Orders");
950 }
951
952 fn futures_executor_block_on<F: std::future::Future>(f: F) -> F::Output {
954 let runtime = tokio::runtime::Builder::new_current_thread()
955 .enable_all()
956 .build()
957 .unwrap();
958 runtime.block_on(f)
959 }
960
961 #[tokio::test]
964 async fn emit_scalar_round_trips() {
965 let bus = TypedEventBus::new();
966 bus.register(handle("Orders", "Order"));
967 bus.emit("Orders", TypedPayload::scalar(json!({"id": 1})))
968 .await
969 .unwrap();
970 let event = bus.receive("Orders").await.unwrap();
971 match event.payload {
972 TypedPayload::Scalar(v) => assert_eq!(v["id"], 1),
973 _ => panic!("expected scalar"),
974 }
975 }
976
977 #[tokio::test]
978 async fn emit_unknown_channel_errors() {
979 let bus = TypedEventBus::new();
980 let err = bus
981 .emit("Nope", TypedPayload::scalar(json!(null)))
982 .await
983 .unwrap_err();
984 assert!(matches!(err, TypedChannelError::ChannelNotFound { .. }));
985 }
986
987 #[tokio::test]
988 async fn emit_event_has_id_and_timestamp() {
989 let bus = TypedEventBus::new();
990 bus.register(handle("Orders", "Order"));
991 bus.emit("Orders", TypedPayload::scalar(json!(0)))
992 .await
993 .unwrap();
994 let e = bus.receive("Orders").await.unwrap();
995 assert!(!e.event_id.is_empty());
996 assert!(e.timestamp_secs > 0.0);
997 }
998
999 #[tokio::test]
1002 async fn emit_handle_through_second_order() {
1003 let bus = TypedEventBus::new();
1004 bus.register(handle("Orders", "Order"));
1005 bus.register(handle("Broker", "Channel<Order>"));
1006 let inner = bus.get_handle("Orders").unwrap();
1007 bus.emit("Broker", TypedPayload::handle(inner))
1008 .await
1009 .unwrap();
1010 let e = bus.receive("Broker").await.unwrap();
1011 match e.payload {
1012 TypedPayload::Handle(h) => assert_eq!(h.name, "Orders"),
1013 _ => panic!("expected handle"),
1014 }
1015 }
1016
1017 #[tokio::test]
1018 async fn emit_mobility_schema_mismatch_inner() {
1019 let bus = TypedEventBus::new();
1020 bus.register(handle("Orders", "Order"));
1021 bus.register(handle("Wrong", "Different"));
1022 bus.register(handle("Broker", "Channel<Order>"));
1023 let wrong = bus.get_handle("Wrong").unwrap();
1024 let err = bus
1025 .emit("Broker", TypedPayload::handle(wrong))
1026 .await
1027 .unwrap_err();
1028 match err {
1029 TypedChannelError::SchemaMismatch(msg) => {
1030 assert!(msg.contains("Channel<Order>"));
1031 assert!(msg.contains("Different"));
1032 }
1033 other => panic!("expected SchemaMismatch, got {other:?}"),
1034 }
1035 }
1036
1037 #[tokio::test]
1038 async fn emit_scalar_to_second_order_rejected() {
1039 let bus = TypedEventBus::new();
1040 bus.register(handle("Broker", "Channel<Order>"));
1041 let err = bus
1042 .emit("Broker", TypedPayload::scalar(json!("oops")))
1043 .await
1044 .unwrap_err();
1045 assert!(matches!(err, TypedChannelError::SchemaMismatch(_)));
1046 }
1047
1048 #[tokio::test]
1049 async fn emit_handle_to_first_order_rejected() {
1050 let bus = TypedEventBus::new();
1051 bus.register(handle("Orders", "Order"));
1052 bus.register(handle("FirstOrder", "Order"));
1053 let h = bus.get_handle("Orders").unwrap();
1054 let err = bus
1055 .emit("FirstOrder", TypedPayload::handle(h))
1056 .await
1057 .unwrap_err();
1058 assert!(matches!(err, TypedChannelError::SchemaMismatch(_)));
1059 }
1060
1061 fn publishable_handle(name: &str, message: &str, shield: &str) -> TypedChannelHandle {
1064 let mut h = handle(name, message);
1065 h.shield_ref = shield.into();
1066 h
1067 }
1068
1069 #[tokio::test]
1070 async fn publish_returns_capability() {
1071 let bus = TypedEventBus::new();
1072 bus.register(publishable_handle("Orders", "Order", "Σ"));
1073 let cap = bus.publish("Orders", "Σ").await.unwrap();
1074 assert_eq!(cap.channel_name, "Orders");
1075 assert_eq!(cap.shield_ref, "Σ");
1076 assert!(!cap.capability_id.is_empty());
1077 assert_eq!(bus.issued_capabilities(), 1);
1078 }
1079
1080 #[tokio::test]
1081 async fn publish_empty_shield_rejected() {
1082 let bus = TypedEventBus::new();
1083 bus.register(publishable_handle("Orders", "Order", "Σ"));
1084 let err = bus.publish("Orders", "").await.unwrap_err();
1085 assert!(matches!(err, TypedChannelError::CapabilityGate(_)));
1086 }
1087
1088 #[tokio::test]
1089 async fn publish_unpublishable_rejected() {
1090 let bus = TypedEventBus::new();
1091 bus.register(handle("Orders", "Order"));
1093 let err = bus.publish("Orders", "Σ").await.unwrap_err();
1094 match err {
1095 TypedChannelError::CapabilityGate(msg) => {
1096 assert!(msg.contains("not publishable"));
1097 }
1098 other => panic!("expected CapabilityGate, got {other:?}"),
1099 }
1100 }
1101
1102 #[tokio::test]
1103 async fn publish_wrong_shield_rejected() {
1104 let bus = TypedEventBus::new();
1105 bus.register(publishable_handle("Orders", "Order", "Σ"));
1106 let err = bus.publish("Orders", "Other").await.unwrap_err();
1107 match err {
1108 TypedChannelError::CapabilityGate(msg) => {
1109 assert!(msg.contains("Σ"));
1110 assert!(msg.contains("Other"));
1111 }
1112 other => panic!("expected CapabilityGate, got {other:?}"),
1113 }
1114 }
1115
1116 #[tokio::test]
1117 async fn publish_unknown_channel_errors() {
1118 let bus = TypedEventBus::new();
1119 let err = bus.publish("Missing", "Σ").await.unwrap_err();
1120 assert!(matches!(err, TypedChannelError::ChannelNotFound { .. }));
1121 }
1122
1123 #[tokio::test]
1124 async fn publish_default_delta_pub_is_paper_lower_bound() {
1125 let bus = TypedEventBus::new();
1126 bus.register(publishable_handle("Orders", "Order", "Σ"));
1127 let cap = bus.publish("Orders", "Σ").await.unwrap();
1128 assert!((cap.delta_pub - 0.05).abs() < f64::EPSILON);
1129 }
1130
1131 #[tokio::test]
1132 async fn publish_compliance_predicate_can_veto() {
1133 let veto: ShieldComplianceFn = Arc::new(|_, _| false);
1134 let bus = TypedEventBus::with_compliance_check(veto);
1135 bus.register(publishable_handle("Orders", "Order", "Σ"));
1136 let err = bus.publish("Orders", "Σ").await.unwrap_err();
1137 match err {
1138 TypedChannelError::CapabilityGate(msg) => {
1139 assert!(msg.contains("does not cover compliance"));
1140 }
1141 other => panic!("expected CapabilityGate, got {other:?}"),
1142 }
1143 }
1144
1145 #[tokio::test]
1146 async fn publish_compliance_predicate_inspects_handle() {
1147 let inspected: Arc<Mutex<Vec<String>>> = Arc::new(Mutex::new(Vec::new()));
1148 let captured = inspected.clone();
1149 let check: ShieldComplianceFn = Arc::new(move |shield, h| {
1150 captured.lock().unwrap().push(format!("{shield}/{}", h.name));
1151 true
1152 });
1153 let bus = TypedEventBus::with_compliance_check(check);
1154 bus.register(publishable_handle("Orders", "Order", "Σ"));
1155 bus.publish("Orders", "Σ").await.unwrap();
1156 let calls = inspected.lock().unwrap();
1157 assert_eq!(*calls, vec!["Σ/Orders".to_string()]);
1158 }
1159
1160 #[tokio::test]
1163 async fn discover_returns_handle_and_consumes_capability() {
1164 let bus = TypedEventBus::new();
1165 bus.register(publishable_handle("Orders", "Order", "Σ"));
1166 let cap = bus.publish("Orders", "Σ").await.unwrap();
1167 assert_eq!(bus.issued_capabilities(), 1);
1168 let found = bus.discover(&cap).await.unwrap();
1169 assert_eq!(found.name, "Orders");
1170 assert_eq!(bus.issued_capabilities(), 0);
1171 let err = bus.discover(&cap).await.unwrap_err();
1173 assert!(matches!(err, TypedChannelError::CapabilityGate(_)));
1174 }
1175
1176 #[tokio::test]
1177 async fn discover_forged_capability_rejected() {
1178 let bus = TypedEventBus::new();
1179 bus.register(publishable_handle("Orders", "Order", "Σ"));
1180 let forged = Capability {
1181 capability_id: "forged".to_string(),
1182 channel_name: "Orders".to_string(),
1183 shield_ref: "Σ".to_string(),
1184 delta_pub: 0.05,
1185 issued_at: 0.0,
1186 };
1187 let err = bus.discover(&forged).await.unwrap_err();
1188 assert!(matches!(err, TypedChannelError::CapabilityGate(_)));
1189 }
1190
1191 #[tokio::test]
1192 async fn capability_from_other_bus_rejected() {
1193 let bus_a = TypedEventBus::new();
1194 let bus_b = TypedEventBus::new();
1195 bus_a.register(publishable_handle("Orders", "Order", "Σ"));
1196 bus_b.register(publishable_handle("Orders", "Order", "Σ"));
1197 let cap = bus_a.publish("Orders", "Σ").await.unwrap();
1198 let err = bus_b.discover(&cap).await.unwrap_err();
1199 assert!(matches!(err, TypedChannelError::CapabilityGate(_)));
1200 }
1201
1202 #[tokio::test]
1205 async fn qos_at_least_once_default_delivers() {
1206 let bus = TypedEventBus::new();
1207 bus.register(handle("Orders", "Order")); bus.emit("Orders", TypedPayload::scalar(json!({"id": 1})))
1209 .await
1210 .unwrap();
1211 bus.emit("Orders", TypedPayload::scalar(json!({"id": 2})))
1212 .await
1213 .unwrap();
1214 let e1 = bus.receive("Orders").await.unwrap();
1215 let e2 = bus.receive("Orders").await.unwrap();
1216 match (&e1.payload, &e2.payload) {
1217 (TypedPayload::Scalar(v1), TypedPayload::Scalar(v2)) => {
1218 assert_eq!(v1["id"], 1);
1219 assert_eq!(v2["id"], 2);
1220 }
1221 _ => panic!("expected scalars"),
1222 }
1223 }
1224
1225 #[tokio::test]
1226 async fn qos_at_most_once_delivers_once_then_drops_silently() {
1227 let bus = TypedEventBus::new();
1228 let mut h = handle("Telemetry", "Tick");
1229 h.qos = "at_most_once".into();
1230 bus.register(h);
1231 bus.emit("Telemetry", TypedPayload::scalar(json!(1)))
1232 .await
1233 .unwrap();
1234 let transport = bus.transport_for("Telemetry");
1237 transport.close();
1238 bus.emit("Telemetry", TypedPayload::scalar(json!(2)))
1240 .await
1241 .unwrap();
1242 }
1243
1244 #[tokio::test]
1245 async fn qos_exactly_once_dedups_event_ids() {
1246 let bus = TypedEventBus::new();
1247 let mut h = handle("EO", "Tick");
1248 h.qos = "exactly_once".into();
1249 bus.register(h.clone());
1250
1251 bus.emit("EO", TypedPayload::scalar(json!(1)))
1255 .await
1256 .unwrap();
1257 let _e1 = bus.receive("EO").await.unwrap();
1258
1259 let manual = TypedEvent {
1260 channel: "EO".to_string(),
1261 payload: TypedPayload::scalar(json!("dup")),
1262 event_id: "fixed-id".to_string(),
1263 timestamp_secs: now_secs(),
1264 };
1265 bus.dispatch(&h, manual.clone()).unwrap();
1266 bus.dispatch(&h, manual).unwrap();
1268 let received = bus.receive("EO").await.unwrap();
1269 assert_eq!(received.event_id, "fixed-id");
1270 let try_more =
1272 tokio::time::timeout(std::time::Duration::from_millis(20), bus.receive("EO")).await;
1273 assert!(try_more.is_err(), "expected dedup to block second event");
1274 }
1275
1276 #[tokio::test]
1277 async fn qos_broadcast_fan_out_to_subscribers() {
1278 let bus = TypedEventBus::new();
1279 let mut h = handle("Bus", "Tick");
1280 h.qos = "broadcast".into();
1281 bus.register(h);
1282 let mut s1 = bus.subscribe_broadcast("Bus").unwrap();
1283 let mut s2 = bus.subscribe_broadcast("Bus").unwrap();
1284 bus.emit("Bus", TypedPayload::scalar(json!("hi")))
1285 .await
1286 .unwrap();
1287 let e1 = s1.recv().await.unwrap();
1288 let e2 = s2.recv().await.unwrap();
1289 assert_eq!(e1.event_id, e2.event_id);
1290 }
1291
1292 #[tokio::test]
1293 async fn qos_broadcast_subscribe_check_rejects_non_broadcast() {
1294 let bus = TypedEventBus::new();
1295 bus.register(handle("Plain", "X"));
1296 let err = bus.subscribe_broadcast("Plain").unwrap_err();
1297 assert!(matches!(err, TypedChannelError::SchemaMismatch(_)));
1298 }
1299
1300 #[tokio::test]
1301 async fn qos_broadcast_receive_rejection() {
1302 let bus = TypedEventBus::new();
1303 let mut h = handle("Bus", "Tick");
1304 h.qos = "broadcast".into();
1305 bus.register(h);
1306 let err = bus.receive("Bus").await.unwrap_err();
1307 assert!(matches!(err, TypedChannelError::SchemaMismatch(_)));
1308 }
1309
1310 #[tokio::test]
1311 async fn qos_queue_fifo_ordering() {
1312 let bus = TypedEventBus::new();
1313 let mut h = handle("Q", "Job");
1314 h.qos = "queue".into();
1315 bus.register(h);
1316 bus.emit("Q", TypedPayload::scalar(json!(1))).await.unwrap();
1317 bus.emit("Q", TypedPayload::scalar(json!(2))).await.unwrap();
1318 bus.emit("Q", TypedPayload::scalar(json!(3))).await.unwrap();
1319 let mut seen = vec![];
1320 for _ in 0..3 {
1321 let e = bus.receive("Q").await.unwrap();
1322 if let TypedPayload::Scalar(v) = e.payload {
1323 seen.push(v.as_i64().unwrap());
1324 }
1325 }
1326 assert_eq!(seen, vec![1, 2, 3]);
1327 }
1328
1329 #[tokio::test]
1332 async fn lifetime_affine_allows_multi_emit() {
1333 let bus = TypedEventBus::new();
1334 bus.register(handle("Orders", "Order")); for i in 0..3 {
1336 bus.emit("Orders", TypedPayload::scalar(json!(i)))
1337 .await
1338 .unwrap();
1339 }
1340 }
1343
1344 #[tokio::test]
1345 async fn lifetime_linear_second_emit_violates() {
1346 let bus = TypedEventBus::new();
1347 let mut h = handle("Once", "Order");
1348 h.lifetime = "linear".into();
1349 bus.register(h);
1350 bus.emit("Once", TypedPayload::scalar(json!(0)))
1351 .await
1352 .unwrap();
1353 let err = bus
1354 .emit("Once", TypedPayload::scalar(json!(1)))
1355 .await
1356 .unwrap_err();
1357 match err {
1358 TypedChannelError::LifetimeViolation { name, count } => {
1359 assert_eq!(name, "Once");
1360 assert_eq!(count, 2);
1361 }
1362 other => panic!("expected LifetimeViolation, got {other:?}"),
1363 }
1364 }
1365
1366 #[tokio::test]
1367 async fn lifetime_persistent_unrestricted() {
1368 let bus = TypedEventBus::new();
1369 let mut h = handle("Ledger", "Entry");
1370 h.lifetime = "persistent".into();
1371 bus.register(h);
1372 for i in 0..16 {
1373 bus.emit("Ledger", TypedPayload::scalar(json!(i)))
1374 .await
1375 .unwrap();
1376 }
1377 }
1378
1379 #[tokio::test]
1382 async fn paper_section9_e2e_producer_publish_discover_receive() {
1383 let bus = TypedEventBus::new();
1388 let mut orders = handle("OrdersCreated", "Order");
1389 orders.shield_ref = "PublicBroker".into();
1390 bus.register(orders);
1391
1392 bus.emit(
1394 "OrdersCreated",
1395 TypedPayload::scalar(json!({"id": 42, "total": 19.99})),
1396 )
1397 .await
1398 .unwrap();
1399
1400 let cap = bus
1402 .publish("OrdersCreated", "PublicBroker")
1403 .await
1404 .unwrap();
1405
1406 let handle = bus.discover(&cap).await.unwrap();
1408 assert_eq!(handle.name, "OrdersCreated");
1409 let event = bus.receive("OrdersCreated").await.unwrap();
1410 match event.payload {
1411 TypedPayload::Scalar(v) => {
1412 assert_eq!(v["id"], 42);
1413 assert_eq!(v["total"], 19.99);
1414 }
1415 _ => panic!("expected scalar Order payload"),
1416 }
1417 }
1418
1419 #[test]
1422 fn error_display_includes_useful_context() {
1423 let err = TypedChannelError::ChannelNotFound {
1424 name: "X".to_string(),
1425 registered: vec!["A".to_string(), "B".to_string()],
1426 };
1427 let s = format!("{err}");
1428 assert!(s.contains("'X'"));
1429 assert!(s.contains("[\"A\", \"B\"]"));
1430
1431 let err = TypedChannelError::LifetimeViolation {
1432 name: "Once".to_string(),
1433 count: 2,
1434 };
1435 let s = format!("{err}");
1436 assert!(s.contains("Once"));
1437 assert!(s.contains("2"));
1438 }
1439
1440 #[tokio::test]
1443 async fn capability_ids_are_unique() {
1444 let bus = TypedEventBus::new();
1445 bus.register(publishable_handle("Orders", "Order", "Σ"));
1446 let cap1 = bus.publish("Orders", "Σ").await.unwrap();
1447 let cap2 = bus.publish("Orders", "Σ").await.unwrap();
1448 assert_ne!(cap1.capability_id, cap2.capability_id);
1449 assert_eq!(bus.issued_capabilities(), 2);
1450 }
1451
1452 #[tokio::test]
1453 async fn close_all_drains_state() {
1454 let bus = TypedEventBus::new();
1455 bus.register(publishable_handle("Orders", "Order", "Σ"));
1456 let mut bcast = handle("Bus", "Tick");
1457 bcast.qos = "broadcast".into();
1458 bus.register(bcast);
1459 let _sub = bus.subscribe_broadcast("Bus").unwrap();
1460 let _cap = bus.publish("Orders", "Σ").await.unwrap();
1461 assert_eq!(bus.issued_capabilities(), 1);
1462
1463 bus.close_all();
1464
1465 assert_eq!(bus.issued_capabilities(), 0);
1466 bus.emit("Bus", TypedPayload::scalar(json!("after-close")))
1468 .await
1469 .unwrap();
1470 }
1471}