Skip to main content

orcs_runtime/channel/runner/
emitter.rs

1//! Event emitter for Components to emit events.
2//!
3//! Components can emit events to:
4//! - Their owning Channel (via `emit`) - processed by ClientRunner
5//! - All Components (via `broadcast`) - Signal broadcast
6//!
7//! # Usage
8//!
9//! ```ignore
10//! // Component receives emitter via set_emitter()
11//! fn set_emitter(&mut self, emitter: Box<dyn Emitter>) {
12//!     self.emitter = Some(emitter);
13//! }
14//!
15//! // Later, emit an Output event
16//! if let Some(emitter) = &self.emitter {
17//!     emitter.emit_output("Task completed successfully");
18//! }
19//! ```
20
21use super::base::OutputSender;
22use super::Event;
23use crate::board::{BoardEntry, BoardEntryKind, SharedBoard};
24use crate::engine::{SharedChannelHandles, SharedComponentChannelMap};
25use orcs_component::Emitter;
26use orcs_event::{EventCategory, Signal};
27use orcs_types::{ChannelId, ComponentId};
28use serde_json::Value;
29use tokio::sync::broadcast;
30
31/// Event emitter for Components.
32///
33/// Allows Components to emit events to their owning Channel
34/// or broadcast signals to all Components.
35///
36/// Output events can be routed to a separate IO channel when configured,
37/// enabling ChannelRunner components to display output via ClientRunner.
38#[derive(Clone)]
39pub struct EventEmitter {
40    /// Sender to emit events to the owning Channel.
41    /// All events are sent as [`InboundEvent::Direct`] since these are
42    /// internal emissions, not broadcasts.
43    channel_tx: OutputSender,
44    /// Sender for Output events to IO channel.
45    /// If set, emit_output sends here instead of channel_tx.
46    output_tx: Option<OutputSender>,
47    /// Sender to broadcast signals to all Components.
48    signal_tx: broadcast::Sender<Signal>,
49    /// Component ID for event source.
50    source_id: ComponentId,
51    /// Shared channel handles for broadcasting events to all channels.
52    shared_handles: Option<SharedChannelHandles>,
53    /// Shared ComponentId → ChannelId mapping for RPC routing.
54    component_channel_map: Option<SharedComponentChannelMap>,
55    /// Channel ID of this emitter's owning ChannelRunner.
56    channel_id: Option<ChannelId>,
57    /// Shared Board for auto-recording emitted events.
58    board: Option<SharedBoard>,
59}
60
61impl EventEmitter {
62    /// Creates a new EventEmitter.
63    ///
64    /// # Arguments
65    ///
66    /// * `channel_tx` - Sender for the owning Channel's event_rx
67    /// * `signal_tx` - Broadcast sender for signals
68    /// * `source_id` - Component ID to use as event source
69    #[must_use]
70    pub(crate) fn new(
71        channel_tx: OutputSender,
72        signal_tx: broadcast::Sender<Signal>,
73        source_id: ComponentId,
74    ) -> Self {
75        Self {
76            channel_tx,
77            output_tx: None,
78            signal_tx,
79            source_id,
80            shared_handles: None,
81            component_channel_map: None,
82            channel_id: None,
83            board: None,
84        }
85    }
86
87    /// Sets the output channel for routing Output events to IO channel.
88    ///
89    /// When set, `emit_output()` will send to this channel instead of
90    /// the owning channel. This enables ChannelRunner components to
91    /// display output via ClientRunner's IOBridge.
92    ///
93    /// # Arguments
94    ///
95    /// * `output_tx` - Sender for the IO channel's event_rx
96    #[must_use]
97    pub(crate) fn with_output_channel(mut self, output_tx: OutputSender) -> Self {
98        self.output_tx = Some(output_tx);
99        self
100    }
101
102    /// Sets the shared channel handles for event broadcasting.
103    ///
104    /// When set, `emit_event()` will broadcast Extension events to all
105    /// registered channels via these handles.
106    #[must_use]
107    pub(crate) fn with_shared_handles(mut self, handles: SharedChannelHandles) -> Self {
108        self.shared_handles = Some(handles);
109        self
110    }
111
112    /// Sets the shared component-to-channel mapping for RPC routing.
113    ///
114    /// When set, `request()` can resolve target ComponentId to ChannelId
115    /// and send RPC requests via ChannelHandle.
116    #[must_use]
117    pub(crate) fn with_component_channel_map(
118        mut self,
119        map: SharedComponentChannelMap,
120        channel_id: ChannelId,
121    ) -> Self {
122        self.component_channel_map = Some(map);
123        self.channel_id = Some(channel_id);
124        self
125    }
126
127    /// Sets the shared Board for auto-recording emitted events.
128    ///
129    /// When set, `emit_output()` and `emit_event()` will automatically
130    /// append entries to the Board for cross-component visibility.
131    #[must_use]
132    pub(crate) fn with_board(mut self, board: SharedBoard) -> Self {
133        self.board = Some(board);
134        self
135    }
136
137    /// Appends an entry to the Board (if attached).
138    fn record_to_board(&self, kind: BoardEntryKind, operation: &str, payload: &serde_json::Value) {
139        if let Some(board) = &self.board {
140            let entry = BoardEntry {
141                timestamp: chrono::Utc::now(),
142                source: self.source_id.clone(),
143                kind,
144                operation: operation.to_string(),
145                payload: payload.clone(),
146            };
147            board.write().append(entry);
148        }
149    }
150
151    /// Emits an event to the owning Channel.
152    ///
153    /// The ClientRunner will receive this event and process it.
154    /// For `Output` category events, ClientRunner will send to IOBridge.
155    ///
156    /// # Arguments
157    ///
158    /// * `event` - The event to emit
159    ///
160    /// # Returns
161    ///
162    /// `true` if the event was sent successfully, `false` if the channel is full or closed.
163    pub fn emit(&self, event: Event) -> bool {
164        self.channel_tx.try_send_direct(event).is_ok()
165    }
166
167    /// Emits an Output event with a message.
168    ///
169    /// If an output channel is configured (via `with_output_channel`),
170    /// the event is sent there. Otherwise, it's sent to the owning channel.
171    /// ClientRunner will send this to IOBridge for display.
172    ///
173    /// # Arguments
174    ///
175    /// * `message` - The message to display
176    pub fn emit_output(&self, message: &str) {
177        let payload = serde_json::json!({
178            "message": message,
179            "level": "info"
180        });
181        self.record_to_board(
182            BoardEntryKind::Output {
183                level: "info".to_string(),
184            },
185            "display",
186            &payload,
187        );
188        let event = Event {
189            category: EventCategory::Output,
190            operation: "display".to_string(),
191            source: self.source_id.clone(),
192            payload,
193        };
194        let ok = self.emit_to_output(event);
195        tracing::debug!(source = %self.source_id.fqn(), success = ok, "emit_output");
196    }
197
198    /// Emits an Output event with a specific level.
199    ///
200    /// If an output channel is configured, sends there; otherwise to owning channel.
201    ///
202    /// # Arguments
203    ///
204    /// * `message` - The message to display
205    /// * `level` - Log level ("info", "warn", "error")
206    pub fn emit_output_with_level(&self, message: &str, level: &str) {
207        let payload = serde_json::json!({
208            "message": message,
209            "level": level
210        });
211        self.record_to_board(
212            BoardEntryKind::Output {
213                level: level.to_string(),
214            },
215            "display",
216            &payload,
217        );
218        let event = Event {
219            category: EventCategory::Output,
220            operation: "display".to_string(),
221            source: self.source_id.clone(),
222            payload,
223        };
224        let _ = self.emit_to_output(event);
225    }
226
227    /// Emits an event to the output channel (or owning channel if not configured).
228    ///
229    /// This is the internal method used by emit_output variants.
230    fn emit_to_output(&self, event: Event) -> bool {
231        if let Some(output_tx) = &self.output_tx {
232            match output_tx.try_send_direct(event) {
233                Ok(()) => true,
234                Err(e) => {
235                    tracing::warn!(
236                        source = %self.source_id.fqn(),
237                        "emit_to_output: output_tx send failed (channel full or closed): {}",
238                        e
239                    );
240                    false
241                }
242            }
243        } else {
244            tracing::warn!(
245                source = %self.source_id.fqn(),
246                "emit_to_output: output_tx is None, falling back to own channel"
247            );
248            self.emit(event)
249        }
250    }
251
252    /// Broadcasts a signal to all Components.
253    ///
254    /// Use this when the Component needs to notify all other
255    /// Components of something (e.g., state change).
256    ///
257    /// # Arguments
258    ///
259    /// * `signal` - The signal to broadcast
260    ///
261    /// # Returns
262    ///
263    /// `true` if the signal was sent successfully.
264    pub fn broadcast(&self, signal: Signal) -> bool {
265        let ok = self.signal_tx.send(signal).is_ok();
266        tracing::debug!(source = %self.source_id.fqn(), success = ok, "broadcast signal");
267        ok
268    }
269
270    /// Broadcasts a custom Extension event to all registered channels.
271    ///
272    /// Creates an `Extension { namespace: "lua", kind: category }` event
273    /// and broadcasts it to all channels via shared handles. Channels
274    /// subscribed to the matching Extension category will process it.
275    ///
276    /// Falls back to emitting to own channel if shared handles are not set.
277    ///
278    /// # Arguments
279    ///
280    /// * `category` - Extension kind string (e.g., "tool:result")
281    /// * `operation` - Operation name (e.g., "complete")
282    /// * `payload` - Event payload data
283    ///
284    /// # Returns
285    ///
286    /// `true` if at least one channel received the event.
287    pub fn emit_event(&self, category: &str, operation: &str, payload: serde_json::Value) -> bool {
288        self.record_to_board(
289            BoardEntryKind::Event {
290                category: category.to_string(),
291            },
292            operation,
293            &payload,
294        );
295        let event = Event {
296            category: EventCategory::Extension {
297                namespace: "lua".to_string(),
298                kind: category.to_string(),
299            },
300            operation: operation.to_string(),
301            source: self.source_id.clone(),
302            payload,
303        };
304
305        if let Some(handles) = &self.shared_handles {
306            let handles = handles.read();
307            let mut delivered = 0usize;
308            for handle in handles.values() {
309                if handle.try_inject(event.clone()).is_ok() {
310                    delivered += 1;
311                }
312            }
313            tracing::debug!(
314                source = %self.source_id.fqn(),
315                category = category,
316                operation = operation,
317                delivered = delivered,
318                "emit_event broadcast"
319            );
320            delivered > 0
321        } else {
322            // Fallback: emit to own channel only
323            let ok = self.emit(event);
324            tracing::debug!(
325                source = %self.source_id.fqn(),
326                category = category,
327                success = ok,
328                "emit_event fallback"
329            );
330            ok
331        }
332    }
333
334    /// Returns the source Component ID.
335    #[must_use]
336    pub fn source_id(&self) -> &ComponentId {
337        &self.source_id
338    }
339}
340
341impl std::fmt::Debug for EventEmitter {
342    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
343        f.debug_struct("EventEmitter")
344            .field("source_id", &self.source_id)
345            .finish_non_exhaustive()
346    }
347}
348
349// Implement Emitter trait from orcs-component
350impl Emitter for EventEmitter {
351    fn emit_output(&self, message: &str) {
352        EventEmitter::emit_output(self, message);
353    }
354
355    fn emit_output_with_level(&self, message: &str, level: &str) {
356        EventEmitter::emit_output_with_level(self, message, level);
357    }
358
359    fn emit_event(&self, category: &str, operation: &str, payload: serde_json::Value) -> bool {
360        EventEmitter::emit_event(self, category, operation, payload)
361    }
362
363    fn board_recent(&self, n: usize) -> Vec<serde_json::Value> {
364        match self.board.as_ref() {
365            Some(board) => board.read().recent_as_json(n),
366            None => Vec::new(),
367        }
368    }
369
370    fn request(
371        &self,
372        target: &str,
373        operation: &str,
374        payload: Value,
375        timeout_ms: Option<u64>,
376    ) -> Result<Value, String> {
377        let map = self
378            .component_channel_map
379            .as_ref()
380            .ok_or("component_channel_map not configured")?;
381        let handles = self
382            .shared_handles
383            .as_ref()
384            .ok_or("shared_handles not configured")?;
385
386        let timeout = timeout_ms.unwrap_or(orcs_event::DEFAULT_TIMEOUT_MS);
387        let source_channel = self.channel_id.unwrap_or_else(ChannelId::new);
388
389        // block_in_place + block_on: safe from within a tokio multi-threaded runtime
390        tokio::task::block_in_place(|| {
391            tokio::runtime::Handle::current().block_on(super::rpc::resolve_and_send_rpc(
392                super::rpc::RpcParams {
393                    component_channel_map: map,
394                    shared_handles: handles,
395                    target_fqn: target,
396                    operation,
397                    source_id: self.source_id.clone(),
398                    source_channel,
399                    payload,
400                    timeout_ms: timeout,
401                },
402            ))
403        })
404    }
405
406    fn is_alive(&self, target_fqn: &str) -> bool {
407        let Some(map) = self.component_channel_map.as_ref() else {
408            return false;
409        };
410        let Some(handles) = self.shared_handles.as_ref() else {
411            return false;
412        };
413
414        let Some(channel_id) = super::rpc::resolve_fqn(map, target_fqn) else {
415            return false;
416        };
417
418        handles
419            .read()
420            .get(&channel_id)
421            .is_some_and(|h| h.is_alive())
422    }
423
424    fn clone_box(&self) -> Box<dyn Emitter> {
425        Box::new(self.clone())
426    }
427}
428
429#[cfg(test)]
430mod tests {
431    use super::*;
432    use crate::channel::runner::base::OutputReceiver;
433    use orcs_types::{Principal, SignalScope};
434
435    fn setup() -> (EventEmitter, OutputReceiver, broadcast::Receiver<Signal>) {
436        let (channel_tx, channel_rx) = OutputSender::channel(64);
437        let (signal_tx, signal_rx) = broadcast::channel(64);
438        let source_id = ComponentId::builtin("test");
439
440        let emitter = EventEmitter::new(channel_tx, signal_tx, source_id);
441        (emitter, channel_rx, signal_rx)
442    }
443
444    #[test]
445    fn emit_event() {
446        let (emitter, mut channel_rx, _signal_rx) = setup();
447
448        let event = Event {
449            category: EventCategory::Echo,
450            operation: "test".to_string(),
451            source: ComponentId::builtin("test"),
452            payload: serde_json::json!({"data": "value"}),
453        };
454
455        assert!(emitter.emit(event));
456
457        let received = channel_rx.try_recv().expect("should receive emitted event");
458        assert_eq!(received.category, EventCategory::Echo);
459        assert_eq!(received.operation, "test");
460    }
461
462    #[test]
463    fn emit_output() {
464        let (emitter, mut channel_rx, _signal_rx) = setup();
465
466        emitter.emit_output("Hello, World!");
467
468        let received = channel_rx
469            .try_recv()
470            .expect("should receive emit_output event");
471        assert_eq!(received.category, EventCategory::Output);
472        assert_eq!(received.operation, "display");
473        assert_eq!(received.payload["message"], "Hello, World!");
474        assert_eq!(received.payload["level"], "info");
475    }
476
477    #[test]
478    fn emit_output_with_level() {
479        let (emitter, mut channel_rx, _signal_rx) = setup();
480
481        emitter.emit_output_with_level("Warning message", "warn");
482
483        let received = channel_rx
484            .try_recv()
485            .expect("should receive emit_output_with_level event");
486        assert_eq!(received.category, EventCategory::Output);
487        assert_eq!(received.payload["message"], "Warning message");
488        assert_eq!(received.payload["level"], "warn");
489    }
490
491    #[tokio::test]
492    async fn broadcast_signal() {
493        let (emitter, _channel_rx, mut signal_rx) = setup();
494
495        let signal = Signal::new(
496            orcs_event::SignalKind::Pause,
497            SignalScope::Global,
498            Principal::System,
499        );
500
501        assert!(emitter.broadcast(signal));
502
503        let received = signal_rx
504            .recv()
505            .await
506            .expect("should receive broadcast signal");
507        assert!(matches!(received.kind, orcs_event::SignalKind::Pause));
508    }
509
510    #[test]
511    fn source_id() {
512        let (emitter, _channel_rx, _signal_rx) = setup();
513        assert_eq!(emitter.source_id().name, "test");
514    }
515
516    #[test]
517    fn clone_emitter() {
518        let (emitter, mut channel_rx, _signal_rx) = setup();
519
520        let cloned = emitter.clone();
521        cloned.emit_output("From clone");
522
523        let received = channel_rx
524            .try_recv()
525            .expect("should receive event from cloned emitter");
526        assert_eq!(received.payload["message"], "From clone");
527    }
528
529    #[test]
530    fn emit_event_without_shared_handles_falls_back_to_own_channel() {
531        let (emitter, mut channel_rx, _signal_rx) = setup();
532
533        let result = emitter.emit_event(
534            "tool:result",
535            "complete",
536            serde_json::json!({"tool": "read", "success": true}),
537        );
538        assert!(result);
539
540        let received = channel_rx
541            .try_recv()
542            .expect("should receive Extension event via fallback");
543        assert_eq!(
544            received.category,
545            EventCategory::Extension {
546                namespace: "lua".to_string(),
547                kind: "tool:result".to_string(),
548            }
549        );
550        assert_eq!(received.operation, "complete");
551        assert_eq!(received.payload["tool"], "read");
552        assert_eq!(received.payload["success"], true);
553    }
554
555    #[test]
556    fn emit_event_with_shared_handles_broadcasts() {
557        use crate::channel::runner::base::ChannelHandle;
558        use orcs_types::ChannelId;
559        use parking_lot::RwLock;
560        use std::collections::HashMap;
561        use std::sync::Arc;
562
563        let (channel_tx, _channel_rx) = OutputSender::channel(64);
564        let (signal_tx, _signal_rx) = broadcast::channel(64);
565        let source_id = ComponentId::builtin("test");
566
567        // Create two target channels
568        let ch1 = ChannelId::new();
569        let ch2 = ChannelId::new();
570        let (tx1, mut rx1) = tokio::sync::mpsc::channel(32);
571        let (tx2, mut rx2) = tokio::sync::mpsc::channel(32);
572
573        let mut handles = HashMap::new();
574        handles.insert(ch1, ChannelHandle::new(ch1, tx1));
575        handles.insert(ch2, ChannelHandle::new(ch2, tx2));
576        let shared = Arc::new(RwLock::new(handles));
577
578        let emitter =
579            EventEmitter::new(channel_tx, signal_tx, source_id).with_shared_handles(shared);
580
581        let result = emitter.emit_event(
582            "tool:result",
583            "complete",
584            serde_json::json!({"data": "test"}),
585        );
586        assert!(result);
587
588        // Both channels should receive the event as Broadcast
589        let evt1 = rx1
590            .try_recv()
591            .expect("channel 1 should receive broadcast event");
592        let evt2 = rx2
593            .try_recv()
594            .expect("channel 2 should receive broadcast event");
595
596        assert!(!evt1.is_direct()); // Should be Broadcast
597        assert!(!evt2.is_direct());
598
599        let e1 = evt1.into_event();
600        let e2 = evt2.into_event();
601
602        assert_eq!(
603            e1.category,
604            EventCategory::Extension {
605                namespace: "lua".to_string(),
606                kind: "tool:result".to_string(),
607            }
608        );
609        assert_eq!(e1.operation, "complete");
610        assert_eq!(e2.payload["data"], "test");
611    }
612
613    #[test]
614    fn emit_event_via_trait() {
615        let (emitter, mut channel_rx, _signal_rx) = setup();
616
617        let boxed: Box<dyn Emitter> = Box::new(emitter);
618        let result = boxed.emit_event(
619            "custom:event",
620            "notify",
621            serde_json::json!({"key": "value"}),
622        );
623        assert!(result);
624
625        let received = channel_rx
626            .try_recv()
627            .expect("should receive Extension event via trait");
628        assert_eq!(
629            received.category,
630            EventCategory::Extension {
631                namespace: "lua".to_string(),
632                kind: "custom:event".to_string(),
633            }
634        );
635    }
636
637    // === Board integration tests ===
638
639    fn setup_with_board() -> (EventEmitter, OutputReceiver, crate::board::SharedBoard) {
640        let (channel_tx, channel_rx) = OutputSender::channel(64);
641        let (signal_tx, _signal_rx) = broadcast::channel(64);
642        let source_id = ComponentId::builtin("test");
643        let board = crate::board::shared_board();
644
645        let emitter = EventEmitter::new(channel_tx, signal_tx, source_id).with_board(board.clone());
646        (emitter, channel_rx, board)
647    }
648
649    #[test]
650    fn emit_output_records_to_board() {
651        let (emitter, _channel_rx, board) = setup_with_board();
652
653        emitter.emit_output("hello board");
654
655        let b = board.read();
656        assert_eq!(b.len(), 1);
657        let entries = b.recent(1);
658        assert_eq!(entries[0].payload["message"], "hello board");
659        assert_eq!(
660            entries[0].kind,
661            crate::board::BoardEntryKind::Output {
662                level: "info".into()
663            }
664        );
665    }
666
667    #[test]
668    fn emit_output_with_level_records_to_board() {
669        let (emitter, _channel_rx, board) = setup_with_board();
670
671        emitter.emit_output_with_level("warning!", "warn");
672
673        let b = board.read();
674        assert_eq!(b.len(), 1);
675        let entries = b.recent(1);
676        assert_eq!(entries[0].payload["level"], "warn");
677        assert_eq!(
678            entries[0].kind,
679            crate::board::BoardEntryKind::Output {
680                level: "warn".into()
681            }
682        );
683    }
684
685    #[test]
686    fn emit_event_records_to_board() {
687        let (emitter, _channel_rx, board) = setup_with_board();
688
689        emitter.emit_event(
690            "tool:result",
691            "complete",
692            serde_json::json!({"tool": "read"}),
693        );
694
695        let b = board.read();
696        assert_eq!(b.len(), 1);
697        let entries = b.recent(1);
698        assert_eq!(entries[0].operation, "complete");
699        assert_eq!(
700            entries[0].kind,
701            crate::board::BoardEntryKind::Event {
702                category: "tool:result".into()
703            }
704        );
705    }
706
707    #[test]
708    fn board_recent_via_trait() {
709        let (emitter, _channel_rx, _board) = setup_with_board();
710
711        emitter.emit_output("msg1");
712        emitter.emit_output("msg2");
713        emitter.emit_output("msg3");
714
715        let boxed: Box<dyn Emitter> = Box::new(emitter);
716        let recent = boxed.board_recent(2);
717        assert_eq!(recent.len(), 2);
718        assert_eq!(recent[0]["payload"]["message"], "msg2");
719        assert_eq!(recent[1]["payload"]["message"], "msg3");
720    }
721
722    #[test]
723    fn board_recent_without_board_returns_empty() {
724        let (emitter, _channel_rx, _signal_rx) = setup();
725
726        emitter.emit_output("no board");
727        let boxed: Box<dyn Emitter> = Box::new(emitter);
728        let recent = boxed.board_recent(10);
729        assert!(recent.is_empty());
730    }
731
732    // === output_channel routing tests ===
733
734    fn setup_with_output_channel() -> (
735        EventEmitter,
736        OutputReceiver,
737        OutputReceiver,
738        broadcast::Receiver<Signal>,
739    ) {
740        let (channel_tx, channel_rx) = OutputSender::channel(64);
741        let (output_tx, output_rx) = OutputSender::channel(64);
742        let (signal_tx, signal_rx) = broadcast::channel(64);
743        let source_id = ComponentId::builtin("test-output-routing");
744
745        let emitter =
746            EventEmitter::new(channel_tx, signal_tx, source_id).with_output_channel(output_tx);
747        (emitter, channel_rx, output_rx, signal_rx)
748    }
749
750    #[test]
751    fn emit_output_routes_to_output_channel() {
752        let (emitter, mut channel_rx, mut output_rx, _signal_rx) = setup_with_output_channel();
753
754        emitter.emit_output("Hello via output channel!");
755
756        // Output event should arrive on output_rx (IO channel)
757        let received = output_rx
758            .try_recv()
759            .expect("Output event should arrive on output channel");
760        assert_eq!(received.category, EventCategory::Output);
761        assert_eq!(received.operation, "display");
762        assert_eq!(received.payload["message"], "Hello via output channel!");
763        assert_eq!(received.payload["level"], "info");
764
765        // Own channel should NOT receive the Output event
766        assert!(
767            channel_rx.try_recv().is_err(),
768            "Own channel should NOT receive Output event when output_channel is configured"
769        );
770    }
771
772    #[test]
773    fn emit_output_with_level_routes_to_output_channel() {
774        let (emitter, mut channel_rx, mut output_rx, _signal_rx) = setup_with_output_channel();
775
776        emitter.emit_output_with_level("Warning!", "warn");
777
778        let received = output_rx
779            .try_recv()
780            .expect("Output event should arrive on output channel");
781        assert_eq!(received.category, EventCategory::Output);
782        assert_eq!(received.payload["message"], "Warning!");
783        assert_eq!(received.payload["level"], "warn");
784
785        assert!(
786            channel_rx.try_recv().is_err(),
787            "Own channel should NOT receive Output event when output_channel is configured"
788        );
789    }
790
791    #[test]
792    fn emit_output_with_output_channel_also_records_to_board() {
793        let (channel_tx, _channel_rx) = OutputSender::channel(64);
794        let (output_tx, mut output_rx) = OutputSender::channel(64);
795        let (signal_tx, _signal_rx) = broadcast::channel(64);
796        let source_id = ComponentId::builtin("test-board-routing");
797        let board = crate::board::shared_board();
798
799        let emitter = EventEmitter::new(channel_tx, signal_tx, source_id)
800            .with_output_channel(output_tx)
801            .with_board(board.clone());
802
803        emitter.emit_output("Board + IO test");
804
805        // Should arrive on output_rx
806        let received = output_rx
807            .try_recv()
808            .expect("Output event should arrive on output channel");
809        assert_eq!(received.payload["message"], "Board + IO test");
810
811        // Should also be recorded on Board
812        let b = board.read();
813        assert_eq!(b.len(), 1, "Board should have exactly 1 entry");
814        let entries = b.recent(1);
815        assert_eq!(entries[0].payload["message"], "Board + IO test");
816    }
817
818    // === is_alive tests ===
819
820    fn setup_with_rpc_wiring() -> (
821        EventEmitter,
822        ChannelId,
823        tokio::sync::mpsc::Receiver<crate::channel::runner::base::InboundEvent>,
824    ) {
825        use crate::channel::runner::base::ChannelHandle;
826        use crate::engine::{SharedChannelHandles, SharedComponentChannelMap};
827        use parking_lot::RwLock;
828        use std::collections::HashMap;
829        use std::sync::Arc;
830
831        let (channel_tx, _channel_rx) = OutputSender::channel(64);
832        let (signal_tx, _signal_rx) = broadcast::channel(64);
833        let source_id = ComponentId::builtin("test-is-alive");
834
835        let target_channel = ChannelId::new();
836        let (target_tx, target_rx) = tokio::sync::mpsc::channel(32);
837
838        let mut handles_map = HashMap::new();
839        handles_map.insert(
840            target_channel,
841            ChannelHandle::new(target_channel, target_tx),
842        );
843        let shared_handles: SharedChannelHandles = Arc::new(RwLock::new(handles_map));
844
845        let mut comp_map = HashMap::new();
846        comp_map.insert("ns::target_comp".to_string(), target_channel);
847        let shared_comp_map: SharedComponentChannelMap = Arc::new(RwLock::new(comp_map));
848
849        let emitter = EventEmitter::new(channel_tx, signal_tx, source_id)
850            .with_shared_handles(shared_handles)
851            .with_component_channel_map(shared_comp_map, ChannelId::new());
852
853        (emitter, target_channel, target_rx)
854    }
855
856    #[test]
857    fn is_alive_returns_true_when_runner_active() {
858        let (emitter, _ch, _rx) = setup_with_rpc_wiring();
859        let boxed: Box<dyn Emitter> = Box::new(emitter);
860
861        // Receiver is alive (held by _rx)
862        assert!(
863            boxed.is_alive("ns::target_comp"),
864            "should return true when target runner is alive"
865        );
866    }
867
868    #[test]
869    fn is_alive_returns_false_after_receiver_dropped() {
870        let (emitter, _ch, rx) = setup_with_rpc_wiring();
871        let boxed: Box<dyn Emitter> = Box::new(emitter);
872
873        // Drop the receiver to simulate runner termination
874        drop(rx);
875
876        assert!(
877            !boxed.is_alive("ns::target_comp"),
878            "should return false after target receiver is dropped"
879        );
880    }
881
882    #[test]
883    fn is_alive_returns_true_with_short_name() {
884        let (emitter, _ch, _rx) = setup_with_rpc_wiring();
885        let boxed: Box<dyn Emitter> = Box::new(emitter);
886
887        // Short-name fallback: "target_comp" should resolve to "ns::target_comp"
888        assert!(
889            boxed.is_alive("target_comp"),
890            "should resolve short name via suffix matching"
891        );
892    }
893
894    #[test]
895    fn is_alive_returns_false_for_unknown_fqn() {
896        let (emitter, _ch, _rx) = setup_with_rpc_wiring();
897        let boxed: Box<dyn Emitter> = Box::new(emitter);
898
899        assert!(
900            !boxed.is_alive("ns::nonexistent"),
901            "should return false for unknown FQN"
902        );
903    }
904
905    #[test]
906    fn is_alive_returns_false_without_wiring() {
907        // Emitter without shared_handles / component_channel_map
908        let (emitter, _channel_rx, _signal_rx) = setup();
909        let boxed: Box<dyn Emitter> = Box::new(emitter);
910
911        assert!(
912            !boxed.is_alive("anything"),
913            "should return false when no RPC wiring is configured"
914        );
915    }
916}