use std::sync::mpsc::SyncSender;
use crate::sink::Sink;
use crate::SondaError;
pub struct ChannelSink {
tx: SyncSender<Vec<u8>>,
}
impl ChannelSink {
pub fn new(tx: SyncSender<Vec<u8>>) -> Self {
Self { tx }
}
}
impl Sink for ChannelSink {
fn write(&mut self, data: &[u8]) -> Result<(), SondaError> {
self.tx
.send(data.to_vec())
.map_err(|e| SondaError::Sink(std::io::Error::other(e.to_string())))
}
fn flush(&mut self) -> Result<(), SondaError> {
Ok(())
}
}
#[cfg(test)]
mod tests {
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
use super::*;
use crate::sink::Sink;
#[test]
fn write_sends_exact_bytes_to_receiver() {
let (tx, rx) = mpsc::sync_channel(10);
let mut sink = ChannelSink::new(tx);
sink.write(b"hello\n").unwrap();
let received = rx.recv().expect("receiver should get data");
assert_eq!(received, b"hello\n");
}
#[test]
fn write_empty_slice_sends_empty_vec() {
let (tx, rx) = mpsc::sync_channel(10);
let mut sink = ChannelSink::new(tx);
sink.write(b"").unwrap();
let received = rx.recv().expect("receiver should get empty vec");
assert!(received.is_empty());
}
#[test]
fn multiple_writes_send_in_order() {
let (tx, rx) = mpsc::sync_channel(10);
let mut sink = ChannelSink::new(tx);
sink.write(b"first\n").unwrap();
sink.write(b"second\n").unwrap();
sink.write(b"third\n").unwrap();
assert_eq!(rx.recv().unwrap(), b"first\n");
assert_eq!(rx.recv().unwrap(), b"second\n");
assert_eq!(rx.recv().unwrap(), b"third\n");
}
#[test]
fn flush_always_returns_ok() {
let (tx, _rx) = mpsc::sync_channel(10);
let mut sink = ChannelSink::new(tx);
assert!(sink.flush().is_ok());
}
#[test]
fn flush_does_not_affect_channel_contents() {
let (tx, rx) = mpsc::sync_channel(10);
let mut sink = ChannelSink::new(tx);
sink.write(b"data").unwrap();
sink.flush().unwrap();
let received = rx.recv().unwrap();
assert_eq!(received, b"data");
}
#[test]
fn write_after_receiver_dropped_returns_err() {
let (tx, rx) = mpsc::sync_channel::<Vec<u8>>(10);
let mut sink = ChannelSink::new(tx);
drop(rx);
let result = sink.write(b"orphaned");
assert!(
result.is_err(),
"write to disconnected channel should return Err"
);
}
#[test]
fn bounded_channel_provides_backpressure_without_oom() {
let capacity = 10usize;
let total_writes = 20usize;
let (tx, rx) = mpsc::sync_channel(capacity);
let mut sink = ChannelSink::new(tx);
let receiver_handle = thread::spawn(move || {
let mut count = 0usize;
while count < total_writes {
if rx.recv_timeout(Duration::from_secs(5)).is_ok() {
count += 1;
thread::sleep(Duration::from_millis(5));
}
}
count
});
for i in 0..total_writes {
let data = format!("item-{i}\n");
sink.write(data.as_bytes()).expect("write should succeed");
}
let received_count = receiver_handle
.join()
.expect("receiver thread should not panic");
assert_eq!(
received_count, total_writes,
"receiver should get all {total_writes} items"
);
}
#[test]
fn channel_sink_write_count_matches_receive_count() {
let (tx, rx) = mpsc::sync_channel(100);
let mut sink = ChannelSink::new(tx);
let n = 50usize;
for i in 0..n {
sink.write(format!("line {i}").as_bytes()).unwrap();
}
drop(sink);
let count = rx.into_iter().count();
assert_eq!(count, n, "should receive exactly {n} items");
}
#[test]
fn channel_sink_is_send() {
fn assert_send<T: Send>() {}
assert_send::<ChannelSink>();
}
#[test]
fn channel_sink_is_sync() {
fn assert_sync<T: Sync>() {}
assert_sync::<ChannelSink>();
}
#[test]
fn channel_sink_usable_as_boxed_sink_trait_object() {
let (tx, rx) = mpsc::sync_channel(10);
let mut sink: Box<dyn Sink> = Box::new(ChannelSink::new(tx));
sink.write(b"trait object test").unwrap();
sink.flush().unwrap();
let data = rx.recv().unwrap();
assert_eq!(data, b"trait object test");
}
}