Skip to main content

ssh_commander_core/
event_bus.rs

1//! Event bus — a single channel for all async-to-sync boundary crossings.
2//!
3//! The event bus decouples the Rust domain layer from any particular consumer
4//! (Tauri WebSocket server, native macOS FFI bridge, CLI harness, …). Every
5//! "push" event (PTY output, transfer progress, connection status change) is
6//! serialised into a `CoreEvent` and sent over a broadcast channel. A single
7//! external callback (registered via `set_event_callback`) drains the channel
8//! and dispatches to the native layer.
9//!
10//! This avoids a sprawling FFI surface where every event type needs its own
11//! callback registration.
12//!
13//! # Thread Safety
14//!
15//! - `CoreEvent`: `Send + Sync`.
16//! - The sender and receiver are behind `OnceLock` — safe to call from any
17//!   thread.
18//! - The broadcast channel has a fixed capacity (1024). If the consumer falls
19//!   behind, the oldest events are dropped. This is intentional — PTY output
20//!   and progress events are latency-sensitive, not reliability-sensitive.
21
22#![allow(dead_code)]
23
24use std::sync::OnceLock;
25
26use tokio::sync::broadcast;
27
28/// All event kinds that cross the async-to-sync boundary.
29#[derive(Debug, Clone)]
30pub enum CoreEvent {
31    /// PTY output data for a connection.
32    ///
33    /// `generation` is the PTY-session counter the publisher captured at
34    /// `start_pty_connection` time. Consumers (Swift native, future
35    /// CLI harness) compare it against the latest generation they've
36    /// seen to discard frames from a PTY session that has since been
37    /// torn down and replaced — without it, the brief tail of in-flight
38    /// output from an old session can spill into a new one.
39    PtyOutput {
40        connection_id: String,
41        generation: u64,
42        data: Vec<u8>,
43    },
44    /// A connection's status changed.
45    ConnectionStatus {
46        connection_id: String,
47        status: ConnectionStatus,
48    },
49    /// File transfer progress.
50    TransferProgress {
51        connection_id: String,
52        path: String,
53        bytes_transferred: u64,
54        total_bytes: u64,
55    },
56    /// One line from a streaming `tcpdump` capture. Stderr lines (e.g.
57    /// libpcap startup banner) carry `is_stderr = true` so the UI can
58    /// dim them.
59    TcpdumpLine {
60        capture_id: u64,
61        line: String,
62        is_stderr: bool,
63    },
64}
65
66#[derive(Debug, Clone, Copy, PartialEq, Eq)]
67pub enum ConnectionStatus {
68    Connected,
69    Disconnected,
70    Error { reason: &'static str },
71}
72
73// ---------------------------------------------------------------------------
74// Singleton channel
75// ---------------------------------------------------------------------------
76
77const EVENT_CHANNEL_CAPACITY: usize = 1024;
78
79static EVENT_TX: OnceLock<broadcast::Sender<CoreEvent>> = OnceLock::new();
80
81/// Get a sender handle to the event bus. The channel is lazily created on
82/// first call with a capacity of 1024 events.
83pub fn event_sender() -> Option<broadcast::Sender<CoreEvent>> {
84    let tx = EVENT_TX.get_or_init(|| {
85        let (tx, _) = broadcast::channel(EVENT_CHANNEL_CAPACITY);
86        tx
87    });
88    Some(tx.clone())
89}
90
91/// Subscribe to all events. Returns a receiver that starts with a `RecvError::Lagged`
92/// for any events produced before this subscription.
93pub fn subscribe() -> broadcast::Receiver<CoreEvent> {
94    let tx = EVENT_TX.get_or_init(|| {
95        let (tx, _) = broadcast::channel(EVENT_CHANNEL_CAPACITY);
96        tx
97    });
98    tx.subscribe()
99}
100
101// ---------------------------------------------------------------------------
102// Tests — uses a private channel to avoid interference from the global static
103// ---------------------------------------------------------------------------
104
105#[cfg(test)]
106mod tests {
107    use super::*;
108    use tokio::sync::broadcast;
109
110    #[test]
111    fn event_bus_send_and_receive() {
112        let (tx, mut rx) = broadcast::channel(16);
113        tx.send(CoreEvent::ConnectionStatus {
114            connection_id: "test-1".into(),
115            status: ConnectionStatus::Connected,
116        })
117        .ok();
118
119        let received = rx.try_recv().expect("should have event");
120        match received {
121            CoreEvent::ConnectionStatus {
122                connection_id,
123                status,
124            } => {
125                assert_eq!(connection_id, "test-1");
126                assert_eq!(status, ConnectionStatus::Connected);
127            }
128            other => panic!("unexpected event: {:?}", other),
129        }
130    }
131
132    #[test]
133    fn multiple_subscribers_get_all_events() {
134        let (tx, mut rx1) = broadcast::channel(16);
135        let mut rx2 = tx.subscribe();
136
137        tx.send(CoreEvent::PtyOutput {
138            connection_id: "c1".into(),
139            generation: 1,
140            data: vec![1, 2, 3],
141        })
142        .ok();
143
144        assert!(rx1.try_recv().is_ok());
145        assert!(rx2.try_recv().is_ok());
146    }
147}