#![allow(dead_code)]
use std::sync::OnceLock;
use tokio::sync::broadcast;
#[derive(Debug, Clone)]
pub enum CoreEvent {
PtyOutput {
connection_id: String,
generation: u64,
data: Vec<u8>,
},
ConnectionStatus {
connection_id: String,
status: ConnectionStatus,
},
TransferProgress {
connection_id: String,
path: String,
bytes_transferred: u64,
total_bytes: u64,
},
TcpdumpLine {
capture_id: u64,
line: String,
is_stderr: bool,
},
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ConnectionStatus {
Connected,
Disconnected,
Error { reason: &'static str },
}
const EVENT_CHANNEL_CAPACITY: usize = 1024;
static EVENT_TX: OnceLock<broadcast::Sender<CoreEvent>> = OnceLock::new();
pub fn event_sender() -> Option<broadcast::Sender<CoreEvent>> {
let tx = EVENT_TX.get_or_init(|| {
let (tx, _) = broadcast::channel(EVENT_CHANNEL_CAPACITY);
tx
});
Some(tx.clone())
}
pub fn subscribe() -> broadcast::Receiver<CoreEvent> {
let tx = EVENT_TX.get_or_init(|| {
let (tx, _) = broadcast::channel(EVENT_CHANNEL_CAPACITY);
tx
});
tx.subscribe()
}
#[cfg(test)]
mod tests {
use super::*;
use tokio::sync::broadcast;
#[test]
fn event_bus_send_and_receive() {
let (tx, mut rx) = broadcast::channel(16);
tx.send(CoreEvent::ConnectionStatus {
connection_id: "test-1".into(),
status: ConnectionStatus::Connected,
})
.ok();
let received = rx.try_recv().expect("should have event");
match received {
CoreEvent::ConnectionStatus {
connection_id,
status,
} => {
assert_eq!(connection_id, "test-1");
assert_eq!(status, ConnectionStatus::Connected);
}
other => panic!("unexpected event: {:?}", other),
}
}
#[test]
fn multiple_subscribers_get_all_events() {
let (tx, mut rx1) = broadcast::channel(16);
let mut rx2 = tx.subscribe();
tx.send(CoreEvent::PtyOutput {
connection_id: "c1".into(),
generation: 1,
data: vec![1, 2, 3],
})
.ok();
assert!(rx1.try_recv().is_ok());
assert!(rx2.try_recv().is_ok());
}
}