use std::collections::{HashSet, VecDeque};
use std::os::unix::net::UnixStream;
use std::sync::{Arc, Condvar, Mutex};
use std::thread::JoinHandle;
use crate::protocol::{
self, EventEnvelope, EventTopic, SubscribeOk, S_EVENT, S_EVENT_OVERFLOW, S_SUBSCRIBE_OK,
};
pub(crate) const QUEUE_CAPACITY: usize = 256;
struct OutboundEvent {
overflow: bool,
envelope: EventEnvelope,
}
pub(crate) struct EventQueue {
inner: Mutex<EventQueueInner>,
cv: Condvar,
}
struct EventQueueInner {
queue: VecDeque<OutboundEvent>,
closed: bool,
}
impl EventQueue {
fn new() -> Self {
Self {
inner: Mutex::new(EventQueueInner {
queue: VecDeque::with_capacity(QUEUE_CAPACITY),
closed: false,
}),
cv: Condvar::new(),
}
}
fn push_drop_oldest(&self, evt: OutboundEvent) -> Result<bool, ()> {
let mut inner = self.inner.lock().unwrap();
if inner.closed {
return Err(());
}
let clean = inner.queue.len() < QUEUE_CAPACITY;
if !clean {
inner.queue.pop_front();
}
inner.queue.push_back(evt);
self.cv.notify_one();
Ok(clean)
}
fn pop_blocking(&self) -> Option<OutboundEvent> {
let mut inner = self.inner.lock().unwrap();
loop {
if let Some(evt) = inner.queue.pop_front() {
return Some(evt);
}
if inner.closed {
return None;
}
inner = self.cv.wait(inner).unwrap();
}
}
fn close(&self) {
let mut inner = self.inner.lock().unwrap();
inner.closed = true;
self.cv.notify_all();
}
#[cfg(test)]
fn len(&self) -> usize {
self.inner.lock().unwrap().queue.len()
}
}
pub(crate) struct Subscriber {
pub(crate) id: u64,
topics: HashSet<EventTopic>,
queue: Option<Arc<EventQueue>>,
dropped_since: u64,
handle: Option<JoinHandle<()>>,
filter_session: Option<String>,
}
impl Drop for Subscriber {
fn drop(&mut self) {
if let Some(q) = self.queue.take() {
q.close();
drop(q);
}
if let Some(h) = self.handle.take() {
let _ = h.join();
}
}
}
pub(crate) struct EventBus {
subscribers: Vec<Subscriber>,
next_id: u64,
session_name: String,
}
impl EventBus {
pub(crate) fn new(session_name: impl Into<String>) -> Self {
Self {
subscribers: Vec::new(),
next_id: 1,
session_name: session_name.into(),
}
}
#[allow(dead_code)]
pub(crate) fn session_name(&self) -> &str {
&self.session_name
}
pub(crate) fn has_subscriber_for(&self, topic: EventTopic) -> bool {
self.subscribers.iter().any(|s| s.topics.contains(&topic))
}
pub(crate) fn register(
&mut self,
topics: Vec<EventTopic>,
filter_session: Option<String>,
mut conn: UnixStream,
) -> Option<u64> {
let id = self.next_id;
let ack = SubscribeOk {
subscriber_id: id,
topics: topics.clone(),
};
let ack_bytes = match serde_json::to_vec(&ack) {
Ok(b) => b,
Err(_) => return None,
};
if protocol::write_msg(&mut conn, S_SUBSCRIBE_OK, &ack_bytes).is_err() {
return None;
}
self.next_id += 1;
let queue = Arc::new(EventQueue::new());
let queue_for_worker = Arc::clone(&queue);
let handle = std::thread::Builder::new()
.name(format!("ezpn-events-{id}"))
.spawn(move || run_subscriber(id, conn, queue_for_worker))
.expect("spawn event subscriber thread");
self.subscribers.push(Subscriber {
id,
topics: topics.into_iter().collect(),
queue: Some(queue),
dropped_since: 0,
handle: Some(handle),
filter_session,
});
Some(id)
}
pub(crate) fn emit(&mut self, envelope_topic: EventTopic, envelope: EventEnvelope) {
let session_filter = &envelope.session;
for sub in &mut self.subscribers {
if !sub.topics.contains(&envelope_topic) {
continue;
}
if let Some(want) = &sub.filter_session {
if want != session_filter {
continue;
}
}
let outbound = OutboundEvent {
overflow: false,
envelope: envelope.clone(),
};
send_with_drop_oldest(sub, outbound);
}
}
pub(crate) fn reap_dead(&mut self) {
self.subscribers.retain(|s| {
match s.handle.as_ref() {
Some(h) => !h.is_finished(),
None => false,
}
});
}
#[allow(dead_code)]
pub(crate) fn subscriber_count(&self) -> usize {
self.subscribers.len()
}
#[allow(dead_code)]
pub(crate) fn emit_pane_created(
&mut self,
pane_id: usize,
tab_index: usize,
command: &str,
cols: u16,
rows: u16,
) {
if !self.has_subscriber_for(EventTopic::Pane) {
return;
}
self.emit(
EventTopic::Pane,
EventEnvelope {
v: 1,
ts: EventEnvelope::now_ts(),
topic: "pane",
type_: "pane.created",
session: self.session_name.clone(),
data: serde_json::json!({
"pane_id": pane_id,
"tab_index": tab_index,
"command": command,
"cols": cols,
"rows": rows,
}),
},
);
}
#[allow(dead_code)]
pub(crate) fn emit_pane_exited(
&mut self,
pane_id: usize,
tab_index: usize,
exit_code: Option<u32>,
) {
if !self.has_subscriber_for(EventTopic::Pane) {
return;
}
self.emit(
EventTopic::Pane,
EventEnvelope {
v: 1,
ts: EventEnvelope::now_ts(),
topic: "pane",
type_: "pane.exited",
session: self.session_name.clone(),
data: serde_json::json!({
"pane_id": pane_id,
"tab_index": tab_index,
"exit_code": exit_code,
}),
},
);
}
pub(crate) fn emit_client_attached(
&mut self,
client_id: u64,
mode: protocol::AttachMode,
cols: u16,
rows: u16,
) {
if !self.has_subscriber_for(EventTopic::Client) {
return;
}
let mode_str = match mode {
protocol::AttachMode::Steal => "steal",
protocol::AttachMode::Shared => "shared",
protocol::AttachMode::Readonly => "readonly",
};
self.emit(
EventTopic::Client,
EventEnvelope {
v: 1,
ts: EventEnvelope::now_ts(),
topic: "client",
type_: "client.attached",
session: self.session_name.clone(),
data: serde_json::json!({
"client_id": client_id,
"mode": mode_str,
"cols": cols,
"rows": rows,
}),
},
);
}
pub(crate) fn emit_client_detached(&mut self, client_id: u64, reason: &str) {
if !self.has_subscriber_for(EventTopic::Client) {
return;
}
self.emit(
EventTopic::Client,
EventEnvelope {
v: 1,
ts: EventEnvelope::now_ts(),
topic: "client",
type_: "client.detached",
session: self.session_name.clone(),
data: serde_json::json!({
"client_id": client_id,
"reason": reason,
}),
},
);
}
pub(crate) fn emit_tab_switched(&mut self, from_index: usize, to_index: usize, name: &str) {
if !self.has_subscriber_for(EventTopic::Tab) {
return;
}
self.emit(
EventTopic::Tab,
EventEnvelope {
v: 1,
ts: EventEnvelope::now_ts(),
topic: "tab",
type_: "tab.switched",
session: self.session_name.clone(),
data: serde_json::json!({
"from_index": from_index,
"to_index": to_index,
"name": name,
}),
},
);
}
}
fn send_with_drop_oldest(sub: &mut Subscriber, evt: OutboundEvent) -> bool {
let Some(queue) = sub.queue.as_ref() else {
return false;
};
if sub.dropped_since > 0 {
let notice = OutboundEvent {
overflow: true,
envelope: EventEnvelope {
v: 1,
ts: EventEnvelope::now_ts(),
topic: "_meta",
type_: "overflow",
session: evt.envelope.session.clone(),
data: serde_json::json!({
"dropped": sub.dropped_since,
"subscriber_id": sub.id,
}),
},
};
let _ = queue.push_drop_oldest(notice);
sub.dropped_since = 0;
}
match queue.push_drop_oldest(evt) {
Ok(true) => true,
Ok(false) => {
sub.dropped_since = sub.dropped_since.saturating_add(1);
false
}
Err(()) => {
false
}
}
}
fn run_subscriber(_id: u64, mut conn: UnixStream, queue: Arc<EventQueue>) {
while let Some(evt) = queue.pop_blocking() {
let bytes = match serde_json::to_vec(&evt.envelope) {
Ok(b) => b,
Err(_) => continue, };
let tag = if evt.overflow {
S_EVENT_OVERFLOW
} else {
S_EVENT
};
if protocol::write_msg(&mut conn, tag, &bytes).is_err() {
return;
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::io::Read;
use std::os::unix::net::UnixStream as Std;
use std::time::Duration;
fn pane_envelope(session: &str, pane_id: u64) -> EventEnvelope {
EventEnvelope {
v: 1,
ts: EventEnvelope::now_ts(),
topic: "pane",
type_: "pane.created",
session: session.to_string(),
data: serde_json::json!({"pane_id": pane_id, "cols": 80, "rows": 24}),
}
}
#[test]
fn register_then_emit_writes_one_frame() {
let (peer_a, peer_b) = Std::pair().unwrap();
let mut bus = EventBus::new("test");
let _id = bus.register(vec![EventTopic::Pane], None, peer_a);
peer_b
.set_read_timeout(Some(Duration::from_millis(500)))
.unwrap();
let (ack_tag, _) = protocol::read_msg(&mut &peer_b).expect("read S_SUBSCRIBE_OK");
assert_eq!(ack_tag, S_SUBSCRIBE_OK);
bus.emit(EventTopic::Pane, pane_envelope("test", 7));
let (tag, payload) = protocol::read_msg(&mut &peer_b).expect("read S_EVENT");
assert_eq!(tag, S_EVENT);
let json: serde_json::Value = serde_json::from_slice(&payload).unwrap();
assert_eq!(json["topic"], "pane");
assert_eq!(json["type"], "pane.created");
assert_eq!(json["data"]["pane_id"], 7);
}
#[test]
fn topic_filter_skips_non_matching_subscriber() {
let (peer_a, peer_b) = Std::pair().unwrap();
let mut bus = EventBus::new("test");
let _ = bus.register(vec![EventTopic::Tab], None, peer_a);
peer_b
.set_read_timeout(Some(Duration::from_millis(500)))
.unwrap();
let (ack_tag, _) = protocol::read_msg(&mut &peer_b).expect("ack");
assert_eq!(ack_tag, S_SUBSCRIBE_OK);
bus.emit(EventTopic::Pane, pane_envelope("test", 1));
peer_b
.set_read_timeout(Some(Duration::from_millis(150)))
.unwrap();
drop(bus);
let mut buf = [0u8; 1];
let n = (&peer_b).read(&mut buf).unwrap_or(0);
assert_eq!(n, 0, "non-matching topic must not produce a frame");
}
#[test]
fn session_filter_skips_other_sessions() {
let (peer_a, peer_b) = Std::pair().unwrap();
let mut bus = EventBus::new("test");
let _ = bus.register(
vec![EventTopic::Pane],
Some("other-session".to_string()),
peer_a,
);
peer_b
.set_read_timeout(Some(Duration::from_millis(500)))
.unwrap();
let (ack_tag, _) = protocol::read_msg(&mut &peer_b).expect("ack");
assert_eq!(ack_tag, S_SUBSCRIBE_OK);
bus.emit(EventTopic::Pane, pane_envelope("test", 1));
peer_b
.set_read_timeout(Some(Duration::from_millis(150)))
.unwrap();
drop(bus);
let mut buf = [0u8; 1];
let n = (&peer_b).read(&mut buf).unwrap_or(0);
assert_eq!(n, 0, "session filter must drop non-matching event");
}
#[test]
fn reap_dead_drops_disconnected_subscriber() {
let (peer_a, peer_b) = Std::pair().unwrap();
let mut bus = EventBus::new("test");
let _ = bus.register(vec![EventTopic::Pane], None, peer_a);
assert_eq!(bus.subscriber_count(), 1);
drop(peer_b);
bus.emit(EventTopic::Pane, pane_envelope("test", 1));
std::thread::sleep(Duration::from_millis(50));
bus.reap_dead();
assert_eq!(bus.subscriber_count(), 0, "dead subscriber must be reaped");
}
#[test]
fn event_queue_drops_oldest_when_full() {
let q = EventQueue::new();
for i in 0..(QUEUE_CAPACITY + 5) {
let evt = OutboundEvent {
overflow: false,
envelope: EventEnvelope {
v: 1,
ts: i as u64,
topic: "pane",
type_: "pane.created",
session: "test".to_string(),
data: serde_json::json!({ "i": i }),
},
};
let _ = q.push_drop_oldest(evt);
}
assert_eq!(q.len(), QUEUE_CAPACITY);
q.close();
let first = q.pop_blocking().expect("non-empty after fill");
assert_eq!(first.envelope.data["i"].as_u64(), Some(5));
let mut last_i = 5u64;
let mut count = 1usize;
while let Some(evt) = q.pop_blocking() {
let i = evt.envelope.data["i"].as_u64().unwrap();
assert!(i > last_i, "drop-oldest must preserve insertion order");
last_i = i;
count += 1;
}
assert_eq!(count, QUEUE_CAPACITY);
assert_eq!(last_i, (QUEUE_CAPACITY + 4) as u64);
}
#[test]
fn event_queue_close_unblocks_waiter() {
let q = Arc::new(EventQueue::new());
let qc = Arc::clone(&q);
let handle = std::thread::spawn(move || qc.pop_blocking());
std::thread::sleep(Duration::from_millis(50));
q.close();
let result = handle.join().expect("thread didn't panic");
assert!(result.is_none(), "close must unblock the waiter with None");
}
#[test]
fn has_subscriber_for_reflects_topic_membership() {
let (peer_a, _peer_b) = Std::pair().unwrap();
let mut bus = EventBus::new("test");
let _ = bus.register(vec![EventTopic::Pane, EventTopic::Tab], None, peer_a);
assert!(bus.has_subscriber_for(EventTopic::Pane));
assert!(bus.has_subscriber_for(EventTopic::Tab));
assert!(!bus.has_subscriber_for(EventTopic::Layout));
}
}