use std::{
collections::VecDeque,
io::{self, Write},
os::{
fd::{AsFd, BorrowedFd, OwnedFd},
unix::net::{UnixListener, UnixStream},
},
path::{Path, PathBuf},
sync::{
atomic::{AtomicBool, Ordering},
mpsc::{self, Receiver, SyncSender, TrySendError},
Arc, LazyLock,
},
thread,
};
use anyhow::Context;
use nix::{
errno::Errno,
poll::{self, PollFd, PollFlags, PollTimeout},
unistd,
};
use serde_derive::Serialize;
use tracing::{error, info, warn};
const SUBSCRIBER_QUEUE_DEPTH: usize = 64;
const EVENT_CHANNEL_CAP: usize = 4096;
#[derive(Serialize, Debug)]
#[serde(tag = "type")]
#[allow(clippy::enum_variant_names)]
pub enum Event {
#[serde(rename = "session.created")]
SessionCreated,
#[serde(rename = "session.attached")]
SessionAttached,
#[serde(rename = "session.detached")]
SessionDetached,
#[serde(rename = "session.removed")]
SessionRemoved,
}
pub struct EventBus {
event_tx: SyncSender<Arc<str>>,
wake_tx: OwnedFd,
sink_dead_logged: AtomicBool,
}
impl EventBus {
pub fn start(socket_path: PathBuf) -> anyhow::Result<(Arc<Self>, EventBusHandle)> {
if socket_path.exists() {
std::fs::remove_file(&socket_path)
.with_context(|| format!("removing stale events socket {:?}", socket_path))?;
}
let listener = UnixListener::bind(&socket_path)
.with_context(|| format!("binding events socket {:?}", socket_path))?;
listener.set_nonblocking(true).context("setting events listener non-blocking")?;
info!("events socket listening at {:?}", socket_path);
let (event_tx, event_rx) = mpsc::sync_channel(EVENT_CHANNEL_CAP);
let (wake_rx, wake_tx) = make_self_pipe().context("creating events wake pipe")?;
let (shutdown_rx, shutdown_tx) =
make_self_pipe().context("creating events shutdown pipe")?;
let bus = Arc::new(Self { event_tx, wake_tx, sink_dead_logged: AtomicBool::new(false) });
let sink = Sink {
listener,
event_rx,
wake_rx,
shutdown_rx,
_guard: ListenerGuard { path: socket_path },
};
let join = thread::Builder::new()
.name("events-sink".into())
.spawn(move || sink.run())
.context("spawning events sink thread")?;
Ok((bus, EventBusHandle { shutdown_tx, sink: Some(join) }))
}
pub fn publish(&self, event: &Event) {
match self.event_tx.try_send(serialize_line(event)) {
Ok(()) => {}
Err(TrySendError::Full(_)) => {
warn!("events channel full; sink is wedged");
return;
}
Err(TrySendError::Disconnected(_)) => {
if !self.sink_dead_logged.swap(true, Ordering::Relaxed) {
error!("events sink died; subsequent events will be dropped");
}
return;
}
}
match unistd::write(&self.wake_tx, b"\0") {
Ok(_) | Err(Errno::EAGAIN) => {}
Err(e) => warn!("waking events sink: {e}"),
}
}
}
pub struct EventBusHandle {
shutdown_tx: OwnedFd,
sink: Option<thread::JoinHandle<()>>,
}
impl Drop for EventBusHandle {
fn drop(&mut self) {
match unistd::write(&self.shutdown_tx, b"\0") {
Ok(_) | Err(Errno::EPIPE) => {}
Err(e) => warn!("signaling events sink shutdown: {e}"),
}
if let Some(join) = self.sink.take() {
if let Err(e) = join.join() {
warn!("joining events sink thread: {:?}", e);
}
}
}
}
pub fn socket_path(main_socket: &Path) -> PathBuf {
let mut path = main_socket.to_path_buf();
path.set_file_name("events.socket");
path
}
pub struct ListenerGuard {
path: PathBuf,
}
impl Drop for ListenerGuard {
fn drop(&mut self) {
match std::fs::remove_file(&self.path) {
Ok(()) => {}
Err(e) if e.kind() == io::ErrorKind::NotFound => {}
Err(e) => warn!("removing events socket {:?}: {:?}", self.path, e),
}
}
}
struct Sink {
listener: UnixListener,
event_rx: Receiver<Arc<str>>,
wake_rx: OwnedFd,
shutdown_rx: OwnedFd,
_guard: ListenerGuard,
}
impl Sink {
fn run(self) {
let Sink { listener, event_rx, wake_rx, shutdown_rx, _guard } = self;
let mut subs: Vec<SubscriberWriter> = Vec::new();
let mut wake_buf = [0u8; 4096];
let mut sub_pollfd_idx: Vec<usize> = Vec::new();
let mut sub_revents: Vec<PollFlags> = Vec::new();
const WAKE_FD_IDX: usize = 0;
const SHUTDOWN_FD_IDX: usize = 2;
const SUB_FDS_START: usize = 3;
loop {
sub_pollfd_idx.clear();
sub_revents.clear();
let mut fds: Vec<PollFd> = Vec::with_capacity(SUB_FDS_START + subs.len());
fds.push(PollFd::new(wake_rx.as_fd(), PollFlags::POLLIN));
fds.push(PollFd::new(listener.as_fd(), PollFlags::POLLIN));
fds.push(PollFd::new(shutdown_rx.as_fd(), PollFlags::POLLIN));
for (i, sub) in subs.iter().enumerate() {
if sub.wants_pollout() {
fds.push(PollFd::new(sub.as_fd(), PollFlags::POLLOUT));
sub_pollfd_idx.push(i);
}
}
match poll::poll(&mut fds, PollTimeout::NONE) {
Ok(_) => {}
Err(Errno::EINTR) => continue,
Err(e) => panic!("events sink poll: {:?}", e),
}
let wake_revents = fds[WAKE_FD_IDX].revents().unwrap_or(PollFlags::empty());
let shutdown_revents = fds[SHUTDOWN_FD_IDX].revents().unwrap_or(PollFlags::empty());
sub_revents.extend(
(0..sub_pollfd_idx.len())
.map(|k| fds[SUB_FDS_START + k].revents().unwrap_or(PollFlags::empty())),
);
drop(fds);
if !shutdown_revents.is_empty() {
return;
}
loop {
match listener.accept() {
Ok((stream, _addr)) => match SubscriberWriter::new(stream) {
Ok(sub) => subs.push(sub),
Err(e) => warn!("registering events subscriber: {:?}", e),
},
Err(e) if e.kind() == io::ErrorKind::WouldBlock => break,
Err(e) => {
error!("events listener accept: {:?}", e);
break;
}
}
}
if !wake_revents.is_empty() {
loop {
match unistd::read(&wake_rx, &mut wake_buf) {
Ok(0) => return,
Ok(_) => continue,
Err(Errno::EAGAIN) => break,
Err(Errno::EINTR) => continue,
Err(e) => panic!("events sink reading wake fd: {:?}", e),
}
}
while let Ok(line) = event_rx.try_recv() {
for sub in subs.iter_mut() {
if sub.dropped {
continue;
}
let was_empty = !sub.wants_pollout();
if let Err(Overflow::CapExceeded) = sub.enqueue(Arc::clone(&line)) {
warn!("dropping events subscriber: queue full");
sub.dropped = true;
continue;
}
if was_empty {
if let Err(e) = sub.drive() {
info!("events subscriber gone: {:?}", e);
sub.dropped = true;
}
}
}
}
}
for (k, &i) in sub_pollfd_idx.iter().enumerate() {
if subs[i].dropped {
continue;
}
let revents = sub_revents[k];
if revents.intersects(PollFlags::POLLERR | PollFlags::POLLHUP | PollFlags::POLLNVAL)
{
info!("events subscriber gone: peer error/hangup");
subs[i].dropped = true;
} else if revents.contains(PollFlags::POLLOUT) {
if let Err(e) = subs[i].drive() {
info!("events subscriber gone: {:?}", e);
subs[i].dropped = true;
}
}
}
subs.retain(|sub| !sub.dropped);
}
}
}
struct SubscriberWriter {
stream: UnixStream,
pending: VecDeque<Arc<str>>,
front_offset: usize,
dropped: bool,
}
#[derive(Debug)]
enum Overflow {
CapExceeded,
}
#[derive(Debug, Eq, PartialEq)]
enum DriveOutcome {
AllFlushed,
WouldBlock,
}
impl SubscriberWriter {
fn new(stream: UnixStream) -> anyhow::Result<Self> {
stream.set_nonblocking(true).context("setting events subscriber stream non-blocking")?;
Ok(Self { stream, pending: VecDeque::new(), front_offset: 0, dropped: false })
}
fn enqueue(&mut self, line: Arc<str>) -> Result<(), Overflow> {
if self.pending.len() >= SUBSCRIBER_QUEUE_DEPTH {
return Err(Overflow::CapExceeded);
}
self.pending.push_back(line);
Ok(())
}
fn drive(&mut self) -> io::Result<DriveOutcome> {
drive_pending(&mut self.stream, &mut self.pending, &mut self.front_offset)
}
fn wants_pollout(&self) -> bool {
!self.pending.is_empty()
}
}
impl AsFd for SubscriberWriter {
fn as_fd(&self) -> BorrowedFd<'_> {
self.stream.as_fd()
}
}
fn drive_pending<W: Write>(
stream: &mut W,
pending: &mut VecDeque<Arc<str>>,
front_offset: &mut usize,
) -> io::Result<DriveOutcome> {
while let Some(front) = pending.front() {
let bytes = &front.as_bytes()[*front_offset..];
match stream.write(bytes) {
Ok(0) => {
return Err(io::Error::new(
io::ErrorKind::WriteZero,
"events subscriber stream returned 0 bytes",
));
}
Ok(n) => {
*front_offset += n;
if *front_offset >= front.len() {
pending.pop_front();
*front_offset = 0;
}
}
Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
return Ok(DriveOutcome::WouldBlock);
}
Err(e) if e.kind() == io::ErrorKind::Interrupted => continue,
Err(e) => return Err(e),
}
}
Ok(DriveOutcome::AllFlushed)
}
fn serialize_line(event: &Event) -> Arc<str> {
fn build(e: Event) -> Arc<str> {
let s = serde_json::to_string(&e).expect("Event variants are infallible to serialize");
Arc::from(format!("{s}\n"))
}
static CREATED: LazyLock<Arc<str>> = LazyLock::new(|| build(Event::SessionCreated));
static ATTACHED: LazyLock<Arc<str>> = LazyLock::new(|| build(Event::SessionAttached));
static DETACHED: LazyLock<Arc<str>> = LazyLock::new(|| build(Event::SessionDetached));
static REMOVED: LazyLock<Arc<str>> = LazyLock::new(|| build(Event::SessionRemoved));
match event {
Event::SessionCreated => Arc::clone(&CREATED),
Event::SessionAttached => Arc::clone(&ATTACHED),
Event::SessionDetached => Arc::clone(&DETACHED),
Event::SessionRemoved => Arc::clone(&REMOVED),
}
}
fn make_self_pipe() -> io::Result<(OwnedFd, OwnedFd)> {
let (rx, tx) = UnixStream::pair()?;
rx.set_nonblocking(true)?;
tx.set_nonblocking(true)?;
Ok((OwnedFd::from(rx), OwnedFd::from(tx)))
}
#[cfg(test)]
mod tests {
use super::*;
use parking_lot::Mutex;
use std::{
io::{BufRead, BufReader, Read},
time::{Duration, Instant},
};
fn json(event: &Event) -> String {
serde_json::to_string(event).unwrap()
}
struct Harness {
_handle: EventBusHandle,
bus: Arc<EventBus>,
path: PathBuf,
_dir: tempfile::TempDir,
}
fn harness() -> Harness {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("events.socket");
let (bus, _handle) = EventBus::start(path.clone()).unwrap();
Harness { _handle, bus, path, _dir: dir }
}
const READ_TIMEOUT: Duration = Duration::from_secs(10);
const CREATED_LINE: &str = "{\"type\":\"session.created\"}\n";
fn read_line(stream: &mut UnixStream) -> String {
stream.set_read_timeout(Some(READ_TIMEOUT)).unwrap();
let mut reader = BufReader::new(stream);
let mut line = String::new();
reader.read_line(&mut line).unwrap();
line
}
fn read_n_lines(stream: &mut UnixStream, n: usize) -> Vec<String> {
stream.set_read_timeout(Some(READ_TIMEOUT)).unwrap();
let mut reader = BufReader::new(stream);
(0..n)
.map(|_| {
let mut line = String::new();
reader.read_line(&mut line).unwrap();
line
})
.collect()
}
fn connect_registered(path: &Path, bus: &EventBus) -> UnixStream {
let mut stream = UnixStream::connect(path).unwrap();
thread::sleep(Duration::from_millis(1));
bus.publish(&Event::SessionCreated);
let _ = read_line(&mut stream);
stream
}
fn connect_n_registered(path: &Path, bus: &EventBus, n: usize) -> Vec<UnixStream> {
let mut streams: Vec<UnixStream> =
(0..n).map(|_| UnixStream::connect(path).unwrap()).collect();
thread::sleep(Duration::from_millis(1));
bus.publish(&Event::SessionCreated);
for s in streams.iter_mut() {
let _ = read_line(s);
}
streams
}
#[test]
fn events_serialize_with_only_type() {
let cases = [
(Event::SessionCreated, r#"{"type":"session.created"}"#),
(Event::SessionAttached, r#"{"type":"session.attached"}"#),
(Event::SessionDetached, r#"{"type":"session.detached"}"#),
(Event::SessionRemoved, r#"{"type":"session.removed"}"#),
];
for (event, expected) in &cases {
assert_eq!(json(event), *expected, "variant {event:?}");
}
}
#[test]
fn bus_publish_with_no_subscribers_is_a_noop() {
let dir = tempfile::tempdir().unwrap();
let (bus, _handle) = EventBus::start(dir.path().join("events.socket")).unwrap();
bus.publish(&Event::SessionCreated);
}
#[test]
fn bus_publish_reaches_subscriber() {
let h = harness();
let mut stream = connect_registered(&h.path, &h.bus);
h.bus.publish(&Event::SessionCreated);
assert_eq!(read_line(&mut stream), CREATED_LINE);
}
#[test]
fn bus_drops_subscriber_whose_peer_closed() {
let h = harness();
let victim = connect_registered(&h.path, &h.bus);
let mut probe = connect_registered(&h.path, &h.bus);
drop(victim);
h.bus.publish(&Event::SessionAttached);
assert_eq!(read_line(&mut probe), "{\"type\":\"session.attached\"}\n");
}
#[test]
#[ignore = "timing-sensitive; run on demand with --ignored"]
fn bus_publish_with_many_subscribers_is_not_quadratic() {
let h = harness();
let n = 200;
let _streams = connect_n_registered(&h.path, &h.bus, n);
let start = Instant::now();
for _ in 0..1000 {
h.bus.publish(&Event::SessionCreated);
}
let elapsed = start.elapsed();
assert!(
elapsed < Duration::from_millis(100),
"1000 publishes with {n} subs took {elapsed:?}"
);
}
#[test]
fn bus_concurrent_publish_under_outer_lock_delivers_all_events() {
let h = harness();
let mut stream = connect_registered(&h.path, &h.bus);
let outer: Arc<Mutex<()>> = Arc::new(Mutex::new(()));
let n_threads = 4;
let n_per_thread = 8;
let total = n_threads * n_per_thread;
let handles: Vec<_> = (0..n_threads)
.map(|_| {
let bus = Arc::clone(&h.bus);
let outer = Arc::clone(&outer);
thread::spawn(move || {
for _ in 0..n_per_thread {
let _g = outer.lock();
bus.publish(&Event::SessionCreated);
}
})
})
.collect();
for handle in handles {
handle.join().unwrap();
}
for line in read_n_lines(&mut stream, total) {
assert_eq!(line, CREATED_LINE);
}
}
#[test]
fn dropping_handle_stops_sink_and_unlinks_socket() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("events.socket");
let (_bus, handle) = EventBus::start(path.clone()).unwrap();
assert!(path.exists(), "socket file should exist while the sink runs");
drop(handle);
assert!(
!path.exists(),
"socket file should be unlinked once the sink is shut down and joined"
);
}
#[test]
fn accept_loop_registers_concurrent_subscribers() {
let h = harness();
let n = 20;
let mut streams: Vec<UnixStream> = (0..n)
.map(|_| {
let path = h.path.clone();
thread::spawn(move || UnixStream::connect(&path).unwrap())
})
.collect::<Vec<_>>()
.into_iter()
.map(|jh| jh.join().unwrap())
.collect();
h.bus.publish(&Event::SessionCreated);
for stream in streams.iter_mut() {
assert_eq!(read_line(stream), CREATED_LINE);
}
}
#[test]
fn burst_load_within_capacity_reaches_every_subscriber() {
let h = harness();
let m = 4;
let mut streams = connect_n_registered(&h.path, &h.bus, m);
let n_events = 32;
for _ in 0..n_events {
h.bus.publish(&Event::SessionCreated);
}
let expected = CREATED_LINE;
for stream in streams.iter_mut() {
for line in read_n_lines(stream, n_events) {
assert_eq!(line, expected);
}
}
}
#[test]
fn events_arrive_in_publish_order() {
let h = harness();
let mut stream = connect_registered(&h.path, &h.bus);
h.bus.publish(&Event::SessionCreated);
h.bus.publish(&Event::SessionAttached);
h.bus.publish(&Event::SessionDetached);
h.bus.publish(&Event::SessionRemoved);
let lines = read_n_lines(&mut stream, 4);
assert_eq!(
lines,
[
CREATED_LINE,
"{\"type\":\"session.attached\"}\n",
"{\"type\":\"session.detached\"}\n",
"{\"type\":\"session.removed\"}\n",
]
);
}
#[test]
fn slow_subscriber_drop_does_not_affect_fast_through_sink() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("events.socket");
let (bus, _handle) = EventBus::start(path.clone()).unwrap();
let _slow = UnixStream::connect(&path).unwrap();
nix::sys::socket::setsockopt(&_slow, nix::sys::socket::sockopt::RcvBuf, &1024).unwrap();
let mut fast = UnixStream::connect(&path).unwrap();
bus.publish(&Event::SessionCreated);
let _ = read_line(&mut fast);
let expected = CREATED_LINE;
fast.set_read_timeout(Some(Duration::from_secs(10))).unwrap();
let mut reader = BufReader::new(&mut fast);
for _ in 0..1000 {
bus.publish(&Event::SessionCreated);
let mut line = String::new();
reader.read_line(&mut line).unwrap();
assert_eq!(line, expected);
}
}
struct FakeWriter {
plan: VecDeque<FakeWrite>,
written: Vec<u8>,
}
enum FakeWrite {
Accept(usize),
WouldBlock,
Interrupted,
Err(io::ErrorKind),
}
impl Write for FakeWriter {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
match self.plan.pop_front() {
Some(FakeWrite::Accept(n)) => {
let take = n.min(buf.len());
self.written.extend_from_slice(&buf[..take]);
Ok(take)
}
Some(FakeWrite::WouldBlock) | None => {
Err(io::Error::new(io::ErrorKind::WouldBlock, "fake EAGAIN"))
}
Some(FakeWrite::Interrupted) => {
Err(io::Error::new(io::ErrorKind::Interrupted, "fake EINTR"))
}
Some(FakeWrite::Err(kind)) => Err(io::Error::new(kind, "fake error")),
}
}
fn flush(&mut self) -> io::Result<()> {
Ok(())
}
}
#[test]
fn drive_pending_flushes_a_single_complete_write() {
let mut w = FakeWriter { plan: VecDeque::from([FakeWrite::Accept(100)]), written: vec![] };
let mut pending: VecDeque<Arc<str>> = VecDeque::from([Arc::from("hello\n")]);
let mut offset = 0;
let outcome = drive_pending(&mut w, &mut pending, &mut offset).unwrap();
assert_eq!(outcome, DriveOutcome::AllFlushed);
assert_eq!(w.written, b"hello\n");
assert!(pending.is_empty());
assert_eq!(offset, 0);
}
#[test]
fn drive_pending_resumes_after_partial_then_wouldblock() {
let mut w = FakeWriter {
plan: VecDeque::from([
FakeWrite::Accept(3),
FakeWrite::WouldBlock,
FakeWrite::Accept(3),
FakeWrite::WouldBlock,
]),
written: vec![],
};
let mut pending: VecDeque<Arc<str>> = VecDeque::from([Arc::from("hello\n")]);
let mut offset = 0;
let outcome = drive_pending(&mut w, &mut pending, &mut offset).unwrap();
assert_eq!(outcome, DriveOutcome::WouldBlock);
assert_eq!(w.written, b"hel");
assert_eq!(offset, 3);
assert_eq!(pending.len(), 1);
let outcome = drive_pending(&mut w, &mut pending, &mut offset).unwrap();
assert_eq!(outcome, DriveOutcome::AllFlushed);
assert_eq!(w.written, b"hello\n");
assert_eq!(offset, 0);
assert!(pending.is_empty());
}
#[test]
fn drive_pending_retries_on_eintr() {
let mut w = FakeWriter {
plan: VecDeque::from([FakeWrite::Interrupted, FakeWrite::Accept(100)]),
written: vec![],
};
let mut pending: VecDeque<Arc<str>> = VecDeque::from([Arc::from("ok\n")]);
let mut offset = 0;
let outcome = drive_pending(&mut w, &mut pending, &mut offset).unwrap();
assert_eq!(outcome, DriveOutcome::AllFlushed);
assert_eq!(w.written, b"ok\n");
}
#[test]
fn drive_pending_propagates_other_errors() {
let mut w = FakeWriter {
plan: VecDeque::from([FakeWrite::Err(io::ErrorKind::BrokenPipe)]),
written: vec![],
};
let mut pending: VecDeque<Arc<str>> = VecDeque::from([Arc::from("x\n")]);
let mut offset = 0;
let err = drive_pending(&mut w, &mut pending, &mut offset).unwrap_err();
assert_eq!(err.kind(), io::ErrorKind::BrokenPipe);
}
#[test]
fn drive_pending_treats_zero_byte_write_as_error() {
let mut w = FakeWriter { plan: VecDeque::from([FakeWrite::Accept(0)]), written: vec![] };
let mut pending: VecDeque<Arc<str>> = VecDeque::from([Arc::from("x\n")]);
let mut offset = 0;
let err = drive_pending(&mut w, &mut pending, &mut offset).unwrap_err();
assert_eq!(err.kind(), io::ErrorKind::WriteZero);
}
#[test]
fn drive_pending_scripted_scenarios_flush_completely() {
enum Op {
Enqueue(&'static str),
Drive(Vec<FakeWrite>),
}
use FakeWrite::*;
let scripts: Vec<Vec<Op>> = vec![
vec![Op::Enqueue("a\n"), Op::Drive(vec![Accept(1), WouldBlock])],
vec![
Op::Enqueue("a\n"),
Op::Drive(vec![Accept(1)]),
Op::Enqueue("b\n"),
Op::Drive(vec![Accept(1), Interrupted, Accept(2), Accept(2)]),
],
vec![
Op::Enqueue("hello\n"),
Op::Drive(vec![WouldBlock]),
Op::Enqueue("world\n"),
Op::Drive(vec![Accept(2), Interrupted, WouldBlock]),
Op::Enqueue("again\n"),
],
vec![
Op::Enqueue("x\n"),
Op::Enqueue("y\n"),
Op::Enqueue("z\n"),
Op::Drive(vec![Accept(1), Accept(1), WouldBlock]),
Op::Drive(vec![Accept(4), Accept(2)]),
],
];
for (i, script) in scripts.into_iter().enumerate() {
let mut pending: VecDeque<Arc<str>> = VecDeque::new();
let mut offset = 0;
let mut all_written = Vec::new();
let mut expected = Vec::new();
for op in script {
match op {
Op::Enqueue(s) => {
pending.push_back(Arc::from(s));
expected.extend_from_slice(s.as_bytes());
}
Op::Drive(plan) => {
let mut w = FakeWriter { plan: plan.into(), written: vec![] };
let _ = drive_pending(&mut w, &mut pending, &mut offset);
all_written.extend_from_slice(&w.written);
}
}
}
let mut w = FakeWriter {
plan: std::iter::repeat_with(|| FakeWrite::Accept(usize::MAX)).take(64).collect(),
written: vec![],
};
let outcome = drive_pending(&mut w, &mut pending, &mut offset).unwrap();
assert_eq!(outcome, DriveOutcome::AllFlushed, "script {i}");
all_written.extend_from_slice(&w.written);
assert_eq!(all_written, expected, "script {i}: bytes lost or reordered");
assert!(pending.is_empty(), "script {i}: pending not drained");
assert_eq!(offset, 0, "script {i}: offset not reset");
}
}
#[test]
fn enqueue_overflows_at_cap() {
let (a, _b) = UnixStream::pair().unwrap();
let mut sub = SubscriberWriter::new(a).unwrap();
for i in 0..SUBSCRIBER_QUEUE_DEPTH {
sub.enqueue(format!("event-{i}\n").into()).unwrap();
}
let err = sub.enqueue("one-too-many\n".into());
assert!(matches!(err, Err(Overflow::CapExceeded)));
}
#[test]
fn subscriber_writer_overflows_when_peer_blocks() {
let (server, _client) = UnixStream::pair().unwrap();
nix::sys::socket::setsockopt(&server, nix::sys::socket::sockopt::SndBuf, &1024).unwrap();
let mut sub = SubscriberWriter::new(server).unwrap();
let line: Arc<str> = CREATED_LINE.into();
let mut overflowed = false;
for _ in 0..(SUBSCRIBER_QUEUE_DEPTH * 1000) {
let was_empty = !sub.wants_pollout();
match sub.enqueue(Arc::clone(&line)) {
Ok(()) => {
if was_empty {
let _ = sub.drive();
}
}
Err(Overflow::CapExceeded) => {
overflowed = true;
break;
}
}
}
assert!(overflowed, "expected SubscriberWriter to overflow");
}
#[test]
fn subscriber_writer_resumes_after_peer_drains() {
let (server, client) = UnixStream::pair().unwrap();
nix::sys::socket::setsockopt(&server, nix::sys::socket::sockopt::SndBuf, &1024).unwrap();
client.set_read_timeout(Some(Duration::from_secs(2))).unwrap();
let mut sub = SubscriberWriter::new(server).unwrap();
let line: Arc<str> = CREATED_LINE.into();
for _ in 0..SUBSCRIBER_QUEUE_DEPTH {
sub.enqueue(Arc::clone(&line)).unwrap();
}
assert_eq!(sub.drive().unwrap(), DriveOutcome::WouldBlock);
let pending_after_first = sub.pending.len();
assert!(pending_after_first > 0);
let mut buf = vec![0u8; 4096];
let drained = (&client).read(&mut buf).unwrap();
assert!(drained > 0);
let _ = sub.drive().unwrap();
assert!(
sub.pending.len() < pending_after_first,
"drive must advance pending after peer drains: {} -> {}",
pending_after_first,
sub.pending.len()
);
}
#[test]
fn fast_writer_unaffected_when_slow_overflows() {
let (slow_server, _slow_client) = UnixStream::pair().unwrap();
nix::sys::socket::setsockopt(&slow_server, nix::sys::socket::sockopt::SndBuf, &1024)
.unwrap();
let mut slow = SubscriberWriter::new(slow_server).unwrap();
let (fast_server, fast_client) = UnixStream::pair().unwrap();
let mut fast = SubscriberWriter::new(fast_server).unwrap();
let drainer = thread::spawn(move || {
let mut buf = [0u8; 4096];
let mut total = 0usize;
loop {
match (&fast_client).read(&mut buf) {
Ok(0) => break,
Ok(n) => total += n,
Err(_) => break,
}
}
total
});
let line: Arc<str> = CREATED_LINE.into();
let mut slow_dropped = false;
for _ in 0..(SUBSCRIBER_QUEUE_DEPTH * 100) {
if !slow_dropped {
let was_empty = !slow.wants_pollout();
match slow.enqueue(Arc::clone(&line)) {
Ok(()) => {
if was_empty {
let _ = slow.drive();
}
}
Err(Overflow::CapExceeded) => {
slow_dropped = true;
}
}
}
let was_empty = !fast.wants_pollout();
fast.enqueue(Arc::clone(&line)).expect("fast must not overflow");
if was_empty {
let _ = fast.drive();
}
if slow_dropped {
break;
}
}
assert!(slow_dropped, "slow should have overflowed");
drop(fast);
let bytes_received = drainer.join().unwrap();
assert!(bytes_received > 0, "fast should have received events");
}
}