ssh_commander_core/event_bus.rs
1//! Event bus — a single channel for all async-to-sync boundary crossings.
2//!
3//! The event bus decouples the Rust domain layer from any particular consumer
4//! (Tauri WebSocket server, native macOS FFI bridge, CLI harness, …). Every
5//! "push" event (PTY output, transfer progress, connection status change) is
6//! serialised into a `CoreEvent` and sent over a broadcast channel. A single
7//! external callback (registered via `set_event_callback`) drains the channel
8//! and dispatches to the native layer.
9//!
10//! This avoids a sprawling FFI surface where every event type needs its own
11//! callback registration.
12//!
13//! # Thread Safety
14//!
15//! - `CoreEvent`: `Send + Sync`.
16//! - The sender and receiver are behind `OnceLock` — safe to call from any
17//! thread.
18//! - The broadcast channel has a fixed capacity (1024). If the consumer falls
19//! behind, the oldest events are dropped. This is intentional — PTY output
20//! and progress events are latency-sensitive, not reliability-sensitive.
21
22#![allow(dead_code)]
23
24use std::sync::OnceLock;
25
26use tokio::sync::broadcast;
27
28/// All event kinds that cross the async-to-sync boundary.
29#[derive(Debug, Clone)]
30pub enum CoreEvent {
31 /// PTY output data for a connection.
32 ///
33 /// `generation` is the PTY-session counter the publisher captured at
34 /// `start_pty_connection` time. Consumers (Swift native, future
35 /// CLI harness) compare it against the latest generation they've
36 /// seen to discard frames from a PTY session that has since been
37 /// torn down and replaced — without it, the brief tail of in-flight
38 /// output from an old session can spill into a new one.
39 PtyOutput {
40 connection_id: String,
41 generation: u64,
42 data: Vec<u8>,
43 },
44 /// A connection's status changed.
45 ConnectionStatus {
46 connection_id: String,
47 status: ConnectionStatus,
48 },
49 /// File transfer progress.
50 TransferProgress {
51 connection_id: String,
52 path: String,
53 bytes_transferred: u64,
54 total_bytes: u64,
55 },
56 /// One line from a streaming `tcpdump` capture. Stderr lines (e.g.
57 /// libpcap startup banner) carry `is_stderr = true` so the UI can
58 /// dim them.
59 TcpdumpLine {
60 capture_id: u64,
61 line: String,
62 is_stderr: bool,
63 },
64}
65
66#[derive(Debug, Clone, Copy, PartialEq, Eq)]
67pub enum ConnectionStatus {
68 Connected,
69 Disconnected,
70 Error { reason: &'static str },
71}
72
73// ---------------------------------------------------------------------------
74// Singleton channel
75// ---------------------------------------------------------------------------
76
77const EVENT_CHANNEL_CAPACITY: usize = 1024;
78
79static EVENT_TX: OnceLock<broadcast::Sender<CoreEvent>> = OnceLock::new();
80
81/// Get a sender handle to the event bus. The channel is lazily created on
82/// first call with a capacity of 1024 events.
83pub fn event_sender() -> Option<broadcast::Sender<CoreEvent>> {
84 let tx = EVENT_TX.get_or_init(|| {
85 let (tx, _) = broadcast::channel(EVENT_CHANNEL_CAPACITY);
86 tx
87 });
88 Some(tx.clone())
89}
90
91/// Subscribe to all events. Returns a receiver that starts with a `RecvError::Lagged`
92/// for any events produced before this subscription.
93pub fn subscribe() -> broadcast::Receiver<CoreEvent> {
94 let tx = EVENT_TX.get_or_init(|| {
95 let (tx, _) = broadcast::channel(EVENT_CHANNEL_CAPACITY);
96 tx
97 });
98 tx.subscribe()
99}
100
101// ---------------------------------------------------------------------------
102// Tests — uses a private channel to avoid interference from the global static
103// ---------------------------------------------------------------------------
104
105#[cfg(test)]
106mod tests {
107 use super::*;
108 use tokio::sync::broadcast;
109
110 #[test]
111 fn event_bus_send_and_receive() {
112 let (tx, mut rx) = broadcast::channel(16);
113 tx.send(CoreEvent::ConnectionStatus {
114 connection_id: "test-1".into(),
115 status: ConnectionStatus::Connected,
116 })
117 .ok();
118
119 let received = rx.try_recv().expect("should have event");
120 match received {
121 CoreEvent::ConnectionStatus {
122 connection_id,
123 status,
124 } => {
125 assert_eq!(connection_id, "test-1");
126 assert_eq!(status, ConnectionStatus::Connected);
127 }
128 other => panic!("unexpected event: {:?}", other),
129 }
130 }
131
132 #[test]
133 fn multiple_subscribers_get_all_events() {
134 let (tx, mut rx1) = broadcast::channel(16);
135 let mut rx2 = tx.subscribe();
136
137 tx.send(CoreEvent::PtyOutput {
138 connection_id: "c1".into(),
139 generation: 1,
140 data: vec![1, 2, 3],
141 })
142 .ok();
143
144 assert!(rx1.try_recv().is_ok());
145 assert!(rx2.try_recv().is_ok());
146 }
147}