use std::sync::mpsc::{Receiver, SyncSender, TrySendError};
use std::sync::{Mutex, OnceLock};
use std::time::{SystemTime, UNIX_EPOCH};
use serde::{Deserialize, Serialize};
pub const MAX_QUEUE_DEPTH: usize = 1000;
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum Event {
#[serde(rename = "session.created")]
SessionCreated { session: String, ts: f64 },
#[serde(rename = "session.detached")]
SessionDetached { session: String, ts: f64 },
#[serde(rename = "pane.spawned")]
PaneSpawned {
session: String,
pane: usize,
command: String,
#[serde(skip_serializing_if = "Option::is_none")]
cwd: Option<String>,
ts: f64,
},
#[serde(rename = "pane.exited")]
PaneExited {
session: String,
pane: usize,
#[serde(skip_serializing_if = "Option::is_none")]
exit_code: Option<i32>,
ts: f64,
},
#[serde(rename = "pane.focused")]
PaneFocused {
session: String,
pane: usize,
ts: f64,
},
#[serde(rename = "pane.cwd_changed")]
PaneCwdChanged {
session: String,
pane: usize,
cwd: String,
ts: f64,
},
#[serde(rename = "pane.prompt")]
PanePrompt {
session: String,
pane: usize,
#[serde(skip_serializing_if = "Option::is_none")]
exit_code: Option<i32>,
ts: f64,
},
#[serde(rename = "tab.added")]
TabAdded {
session: String,
tab: usize,
ts: f64,
},
#[serde(rename = "tab.renamed")]
TabRenamed {
session: String,
tab: usize,
name: String,
ts: f64,
},
#[serde(rename = "config.reloaded")]
ConfigReloaded { ok: bool, ts: f64 },
#[serde(rename = "snapshot.saved")]
SnapshotSaved {
session: String,
path: String,
ts: f64,
},
#[serde(rename = "events.dropped")]
EventsDropped { count: u64, ts: f64 },
}
impl Event {
pub fn type_tag(&self) -> &'static str {
match self {
Event::SessionCreated { .. } => "session.created",
Event::SessionDetached { .. } => "session.detached",
Event::PaneSpawned { .. } => "pane.spawned",
Event::PaneExited { .. } => "pane.exited",
Event::PaneFocused { .. } => "pane.focused",
Event::PaneCwdChanged { .. } => "pane.cwd_changed",
Event::PanePrompt { .. } => "pane.prompt",
Event::TabAdded { .. } => "tab.added",
Event::TabRenamed { .. } => "tab.renamed",
Event::ConfigReloaded { .. } => "config.reloaded",
Event::SnapshotSaved { .. } => "snapshot.saved",
Event::EventsDropped { .. } => "events.dropped",
}
}
pub fn session(&self) -> Option<&str> {
match self {
Event::SessionCreated { session, .. }
| Event::SessionDetached { session, .. }
| Event::PaneSpawned { session, .. }
| Event::PaneExited { session, .. }
| Event::PaneFocused { session, .. }
| Event::PaneCwdChanged { session, .. }
| Event::PanePrompt { session, .. }
| Event::TabAdded { session, .. }
| Event::TabRenamed { session, .. }
| Event::SnapshotSaved { session, .. } => Some(session.as_str()),
Event::ConfigReloaded { .. } | Event::EventsDropped { .. } => None,
}
}
}
pub fn now_ts() -> f64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_secs_f64())
.unwrap_or(0.0)
}
pub struct Subscription {
pub rx: Receiver<Event>,
pub id: u64,
bus: &'static EventBus,
}
impl Drop for Subscription {
fn drop(&mut self) {
self.bus.unsubscribe(self.id);
}
}
struct Subscriber {
id: u64,
tx: SyncSender<Event>,
session_filter: Option<String>,
type_filter: Option<Vec<String>>,
dropped: u64,
}
pub struct EventBus {
inner: Mutex<BusInner>,
}
struct BusInner {
next_id: u64,
subs: Vec<Subscriber>,
}
static BUS: OnceLock<EventBus> = OnceLock::new();
impl EventBus {
pub fn global() -> &'static EventBus {
BUS.get_or_init(|| EventBus {
inner: Mutex::new(BusInner {
next_id: 1,
subs: Vec::new(),
}),
})
}
pub fn subscribe(
&'static self,
session_filter: Option<String>,
type_filter: Option<Vec<String>>,
) -> Subscription {
let (tx, rx) = std::sync::mpsc::sync_channel(MAX_QUEUE_DEPTH);
let mut inner = self.inner.lock().expect("event bus poisoned");
let id = inner.next_id;
inner.next_id += 1;
inner.subs.push(Subscriber {
id,
tx,
session_filter,
type_filter,
dropped: 0,
});
Subscription { rx, id, bus: self }
}
fn unsubscribe(&self, id: u64) {
let mut inner = self.inner.lock().expect("event bus poisoned");
inner.subs.retain(|s| s.id != id);
}
#[cfg(test)]
pub fn subscriber_count(&self) -> usize {
self.inner.lock().map(|inner| inner.subs.len()).unwrap_or(0)
}
pub fn publish(&self, event: Event) {
let mut inner = self.inner.lock().expect("event bus poisoned");
let mut to_remove: Vec<u64> = Vec::new();
for sub in inner.subs.iter_mut() {
if !subscriber_matches(sub, &event) {
continue;
}
if sub.dropped > 0 {
let notice = Event::EventsDropped {
count: sub.dropped,
ts: now_ts(),
};
match sub.tx.try_send(notice) {
Ok(()) => sub.dropped = 0,
Err(TrySendError::Full(_)) => {
sub.dropped = sub.dropped.saturating_add(1);
continue;
}
Err(TrySendError::Disconnected(_)) => {
to_remove.push(sub.id);
continue;
}
}
}
match sub.tx.try_send(event.clone()) {
Ok(()) => {}
Err(TrySendError::Full(_)) => {
sub.dropped = sub.dropped.saturating_add(1);
}
Err(TrySendError::Disconnected(_)) => {
to_remove.push(sub.id);
}
}
}
if !to_remove.is_empty() {
inner.subs.retain(|s| !to_remove.contains(&s.id));
}
}
}
pub fn publish(event: Event) {
EventBus::global().publish(event);
}
fn subscriber_matches(sub: &Subscriber, event: &Event) -> bool {
if let Some(filter) = &sub.session_filter {
match event.session() {
Some(s) if s == filter.as_str() => {}
_ => return false,
}
}
if let Some(types) = &sub.type_filter {
if !types.iter().any(|t| t == event.type_tag()) {
return false;
}
}
true
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::mpsc::RecvTimeoutError;
use std::time::Duration;
fn pane_spawned(session: &str, pane: usize) -> Event {
Event::PaneSpawned {
session: session.to_string(),
pane,
command: "zsh".to_string(),
cwd: None,
ts: 0.0,
}
}
#[test]
fn event_serializes_with_type_tag() {
let evt = pane_spawned("work", 4);
let json = serde_json::to_string(&evt).unwrap();
assert!(json.contains("\"type\":\"pane.spawned\""));
assert!(json.contains("\"session\":\"work\""));
assert!(json.contains("\"pane\":4"));
}
#[test]
fn event_omits_optional_cwd_when_none() {
let evt = pane_spawned("work", 1);
let json = serde_json::to_string(&evt).unwrap();
assert!(!json.contains("\"cwd\""));
}
#[test]
fn type_tag_matches_serde_rename() {
let cases = vec![
pane_spawned("a", 0),
Event::SessionCreated {
session: "x".into(),
ts: 0.0,
},
Event::ConfigReloaded { ok: true, ts: 0.0 },
Event::EventsDropped { count: 5, ts: 0.0 },
];
for evt in cases {
let json = serde_json::to_string(&evt).unwrap();
let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
assert_eq!(parsed["type"].as_str().unwrap(), evt.type_tag());
}
}
#[test]
#[ignore = "race against EventBus::global() under parallel runners — fix in v0.12.1"]
fn subscribe_publish_delivers_event() {
let bus = EventBus::global();
let sub = bus.subscribe(None, None);
bus.publish(pane_spawned("alpha", 1));
let evt = sub.rx.recv_timeout(Duration::from_millis(200)).unwrap();
match evt {
Event::PaneSpawned { session, pane, .. } => {
assert_eq!(session, "alpha");
assert_eq!(pane, 1);
}
other => panic!("unexpected event: {:?}", other),
}
}
#[test]
fn session_filter_drops_other_sessions() {
let bus = EventBus::global();
let sub = bus.subscribe(Some("work".to_string()), None);
bus.publish(pane_spawned("home", 1));
bus.publish(pane_spawned("work", 2));
let evt = sub.rx.recv_timeout(Duration::from_millis(200)).unwrap();
match evt {
Event::PaneSpawned { session, pane, .. } => {
assert_eq!(session, "work");
assert_eq!(pane, 2);
}
other => panic!("unexpected: {:?}", other),
}
assert!(matches!(
sub.rx.recv_timeout(Duration::from_millis(50)),
Err(RecvTimeoutError::Timeout)
));
}
#[test]
fn type_filter_keeps_only_matching_types() {
let bus = EventBus::global();
let sub = bus.subscribe(None, Some(vec!["pane.exited".to_string()]));
bus.publish(pane_spawned("a", 1));
bus.publish(Event::PaneExited {
session: "a".into(),
pane: 1,
exit_code: Some(0),
ts: 0.0,
});
let evt = sub.rx.recv_timeout(Duration::from_millis(200)).unwrap();
assert_eq!(evt.type_tag(), "pane.exited");
assert!(matches!(
sub.rx.recv_timeout(Duration::from_millis(50)),
Err(RecvTimeoutError::Timeout)
));
}
#[test]
fn drop_unsubscribes() {
let bus = EventBus::global();
let id = {
let sub = bus.subscribe(None, None);
sub.id
};
let inner = bus.inner.lock().unwrap();
assert!(!inner.subs.iter().any(|s| s.id == id));
}
#[test]
fn slow_subscriber_emits_drop_notice() {
let bus = EventBus::global();
let sub = bus.subscribe(None, None);
for i in 0..MAX_QUEUE_DEPTH {
bus.publish(pane_spawned("flood", i));
}
bus.publish(pane_spawned("flood", MAX_QUEUE_DEPTH));
bus.publish(pane_spawned("flood", MAX_QUEUE_DEPTH + 1));
for _ in 0..3 {
let _ = sub.rx.recv_timeout(Duration::from_millis(100)).unwrap();
}
bus.publish(pane_spawned("flood", 9999));
let mut saw_dropped = false;
for _ in 0..MAX_QUEUE_DEPTH + 5 {
match sub.rx.recv_timeout(Duration::from_millis(50)) {
Ok(Event::EventsDropped { count, .. }) => {
assert!(count >= 2, "expected >=2 drops, got {}", count);
saw_dropped = true;
break;
}
Ok(_) => continue,
Err(_) => break,
}
}
assert!(saw_dropped, "drop notice never delivered");
}
}