#![warn(clippy::all)]
#![allow(clippy::new_without_default)]
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Condvar, Mutex};
mod imit;
mod inner;
mod peg;
mod sub;
pub use crate::imit::Imitator;
use crate::inner::{MemoryMode, SafeInner, IMITATORS};
use crate::peg::Peg;
pub use crate::sub::Subscription;
pub struct Stream<T: 'static> {
#[allow(dead_code)]
peg: Peg,
inner: SafeInner<T>,
}
impl<T> Stream<T> {
pub fn sink() -> Sink<T> {
Sink::new()
}
pub fn of(value: T) -> Stream<T>
where
T: Clone,
{
let inner = SafeInner::new(MemoryMode::KeepUntilEnd, Some(value));
Stream {
peg: Peg::new_fake(),
inner,
}
}
pub fn never() -> Stream<T> {
let inner = SafeInner::new(MemoryMode::NoMemory, None);
Stream {
peg: Peg::new_fake(),
inner,
}
}
pub fn has_memory(&self) -> bool {
self.inner.lock().memory_mode().is_memory()
}
pub fn imitator() -> Imitator<T>
where
T: Clone,
{
Imitator::new()
}
pub fn subscribe<F>(&self, f: F) -> Subscription
where
F: FnMut(Option<&T>) + 'static,
{
let peg = self.inner.lock().add(f);
peg.keep_mode();
Subscription::new(peg)
}
fn internal_subscribe<F: FnMut(Option<&T>) + 'static>(&self, f: F) -> Peg {
let mut peg = self.inner.lock().add(f);
peg.add_related(self.peg.clone());
peg
}
pub fn collect(&self) -> Collector<T>
where
T: Clone,
{
let state = Arc::new((Mutex::new((false, Some(vec![]))), Condvar::new()));
let clone = state.clone();
let peg = self.internal_subscribe(move |t| {
let mut lock = clone.0.lock().unwrap();
if let Some(t) = t {
if let Some(v) = lock.1.as_mut() {
v.push(t.clone());
}
} else {
lock.0 = true;
clone.1.notify_all();
}
});
Collector { peg, state }
}
pub fn dedupe(&self) -> Stream<T>
where
T: Clone + PartialEq,
{
self.dedupe_by(|v| v.clone())
}
pub fn dedupe_by<U, F>(&self, mut f: F) -> Stream<T>
where
U: PartialEq + 'static,
F: FnMut(&T) -> U + 'static,
{
let inner = SafeInner::new(MemoryMode::NoMemory, None);
let inner_clone = inner.clone();
let mut prev: Option<U> = None;
let peg = self.internal_subscribe(move |t| {
if let Some(t) = t {
let propagate = match (prev.take(), f(t)) {
(None, u) => {
prev = Some(u);
true
}
(Some(pu), u) => {
if pu != u {
prev = Some(u);
true
} else {
false
}
}
};
if propagate {
inner_clone.lock().update_borrowed(Some(t));
}
} else {
inner_clone.lock().update_borrowed(t);
}
});
Stream { peg, inner }
}
pub fn drop(&self, amount: usize) -> Stream<T> {
let mut todo = amount + 1;
self.drop_while(move |_| {
if todo > 0 {
todo -= 1;
}
todo > 0
})
}
pub fn drop_while<F>(&self, mut f: F) -> Stream<T>
where
F: FnMut(&T) -> bool + 'static,
{
let inner = SafeInner::new(MemoryMode::NoMemory, None);
let inner_clone = inner.clone();
let mut dropping = true;
let peg = self.internal_subscribe(move |t| {
if let Some(t) = t {
if dropping && !f(t) {
dropping = false;
}
if dropping {
return;
}
inner_clone.lock().update_borrowed(Some(t));
} else {
inner_clone.lock().update_borrowed(t);
}
});
Stream { peg, inner }
}
pub fn end_when<U>(&self, other: &Stream<U>) -> Stream<T> {
let inner = SafeInner::new(MemoryMode::NoMemory, None);
let inner_clone1 = inner.clone();
let inner_clone2 = inner.clone();
let peg1 = other.internal_subscribe(move |o| {
if o.is_none() {
inner_clone1.lock().update_borrowed(None);
}
});
let peg2 = self.internal_subscribe(move |t| {
inner_clone2.lock().update_borrowed(t);
});
let peg = Peg::many(vec![peg1, peg2]);
Stream { peg, inner }
}
pub fn filter<F>(&self, mut f: F) -> Stream<T>
where
F: FnMut(&T) -> bool + 'static,
{
let inner = SafeInner::new(MemoryMode::NoMemory, None);
let inner_clone = inner.clone();
let peg = self.internal_subscribe(move |t| {
if let Some(t) = t {
if f(t) {
inner_clone.lock().update_borrowed(Some(t));
}
} else {
inner_clone.lock().update_borrowed(t);
}
});
Stream { peg, inner }
}
pub fn fold<U, F>(&self, seed: U, mut f: F) -> Stream<U>
where
U: 'static,
F: FnMut(U, &T) -> U + 'static,
{
let inner = SafeInner::new(MemoryMode::KeepUntilEnd, Some(seed));
let inner_clone = inner.clone();
let peg = self.internal_subscribe(move |t| {
if let Some(t) = t {
let mut lock = inner_clone.lock();
if let Some(prev) = lock.take_memory() {
let next = f(prev, t);
lock.update_owned(Some(next));
} else {
panic!("fold without a previous value");
}
} else {
inner_clone.lock().update_owned(None);
}
});
Stream { peg, inner }
}
fn imitate(&self, imitator: SafeInner<T>) -> Peg
where
T: Clone,
{
self.internal_subscribe(move |t| {
let imitator_clone = imitator.clone();
if t.is_some() {
let t_clone = t.cloned();
IMITATORS.with(|imit_cell| {
let mut imit = imit_cell.borrow_mut();
imit.push(Box::new(move || {
let t = t_clone.clone();
imitator_clone.lock().update_owned(t.clone());
}));
});
} else {
imitator_clone.lock().update_owned(None);
}
})
}
pub fn last(&self) -> Stream<T>
where
T: Clone,
{
let inner = SafeInner::new(MemoryMode::NoMemory, None);
let inner_clone = inner.clone();
let last = Mutex::new(None);
let peg = self.internal_subscribe(move |t| {
let mut lock = last.lock().unwrap();
if t.is_some() {
*lock = t.cloned();
} else {
let mut ilock = inner_clone.lock();
if let Some(l) = lock.take() {
ilock.update_owned(Some(l));
}
ilock.update_owned(None);
}
});
Stream { peg, inner }
}
pub fn map<U, F>(&self, mut f: F) -> Stream<U>
where
U: 'static,
F: FnMut(&T) -> U + 'static,
{
let inner = SafeInner::new(MemoryMode::NoMemory, None);
let inner_clone = inner.clone();
let peg = self.internal_subscribe(move |t| {
if let Some(t) = t {
let u = f(t);
inner_clone.lock().update_owned(Some(u));
} else {
inner_clone.lock().update_owned(None);
}
});
Stream { peg, inner }
}
pub fn map_to<U>(&self, u: U) -> Stream<U>
where
U: Clone + 'static,
{
self.map(move |_| u.clone())
}
pub fn merge(streams: Vec<Stream<T>>) -> Stream<T> {
let inner = SafeInner::new(MemoryMode::NoMemory, None);
let inner_clone = inner.clone();
let active = Arc::new(AtomicUsize::new(streams.len()));
let pegs: Vec<_> = streams
.into_iter()
.map(|stream| {
let inner_clone = inner_clone.clone();
let active = active.clone();
stream.internal_subscribe(move |t| {
if t.is_some() {
inner_clone.lock().update_borrowed(t);
} else if active.fetch_sub(1, Ordering::SeqCst) == 1 {
inner_clone.lock().update_borrowed(None);
}
})
})
.collect();
let peg = Peg::many(pegs);
Stream { peg, inner }
}
pub fn remember(&self) -> Stream<T>
where
T: Clone,
{
self.remember_mode(MemoryMode::KeepUntilEnd)
}
fn remember_mode(&self, mode: MemoryMode) -> Stream<T>
where
T: Clone,
{
let inner = SafeInner::new(mode, None);
let inner_clone = inner.clone();
let peg = self.internal_subscribe(move |t| {
let t = t.cloned();
inner_clone.lock().update_owned(t);
});
Stream { peg, inner }
}
pub fn sample_combine<U>(&self, other: &Stream<U>) -> Stream<(T, U)>
where
T: Clone,
U: Clone,
{
let inner = SafeInner::new(MemoryMode::NoMemory, None);
let inner_clone = inner.clone();
let rem = other.remember_mode(MemoryMode::KeepAfterEnd);
let peg = self.internal_subscribe(move |t| {
if let Some(t) = t {
let rlock = rem.inner.lock();
if let Some(u) = rlock.peek_memory().as_ref() {
let v = (t.clone(), u.clone());
inner_clone.lock().update_owned(Some(v));
}
} else {
inner_clone.lock().update_borrowed(None);
}
});
Stream { peg, inner }
}
pub fn start_with(&self, start: T) -> Stream<T> {
let inner = SafeInner::new(MemoryMode::KeepUntilEnd, Some(start));
let inner_clone = inner.clone();
let peg = self.internal_subscribe(move |t| {
inner_clone.lock().update_borrowed(t);
});
Stream { peg, inner }
}
pub fn take(&self, amount: usize) -> Stream<T> {
let mut todo = amount + 1;
self.take_while(move |_| {
if todo > 0 {
todo -= 1;
}
todo > 0
})
}
pub fn take_while<F>(&self, mut f: F) -> Stream<T>
where
F: FnMut(&T) -> bool + 'static,
{
let inner = SafeInner::new(MemoryMode::NoMemory, None);
let inner_clone = inner.clone();
let peg = self.internal_subscribe(move |t| {
if let Some(t) = t {
if f(t) {
inner_clone.lock().update_borrowed(Some(t));
} else {
inner_clone.lock().update_borrowed(None);
}
} else {
inner_clone.lock().update_borrowed(t);
}
});
Stream { peg, inner }
}
#[allow(clippy::mutex_atomic)]
pub fn wait(&self) {
let pair = Arc::new((Mutex::new(false), Condvar::new()));
let pair2 = pair.clone();
let _sub = self.internal_subscribe(move |t| {
if t.is_none() {
let mut lock = pair2.0.lock().unwrap();
*lock = true;
pair2.1.notify_all();
}
});
let mut lock = pair.0.lock().unwrap();
while !*lock {
lock = pair.1.wait(lock).unwrap();
}
}
}
impl<T> Stream<Stream<T>> {
pub fn flatten(&self) -> Stream<T> {
let inner = SafeInner::new(MemoryMode::NoMemory, None);
let inner_clone = inner.clone();
let mut ipeg = None;
let peg = self.internal_subscribe(move |ts| {
if let Some(ts) = ts {
let inner_clone = inner_clone.clone();
ipeg = Some(ts.internal_subscribe(move |tv| {
if let Some(tv) = tv {
inner_clone.lock().update_borrowed(Some(tv));
} else {
}
}));
} else {
ipeg.take();
inner_clone.lock().update_borrowed(None);
}
});
Stream { peg, inner }
}
pub fn flatten_concurrently(&self) -> Stream<T> {
let inner = SafeInner::new(MemoryMode::NoMemory, None);
let inner_clone = inner.clone();
let peg = self.internal_subscribe(move |ts| {
if let Some(ts) = ts {
let inner_clone = inner_clone.clone();
let ipeg = ts.internal_subscribe(move |tv| {
if let Some(tv) = tv {
inner_clone.lock().update_borrowed(Some(tv));
} else {
}
});
ipeg.keep_mode(); } else {
inner_clone.lock().update_borrowed(None);
}
});
Stream { peg, inner }
}
}
include!("./comb.rs");
pub struct Sink<T: 'static> {
inner: SafeInner<T>,
}
impl<T> Sink<T> {
fn new() -> Self {
Sink {
inner: SafeInner::new(MemoryMode::NoMemory, None),
}
}
pub fn stream(&self) -> Stream<T> {
Stream {
peg: Peg::new_fake(),
inner: self.inner.clone(),
}
}
pub fn update(&self, next: T) {
self.inner.lock().update_and_imitate(Some(next));
}
pub fn end(self) {
self.inner.lock().update_and_imitate(None);
}
}
pub struct Collector<T> {
#[allow(dead_code)]
peg: Peg,
#[allow(clippy::type_complexity)]
state: Arc<(Mutex<(bool, Option<Vec<T>>)>, Condvar)>,
}
impl<T> Collector<T> {
pub fn wait(self) -> Vec<T> {
let mut lock = self.state.0.lock().unwrap();
while !lock.0 {
lock = self.state.1.wait(lock).unwrap();
}
lock.1.take().unwrap()
}
pub fn take(self) -> Vec<T> {
let mut lock = self.state.0.lock().unwrap();
lock.1.take().unwrap()
}
}
impl<T> Clone for Stream<T> {
fn clone(&self) -> Self {
Stream {
peg: self.peg.clone(),
inner: self.inner.clone(),
}
}
}
#[cfg(test)]
mod test {
use super::*;
use std::sync::mpsc::sync_channel;
#[test]
fn test_sink_auto_traits() {
fn f<X: Sync + Send>(_: X) {}
let sink: Sink<u32> = Sink::new();
f(sink);
}
#[test]
fn test_stream_auto_traits() {
fn f<X: Sync + Send + Clone>(_: X) {}
struct Foo(); let sink: Sink<Foo> = Sink::new();
f(sink.stream());
}
#[test]
fn test_subscription_auto_traits() {
fn f<X: Sync + Send + Clone>(_: X) {}
let sink: Sink<u32> = Sink::new();
let sub = sink.stream().subscribe(|_| {});
f(sub);
}
#[test]
fn test_chained_maps() {
let sink: Sink<u32> = Sink::new();
let map = sink.stream().map(|x| x + 1).map(|x| x * 2);
let coll = map.collect();
sink.update(0);
sink.update(1);
sink.update(2);
sink.end();
assert_eq!(coll.wait(), vec![2, 4, 6]);
}
#[test]
fn test_of() {
let stream = Stream::of(42);
let (tx, rx) = sync_channel(1);
stream.subscribe(move |x| tx.send(*x.unwrap()).unwrap());
assert_eq!(rx.recv().unwrap(), 42);
}
#[test]
fn test_imitate() {
let sink: Sink<u32> = Sink::new();
let imit: Imitator<u32> = Imitator::new();
let map = sink.stream().map(|x| x * 2);
let coll = imit.stream().collect();
imit.imitate(&map);
sink.update(0);
sink.update(1);
sink.update(2);
sink.end();
assert_eq!(coll.wait(), vec![0, 2, 4]);
}
#[test]
fn test_fold_and_last() {
let sink: Sink<u32> = Sink::new();
let fold = sink
.stream()
.fold("|".to_string(), |p, c| format!("{} {}", p, c))
.last();
let coll = fold.collect();
sink.update(42);
sink.end();
assert_eq!(coll.wait(), vec!["| 42".to_string()]);
}
#[test]
fn test_fold_and_remember() {
let sink: Sink<u32> = Sink::new();
let fold = sink
.stream()
.fold("|".to_string(), |p, c| format!("{} {}", p, c))
.remember();
let coll = fold.collect();
sink.update(42);
sink.end();
assert_eq!(coll.wait(), vec!["|".to_string(), "| 42".to_string()]);
}
#[test]
fn test_imitate_cycle() {
let imitator = Stream::imitator();
let fold = imitator
.stream()
.fold(1, |p, c| if *c < 10 { p + c } else { p })
.dedupe();
let sink = Stream::sink();
let merge = Stream::merge(vec![fold, sink.stream()]);
imitator.imitate(&merge);
let coll = merge.collect();
sink.update(1);
assert_eq!(coll.take(), vec![1, 2, 4, 8, 16]);
}
#[test]
fn test_combine() {
let sink1 = Stream::sink();
let sink2 = Stream::sink();
let comb = Stream::combine2(&sink1.stream(), &sink2.stream());
let coll = comb.collect();
sink1.update(0.0);
sink2.update(10);
sink1.update(1.0);
sink1.update(2.0);
sink2.update(11);
sink1.update(3.0);
sink1.end();
sink2.end();
assert_eq!(
coll.wait(),
vec![(0.0, 10), (1.0, 10), (2.0, 10), (2.0, 11), (3.0, 11)]
);
}
}