use crate::*;
pub(crate) struct Sender<Si> {
stream_id: Option<StreamId>,
sink: Option<Si>,
read_halt: HaltRead,
}
impl<Si> Unpin for Sender<Si> where Si: Unpin {}
impl<Si> std::fmt::Debug for Sender<Si> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Sender").finish()
}
}
impl<Si> Sender<Si> {
#[tracing::instrument(level = "trace", skip(sink, read_halt))]
pub(crate) fn new(sink: Si, read_halt: HaltRead) -> Self {
Self {
stream_id: None,
sink: Some(sink),
read_halt,
}
}
pub(crate) fn sink(&mut self) -> Option<&mut Si> {
self.sink.as_mut()
}
pub(crate) fn set_stream_id(&mut self, stream_id: StreamId) {
if let Some(old_id) = self.stream_id.replace(stream_id) {
panic!("Stream ID was already set to: {}", old_id);
}
}
pub(crate) fn stream_id(&self) -> StreamId {
self.stream_id.expect("Should have stream ID")
}
}
impl<Si> Drop for Sender<Si> {
#[tracing::instrument(level = "trace", skip(self))]
fn drop(&mut self) {
tracing::trace!("Sending dropped, halting read.");
let _ = self.sink.take().expect("Should still have sender.");
tracing::trace!("Signaling halt!");
self.read_halt.signal();
}
}
#[cfg(test)]
mod tests {
use futures::prelude::*;
use tokio::sync::mpsc;
#[tokio::test]
async fn sender_id() {
let (tx, rx) = mpsc::channel::<()>(10);
let (mut sender, mut reader) = crate::tests::sender_reader(tx, rx);
sender.set_stream_id(42);
reader.set_stream_id(42);
assert_eq!(42, sender.stream_id());
}
#[tokio::test]
async fn message() {
let (tx, rx) = mpsc::channel::<()>(10);
let (sender, mut reader) = crate::tests::sender_reader(tx, rx);
drop(sender);
assert!(reader.next().await.is_none());
}
}