#![deny(missing_docs)]
#![warn(rust_2018_idioms)]
use tokio::sync::watch;
mod combinator;
mod wrapper;
pub use crate::combinator::{StreamExt, TakeUntilIf, Tripwire};
pub use crate::wrapper::{Valve, Valved};
#[derive(Debug)]
pub struct Trigger(Option<watch::Sender<bool>>);
impl Trigger {
pub fn cancel(self) {
drop(self);
}
pub fn disable(mut self) {
let _ = self.0.take();
drop(self);
}
}
impl Drop for Trigger {
fn drop(&mut self) {
if let Some(tx) = self.0.take() {
let _ = tx.send(true);
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use futures::prelude::*;
use futures_util::stream::select;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio_stream::wrappers::TcpListenerStream;
#[test]
fn tokio_run() {
use std::thread;
let rt = tokio::runtime::Runtime::new().unwrap();
let listener = rt
.block_on(tokio::net::TcpListener::bind("127.0.0.1:0"))
.unwrap();
let (exit_tx, exit_rx) = tokio::sync::oneshot::channel();
let server = thread::spawn(move || {
let (tx, rx) = tokio::sync::oneshot::channel();
rt.block_on(async move {
let (exit, mut incoming) = Valved::new(TcpListenerStream::new(listener));
exit_tx.send(exit).unwrap();
while let Some(mut s) = incoming.next().await.transpose().unwrap() {
tokio::spawn(async move {
let (mut r, mut w) = s.split();
tokio::io::copy(&mut r, &mut w).await.unwrap();
});
}
tx.send(()).unwrap();
});
let _ = rt.block_on(rx).unwrap();
});
let exit = futures::executor::block_on(exit_rx);
drop(exit);
server.join().unwrap();
}
#[tokio::test]
async fn tokio_rt_on_idle() {
let (exit_tx, exit_rx) = tokio::sync::oneshot::channel();
tokio::spawn(async move {
let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
let (exit, mut incoming) = Valved::new(TcpListenerStream::new(listener));
exit_tx.send(exit).unwrap();
while let Some(mut s) = incoming.next().await.transpose().unwrap() {
tokio::spawn(async move {
let (mut r, mut w) = s.split();
tokio::io::copy(&mut r, &mut w).await.unwrap();
});
}
});
let exit = exit_rx.await;
drop(exit);
}
#[tokio::test]
async fn multi_interrupt() {
let (exit, valve) = Valve::new();
tokio::spawn(async move {
let listener1 = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
let listener2 = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
let incoming1 = valve.wrap(TcpListenerStream::new(listener1));
let incoming2 = valve.wrap(TcpListenerStream::new(listener2));
let mut incoming = select(incoming1, incoming2);
while let Some(mut s) = incoming.next().await.transpose().unwrap() {
tokio::spawn(async move {
let (mut r, mut w) = s.split();
tokio::io::copy(&mut r, &mut w).await.unwrap();
});
}
});
drop(exit);
}
#[tokio::test]
async fn yields_many() {
use std::sync::{
atomic::{AtomicUsize, Ordering},
Arc,
};
let (exit, valve) = Valve::new();
let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
let reqs = Arc::new(AtomicUsize::new(0));
let got = reqs.clone();
tokio::spawn(async move {
let mut incoming = valve.wrap(TcpListenerStream::new(listener));
while let Some(mut s) = incoming.next().await.transpose().unwrap() {
reqs.fetch_add(1, Ordering::SeqCst);
tokio::spawn(async move {
let (mut r, mut w) = s.split();
tokio::io::copy(&mut r, &mut w).await.unwrap();
});
}
});
let mut s = tokio::net::TcpStream::connect(&addr).await.unwrap();
s.write_all(b"hello").await.unwrap();
let mut buf = [0; 5];
s.read_exact(&mut buf[..]).await.unwrap();
assert_eq!(&buf, b"hello");
drop(s);
let mut s = tokio::net::TcpStream::connect(&addr).await.unwrap();
s.write_all(b"world").await.unwrap();
let mut buf = [0; 5];
s.read_exact(&mut buf[..]).await.unwrap();
assert_eq!(&buf, b"world");
drop(s);
assert_eq!(got.load(Ordering::SeqCst), 2);
drop(exit);
}
#[tokio::test]
async fn yields_some() {
use std::sync::{
atomic::{AtomicUsize, Ordering},
Arc,
};
let (exit, valve) = Valve::new();
let listener1 = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
let listener2 = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr1 = listener1.local_addr().unwrap();
let addr2 = listener2.local_addr().unwrap();
let reqs = Arc::new(AtomicUsize::new(0));
let got = reqs.clone();
tokio::spawn(async move {
let incoming1 = valve.wrap(TcpListenerStream::new(listener1));
let incoming2 = valve.wrap(TcpListenerStream::new(listener2));
let mut incoming = select(incoming1, incoming2);
while let Some(mut s) = incoming.next().await.transpose().unwrap() {
reqs.fetch_add(1, Ordering::SeqCst);
tokio::spawn(async move {
let (mut r, mut w) = s.split();
tokio::io::copy(&mut r, &mut w).await.unwrap();
});
}
});
let mut s = tokio::net::TcpStream::connect(&addr1).await.unwrap();
s.write_all(b"hello").await.unwrap();
let mut buf = [0; 5];
s.read_exact(&mut buf[..]).await.unwrap();
assert_eq!(&buf, b"hello");
drop(s);
let mut s = tokio::net::TcpStream::connect(&addr2).await.unwrap();
s.write_all(b"world").await.unwrap();
let mut buf = [0; 5];
s.read_exact(&mut buf[..]).await.unwrap();
assert_eq!(&buf, b"world");
drop(s);
assert_eq!(got.load(Ordering::SeqCst), 2);
drop(exit);
}
}