use std::collections::VecDeque;
use std::sync::{Arc, Mutex};
pub struct Stream<T> {
queue: Arc<Mutex<VecDeque<T>>>,
read_waker: Arc<Mutex<Option<std::task::Waker>>>,
done: Arc<Mutex<bool>>,
error: Arc<Mutex<Option<Box<dyn std::error::Error + Send + Sync>>>>,
returned_fn: Option<Box<dyn FnOnce() + Send>>,
started: Arc<Mutex<bool>>,
}
unsafe impl<T: Send> Send for Stream<T> {}
impl<T: Send + 'static> Stream<T> {
pub fn new(returned: Option<impl FnOnce() + Send + 'static>) -> Self {
Stream {
queue: Arc::new(Mutex::new(VecDeque::new())),
read_waker: Arc::new(Mutex::new(None)),
done: Arc::new(Mutex::new(false)),
error: Arc::new(Mutex::new(None)),
returned_fn: returned.map(|f| Box::new(f) as Box<dyn FnOnce() + Send>),
started: Arc::new(Mutex::new(false)),
}
}
pub fn enqueue(&self, value: T) {
let should_wake = {
let mut queue = self.queue.lock().unwrap();
queue.push_back(value);
queue.len() == 1
};
if should_wake {
if let Ok(waker) = self.read_waker.lock() {
if let Some(w) = waker.as_ref() {
w.wake_by_ref();
}
}
}
}
pub fn done(&self) {
*self.done.lock().unwrap() = true;
if let Ok(waker) = self.read_waker.lock() {
if let Some(w) = waker.as_ref() {
w.wake_by_ref();
}
}
}
pub fn error(&self, err: Box<dyn std::error::Error + Send + Sync>) {
*self.error.lock().unwrap() = Some(err);
if let Ok(waker) = self.read_waker.lock() {
if let Some(w) = waker.as_ref() {
w.wake_by_ref();
}
}
}
}
impl<T: Send + 'static> Stream<T> {
pub fn iter(self: Arc<Self>) -> StreamIter<T> {
StreamIter { stream: self }
}
}
pub struct StreamIter<T: Send + 'static> {
stream: Arc<Stream<T>>,
}
impl<T: Send + 'static> Iterator for StreamIter<T> {
type Item = T;
fn next(&mut self) -> Option<Self::Item> {
self.stream.queue.lock().unwrap().pop_front()
}
}
impl<T> Drop for Stream<T> {
fn drop(&mut self) {
*self.done.lock().unwrap() = true;
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::thread;
use std::time::Duration;
#[test]
fn test_stream_enqueue_dequeue() {
let stream: Arc<Stream<i32>> = Arc::new(Stream::new(None::<fn()>));
stream.enqueue(1);
stream.enqueue(2);
let mut iter = stream.iter();
assert_eq!(iter.next(), Some(1));
assert_eq!(iter.next(), Some(2));
}
#[test]
fn test_stream_done() {
let stream = Arc::new(Stream::new(None::<fn()>));
stream.enqueue(1);
stream.done();
let mut iter = stream.iter();
assert_eq!(iter.next(), Some(1));
assert_eq!(iter.next(), None);
}
}