1use super::base::OutputSender;
22use super::Event;
23use crate::board::{BoardEntry, BoardEntryKind, SharedBoard};
24use crate::engine::{SharedChannelHandles, SharedComponentChannelMap};
25use orcs_component::Emitter;
26use orcs_event::{EventCategory, Signal};
27use orcs_types::{ChannelId, ComponentId};
28use serde_json::Value;
29use tokio::sync::broadcast;
30
31#[derive(Clone)]
39pub struct EventEmitter {
40 channel_tx: OutputSender,
44 output_tx: Option<OutputSender>,
47 signal_tx: broadcast::Sender<Signal>,
49 source_id: ComponentId,
51 shared_handles: Option<SharedChannelHandles>,
53 component_channel_map: Option<SharedComponentChannelMap>,
55 channel_id: Option<ChannelId>,
57 board: Option<SharedBoard>,
59}
60
61impl EventEmitter {
62 #[must_use]
70 pub(crate) fn new(
71 channel_tx: OutputSender,
72 signal_tx: broadcast::Sender<Signal>,
73 source_id: ComponentId,
74 ) -> Self {
75 Self {
76 channel_tx,
77 output_tx: None,
78 signal_tx,
79 source_id,
80 shared_handles: None,
81 component_channel_map: None,
82 channel_id: None,
83 board: None,
84 }
85 }
86
87 #[must_use]
97 pub(crate) fn with_output_channel(mut self, output_tx: OutputSender) -> Self {
98 self.output_tx = Some(output_tx);
99 self
100 }
101
102 #[must_use]
107 pub(crate) fn with_shared_handles(mut self, handles: SharedChannelHandles) -> Self {
108 self.shared_handles = Some(handles);
109 self
110 }
111
112 #[must_use]
117 pub(crate) fn with_component_channel_map(
118 mut self,
119 map: SharedComponentChannelMap,
120 channel_id: ChannelId,
121 ) -> Self {
122 self.component_channel_map = Some(map);
123 self.channel_id = Some(channel_id);
124 self
125 }
126
127 #[must_use]
132 pub(crate) fn with_board(mut self, board: SharedBoard) -> Self {
133 self.board = Some(board);
134 self
135 }
136
137 fn record_to_board(&self, kind: BoardEntryKind, operation: &str, payload: &serde_json::Value) {
139 if let Some(board) = &self.board {
140 let entry = BoardEntry {
141 timestamp: chrono::Utc::now(),
142 source: self.source_id.clone(),
143 kind,
144 operation: operation.to_string(),
145 payload: payload.clone(),
146 };
147 board.write().append(entry);
148 }
149 }
150
151 pub fn emit(&self, event: Event) -> bool {
164 self.channel_tx.try_send_direct(event).is_ok()
165 }
166
167 pub fn emit_output(&self, message: &str) {
177 let payload = serde_json::json!({
178 "message": message,
179 "level": "info"
180 });
181 self.record_to_board(
182 BoardEntryKind::Output {
183 level: "info".to_string(),
184 },
185 "display",
186 &payload,
187 );
188 let event = Event {
189 category: EventCategory::Output,
190 operation: "display".to_string(),
191 source: self.source_id.clone(),
192 payload,
193 };
194 let ok = self.emit_to_output(event);
195 tracing::debug!(source = %self.source_id.fqn(), success = ok, "emit_output");
196 }
197
198 pub fn emit_output_with_level(&self, message: &str, level: &str) {
207 let payload = serde_json::json!({
208 "message": message,
209 "level": level
210 });
211 self.record_to_board(
212 BoardEntryKind::Output {
213 level: level.to_string(),
214 },
215 "display",
216 &payload,
217 );
218 let event = Event {
219 category: EventCategory::Output,
220 operation: "display".to_string(),
221 source: self.source_id.clone(),
222 payload,
223 };
224 let _ = self.emit_to_output(event);
225 }
226
227 fn emit_to_output(&self, event: Event) -> bool {
231 if let Some(output_tx) = &self.output_tx {
232 match output_tx.try_send_direct(event) {
233 Ok(()) => true,
234 Err(e) => {
235 tracing::warn!(
236 source = %self.source_id.fqn(),
237 "emit_to_output: output_tx send failed (channel full or closed): {}",
238 e
239 );
240 false
241 }
242 }
243 } else {
244 tracing::warn!(
245 source = %self.source_id.fqn(),
246 "emit_to_output: output_tx is None, falling back to own channel"
247 );
248 self.emit(event)
249 }
250 }
251
252 pub fn broadcast(&self, signal: Signal) -> bool {
265 let ok = self.signal_tx.send(signal).is_ok();
266 tracing::debug!(source = %self.source_id.fqn(), success = ok, "broadcast signal");
267 ok
268 }
269
270 pub fn emit_event(&self, category: &str, operation: &str, payload: serde_json::Value) -> bool {
288 self.record_to_board(
289 BoardEntryKind::Event {
290 category: category.to_string(),
291 },
292 operation,
293 &payload,
294 );
295 let event = Event {
296 category: EventCategory::Extension {
297 namespace: "lua".to_string(),
298 kind: category.to_string(),
299 },
300 operation: operation.to_string(),
301 source: self.source_id.clone(),
302 payload,
303 };
304
305 if let Some(handles) = &self.shared_handles {
306 let handles = handles.read();
307 let mut delivered = 0usize;
308 for handle in handles.values() {
309 if handle.try_inject(event.clone()).is_ok() {
310 delivered += 1;
311 }
312 }
313 tracing::debug!(
314 source = %self.source_id.fqn(),
315 category = category,
316 operation = operation,
317 delivered = delivered,
318 "emit_event broadcast"
319 );
320 delivered > 0
321 } else {
322 let ok = self.emit(event);
324 tracing::debug!(
325 source = %self.source_id.fqn(),
326 category = category,
327 success = ok,
328 "emit_event fallback"
329 );
330 ok
331 }
332 }
333
334 #[must_use]
336 pub fn source_id(&self) -> &ComponentId {
337 &self.source_id
338 }
339}
340
341impl std::fmt::Debug for EventEmitter {
342 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
343 f.debug_struct("EventEmitter")
344 .field("source_id", &self.source_id)
345 .finish_non_exhaustive()
346 }
347}
348
349impl Emitter for EventEmitter {
351 fn emit_output(&self, message: &str) {
352 EventEmitter::emit_output(self, message);
353 }
354
355 fn emit_output_with_level(&self, message: &str, level: &str) {
356 EventEmitter::emit_output_with_level(self, message, level);
357 }
358
359 fn emit_event(&self, category: &str, operation: &str, payload: serde_json::Value) -> bool {
360 EventEmitter::emit_event(self, category, operation, payload)
361 }
362
363 fn board_recent(&self, n: usize) -> Vec<serde_json::Value> {
364 match self.board.as_ref() {
365 Some(board) => board.read().recent_as_json(n),
366 None => Vec::new(),
367 }
368 }
369
370 fn request(
371 &self,
372 target: &str,
373 operation: &str,
374 payload: Value,
375 timeout_ms: Option<u64>,
376 ) -> Result<Value, String> {
377 let map = self
378 .component_channel_map
379 .as_ref()
380 .ok_or("component_channel_map not configured")?;
381 let handles = self
382 .shared_handles
383 .as_ref()
384 .ok_or("shared_handles not configured")?;
385
386 let timeout = timeout_ms.unwrap_or(orcs_event::DEFAULT_TIMEOUT_MS);
387 let source_channel = self.channel_id.unwrap_or_else(ChannelId::new);
388
389 tokio::task::block_in_place(|| {
391 tokio::runtime::Handle::current().block_on(super::rpc::resolve_and_send_rpc(
392 super::rpc::RpcParams {
393 component_channel_map: map,
394 shared_handles: handles,
395 target_fqn: target,
396 operation,
397 source_id: self.source_id.clone(),
398 source_channel,
399 payload,
400 timeout_ms: timeout,
401 },
402 ))
403 })
404 }
405
406 fn is_alive(&self, target_fqn: &str) -> bool {
407 let Some(map) = self.component_channel_map.as_ref() else {
408 return false;
409 };
410 let Some(handles) = self.shared_handles.as_ref() else {
411 return false;
412 };
413
414 let Some(channel_id) = super::rpc::resolve_fqn(map, target_fqn) else {
415 return false;
416 };
417
418 handles
419 .read()
420 .get(&channel_id)
421 .is_some_and(|h| h.is_alive())
422 }
423
424 fn clone_box(&self) -> Box<dyn Emitter> {
425 Box::new(self.clone())
426 }
427}
428
429#[cfg(test)]
430mod tests {
431 use super::*;
432 use crate::channel::runner::base::OutputReceiver;
433 use orcs_types::{Principal, SignalScope};
434
435 fn setup() -> (EventEmitter, OutputReceiver, broadcast::Receiver<Signal>) {
436 let (channel_tx, channel_rx) = OutputSender::channel(64);
437 let (signal_tx, signal_rx) = broadcast::channel(64);
438 let source_id = ComponentId::builtin("test");
439
440 let emitter = EventEmitter::new(channel_tx, signal_tx, source_id);
441 (emitter, channel_rx, signal_rx)
442 }
443
444 #[test]
445 fn emit_event() {
446 let (emitter, mut channel_rx, _signal_rx) = setup();
447
448 let event = Event {
449 category: EventCategory::Echo,
450 operation: "test".to_string(),
451 source: ComponentId::builtin("test"),
452 payload: serde_json::json!({"data": "value"}),
453 };
454
455 assert!(emitter.emit(event));
456
457 let received = channel_rx.try_recv().expect("should receive emitted event");
458 assert_eq!(received.category, EventCategory::Echo);
459 assert_eq!(received.operation, "test");
460 }
461
462 #[test]
463 fn emit_output() {
464 let (emitter, mut channel_rx, _signal_rx) = setup();
465
466 emitter.emit_output("Hello, World!");
467
468 let received = channel_rx
469 .try_recv()
470 .expect("should receive emit_output event");
471 assert_eq!(received.category, EventCategory::Output);
472 assert_eq!(received.operation, "display");
473 assert_eq!(received.payload["message"], "Hello, World!");
474 assert_eq!(received.payload["level"], "info");
475 }
476
477 #[test]
478 fn emit_output_with_level() {
479 let (emitter, mut channel_rx, _signal_rx) = setup();
480
481 emitter.emit_output_with_level("Warning message", "warn");
482
483 let received = channel_rx
484 .try_recv()
485 .expect("should receive emit_output_with_level event");
486 assert_eq!(received.category, EventCategory::Output);
487 assert_eq!(received.payload["message"], "Warning message");
488 assert_eq!(received.payload["level"], "warn");
489 }
490
491 #[tokio::test]
492 async fn broadcast_signal() {
493 let (emitter, _channel_rx, mut signal_rx) = setup();
494
495 let signal = Signal::new(
496 orcs_event::SignalKind::Pause,
497 SignalScope::Global,
498 Principal::System,
499 );
500
501 assert!(emitter.broadcast(signal));
502
503 let received = signal_rx
504 .recv()
505 .await
506 .expect("should receive broadcast signal");
507 assert!(matches!(received.kind, orcs_event::SignalKind::Pause));
508 }
509
510 #[test]
511 fn source_id() {
512 let (emitter, _channel_rx, _signal_rx) = setup();
513 assert_eq!(emitter.source_id().name, "test");
514 }
515
516 #[test]
517 fn clone_emitter() {
518 let (emitter, mut channel_rx, _signal_rx) = setup();
519
520 let cloned = emitter.clone();
521 cloned.emit_output("From clone");
522
523 let received = channel_rx
524 .try_recv()
525 .expect("should receive event from cloned emitter");
526 assert_eq!(received.payload["message"], "From clone");
527 }
528
529 #[test]
530 fn emit_event_without_shared_handles_falls_back_to_own_channel() {
531 let (emitter, mut channel_rx, _signal_rx) = setup();
532
533 let result = emitter.emit_event(
534 "tool:result",
535 "complete",
536 serde_json::json!({"tool": "read", "success": true}),
537 );
538 assert!(result);
539
540 let received = channel_rx
541 .try_recv()
542 .expect("should receive Extension event via fallback");
543 assert_eq!(
544 received.category,
545 EventCategory::Extension {
546 namespace: "lua".to_string(),
547 kind: "tool:result".to_string(),
548 }
549 );
550 assert_eq!(received.operation, "complete");
551 assert_eq!(received.payload["tool"], "read");
552 assert_eq!(received.payload["success"], true);
553 }
554
555 #[test]
556 fn emit_event_with_shared_handles_broadcasts() {
557 use crate::channel::runner::base::ChannelHandle;
558 use orcs_types::ChannelId;
559 use parking_lot::RwLock;
560 use std::collections::HashMap;
561 use std::sync::Arc;
562
563 let (channel_tx, _channel_rx) = OutputSender::channel(64);
564 let (signal_tx, _signal_rx) = broadcast::channel(64);
565 let source_id = ComponentId::builtin("test");
566
567 let ch1 = ChannelId::new();
569 let ch2 = ChannelId::new();
570 let (tx1, mut rx1) = tokio::sync::mpsc::channel(32);
571 let (tx2, mut rx2) = tokio::sync::mpsc::channel(32);
572
573 let mut handles = HashMap::new();
574 handles.insert(ch1, ChannelHandle::new(ch1, tx1));
575 handles.insert(ch2, ChannelHandle::new(ch2, tx2));
576 let shared = Arc::new(RwLock::new(handles));
577
578 let emitter =
579 EventEmitter::new(channel_tx, signal_tx, source_id).with_shared_handles(shared);
580
581 let result = emitter.emit_event(
582 "tool:result",
583 "complete",
584 serde_json::json!({"data": "test"}),
585 );
586 assert!(result);
587
588 let evt1 = rx1
590 .try_recv()
591 .expect("channel 1 should receive broadcast event");
592 let evt2 = rx2
593 .try_recv()
594 .expect("channel 2 should receive broadcast event");
595
596 assert!(!evt1.is_direct()); assert!(!evt2.is_direct());
598
599 let e1 = evt1.into_event();
600 let e2 = evt2.into_event();
601
602 assert_eq!(
603 e1.category,
604 EventCategory::Extension {
605 namespace: "lua".to_string(),
606 kind: "tool:result".to_string(),
607 }
608 );
609 assert_eq!(e1.operation, "complete");
610 assert_eq!(e2.payload["data"], "test");
611 }
612
613 #[test]
614 fn emit_event_via_trait() {
615 let (emitter, mut channel_rx, _signal_rx) = setup();
616
617 let boxed: Box<dyn Emitter> = Box::new(emitter);
618 let result = boxed.emit_event(
619 "custom:event",
620 "notify",
621 serde_json::json!({"key": "value"}),
622 );
623 assert!(result);
624
625 let received = channel_rx
626 .try_recv()
627 .expect("should receive Extension event via trait");
628 assert_eq!(
629 received.category,
630 EventCategory::Extension {
631 namespace: "lua".to_string(),
632 kind: "custom:event".to_string(),
633 }
634 );
635 }
636
637 fn setup_with_board() -> (EventEmitter, OutputReceiver, crate::board::SharedBoard) {
640 let (channel_tx, channel_rx) = OutputSender::channel(64);
641 let (signal_tx, _signal_rx) = broadcast::channel(64);
642 let source_id = ComponentId::builtin("test");
643 let board = crate::board::shared_board();
644
645 let emitter = EventEmitter::new(channel_tx, signal_tx, source_id).with_board(board.clone());
646 (emitter, channel_rx, board)
647 }
648
649 #[test]
650 fn emit_output_records_to_board() {
651 let (emitter, _channel_rx, board) = setup_with_board();
652
653 emitter.emit_output("hello board");
654
655 let b = board.read();
656 assert_eq!(b.len(), 1);
657 let entries = b.recent(1);
658 assert_eq!(entries[0].payload["message"], "hello board");
659 assert_eq!(
660 entries[0].kind,
661 crate::board::BoardEntryKind::Output {
662 level: "info".into()
663 }
664 );
665 }
666
667 #[test]
668 fn emit_output_with_level_records_to_board() {
669 let (emitter, _channel_rx, board) = setup_with_board();
670
671 emitter.emit_output_with_level("warning!", "warn");
672
673 let b = board.read();
674 assert_eq!(b.len(), 1);
675 let entries = b.recent(1);
676 assert_eq!(entries[0].payload["level"], "warn");
677 assert_eq!(
678 entries[0].kind,
679 crate::board::BoardEntryKind::Output {
680 level: "warn".into()
681 }
682 );
683 }
684
685 #[test]
686 fn emit_event_records_to_board() {
687 let (emitter, _channel_rx, board) = setup_with_board();
688
689 emitter.emit_event(
690 "tool:result",
691 "complete",
692 serde_json::json!({"tool": "read"}),
693 );
694
695 let b = board.read();
696 assert_eq!(b.len(), 1);
697 let entries = b.recent(1);
698 assert_eq!(entries[0].operation, "complete");
699 assert_eq!(
700 entries[0].kind,
701 crate::board::BoardEntryKind::Event {
702 category: "tool:result".into()
703 }
704 );
705 }
706
707 #[test]
708 fn board_recent_via_trait() {
709 let (emitter, _channel_rx, _board) = setup_with_board();
710
711 emitter.emit_output("msg1");
712 emitter.emit_output("msg2");
713 emitter.emit_output("msg3");
714
715 let boxed: Box<dyn Emitter> = Box::new(emitter);
716 let recent = boxed.board_recent(2);
717 assert_eq!(recent.len(), 2);
718 assert_eq!(recent[0]["payload"]["message"], "msg2");
719 assert_eq!(recent[1]["payload"]["message"], "msg3");
720 }
721
722 #[test]
723 fn board_recent_without_board_returns_empty() {
724 let (emitter, _channel_rx, _signal_rx) = setup();
725
726 emitter.emit_output("no board");
727 let boxed: Box<dyn Emitter> = Box::new(emitter);
728 let recent = boxed.board_recent(10);
729 assert!(recent.is_empty());
730 }
731
732 fn setup_with_output_channel() -> (
735 EventEmitter,
736 OutputReceiver,
737 OutputReceiver,
738 broadcast::Receiver<Signal>,
739 ) {
740 let (channel_tx, channel_rx) = OutputSender::channel(64);
741 let (output_tx, output_rx) = OutputSender::channel(64);
742 let (signal_tx, signal_rx) = broadcast::channel(64);
743 let source_id = ComponentId::builtin("test-output-routing");
744
745 let emitter =
746 EventEmitter::new(channel_tx, signal_tx, source_id).with_output_channel(output_tx);
747 (emitter, channel_rx, output_rx, signal_rx)
748 }
749
750 #[test]
751 fn emit_output_routes_to_output_channel() {
752 let (emitter, mut channel_rx, mut output_rx, _signal_rx) = setup_with_output_channel();
753
754 emitter.emit_output("Hello via output channel!");
755
756 let received = output_rx
758 .try_recv()
759 .expect("Output event should arrive on output channel");
760 assert_eq!(received.category, EventCategory::Output);
761 assert_eq!(received.operation, "display");
762 assert_eq!(received.payload["message"], "Hello via output channel!");
763 assert_eq!(received.payload["level"], "info");
764
765 assert!(
767 channel_rx.try_recv().is_err(),
768 "Own channel should NOT receive Output event when output_channel is configured"
769 );
770 }
771
772 #[test]
773 fn emit_output_with_level_routes_to_output_channel() {
774 let (emitter, mut channel_rx, mut output_rx, _signal_rx) = setup_with_output_channel();
775
776 emitter.emit_output_with_level("Warning!", "warn");
777
778 let received = output_rx
779 .try_recv()
780 .expect("Output event should arrive on output channel");
781 assert_eq!(received.category, EventCategory::Output);
782 assert_eq!(received.payload["message"], "Warning!");
783 assert_eq!(received.payload["level"], "warn");
784
785 assert!(
786 channel_rx.try_recv().is_err(),
787 "Own channel should NOT receive Output event when output_channel is configured"
788 );
789 }
790
791 #[test]
792 fn emit_output_with_output_channel_also_records_to_board() {
793 let (channel_tx, _channel_rx) = OutputSender::channel(64);
794 let (output_tx, mut output_rx) = OutputSender::channel(64);
795 let (signal_tx, _signal_rx) = broadcast::channel(64);
796 let source_id = ComponentId::builtin("test-board-routing");
797 let board = crate::board::shared_board();
798
799 let emitter = EventEmitter::new(channel_tx, signal_tx, source_id)
800 .with_output_channel(output_tx)
801 .with_board(board.clone());
802
803 emitter.emit_output("Board + IO test");
804
805 let received = output_rx
807 .try_recv()
808 .expect("Output event should arrive on output channel");
809 assert_eq!(received.payload["message"], "Board + IO test");
810
811 let b = board.read();
813 assert_eq!(b.len(), 1, "Board should have exactly 1 entry");
814 let entries = b.recent(1);
815 assert_eq!(entries[0].payload["message"], "Board + IO test");
816 }
817
818 fn setup_with_rpc_wiring() -> (
821 EventEmitter,
822 ChannelId,
823 tokio::sync::mpsc::Receiver<crate::channel::runner::base::InboundEvent>,
824 ) {
825 use crate::channel::runner::base::ChannelHandle;
826 use crate::engine::{SharedChannelHandles, SharedComponentChannelMap};
827 use parking_lot::RwLock;
828 use std::collections::HashMap;
829 use std::sync::Arc;
830
831 let (channel_tx, _channel_rx) = OutputSender::channel(64);
832 let (signal_tx, _signal_rx) = broadcast::channel(64);
833 let source_id = ComponentId::builtin("test-is-alive");
834
835 let target_channel = ChannelId::new();
836 let (target_tx, target_rx) = tokio::sync::mpsc::channel(32);
837
838 let mut handles_map = HashMap::new();
839 handles_map.insert(
840 target_channel,
841 ChannelHandle::new(target_channel, target_tx),
842 );
843 let shared_handles: SharedChannelHandles = Arc::new(RwLock::new(handles_map));
844
845 let mut comp_map = HashMap::new();
846 comp_map.insert("ns::target_comp".to_string(), target_channel);
847 let shared_comp_map: SharedComponentChannelMap = Arc::new(RwLock::new(comp_map));
848
849 let emitter = EventEmitter::new(channel_tx, signal_tx, source_id)
850 .with_shared_handles(shared_handles)
851 .with_component_channel_map(shared_comp_map, ChannelId::new());
852
853 (emitter, target_channel, target_rx)
854 }
855
856 #[test]
857 fn is_alive_returns_true_when_runner_active() {
858 let (emitter, _ch, _rx) = setup_with_rpc_wiring();
859 let boxed: Box<dyn Emitter> = Box::new(emitter);
860
861 assert!(
863 boxed.is_alive("ns::target_comp"),
864 "should return true when target runner is alive"
865 );
866 }
867
868 #[test]
869 fn is_alive_returns_false_after_receiver_dropped() {
870 let (emitter, _ch, rx) = setup_with_rpc_wiring();
871 let boxed: Box<dyn Emitter> = Box::new(emitter);
872
873 drop(rx);
875
876 assert!(
877 !boxed.is_alive("ns::target_comp"),
878 "should return false after target receiver is dropped"
879 );
880 }
881
882 #[test]
883 fn is_alive_returns_true_with_short_name() {
884 let (emitter, _ch, _rx) = setup_with_rpc_wiring();
885 let boxed: Box<dyn Emitter> = Box::new(emitter);
886
887 assert!(
889 boxed.is_alive("target_comp"),
890 "should resolve short name via suffix matching"
891 );
892 }
893
894 #[test]
895 fn is_alive_returns_false_for_unknown_fqn() {
896 let (emitter, _ch, _rx) = setup_with_rpc_wiring();
897 let boxed: Box<dyn Emitter> = Box::new(emitter);
898
899 assert!(
900 !boxed.is_alive("ns::nonexistent"),
901 "should return false for unknown FQN"
902 );
903 }
904
905 #[test]
906 fn is_alive_returns_false_without_wiring() {
907 let (emitter, _channel_rx, _signal_rx) = setup();
909 let boxed: Box<dyn Emitter> = Box::new(emitter);
910
911 assert!(
912 !boxed.is_alive("anything"),
913 "should return false when no RPC wiring is configured"
914 );
915 }
916}