1use super::error::EngineError;
47use crate::channel::{ChannelHandle, Event};
48use orcs_event::{EventCategory, Request, Signal};
49use orcs_hook::SharedHookRegistry;
50use orcs_types::{ChannelId, ComponentId, RequestId};
51use parking_lot::RwLock;
52use serde_json::Value;
53use std::collections::{HashMap, HashSet};
54use std::sync::Arc;
55use std::time::Duration;
56use tokio::sync::{broadcast, mpsc, oneshot};
57
58pub type SharedChannelHandles = Arc<RwLock<HashMap<ChannelId, ChannelHandle>>>;
65
66pub type SharedComponentChannelMap = Arc<RwLock<HashMap<String, ChannelId>>>;
72
73pub struct EventBus {
87 request_senders: HashMap<ComponentId, mpsc::Sender<Request>>,
89 pending_responses: HashMap<RequestId, oneshot::Sender<Result<Value, EngineError>>>,
91 signal_tx: broadcast::Sender<Signal>,
95 subscriptions: HashMap<EventCategory, HashSet<ComponentId>>,
97 channel_handles: SharedChannelHandles,
101 component_channel_map: SharedComponentChannelMap,
106 hook_registry: Option<SharedHookRegistry>,
108}
109
110impl EventBus {
111 #[must_use]
113 pub fn new() -> Self {
114 let (signal_tx, _) = broadcast::channel(64);
115 Self {
116 request_senders: HashMap::new(),
117 pending_responses: HashMap::new(),
118 signal_tx,
119 subscriptions: HashMap::new(),
120 channel_handles: Arc::new(RwLock::new(HashMap::new())),
121 component_channel_map: Arc::new(RwLock::new(HashMap::new())),
122 hook_registry: None,
123 }
124 }
125
126 pub fn set_hook_registry(&mut self, registry: SharedHookRegistry) {
128 self.hook_registry = Some(registry);
129 }
130
131 fn dispatch_hook(
135 &self,
136 point: orcs_hook::HookPoint,
137 target_id: &ComponentId,
138 payload: serde_json::Value,
139 ) -> orcs_hook::HookAction {
140 let channel_id = ChannelId::new();
141
142 let ctx = orcs_hook::HookContext::new(
143 point,
144 target_id.clone(),
145 channel_id,
146 orcs_types::Principal::System,
147 0,
148 payload,
149 );
150
151 let Some(registry) = &self.hook_registry else {
152 return orcs_hook::HookAction::Continue(Box::new(ctx));
153 };
154
155 let guard = registry.read().unwrap_or_else(|poisoned| {
156 tracing::warn!("hook registry lock poisoned, using inner value");
157 poisoned.into_inner()
158 });
159 guard.dispatch(point, target_id, None, ctx)
160 }
161
162 #[must_use]
167 pub fn shared_handles(&self) -> SharedChannelHandles {
168 Arc::clone(&self.channel_handles)
169 }
170
171 #[must_use]
176 pub fn shared_component_channel_map(&self) -> SharedComponentChannelMap {
177 Arc::clone(&self.component_channel_map)
178 }
179
180 pub fn register(
200 &mut self,
201 id: ComponentId,
202 subscriptions: Vec<EventCategory>,
203 ) -> ComponentHandle {
204 let (req_tx, req_rx) = mpsc::channel(32);
205 let signal_rx = self.signal_tx.subscribe();
206
207 self.request_senders.insert(id.clone(), req_tx);
208
209 let sub_strings: Vec<String> = subscriptions.iter().map(|c| format!("{c:?}")).collect();
211 for category in subscriptions {
212 self.subscriptions
213 .entry(category)
214 .or_default()
215 .insert(id.clone());
216 }
217
218 let payload = serde_json::json!({
220 "component_id": id.fqn(),
221 "subscriptions": sub_strings,
222 });
223 let _ = self.dispatch_hook(orcs_hook::HookPoint::BusOnRegister, &id, payload);
224
225 ComponentHandle {
226 component_id: id,
227 request_rx: req_rx,
228 signal_rx,
229 }
230 }
231
232 #[must_use]
234 pub fn subscribers(&self, category: &EventCategory) -> Vec<&ComponentId> {
235 self.subscriptions
236 .get(category)
237 .map(|set| set.iter().collect())
238 .unwrap_or_default()
239 }
240
241 pub fn unregister(&mut self, id: &ComponentId) {
243 let payload = serde_json::json!({
245 "component_id": id.fqn(),
246 });
247 let _ = self.dispatch_hook(orcs_hook::HookPoint::BusOnUnregister, id, payload);
248
249 self.request_senders.remove(id);
250 for subscribers in self.subscriptions.values_mut() {
252 subscribers.remove(id);
253 }
254 }
255
256 pub async fn request(&mut self, req: Request) -> Result<Value, EngineError> {
270 let request_id = req.id;
271 let timeout_ms = req.timeout_ms;
272
273 let Some(target) = &req.target else {
274 return Err(EngineError::NoTarget);
275 };
276
277 let resolved_channel_id = self
284 .component_channel_map
285 .read()
286 .get(&target.fqn())
287 .copied();
288 if let Some(channel_id) = resolved_channel_id {
289 let handle = {
290 let handles = self.channel_handles.read();
291 handles.get(&channel_id).cloned()
292 };
293
294 if let Some(handle) = handle {
295 if handle.accepts_requests() {
296 let (reply_tx, reply_rx) = oneshot::channel();
297 if handle.send_request(req, reply_tx).await.is_err() {
298 return Err(EngineError::SendFailed("request channel closed".into()));
299 }
300
301 let timeout_duration = Duration::from_millis(timeout_ms);
302 return match tokio::time::timeout(timeout_duration, reply_rx).await {
303 Ok(Ok(result)) => result.map_err(EngineError::ComponentFailed),
304 Ok(Err(_)) => Err(EngineError::ChannelClosed),
305 Err(_) => Err(EngineError::Timeout(request_id)),
306 };
307 }
308 }
309 }
310
311 let Some(sender) = self.request_senders.get(target) else {
313 return Err(EngineError::ComponentNotFound(target.clone()));
314 };
315
316 let (tx, rx) = oneshot::channel();
317 self.pending_responses.insert(request_id, tx);
318
319 if sender.send(req).await.is_err() {
320 self.pending_responses.remove(&request_id);
321 return Err(EngineError::SendFailed("channel closed".into()));
322 }
323
324 let timeout_duration = Duration::from_millis(timeout_ms);
325 match tokio::time::timeout(timeout_duration, rx).await {
326 Ok(Ok(result)) => result,
327 Ok(Err(_)) => Err(EngineError::ChannelClosed),
328 Err(_) => {
329 self.pending_responses.remove(&request_id);
330 Err(EngineError::Timeout(request_id))
331 }
332 }
333 }
334
335 pub async fn publish(&mut self, req: Request) -> Result<Value, EngineError> {
360 let category = req.category.clone();
361
362 let target = self
364 .subscriptions
365 .get(&category)
366 .and_then(|set| set.iter().next().cloned())
367 .ok_or(EngineError::NoSubscriber(category))?;
368
369 let req_with_target = req.with_target(target);
371 self.request(req_with_target).await
372 }
373
374 pub fn respond(&mut self, request_id: RequestId, result: Result<Value, String>) {
382 if let Some(tx) = self.pending_responses.remove(&request_id) {
383 let mapped = result.map_err(EngineError::ComponentFailed);
384 let _ = tx.send(mapped);
385 }
386 }
387
388 #[must_use]
390 pub fn component_count(&self) -> usize {
391 self.request_senders.len()
392 }
393
394 pub fn register_channel(&mut self, handle: ChannelHandle) {
401 let mut handles = self.channel_handles.write();
402 handles.insert(handle.id, handle);
403 }
404
405 pub fn register_component_channel(
416 &mut self,
417 component_id: &ComponentId,
418 channel_id: ChannelId,
419 ) {
420 self.component_channel_map
421 .write()
422 .insert(component_id.fqn(), channel_id);
423 }
424
425 pub fn unregister_channel(&mut self, id: &ChannelId) {
429 let mut handles = self.channel_handles.write();
430 handles.remove(id);
431 self.component_channel_map
433 .write()
434 .retain(|_, cid| cid != id);
435 }
436
437 pub async fn inject(&self, channel_id: ChannelId, event: Event) -> Result<(), EngineError> {
453 let handle = {
454 let handles = self.channel_handles.read();
455 handles
456 .get(&channel_id)
457 .cloned()
458 .ok_or(EngineError::ChannelNotFound(channel_id))?
459 };
460
461 handle
462 .inject_direct(event)
463 .await
464 .map_err(|_| EngineError::SendFailed("channel closed".into()))
465 }
466
467 pub fn try_inject(&self, channel_id: ChannelId, event: Event) -> Result<(), EngineError> {
475 let handles = self.channel_handles.read();
476 let handle = handles
477 .get(&channel_id)
478 .ok_or(EngineError::ChannelNotFound(channel_id))?;
479
480 handle
481 .try_inject_direct(event)
482 .map_err(|e| EngineError::SendFailed(e.to_string()))
483 }
484
485 pub fn broadcast(&self, event: Event) -> usize {
503 let bus_id = ComponentId::builtin("eventbus");
504
505 let pre_payload = serde_json::json!({
507 "category": format!("{:?}", event.category),
508 "operation": event.operation,
509 "source": event.source.fqn(),
510 });
511 let pre_action =
512 self.dispatch_hook(orcs_hook::HookPoint::BusPreBroadcast, &bus_id, pre_payload);
513 match &pre_action {
514 orcs_hook::HookAction::Abort { .. } | orcs_hook::HookAction::Skip(_) => {
515 return 0;
516 }
517 _ => {} }
519
520 let handles = self.channel_handles.read();
521 let mut delivered = 0;
522 for handle in handles.values() {
523 if handle.try_inject(event.clone()).is_ok() {
524 delivered += 1;
525 }
526 }
527
528 let post_payload = serde_json::json!({
530 "category": format!("{:?}", event.category),
531 "operation": event.operation,
532 "delivered": delivered,
533 });
534 let _ = self.dispatch_hook(
535 orcs_hook::HookPoint::BusPostBroadcast,
536 &bus_id,
537 post_payload,
538 );
539
540 delivered
541 }
542
543 pub async fn broadcast_async(&self, event: Event) -> usize {
556 let bus_id = ComponentId::builtin("eventbus");
557
558 let pre_payload = serde_json::json!({
560 "category": format!("{:?}", event.category),
561 "operation": event.operation,
562 "source": event.source.fqn(),
563 });
564 let pre_action =
565 self.dispatch_hook(orcs_hook::HookPoint::BusPreBroadcast, &bus_id, pre_payload);
566 match &pre_action {
567 orcs_hook::HookAction::Abort { .. } | orcs_hook::HookAction::Skip(_) => {
568 return 0;
569 }
570 _ => {}
571 }
572
573 let handles: Vec<_> = {
575 let h = self.channel_handles.read();
576 h.values().cloned().collect()
577 };
578
579 let mut delivered = 0;
580 for handle in handles {
581 if handle.inject(event.clone()).await.is_ok() {
582 delivered += 1;
583 }
584 }
585
586 let post_payload = serde_json::json!({
588 "category": format!("{:?}", event.category),
589 "operation": event.operation,
590 "delivered": delivered,
591 });
592 let _ = self.dispatch_hook(
593 orcs_hook::HookPoint::BusPostBroadcast,
594 &bus_id,
595 post_payload,
596 );
597
598 delivered
599 }
600
601 #[must_use]
603 pub fn channel_count(&self) -> usize {
604 let handles = self.channel_handles.read();
605 handles.len()
606 }
607}
608
609#[cfg(test)]
611impl EventBus {
612 pub fn signal(&self, signal: Signal) {
614 let _ = self.signal_tx.send(signal);
615 }
616}
617
618impl Default for EventBus {
619 fn default() -> Self {
620 Self::new()
621 }
622}
623
624pub struct ComponentHandle {
626 component_id: ComponentId,
627 request_rx: mpsc::Receiver<Request>,
628 signal_rx: broadcast::Receiver<Signal>,
629}
630
631impl ComponentHandle {
632 #[must_use]
634 pub fn component_id(&self) -> &ComponentId {
635 &self.component_id
636 }
637
638 pub fn try_recv_request(&mut self) -> Option<Request> {
640 self.request_rx.try_recv().ok()
641 }
642
643 pub fn try_recv_signal(&mut self) -> Option<Signal> {
645 self.signal_rx.try_recv().ok()
646 }
647
648 pub async fn recv_request(&mut self) -> Option<Request> {
652 self.request_rx.recv().await
653 }
654
655 pub async fn recv_signal(&mut self) -> Result<Signal, broadcast::error::RecvError> {
661 self.signal_rx.recv().await
662 }
663}
664
665#[cfg(test)]
666mod tests {
667 use super::*;
668 use crate::Principal;
669 use orcs_component::EventCategory;
670 use orcs_types::ChannelId;
671
672 #[test]
673 fn eventbus_creation() {
674 let bus = EventBus::new();
675 assert_eq!(bus.component_count(), 0);
676 }
677
678 #[test]
679 fn register_component() {
680 let mut bus = EventBus::new();
681 let id = ComponentId::builtin("test");
682 let _handle = bus.register(id, vec![EventCategory::Lifecycle]);
683
684 assert_eq!(bus.component_count(), 1);
685 }
686
687 #[test]
688 fn unregister_component() {
689 let mut bus = EventBus::new();
690 let id = ComponentId::builtin("test");
691 let _handle = bus.register(id.clone(), vec![EventCategory::Lifecycle]);
692
693 bus.unregister(&id);
694 assert_eq!(bus.component_count(), 0);
695 }
696
697 #[tokio::test]
698 async fn signal_broadcast() {
699 let mut bus = EventBus::new();
700 let id = ComponentId::builtin("test");
701 let mut handle = bus.register(id, vec![EventCategory::Lifecycle]);
702
703 let principal = Principal::System;
704 bus.signal(Signal::veto(principal));
705 tokio::task::yield_now().await;
706
707 let signal = handle.try_recv_signal();
708 assert!(signal.is_some());
709 assert!(signal.expect("should receive veto signal").is_veto());
710 }
711
712 #[tokio::test]
713 async fn request_to_nonexistent_target() {
714 use orcs_types::ErrorCode;
715
716 let mut bus = EventBus::new();
717 let source = ComponentId::builtin("source");
718 let target = ComponentId::builtin("nonexistent");
719 let channel = ChannelId::new();
720
721 let req = Request::new(EventCategory::Echo, "test", source, channel, Value::Null)
722 .with_target(target);
723
724 let result = bus.request(req).await;
725 assert!(result.is_err());
726 assert_eq!(
727 result
728 .expect_err("request to nonexistent target should fail")
729 .code(),
730 "ENGINE_COMPONENT_NOT_FOUND"
731 );
732 }
733
734 #[tokio::test]
735 async fn request_without_target() {
736 use orcs_types::ErrorCode;
737
738 let mut bus = EventBus::new();
739 let source = ComponentId::builtin("source");
740 let channel = ChannelId::new();
741
742 let req = Request::new(EventCategory::Echo, "test", source, channel, Value::Null);
743
744 let result = bus.request(req).await;
745 assert!(result.is_err());
746 assert_eq!(
747 result
748 .expect_err("request without target should fail")
749 .code(),
750 "ENGINE_NO_TARGET"
751 );
752 }
753
754 #[tokio::test]
755 async fn request_respond_flow() {
756 let mut bus = EventBus::new();
757 let source = ComponentId::builtin("source");
758 let target = ComponentId::builtin("target");
759 let channel = ChannelId::new();
760
761 let mut handle = bus.register(target.clone(), vec![EventCategory::Echo]);
762
763 let req = Request::new(
764 EventCategory::Echo,
765 "echo",
766 source,
767 channel,
768 Value::String("hello".into()),
769 )
770 .with_target(target);
771 let request_id = req.id;
772
773 let (tx, rx) = tokio::sync::oneshot::channel();
774
775 tokio::spawn(async move {
776 if let Some(req) = handle.recv_request().await {
777 tx.send(req).ok();
778 }
779 });
780
781 let _response_task = tokio::spawn(async move {
782 let mut bus = bus;
783 bus.request(req).await
784 });
785
786 let received_req = rx.await.expect("should receive request via oneshot");
787 assert_eq!(received_req.id, request_id);
788 }
789
790 #[tokio::test]
791 async fn respond_completes_request() {
792 let mut bus = EventBus::new();
793 let source = ComponentId::builtin("source");
794 let target = ComponentId::builtin("target");
795 let channel = ChannelId::new();
796
797 let mut handle = bus.register(target.clone(), vec![EventCategory::Echo]);
798
799 let req = Request::new(
800 EventCategory::Echo,
801 "echo",
802 source,
803 channel,
804 Value::String("test".into()),
805 )
806 .with_target(target.clone());
807 let request_id = req.id;
808
809 let (resp_tx, resp_rx) = tokio::sync::oneshot::channel::<Result<Value, String>>();
810 tokio::spawn(async move {
811 if let Some(req) = handle.recv_request().await {
812 resp_tx.send(Ok(req.payload)).ok();
813 }
814 });
815
816 let (tx, rx) = tokio::sync::oneshot::channel();
817 bus.pending_responses.insert(request_id, tx);
818
819 if let Some(sender) = bus.request_senders.get(&target) {
820 sender
821 .send(req)
822 .await
823 .expect("send request to target should succeed");
824 }
825
826 let result = resp_rx.await.expect("should receive response from handler");
827 bus.respond(request_id, result);
828
829 let received = rx
830 .await
831 .expect("should receive completed response via oneshot");
832 assert!(received.is_ok());
833 }
834
835 #[test]
836 fn respond_to_pending_request() {
837 let mut bus = EventBus::new();
838 let request_id = RequestId::new();
839
840 let (tx, mut rx) = tokio::sync::oneshot::channel();
841 bus.pending_responses.insert(request_id, tx);
842
843 bus.respond(request_id, Ok(Value::String("result".into())));
844
845 let received = rx.try_recv();
846 assert!(received.is_ok());
847 assert!(received.expect("should receive response value").is_ok());
848 }
849
850 #[test]
851 fn respond_to_unknown_request_is_noop() {
852 let mut bus = EventBus::new();
853 let request_id = RequestId::new();
854
855 bus.respond(request_id, Ok(Value::Null));
856 }
857
858 #[tokio::test]
859 async fn multiple_signal_receivers() {
860 let mut bus = EventBus::new();
861 let id1 = ComponentId::builtin("comp1");
862 let id2 = ComponentId::builtin("comp2");
863
864 let mut handle1 = bus.register(id1, vec![EventCategory::Lifecycle]);
865 let mut handle2 = bus.register(id2, vec![EventCategory::Lifecycle]);
866
867 let principal = Principal::System;
868 bus.signal(Signal::veto(principal));
869 tokio::task::yield_now().await;
870
871 assert!(handle1.try_recv_signal().is_some());
872 assert!(handle2.try_recv_signal().is_some());
873 }
874
875 #[test]
876 fn component_handle_getters() {
877 let mut bus = EventBus::new();
878 let id = ComponentId::builtin("test");
879 let handle = bus.register(id.clone(), vec![EventCategory::Lifecycle]);
880
881 assert_eq!(handle.component_id(), &id);
882 }
883
884 #[tokio::test]
885 async fn request_timeout() {
886 use orcs_types::ErrorCode;
887
888 let mut bus = EventBus::new();
889 let source = ComponentId::builtin("source");
890 let target = ComponentId::builtin("target");
891 let channel = ChannelId::new();
892
893 let _handle = bus.register(target.clone(), vec![EventCategory::Echo]);
894
895 let req = Request::new(EventCategory::Echo, "slow_op", source, channel, Value::Null)
896 .with_target(target)
897 .with_timeout(10);
898
899 let result = bus.request(req).await;
900
901 assert!(result.is_err());
902 let err = result.expect_err("request should timeout");
903 assert_eq!(err.code(), "ENGINE_TIMEOUT");
904 assert!(err.is_recoverable());
905 }
906
907 #[test]
908 fn register_with_multiple_subscriptions() {
909 let mut bus = EventBus::new();
910 let id = ComponentId::builtin("hil");
911
912 let _handle = bus.register(
913 id.clone(),
914 vec![EventCategory::Hil, EventCategory::Lifecycle],
915 );
916
917 assert_eq!(bus.component_count(), 1);
918 assert_eq!(bus.subscribers(&EventCategory::Hil).len(), 1);
919 assert_eq!(bus.subscribers(&EventCategory::Lifecycle).len(), 1);
920 assert_eq!(bus.subscribers(&EventCategory::Echo).len(), 0);
921 }
922
923 #[test]
924 fn unregister_removes_subscriptions() {
925 let mut bus = EventBus::new();
926 let id = ComponentId::builtin("hil");
927
928 let _handle = bus.register(id.clone(), vec![EventCategory::Hil, EventCategory::Echo]);
929
930 assert_eq!(bus.subscribers(&EventCategory::Hil).len(), 1);
931
932 bus.unregister(&id);
933
934 assert_eq!(bus.subscribers(&EventCategory::Hil).len(), 0);
935 assert_eq!(bus.subscribers(&EventCategory::Echo).len(), 0);
936 }
937
938 #[tokio::test]
939 async fn publish_no_subscriber_error() {
940 use orcs_types::ErrorCode;
941
942 let mut bus = EventBus::new();
943 let source = ComponentId::builtin("source");
944 let channel = ChannelId::new();
945
946 let req = Request::new(EventCategory::Hil, "submit", source, channel, Value::Null);
948
949 let result = bus.publish(req).await;
950
951 assert!(result.is_err());
952 let err = result.expect_err("publish without subscriber should fail");
953 assert_eq!(err.code(), "ENGINE_NO_SUBSCRIBER");
954 }
955
956 #[tokio::test]
957 async fn publish_routes_to_subscriber() {
958 let mut bus = EventBus::new();
959 let source = ComponentId::builtin("source");
960 let hil_id = ComponentId::builtin("hil");
961 let channel = ChannelId::new();
962
963 let mut handle = bus.register(hil_id, vec![EventCategory::Hil]);
965
966 let req = Request::new(EventCategory::Hil, "submit", source, channel, Value::Null);
967 let request_id = req.id;
968
969 let (tx, rx) = tokio::sync::oneshot::channel();
971 tokio::spawn(async move {
972 if let Some(received) = handle.recv_request().await {
973 tx.send(received).ok();
974 }
975 });
976
977 let _publish_task = tokio::spawn(async move {
979 let mut bus = bus;
980 bus.publish(req).await
981 });
982
983 let received = rx.await.expect("should receive published request");
984 assert_eq!(received.id, request_id);
985 assert_eq!(received.operation, "submit");
986 }
987
988 #[test]
991 fn broadcast_to_no_channels() {
992 let bus = EventBus::new();
993 let source = ComponentId::builtin("source");
994
995 let event = Event {
996 category: EventCategory::UserInput,
997 operation: "input".to_string(),
998 source,
999 payload: serde_json::json!({"message": "hello"}),
1000 };
1001
1002 let delivered = bus.broadcast(event);
1003 assert_eq!(delivered, 0);
1004 }
1005
1006 #[tokio::test]
1007 async fn broadcast_to_multiple_channels() {
1008 use crate::channel::{ChannelConfig, World, WorldManager};
1009
1010 let mut world = World::new();
1011 let ch1 = world.create_channel(ChannelConfig::default());
1012 let ch2 = world.create_channel(ChannelConfig::default());
1013
1014 let (manager, _world_tx) = WorldManager::with_world(world);
1015 let _manager_task = tokio::spawn(manager.run());
1016
1017 let mut bus = EventBus::new();
1018
1019 let (tx1, mut rx1) = tokio::sync::mpsc::channel(32);
1021 let (tx2, mut rx2) = tokio::sync::mpsc::channel(32);
1022
1023 let handle1 = ChannelHandle::new(ch1, tx1);
1024 let handle2 = ChannelHandle::new(ch2, tx2);
1025
1026 bus.register_channel(handle1);
1027 bus.register_channel(handle2);
1028
1029 let source = ComponentId::builtin("source");
1030 let event = Event {
1031 category: EventCategory::UserInput,
1032 operation: "input".to_string(),
1033 source,
1034 payload: serde_json::json!({"message": "broadcast test"}),
1035 };
1036
1037 let delivered = bus.broadcast(event);
1038 assert_eq!(delivered, 2);
1039
1040 let evt1 = rx1.try_recv();
1042 let evt2 = rx2.try_recv();
1043 assert!(evt1.is_ok());
1044 assert!(evt2.is_ok());
1045 assert_eq!(
1046 evt1.expect("channel 1 should receive broadcast")
1047 .into_event()
1048 .operation,
1049 "input"
1050 );
1051 assert_eq!(
1052 evt2.expect("channel 2 should receive broadcast")
1053 .into_event()
1054 .operation,
1055 "input"
1056 );
1057 }
1058
1059 #[tokio::test]
1060 async fn broadcast_async_to_channels() {
1061 let mut bus = EventBus::new();
1062
1063 let ch1 = ChannelId::new();
1064 let (tx1, mut rx1) = tokio::sync::mpsc::channel(32);
1065 let handle1 = ChannelHandle::new(ch1, tx1);
1066 bus.register_channel(handle1);
1067
1068 let source = ComponentId::builtin("source");
1069 let event = Event {
1070 category: EventCategory::UserInput,
1071 operation: "async_input".to_string(),
1072 source,
1073 payload: serde_json::json!({"message": "async broadcast"}),
1074 };
1075
1076 let delivered = bus.broadcast_async(event).await;
1077 assert_eq!(delivered, 1);
1078
1079 let evt = rx1.try_recv();
1080 assert!(evt.is_ok());
1081 assert_eq!(
1082 evt.expect("channel should receive async broadcast")
1083 .into_event()
1084 .operation,
1085 "async_input"
1086 );
1087 }
1088
1089 async fn setup_runner_with_request_channel() -> (
1095 ChannelHandle,
1096 tokio::task::JoinHandle<crate::channel::RunnerResult>,
1097 ChannelId,
1098 tokio::sync::broadcast::Sender<orcs_event::Signal>,
1099 tokio::sync::mpsc::Sender<crate::channel::WorldCommand>,
1100 tokio::task::JoinHandle<()>,
1101 ) {
1102 use crate::channel::ChannelRunner;
1103 use crate::channel::{ChannelConfig, World, WorldManager};
1104 use orcs_component::{ComponentError, Status};
1105 use orcs_event::SignalResponse;
1106 use tokio::sync::broadcast;
1107
1108 struct EchoRpcComponent {
1109 id: ComponentId,
1110 }
1111 impl orcs_component::Component for EchoRpcComponent {
1112 fn id(&self) -> &ComponentId {
1113 &self.id
1114 }
1115 fn status(&self) -> Status {
1116 Status::Idle
1117 }
1118 fn on_request(
1119 &mut self,
1120 request: &orcs_event::Request,
1121 ) -> Result<Value, ComponentError> {
1122 Ok(request.payload.clone())
1123 }
1124 fn on_signal(&mut self, signal: &orcs_event::Signal) -> SignalResponse {
1125 if signal.is_veto() {
1126 SignalResponse::Abort
1127 } else {
1128 SignalResponse::Handled
1129 }
1130 }
1131 fn abort(&mut self) {}
1132 }
1133
1134 let mut world = World::new();
1135 let channel_id = world.create_channel(ChannelConfig::interactive());
1136 let (manager, world_tx) = WorldManager::with_world(world);
1137 let world_handle = manager.world();
1138 let manager_task = tokio::spawn(manager.run());
1139 let (signal_tx, _) = broadcast::channel(64);
1140
1141 let component: Box<dyn orcs_component::Component> = Box::new(EchoRpcComponent {
1142 id: ComponentId::builtin("rpc_target"),
1143 });
1144 let signal_rx = signal_tx.subscribe();
1145 let (runner, handle) = ChannelRunner::builder(
1146 channel_id,
1147 world_tx.clone(),
1148 world_handle,
1149 signal_rx,
1150 component,
1151 )
1152 .with_request_channel()
1153 .build();
1154
1155 let runner_task = tokio::spawn(runner.run());
1156
1157 (
1158 handle,
1159 runner_task,
1160 channel_id,
1161 signal_tx,
1162 world_tx,
1163 manager_task,
1164 )
1165 }
1166
1167 #[tokio::test]
1168 async fn request_via_channel_handle_routes_and_responds() {
1169 let (handle, runner_task, channel_id, signal_tx, world_tx, manager_task) =
1170 setup_runner_with_request_channel().await;
1171
1172 let mut bus = EventBus::new();
1173 let source = ComponentId::builtin("source");
1174 let target = ComponentId::builtin("rpc_target");
1175
1176 bus.register_channel(handle);
1177 bus.register_component_channel(&target, channel_id);
1178
1179 let req = Request::new(
1180 EventCategory::Echo,
1181 "echo",
1182 source,
1183 channel_id,
1184 Value::String("rpc_test".into()),
1185 )
1186 .with_target(target);
1187
1188 let result = bus.request(req).await;
1189 assert!(result.is_ok());
1190 assert_eq!(
1191 result.expect("RPC request should succeed"),
1192 Value::String("rpc_test".into())
1193 );
1194
1195 signal_tx
1197 .send(orcs_event::Signal::cancel(
1198 channel_id,
1199 crate::Principal::System,
1200 ))
1201 .expect("send cancel signal for cleanup should succeed");
1202 let _ = tokio::time::timeout(std::time::Duration::from_millis(100), runner_task).await;
1203 let _ = world_tx.send(crate::channel::WorldCommand::Shutdown).await;
1204 let _ = manager_task.await;
1205 }
1206
1207 #[tokio::test]
1208 async fn request_via_channel_handle_timeout() {
1209 use orcs_types::ErrorCode;
1210
1211 let (handle, runner_task, channel_id, signal_tx, world_tx, manager_task) =
1213 setup_runner_with_request_channel().await;
1214
1215 let mut bus = EventBus::new();
1216 let source = ComponentId::builtin("source");
1217 let target = ComponentId::builtin("rpc_target");
1218
1219 bus.register_channel(handle);
1220 bus.register_component_channel(&target, channel_id);
1221
1222 signal_tx
1224 .send(orcs_event::Signal::veto(crate::Principal::System))
1225 .expect("send veto signal to stop runner should succeed");
1226 let _ = tokio::time::timeout(std::time::Duration::from_millis(200), runner_task).await;
1227
1228 let req = Request::new(
1229 EventCategory::Echo,
1230 "slow_op",
1231 source,
1232 channel_id,
1233 Value::Null,
1234 )
1235 .with_target(target)
1236 .with_timeout(50);
1237
1238 let result = bus.request(req).await;
1239
1240 assert!(result.is_err());
1243 let err = result.expect_err("request after runner stop should fail");
1244 let code = err.code();
1245 assert!(
1246 code == "ENGINE_SEND_FAILED" || code == "ENGINE_CHANNEL_CLOSED",
1247 "expected send_failed or channel_closed, got: {}",
1248 code,
1249 );
1250
1251 let _ = world_tx.send(crate::channel::WorldCommand::Shutdown).await;
1252 let _ = manager_task.await;
1253 }
1254
1255 #[tokio::test]
1256 async fn request_falls_back_to_component_handle_when_no_channel_map() {
1257 let mut bus = EventBus::new();
1258 let source = ComponentId::builtin("source");
1259 let target = ComponentId::builtin("target");
1260 let channel = ChannelId::new();
1261
1262 let mut handle = bus.register(target.clone(), vec![EventCategory::Echo]);
1264
1265 let req = Request::new(
1266 EventCategory::Echo,
1267 "echo",
1268 source,
1269 channel,
1270 Value::String("fallback".into()),
1271 )
1272 .with_target(target);
1273 let request_id = req.id;
1274
1275 let (tx, rx) = tokio::sync::oneshot::channel();
1277 tokio::spawn(async move {
1278 if let Some(req) = handle.recv_request().await {
1279 tx.send(req).ok();
1280 }
1281 });
1282
1283 let _response_task = tokio::spawn(async move {
1284 let mut bus = bus;
1285 bus.request(req).await
1286 });
1287
1288 let received = rx
1289 .await
1290 .expect("should receive request via fallback ComponentHandle path");
1291 assert_eq!(received.id, request_id);
1292 }
1293
1294 #[tokio::test]
1295 async fn unregister_channel_cleans_up_component_channel_map() {
1296 let mut bus = EventBus::new();
1297 let target = ComponentId::builtin("target");
1298 let channel_id = ChannelId::new();
1299
1300 let (event_tx, _event_rx) = tokio::sync::mpsc::channel(32);
1301 let handle = ChannelHandle::new(channel_id, event_tx);
1302
1303 bus.register_channel(handle);
1304 bus.register_component_channel(&target, channel_id);
1305
1306 assert!(bus.component_channel_map.read().contains_key(&target.fqn()));
1308
1309 bus.unregister_channel(&channel_id);
1311 assert!(!bus.component_channel_map.read().contains_key(&target.fqn()));
1312 }
1313
1314 mod hook_tests {
1317 use super::*;
1318 use orcs_hook::testing::MockHook;
1319 use orcs_hook::HookPoint;
1320 use serde_json::json;
1321
1322 fn setup_with_hooks() -> (EventBus, orcs_hook::SharedHookRegistry) {
1323 let registry = orcs_hook::shared_hook_registry();
1324 let mut bus = EventBus::new();
1325 bus.set_hook_registry(Arc::clone(®istry));
1326 (bus, registry)
1327 }
1328
1329 #[test]
1330 fn bus_on_register_fires() {
1331 let (mut bus, registry) = setup_with_hooks();
1332
1333 let hook = MockHook::pass_through("reg-observer", "*::*", HookPoint::BusOnRegister);
1334 let counter = hook.call_count.clone();
1335 registry
1336 .write()
1337 .expect("hook registry write lock should succeed")
1338 .register(Box::new(hook));
1339
1340 let _handle =
1341 bus.register(ComponentId::builtin("test"), vec![EventCategory::Lifecycle]);
1342 assert_eq!(counter.load(std::sync::atomic::Ordering::SeqCst), 1);
1343 }
1344
1345 #[test]
1346 fn bus_on_register_payload_contains_component_info() {
1347 let (mut bus, registry) = setup_with_hooks();
1348
1349 let hook = MockHook::modifier("reg-checker", "*::*", HookPoint::BusOnRegister, |ctx| {
1350 assert_eq!(ctx.payload["component_id"], "builtin::test-comp");
1351 let subs = ctx.payload["subscriptions"]
1352 .as_array()
1353 .expect("subscriptions should be an array");
1354 assert!(!subs.is_empty());
1355 });
1356 let counter = hook.call_count.clone();
1357 registry
1358 .write()
1359 .expect("hook registry write lock should succeed")
1360 .register(Box::new(hook));
1361
1362 let _handle = bus.register(
1363 ComponentId::builtin("test-comp"),
1364 vec![EventCategory::Lifecycle],
1365 );
1366 assert_eq!(counter.load(std::sync::atomic::Ordering::SeqCst), 1);
1367 }
1368
1369 #[test]
1370 fn bus_on_unregister_fires() {
1371 let (mut bus, registry) = setup_with_hooks();
1372
1373 let id = ComponentId::builtin("test");
1374 let _handle = bus.register(id.clone(), vec![EventCategory::Lifecycle]);
1375
1376 let hook = MockHook::pass_through("unreg-observer", "*::*", HookPoint::BusOnUnregister);
1377 let counter = hook.call_count.clone();
1378 registry
1379 .write()
1380 .expect("hook registry write lock should succeed")
1381 .register(Box::new(hook));
1382
1383 bus.unregister(&id);
1384 assert_eq!(counter.load(std::sync::atomic::Ordering::SeqCst), 1);
1385 }
1386
1387 #[test]
1388 fn bus_on_unregister_payload_contains_component_id() {
1389 let (mut bus, registry) = setup_with_hooks();
1390
1391 let id = ComponentId::builtin("my-comp");
1392 let _handle = bus.register(id.clone(), vec![EventCategory::Echo]);
1393
1394 let hook =
1395 MockHook::modifier("unreg-checker", "*::*", HookPoint::BusOnUnregister, |ctx| {
1396 assert_eq!(ctx.payload["component_id"], "builtin::my-comp");
1397 });
1398 let counter = hook.call_count.clone();
1399 registry
1400 .write()
1401 .expect("hook registry write lock should succeed")
1402 .register(Box::new(hook));
1403
1404 bus.unregister(&id);
1405 assert_eq!(counter.load(std::sync::atomic::Ordering::SeqCst), 1);
1406 }
1407
1408 #[test]
1409 fn bus_pre_broadcast_abort_prevents_delivery() {
1410 let (mut bus, registry) = setup_with_hooks();
1411
1412 let ch = ChannelId::new();
1413 let (tx, _rx) = tokio::sync::mpsc::channel(32);
1414 bus.register_channel(ChannelHandle::new(ch, tx));
1415
1416 let hook = MockHook::aborter(
1417 "block-broadcast",
1418 "*::*",
1419 HookPoint::BusPreBroadcast,
1420 "policy",
1421 );
1422 registry
1423 .write()
1424 .expect("hook registry write lock should succeed")
1425 .register(Box::new(hook));
1426
1427 let event = Event {
1428 category: EventCategory::UserInput,
1429 operation: "input".to_string(),
1430 source: ComponentId::builtin("source"),
1431 payload: json!({"msg": "hello"}),
1432 };
1433
1434 let delivered = bus.broadcast(event);
1435 assert_eq!(delivered, 0);
1436 }
1437
1438 #[test]
1439 fn bus_pre_broadcast_skip_prevents_delivery() {
1440 let (mut bus, registry) = setup_with_hooks();
1441
1442 let ch = ChannelId::new();
1443 let (tx, _rx) = tokio::sync::mpsc::channel(32);
1444 bus.register_channel(ChannelHandle::new(ch, tx));
1445
1446 let hook = MockHook::skipper(
1447 "skip-broadcast",
1448 "*::*",
1449 HookPoint::BusPreBroadcast,
1450 json!(null),
1451 );
1452 registry
1453 .write()
1454 .expect("hook registry write lock should succeed")
1455 .register(Box::new(hook));
1456
1457 let event = Event {
1458 category: EventCategory::UserInput,
1459 operation: "input".to_string(),
1460 source: ComponentId::builtin("source"),
1461 payload: json!({}),
1462 };
1463
1464 let delivered = bus.broadcast(event);
1465 assert_eq!(delivered, 0);
1466 }
1467
1468 #[test]
1469 fn bus_pre_broadcast_continue_allows_delivery() {
1470 let (mut bus, registry) = setup_with_hooks();
1471
1472 let ch = ChannelId::new();
1473 let (tx, mut rx) = tokio::sync::mpsc::channel(32);
1474 bus.register_channel(ChannelHandle::new(ch, tx));
1475
1476 let hook =
1477 MockHook::pass_through("allow-broadcast", "*::*", HookPoint::BusPreBroadcast);
1478 let counter = hook.call_count.clone();
1479 registry
1480 .write()
1481 .expect("hook registry write lock should succeed")
1482 .register(Box::new(hook));
1483
1484 let event = Event {
1485 category: EventCategory::UserInput,
1486 operation: "input".to_string(),
1487 source: ComponentId::builtin("source"),
1488 payload: json!({"msg": "ok"}),
1489 };
1490
1491 let delivered = bus.broadcast(event);
1492 assert_eq!(delivered, 1);
1493 assert_eq!(counter.load(std::sync::atomic::Ordering::SeqCst), 1);
1494 assert!(rx.try_recv().is_ok());
1495 }
1496
1497 #[test]
1498 fn bus_post_broadcast_fires_with_delivered_count() {
1499 let (mut bus, registry) = setup_with_hooks();
1500
1501 let ch = ChannelId::new();
1502 let (tx, _rx) = tokio::sync::mpsc::channel(32);
1503 bus.register_channel(ChannelHandle::new(ch, tx));
1504
1505 let hook =
1506 MockHook::modifier("post-checker", "*::*", HookPoint::BusPostBroadcast, |ctx| {
1507 assert_eq!(ctx.payload["delivered"], 1);
1508 assert_eq!(ctx.payload["operation"], "input");
1509 });
1510 let counter = hook.call_count.clone();
1511 registry
1512 .write()
1513 .expect("hook registry write lock should succeed")
1514 .register(Box::new(hook));
1515
1516 let event = Event {
1517 category: EventCategory::UserInput,
1518 operation: "input".to_string(),
1519 source: ComponentId::builtin("source"),
1520 payload: json!({}),
1521 };
1522
1523 let _ = bus.broadcast(event);
1524 assert_eq!(counter.load(std::sync::atomic::Ordering::SeqCst), 1);
1525 }
1526
1527 #[tokio::test]
1528 async fn bus_pre_broadcast_abort_prevents_async_delivery() {
1529 let (mut bus, registry) = setup_with_hooks();
1530
1531 let ch = ChannelId::new();
1532 let (tx, _rx) = tokio::sync::mpsc::channel(32);
1533 bus.register_channel(ChannelHandle::new(ch, tx));
1534
1535 let hook =
1536 MockHook::aborter("block-async", "*::*", HookPoint::BusPreBroadcast, "blocked");
1537 registry
1538 .write()
1539 .expect("hook registry write lock should succeed")
1540 .register(Box::new(hook));
1541
1542 let event = Event {
1543 category: EventCategory::UserInput,
1544 operation: "input".to_string(),
1545 source: ComponentId::builtin("source"),
1546 payload: json!({}),
1547 };
1548
1549 let delivered = bus.broadcast_async(event).await;
1550 assert_eq!(delivered, 0);
1551 }
1552
1553 #[test]
1554 fn no_hook_registry_passthrough() {
1555 let mut bus = EventBus::new();
1557
1558 let ch = ChannelId::new();
1559 let (tx, mut rx) = tokio::sync::mpsc::channel(32);
1560 bus.register_channel(ChannelHandle::new(ch, tx));
1561
1562 let event = Event {
1563 category: EventCategory::UserInput,
1564 operation: "input".to_string(),
1565 source: ComponentId::builtin("source"),
1566 payload: json!({"msg": "normal"}),
1567 };
1568
1569 let delivered = bus.broadcast(event);
1570 assert_eq!(delivered, 1);
1571 assert!(rx.try_recv().is_ok());
1572 }
1573 }
1574}