xi/
lib.rs

1//! A functional reactive stream library for rust.
2//!
3//! * Small (~20 operations)
4//! * Synchronous
5//! * No dependencies
6//! * Is FRP (ha!)
7//!
8//! Modelled on André Staltz' javascript library [xstream][xstrem] which nicely distills
9//! the ideas of [reactive extensions (Rx)][reactx] down to the essential minimum.
10//!
11//! This library is not FRP (Functional Reactive Programming) in the way it was
12//! defined by Conal Elliot, but as a paradigm that is both functional and reactive.
13//! [Why I cannot say FRP but I just did][notfrp].
14//!
15//! [xstrem]: https://github.com/staltz/xstream
16//! [reactx]: http://reactivex.io
17//! [notfrp]: https://medium.com/@andrestaltz/why-i-cannot-say-frp-but-i-just-did-d5ffaa23973b
18//!
19//! ## Example
20//!
21//! ```
22//! use xi::{Sink, Stream};
23//!
24//! // A sink is an originator of events that form a stream.
25//! let sink: Sink<u32> = Stream::sink();
26//!
27//! // Map the even numbers to their square.
28//! let stream: Stream<u32> = sink.stream()
29//!     .filter(|i| i % 2 == 0)
30//!     .map(|i| i * i);
31//!
32//! // Print the result
33//! stream.subscribe(|i| if let Some(i) = i {
34//!     println!("{}", i)
35//! });
36//!
37//! // Send numbers into the sink.
38//! for i in 0..10 {
39//!     sink.update(i);
40//! }
41//! sink.end();
42//! ```
43//!
44//! # Idea
45//!
46//! Functional Reactive Programming is a good foundation for functional programming (FP).
47//! The step-by-step approach of composing interlocked operations, is a relatively
48//! easy way to make an FP structure to a piece of software.
49//!
50//! ## Synchronous
51//!
52//! Libraries that deals with streams as values-over-time (or events) often conflate the
53//! idea of moving data from point A to B, with the operators that transform the data. The
54//! result is that the library must deal with queues of data, queue lengths and backpressure.
55//!
56//! _Xi has no queues_
57//!
58//! Every [`Sink::update()`](struct.Sink.html#method.update) of data into the tree of
59//! operations executes synchronously. Xi has no operators that dispatches "later",
60//! i.e. no `delay()` or other time shifting operations.
61//!
62//! That also means xi also has no internal threads, futures or otherwise.
63//!
64//! ## Thread safe
65//!
66//! Every part of the xi tree is thread safe. You can move a `Sink` into another thread,
67//! or subscribe and propagate on a UI main thread. The thread that calls `Sink::update()` is
68//! the thread executing the entire tree.
69//!
70//! That safety comes at a cost, xi is not a zero cost abstraction library. Every part of
71//! the tree is protected by a mutex lock. This is fine for most applications since a lock
72//! without contention is not much overhead in the execution. But if you plan on having
73//! lots of threads simultaneously updating many values into the tree, you might
74//! experience a performance hit due to lock contention.
75//!
76//! ## Be out of your way
77//!
78//! Xi tries to impose a minimum of cognitive load when using it.
79//!
80//! * Every operator is an `FnMut(&T)` to make it the most usable possible.
81//! * Not require `Sync` and/or `Send` on operator functions.
82//! * Xi stream instances themselves are `Sync` and `Send`.
83//! * Impose a minimum of constraints the event value `T`.
84//!
85//! ## Subscription lifetimes
86//!
87//! See [`Subscription`](struct.Subscription.html#subscription-lifetimes)
88
89#![warn(clippy::all)]
90#![allow(clippy::new_without_default)]
91
92use std::sync::atomic::{AtomicUsize, Ordering};
93use std::sync::{Arc, Condvar, Mutex};
94
95mod imit;
96mod inner;
97mod peg;
98mod sub;
99
100pub use crate::imit::Imitator;
101use crate::inner::{MemoryMode, SafeInner, IMITATORS};
102use crate::peg::Peg;
103pub use crate::sub::Subscription;
104
105/// A stream of events, values in time.
106///
107/// Streams have combinators to build "execution trees" working over events.
108///
109/// ## Memory
110///
111/// Some streams have "memory". Streams with memory keeps a copy of the last value they
112/// produced so that any new subscriber will syncronously receive the value.
113///
114/// Streams with memory are explicitly created using
115/// [`.remember()`](struct.Stream.html#method.remember), but also by other combinators
116/// such as [`.fold()`](struct.Stream.html#method.fold) and
117/// [`.start_with()`](struct.Stream.html#method.start_with).
118pub struct Stream<T: 'static> {
119    #[allow(dead_code)]
120    peg: Peg,
121    inner: SafeInner<T>,
122}
123
124impl<T> Stream<T> {
125    //
126
127    /// Create a sink that is used to push values into a stream.
128    ///
129    /// ```
130    /// let sink = xi::Stream::sink();
131    ///
132    /// // collect values going into the sink
133    /// let coll = sink.stream().collect();
134    ///
135    /// sink.update(0);
136    /// sink.update(1);
137    /// sink.update(2);
138    /// sink.end();
139    ///
140    /// assert_eq!(coll.wait(), vec![0, 1, 2]);
141    /// ```
142    pub fn sink() -> Sink<T> {
143        Sink::new()
144    }
145
146    /// Create a stream with memory that only emits one single value to anyone subscribing.
147    ///
148    /// ```
149    /// let value = xi::Stream::of(42);
150    ///
151    /// // both collectors will receive the value
152    /// let coll1 = value.collect();
153    /// let coll2 = value.collect();
154    ///
155    /// // use .take() since stream doesn't end
156    /// assert_eq!(coll1.take(), [42]);
157    /// assert_eq!(coll2.take(), [42]);
158    /// ```
159    pub fn of(value: T) -> Stream<T>
160    where
161        T: Clone,
162    {
163        let inner = SafeInner::new(MemoryMode::KeepUntilEnd, Some(value));
164        Stream {
165            peg: Peg::new_fake(),
166            inner,
167        }
168    }
169
170    /// Create a stream that never emits any value and never ends.
171    ///
172    /// ```
173    /// use xi::Stream;
174    ///
175    /// let never: Stream<u32> = Stream::never();
176    /// let coll = never.collect();
177    /// assert_eq!(coll.take(), vec![]);
178    /// ```
179    pub fn never() -> Stream<T> {
180        let inner = SafeInner::new(MemoryMode::NoMemory, None);
181        Stream {
182            peg: Peg::new_fake(),
183            inner,
184        }
185    }
186
187    /// Check if this stream has "memory".
188    ///
189    /// Streams with memory keeps a copy of the last value they produced so that any
190    /// new subscriber will syncronously receive the value.
191    ///
192    /// Streams with memory are explicitly created using `.remember()`, but also by
193    /// other combinators such as `.fold()` and `.start_with()`.
194    ///
195    /// The memory is not inherited to child combinators. I.e.
196    ///
197    /// ```
198    /// let sink = xi::Stream::sink();
199    /// sink.update(0);
200    ///
201    /// // This stream has memory.
202    /// let rem = sink.stream().remember();
203    ///
204    /// // This filtered stream has _NO_ memory.
205    /// let filt = rem.filter(|t| *t > 10);
206    ///
207    /// assert!(rem.has_memory());
208    /// assert!(!filt.has_memory());
209    /// ```
210    pub fn has_memory(&self) -> bool {
211        self.inner.lock().memory_mode().is_memory()
212    }
213
214    /// Creates an imitator. Imitators are used to make cyclic streams.
215    ///
216    ///
217    pub fn imitator() -> Imitator<T>
218    where
219        T: Clone,
220    {
221        Imitator::new()
222    }
223
224    /// Subscribe to events from this stream. The returned subscription can be used to
225    /// unsubscribe at a future time.
226    ///
227    /// Each value is wrapped in an `Option`, there will be exactly one None event when
228    /// the stream ends.
229    ///
230    /// ```
231    /// let sink = xi::Stream::sink();
232    /// let stream = sink.stream();
233    ///
234    /// let handle = std::thread::spawn(move || {
235    ///
236    ///   // values are Some(0), Some(1), Some(2), None
237    ///   stream.subscribe(|v| if let Some(v) = v {
238    ///       println!("Got value: {}", v);
239    ///   });
240    ///
241    ///   // stall thread until stream ends.
242    ///   stream.wait();
243    /// });
244    ///
245    /// sink.update(0);
246    /// sink.update(1);
247    /// sink.update(2);
248    /// sink.end();
249    ///
250    /// handle.join();
251    /// ```
252    pub fn subscribe<F>(&self, f: F) -> Subscription
253    where
254        F: FnMut(Option<&T>) + 'static,
255    {
256        let peg = self.inner.lock().add(f);
257        peg.keep_mode();
258        Subscription::new(peg)
259    }
260
261    /// Internal subscribe that stops subscribing if the subscription goes out of scope.
262    fn internal_subscribe<F: FnMut(Option<&T>) + 'static>(&self, f: F) -> Peg {
263        let mut peg = self.inner.lock().add(f);
264        peg.add_related(self.peg.clone());
265        peg
266    }
267
268    /// Collect events into a `Collector`. This is mostly interesting for testing.
269    ///
270    /// ```
271    /// let sink = xi::Stream::sink();
272    ///
273    /// // collect all values emitted into the sink
274    /// let coll = sink.stream().collect();
275    ///
276    /// std::thread::spawn(move || {
277    ///   sink.update(0);
278    ///   sink.update(1);
279    ///   sink.update(2);
280    ///   sink.end();
281    /// });
282    ///
283    /// let result = coll.wait(); // wait for stream to end
284    /// assert_eq!(result, vec![0, 1, 2]);
285    /// ```
286    pub fn collect(&self) -> Collector<T>
287    where
288        T: Clone,
289    {
290        let state = Arc::new((Mutex::new((false, Some(vec![]))), Condvar::new()));
291        let clone = state.clone();
292        let peg = self.internal_subscribe(move |t| {
293            let mut lock = clone.0.lock().unwrap();
294            if let Some(t) = t {
295                if let Some(v) = lock.1.as_mut() {
296                    v.push(t.clone());
297                }
298            } else {
299                lock.0 = true;
300                clone.1.notify_all();
301            }
302        });
303        Collector { peg, state }
304    }
305
306    /// Dedupe stream by the event itself.
307    ///
308    /// This clones every event to compare with the next.
309    ///
310    /// ```
311    /// let sink = xi::Stream::sink();
312    ///
313    /// let deduped = sink.stream().dedupe();
314    ///
315    /// let coll = deduped.collect();
316    ///
317    /// sink.update(0);
318    /// sink.update(0);
319    /// sink.update(1);
320    /// sink.update(1);
321    /// sink.end();
322    ///
323    /// assert_eq!(coll.wait(), vec![0, 1]);
324    /// ```
325    pub fn dedupe(&self) -> Stream<T>
326    where
327        T: Clone + PartialEq,
328    {
329        self.dedupe_by(|v| v.clone())
330    }
331
332    /// Dedupe stream by some extracted value.
333    ///
334    /// ```
335    /// use xi::{Stream, Sink};
336    ///
337    /// #[derive(Clone, Debug)]
338    /// struct Foo(&'static str, usize);
339    ///
340    /// let sink: Sink<Foo> = Stream::sink();
341    ///
342    /// // dedupe this stream of Foo on the contained usize
343    /// let deduped = sink.stream().dedupe_by(|v| v.1);
344    ///
345    /// let coll = deduped.collect();
346    ///
347    /// sink.update(Foo("yo", 1));
348    /// sink.update(Foo("bro", 1));
349    /// sink.update(Foo("lo", 2));
350    /// sink.update(Foo("lo", 2));
351    /// sink.end();
352    ///
353    /// assert_eq!(format!("{:?}", coll.wait()),
354    ///     "[Foo(\"yo\", 1), Foo(\"lo\", 2)]");
355    /// ```
356    pub fn dedupe_by<U, F>(&self, mut f: F) -> Stream<T>
357    where
358        U: PartialEq + 'static,
359        F: FnMut(&T) -> U + 'static,
360    {
361        let inner = SafeInner::new(MemoryMode::NoMemory, None);
362        let inner_clone = inner.clone();
363        let mut prev: Option<U> = None;
364        let peg = self.internal_subscribe(move |t| {
365            if let Some(t) = t {
366                let propagate = match (prev.take(), f(t)) {
367                    (None, u) => {
368                        // no previous value, save this and propagate
369                        prev = Some(u);
370                        true
371                    }
372                    (Some(pu), u) => {
373                        if pu != u {
374                            // new value is different to previous, save and propagate
375                            prev = Some(u);
376                            true
377                        } else {
378                            // new value is same as before, don't propagate
379                            false
380                        }
381                    }
382                };
383                if propagate {
384                    inner_clone.lock().update_borrowed(Some(t));
385                }
386            } else {
387                inner_clone.lock().update_borrowed(t);
388            }
389        });
390        Stream { peg, inner }
391    }
392
393    /// Drop an amount of initial values.
394    ///
395    /// ```
396    /// let sink = xi::Stream::sink();
397    ///
398    /// // drop 2 initial values
399    /// let dropped = sink.stream().drop(2);
400    ///
401    /// let coll = dropped.collect();
402    ///
403    /// sink.update(0);
404    /// sink.update(1);
405    /// sink.update(2);
406    /// sink.update(3);
407    /// sink.end();
408    ///
409    /// assert_eq!(coll.wait(), vec![2, 3]);
410    /// ```
411    pub fn drop(&self, amount: usize) -> Stream<T> {
412        let mut todo = amount + 1;
413        self.drop_while(move |_| {
414            if todo > 0 {
415                todo -= 1;
416            }
417            todo > 0
418        })
419    }
420
421    /// Don't take values while some condition holds true. Once the condition is false,
422    /// the resulting stream emits all events.
423    ///
424    /// ```
425    /// let sink = xi::Stream::sink();
426    ///
427    /// // drop initial odd values
428    /// let dropped = sink.stream().drop_while(|v| v % 2 == 1);
429    ///
430    /// let coll = dropped.collect();
431    ///
432    /// sink.update(1);
433    /// sink.update(3);
434    /// sink.update(4);
435    /// sink.update(5); // not dropped
436    /// sink.end();
437    ///
438    /// assert_eq!(coll.wait(), vec![4, 5]);
439    /// ```
440    pub fn drop_while<F>(&self, mut f: F) -> Stream<T>
441    where
442        F: FnMut(&T) -> bool + 'static,
443    {
444        let inner = SafeInner::new(MemoryMode::NoMemory, None);
445        let inner_clone = inner.clone();
446        let mut dropping = true;
447        let peg = self.internal_subscribe(move |t| {
448            if let Some(t) = t {
449                if dropping && !f(t) {
450                    dropping = false;
451                }
452                if dropping {
453                    return;
454                }
455                inner_clone.lock().update_borrowed(Some(t));
456            } else {
457                inner_clone.lock().update_borrowed(t);
458            }
459        });
460        Stream { peg, inner }
461    }
462
463    /// Produce a stream that ends when some other stream ends.
464    ///
465    /// ```
466    /// use xi::Stream;
467    ///
468    /// let sink1 = Stream::sink();
469    /// let sink2 = Stream::sink();
470    ///
471    /// // ending shows values of sink1, but ends when sink2 does.
472    /// let ending = sink1.stream().end_when(&sink2.stream());
473    ///
474    /// let coll = ending.collect();
475    ///
476    /// sink1.update(0);
477    /// sink2.update("yo");
478    /// sink1.update(1);
479    /// sink2.end();
480    /// sink1.update(2); // collector never sees this value
481    ///
482    /// assert_eq!(coll.wait(), [0, 1]);
483    /// ```
484    pub fn end_when<U>(&self, other: &Stream<U>) -> Stream<T> {
485        let inner = SafeInner::new(MemoryMode::NoMemory, None);
486        let inner_clone1 = inner.clone();
487        let inner_clone2 = inner.clone();
488        let peg1 = other.internal_subscribe(move |o| {
489            if o.is_none() {
490                inner_clone1.lock().update_borrowed(None);
491            }
492        });
493        let peg2 = self.internal_subscribe(move |t| {
494            inner_clone2.lock().update_borrowed(t);
495        });
496        let peg = Peg::many(vec![peg1, peg2]);
497        Stream { peg, inner }
498    }
499
500    /// Filter out a subset of the events in the stream.
501    ///
502    /// ```
503    /// let sink = xi::Stream::sink();
504    ///
505    /// // keep even numbers
506    /// let filtered = sink.stream().filter(|v| v % 2 == 0);
507    ///
508    /// let coll = filtered.collect();
509    ///
510    /// sink.update(0);
511    /// sink.update(1);
512    /// sink.update(2);
513    /// sink.end();
514    ///
515    /// assert_eq!(coll.wait(), vec![0, 2]);
516    /// ```
517    pub fn filter<F>(&self, mut f: F) -> Stream<T>
518    where
519        F: FnMut(&T) -> bool + 'static,
520    {
521        let inner = SafeInner::new(MemoryMode::NoMemory, None);
522        let inner_clone = inner.clone();
523        let peg = self.internal_subscribe(move |t| {
524            if let Some(t) = t {
525                if f(t) {
526                    inner_clone.lock().update_borrowed(Some(t));
527                }
528            } else {
529                inner_clone.lock().update_borrowed(t);
530            }
531        });
532        Stream { peg, inner }
533    }
534
535    /// Combine events from the past, with new events to produce an output.
536    ///
537    /// This is roughly equivalent to a "fold" or "reduce" over an array. For each event we
538    /// emit the latest state out. The seed value is emitted straight away.
539    ///
540    /// The result is always a "memory" stream.
541    ///
542    /// ```
543    /// let sink = xi::Stream::sink();
544    ///
545    /// let folded = sink.stream()
546    ///     .fold(40.5, |prev, next| prev + (*next as f32) / 2.0);
547    ///
548    /// let coll = folded.collect();
549    ///
550    /// sink.update(0);
551    /// sink.update(1);
552    /// sink.update(2);
553    /// sink.end();
554    ///
555    /// assert_eq!(coll.wait(), vec![40.5, 40.5, 41.0, 42.0]);
556    /// ```
557    pub fn fold<U, F>(&self, seed: U, mut f: F) -> Stream<U>
558    where
559        U: 'static,
560        F: FnMut(U, &T) -> U + 'static,
561    {
562        let inner = SafeInner::new(MemoryMode::KeepUntilEnd, Some(seed));
563        let inner_clone = inner.clone();
564        let peg = self.internal_subscribe(move |t| {
565            if let Some(t) = t {
566                let mut lock = inner_clone.lock();
567                if let Some(prev) = lock.take_memory() {
568                    let next = f(prev, t);
569                    lock.update_owned(Some(next));
570                } else {
571                    panic!("fold without a previous value");
572                }
573            } else {
574                inner_clone.lock().update_owned(None);
575            }
576        });
577        Stream { peg, inner }
578    }
579
580    /// Internal imitate for imitator.
581    fn imitate(&self, imitator: SafeInner<T>) -> Peg
582    where
583        T: Clone,
584    {
585        self.internal_subscribe(move |t| {
586            let imitator_clone = imitator.clone();
587            if t.is_some() {
588                let t_clone = t.cloned();
589                IMITATORS.with(|imit_cell| {
590                    let mut imit = imit_cell.borrow_mut();
591                    imit.push(Box::new(move || {
592                        // this is one clone too many. if we could use
593                        // Box<FnOnce> on stable, we would do that instead
594                        let t = t_clone.clone();
595                        imitator_clone.lock().update_owned(t.clone());
596                    }));
597                });
598            } else {
599                imitator_clone.lock().update_owned(None);
600            }
601        })
602    }
603
604    /// Emits the last seen event when the stream closes.
605    ///
606    /// ```
607    /// let sink = xi::Stream::sink();
608    ///
609    /// let coll = sink.stream().last().collect();
610    ///
611    /// sink.update(0);
612    /// sink.update(1);
613    /// sink.update(2);
614    /// sink.end();
615    ///
616    /// assert_eq!(coll.wait(), vec![2]);
617    /// ```
618    pub fn last(&self) -> Stream<T>
619    where
620        T: Clone,
621    {
622        let inner = SafeInner::new(MemoryMode::NoMemory, None);
623        let inner_clone = inner.clone();
624        let last = Mutex::new(None);
625        let peg = self.internal_subscribe(move |t| {
626            let mut lock = last.lock().unwrap();
627            if t.is_some() {
628                *lock = t.cloned();
629            } else {
630                let mut ilock = inner_clone.lock();
631                if let Some(l) = lock.take() {
632                    ilock.update_owned(Some(l));
633                }
634                ilock.update_owned(None);
635            }
636        });
637        Stream { peg, inner }
638    }
639
640    /// Transform events.
641    ///
642    /// ```
643    /// let sink = xi::Stream::sink();
644    ///
645    /// let mapped = sink.stream().map(|v| format!("yo {}", v));
646    ///
647    /// let coll = mapped.collect();
648    ///
649    /// sink.update(41);
650    /// sink.update(42);
651    /// sink.end();
652    ///
653    /// assert_eq!(coll.wait(),
654    ///     vec!["yo 41".to_string(), "yo 42".to_string()]);
655    /// ```
656    pub fn map<U, F>(&self, mut f: F) -> Stream<U>
657    where
658        U: 'static,
659        F: FnMut(&T) -> U + 'static,
660    {
661        let inner = SafeInner::new(MemoryMode::NoMemory, None);
662        let inner_clone = inner.clone();
663        let peg = self.internal_subscribe(move |t| {
664            if let Some(t) = t {
665                let u = f(t);
666                inner_clone.lock().update_owned(Some(u));
667            } else {
668                inner_clone.lock().update_owned(None);
669            }
670        });
671        Stream { peg, inner }
672    }
673
674    /// For every event, emit a static value.
675    ///
676    /// ```
677    /// let sink = xi::Stream::sink();
678    ///
679    /// let mapped = sink.stream().map_to(42.0);
680    ///
681    /// let coll = mapped.collect();
682    ///
683    /// sink.update(0);
684    /// sink.update(1);
685    /// sink.end();
686    ///
687    /// assert_eq!(coll.wait(), vec![42.0, 42.0]);
688    /// ```
689    pub fn map_to<U>(&self, u: U) -> Stream<U>
690    where
691        U: Clone + 'static,
692    {
693        self.map(move |_| u.clone())
694    }
695
696    /// Merge events from a bunch of streams to one stream.
697    ///
698    /// ```
699    /// use xi::Stream;
700    ///
701    /// let sink1 = Stream::sink();
702    /// let sink2 = Stream::sink();
703    ///
704    /// let merged = Stream::merge(vec![
705    ///     sink1.stream(),
706    ///     sink2.stream()
707    /// ]);
708    ///
709    /// let coll = merged.collect();
710    ///
711    /// sink1.update(0);
712    /// sink2.update(10);
713    /// sink1.update(1);
714    /// sink2.update(11);
715    /// sink1.end();
716    /// sink2.end();
717    ///
718    /// assert_eq!(coll.wait(), vec![0, 10, 1, 11]);
719    /// ```
720    pub fn merge(streams: Vec<Stream<T>>) -> Stream<T> {
721        let inner = SafeInner::new(MemoryMode::NoMemory, None);
722        let inner_clone = inner.clone();
723        let active = Arc::new(AtomicUsize::new(streams.len()));
724        let pegs: Vec<_> = streams
725            .into_iter()
726            .map(|stream| {
727                let inner_clone = inner_clone.clone();
728                let active = active.clone();
729                stream.internal_subscribe(move |t| {
730                    if t.is_some() {
731                        inner_clone.lock().update_borrowed(t);
732                    } else if active.fetch_sub(1, Ordering::SeqCst) == 1 {
733                        // all streams are ended. close the merged one
734                        inner_clone.lock().update_borrowed(None);
735                    }
736                })
737            })
738            .collect();
739        let peg = Peg::many(pegs);
740        Stream { peg, inner }
741    }
742
743    /// Make a stream in memory mode. Each value is remembered for future subscribers.
744    ///
745    /// ```
746    /// let sink = xi::Stream::sink();
747    ///
748    /// let rem = sink.stream().remember();
749    ///
750    /// sink.update(0);
751    /// sink.update(1);
752    ///
753    /// // receives last "remembered" value
754    /// let coll = rem.collect();
755    ///
756    /// sink.update(2);
757    /// sink.end();
758    ///
759    /// assert_eq!(coll.wait(), vec![1, 2]);
760    /// ```
761    pub fn remember(&self) -> Stream<T>
762    where
763        T: Clone,
764    {
765        self.remember_mode(MemoryMode::KeepUntilEnd)
766    }
767
768    /// Internal remember where we can chose "mode"
769    fn remember_mode(&self, mode: MemoryMode) -> Stream<T>
770    where
771        T: Clone,
772    {
773        let inner = SafeInner::new(mode, None);
774        let inner_clone = inner.clone();
775        let peg = self.internal_subscribe(move |t| {
776            let t = t.cloned();
777            inner_clone.lock().update_owned(t);
778        });
779        Stream { peg, inner }
780    }
781
782    /// On every event in this stream, combine with the last value of the other stream.
783    ///
784    /// ```
785    /// use xi::Stream;
786    ///
787    /// let sink1 = Stream::sink();
788    /// let sink2 = Stream::sink();
789    ///
790    /// let comb = sink1.stream().sample_combine(&sink2.stream());
791    ///
792    /// let coll = comb.collect();
793    ///
794    /// sink1.update(0);     // lost, because no value in sink2
795    /// sink2.update("foo"); // doesn't trigger combine
796    /// sink1.update(1);
797    /// sink1.update(2);
798    /// sink2.update("bar");
799    /// sink2.end();         // sink2 is "bar" forever
800    /// sink1.update(3);
801    /// sink1.end();
802    ///
803    /// assert_eq!(coll.wait(),
804    ///   vec![(1, "foo"), (2, "foo"), (3, "bar")])
805    /// ```
806    pub fn sample_combine<U>(&self, other: &Stream<U>) -> Stream<(T, U)>
807    where
808        T: Clone,
809        U: Clone,
810    {
811        let inner = SafeInner::new(MemoryMode::NoMemory, None);
812        let inner_clone = inner.clone();
813        let rem = other.remember_mode(MemoryMode::KeepAfterEnd);
814        let peg = self.internal_subscribe(move |t| {
815            if let Some(t) = t {
816                let rlock = rem.inner.lock();
817                if let Some(u) = rlock.peek_memory().as_ref() {
818                    // we have both t and u
819                    let v = (t.clone(), u.clone());
820                    inner_clone.lock().update_owned(Some(v));
821                }
822            } else {
823                inner_clone.lock().update_borrowed(None);
824            }
825        });
826        Stream { peg, inner }
827    }
828
829    /// Prepend a start value to the stream. The result is a memory stream.
830    ///
831    /// ```
832    /// let sink = xi::Stream::sink();
833    ///
834    /// sink.update(0); // lost
835    ///
836    /// let started = sink.stream().start_with(1);
837    ///
838    /// let coll = started.collect(); // receives 1 and following
839    ///
840    /// sink.update(2);
841    /// sink.end();
842    ///
843    /// assert_eq!(coll.wait(), vec![1, 2]);
844    /// ```
845    pub fn start_with(&self, start: T) -> Stream<T> {
846        let inner = SafeInner::new(MemoryMode::KeepUntilEnd, Some(start));
847        let inner_clone = inner.clone();
848        let peg = self.internal_subscribe(move |t| {
849            inner_clone.lock().update_borrowed(t);
850        });
851        Stream { peg, inner }
852    }
853
854    /// Take a number of events, then end the stream.
855    ///
856    /// ```
857    /// let sink = xi::Stream::sink();
858    ///
859    /// let take2 = sink.stream().take(2);
860    ///
861    /// let coll = take2.collect();
862    ///
863    /// sink.update(0);
864    /// sink.update(1); // take2 ends here
865    /// sink.update(2);
866    ///
867    /// assert_eq!(coll.wait(), vec![0, 1]);
868    /// ```
869    pub fn take(&self, amount: usize) -> Stream<T> {
870        let mut todo = amount + 1;
871        self.take_while(move |_| {
872            if todo > 0 {
873                todo -= 1;
874            }
875            todo > 0
876        })
877    }
878
879    /// Take events from the stream as long as a condition holds true.
880    ///
881    /// ```
882    /// let sink = xi::Stream::sink();
883    ///
884    /// // take events as long as they are even
885    /// let take = sink.stream().take_while(|v| *v % 2 == 0);
886    ///
887    /// let coll = take.collect();
888    ///
889    /// sink.update(0);
890    /// sink.update(2);
891    /// sink.update(3); // take ends here
892    /// sink.update(4);
893    ///
894    /// assert_eq!(coll.wait(), vec![0, 2]);
895    /// ```
896    pub fn take_while<F>(&self, mut f: F) -> Stream<T>
897    where
898        F: FnMut(&T) -> bool + 'static,
899    {
900        let inner = SafeInner::new(MemoryMode::NoMemory, None);
901        let inner_clone = inner.clone();
902        let peg = self.internal_subscribe(move |t| {
903            if let Some(t) = t {
904                if f(t) {
905                    inner_clone.lock().update_borrowed(Some(t));
906                } else {
907                    inner_clone.lock().update_borrowed(None);
908                }
909            } else {
910                inner_clone.lock().update_borrowed(t);
911            }
912        });
913        Stream { peg, inner }
914    }
915
916    /// Stalls calling thread until the stream ends.
917    ///
918    /// ```
919    /// let sink = xi::Stream::sink();
920    /// let stream = sink.stream();
921    ///
922    /// std::thread::spawn(move || {
923    ///   sink.update(0);
924    ///   sink.update(1);
925    ///   sink.update(2);
926    ///   sink.end(); // this releases the wait
927    /// });
928    ///
929    /// stream.wait(); // wait for other thread
930    /// ```
931    #[allow(clippy::mutex_atomic)]
932    pub fn wait(&self) {
933        let pair = Arc::new((Mutex::new(false), Condvar::new()));
934        let pair2 = pair.clone();
935        let _sub = self.internal_subscribe(move |t| {
936            if t.is_none() {
937                let mut lock = pair2.0.lock().unwrap();
938                *lock = true;
939                pair2.1.notify_all();
940            }
941        });
942        let mut lock = pair.0.lock().unwrap();
943        while !*lock {
944            lock = pair.1.wait(lock).unwrap();
945        }
946    }
947}
948
949impl<T> Stream<Stream<T>> {
950    //
951
952    /// Flatten out a stream of streams, sequentially.
953    ///
954    /// For each new stream, unsubscribe from the previous, and subscribe to the new. The new
955    /// stream "interrupts" the previous stream.
956    ///
957    /// ```
958    /// use xi::{Stream, Sink};
959    ///
960    /// let sink1: Sink<Stream<u32>> = Stream::sink();
961    /// let sink2: Sink<u32> = Stream::sink();
962    /// let sink3: Sink<u32> = Stream::sink();
963    ///
964    /// let flat = sink1.stream().flatten();
965    ///
966    /// let coll = flat.collect();
967    ///
968    /// sink2.update(0); // lost
969    ///
970    /// sink1.update(sink2.stream());
971    /// sink2.update(1);
972    /// sink2.update(2);
973    /// sink2.end(); // does not end sink1
974    ///
975    /// sink3.update(10); // lost
976    ///
977    /// sink1.update(sink3.stream());
978    /// sink3.update(11);
979    ///
980    /// sink1.end();
981    ///
982    /// assert_eq!(coll.wait(), vec![1, 2, 11]);
983    /// ```
984    pub fn flatten(&self) -> Stream<T> {
985        let inner = SafeInner::new(MemoryMode::NoMemory, None);
986        let inner_clone = inner.clone();
987        let mut ipeg = None;
988        let peg = self.internal_subscribe(move |ts| {
989            if let Some(ts) = ts {
990                let inner_clone = inner_clone.clone();
991                ipeg = Some(ts.internal_subscribe(move |tv| {
992                    if let Some(tv) = tv {
993                        inner_clone.lock().update_borrowed(Some(tv));
994                    } else {
995                        // inner stream end does nothing to outer
996                    }
997                }));
998            } else {
999                ipeg.take();
1000                inner_clone.lock().update_borrowed(None);
1001            }
1002        });
1003        Stream { peg, inner }
1004    }
1005
1006    /// Flatten out a stream of streams, concurrently.
1007    ///
1008    /// For each new stream, keep the previous, and subscribe to the new.
1009    ///
1010    /// ```
1011    /// use xi::{Stream, Sink};
1012    ///
1013    /// let sink1: Sink<Stream<u32>> = Stream::sink();
1014    /// let sink2: Sink<u32> = Stream::sink();
1015    /// let sink3: Sink<u32> = Stream::sink();
1016    ///
1017    /// let flat = sink1.stream().flatten_concurrently();
1018    ///
1019    /// let coll = flat.collect();
1020    ///
1021    /// sink2.update(0); // lost
1022    ///
1023    /// sink1.update(sink2.stream());
1024    /// sink2.update(1);
1025    /// sink2.update(2);
1026    ///
1027    /// sink3.update(10); // lost
1028    ///
1029    /// sink1.update(sink3.stream());
1030    /// sink3.update(11);
1031    /// sink2.update(3);
1032    /// sink3.update(12);
1033    ///
1034    /// sink1.end();
1035    ///
1036    /// assert_eq!(coll.wait(), vec![1, 2, 11, 3, 12]);
1037    /// ```
1038    pub fn flatten_concurrently(&self) -> Stream<T> {
1039        let inner = SafeInner::new(MemoryMode::NoMemory, None);
1040        let inner_clone = inner.clone();
1041        let peg = self.internal_subscribe(move |ts| {
1042            if let Some(ts) = ts {
1043                let inner_clone = inner_clone.clone();
1044                let ipeg = ts.internal_subscribe(move |tv| {
1045                    if let Some(tv) = tv {
1046                        inner_clone.lock().update_borrowed(Some(tv));
1047                    } else {
1048                        // inner stream end does nothing to outer
1049                    }
1050                });
1051                ipeg.keep_mode(); // we drop ipeg, but keep listening
1052            } else {
1053                inner_clone.lock().update_borrowed(None);
1054            }
1055        });
1056        Stream { peg, inner }
1057    }
1058}
1059
1060include!("./comb.rs");
1061
1062/// A sink is a producer of events. Created by [`Stream::sink()`](struct.Stream.html#method.sink).
1063pub struct Sink<T: 'static> {
1064    inner: SafeInner<T>,
1065}
1066impl<T> Sink<T> {
1067    /// Create a new sink that in turn is used to stream events.
1068    fn new() -> Self {
1069        Sink {
1070            inner: SafeInner::new(MemoryMode::NoMemory, None),
1071        }
1072    }
1073
1074    /// Get a stream of events from this sink. One stream instance is created for each call,
1075    /// and they all receive the events from the sink.
1076    ///
1077    /// ```
1078    /// let sink = xi::Stream::sink();
1079    ///
1080    /// let stream1 = sink.stream();
1081    /// let stream2 = sink.stream();
1082    ///
1083    /// let coll1 = stream1.collect();
1084    /// let coll2 = stream1.collect();
1085    ///
1086    /// sink.update(42);
1087    /// sink.end();
1088    ///
1089    /// assert_eq!(coll1.wait(), vec![42]);
1090    /// assert_eq!(coll2.wait(), vec![42]);
1091    /// ```
1092    pub fn stream(&self) -> Stream<T> {
1093        Stream {
1094            peg: Peg::new_fake(),
1095            inner: self.inner.clone(),
1096        }
1097    }
1098
1099    /// Update a value into this sink.
1100    ///
1101    /// The execution of the combinators "hanging" off this sink is (thread safe) and
1102    /// synchronous. In other words, there is nothing in xi itself that will still be
1103    /// "to do" once the `update()` call returns.
1104    ///
1105    /// Each value is wrapped in an `Option` towards subscribers of the streams.
1106    ///
1107    /// ```
1108    /// let sink = xi::Stream::sink();
1109    /// let stream = sink.stream();
1110    ///
1111    /// stream.subscribe(|v| {
1112    ///     // v is Some(0), Some(1), None
1113    /// });
1114    ///
1115    /// sink.update(0);
1116    /// sink.update(1);
1117    /// sink.end();
1118    /// ```
1119    pub fn update(&self, next: T) {
1120        self.inner.lock().update_and_imitate(Some(next));
1121    }
1122
1123    /// End the stream of events. Consumes the instance since no more values are to go into it.
1124    ///
1125    /// Subscribers will se a `None` value.
1126    ///
1127    /// Every stream hanging directly off this sink will also end. The exception is streams
1128    /// combining input from multiple source streams.
1129    pub fn end(self) {
1130        self.inner.lock().update_and_imitate(None);
1131    }
1132}
1133
1134/// The collector instance collects values from a stream. Created by
1135/// [`Stream::collect()`](struct.Stream.html#method.collect).
1136pub struct Collector<T> {
1137    #[allow(dead_code)]
1138    peg: Peg,
1139    #[allow(clippy::type_complexity)]
1140    state: Arc<(Mutex<(bool, Option<Vec<T>>)>, Condvar)>,
1141}
1142
1143impl<T> Collector<T> {
1144    /// Stall the thread and wait for the stream to end.
1145    pub fn wait(self) -> Vec<T> {
1146        let mut lock = self.state.0.lock().unwrap();
1147        while !lock.0 {
1148            lock = self.state.1.wait(lock).unwrap();
1149        }
1150        lock.1.take().unwrap()
1151    }
1152
1153    /// Take whatever is there, without the stream ending, and stop collecting.
1154    pub fn take(self) -> Vec<T> {
1155        let mut lock = self.state.0.lock().unwrap();
1156        lock.1.take().unwrap()
1157    }
1158}
1159
1160impl<T> Clone for Stream<T> {
1161    fn clone(&self) -> Self {
1162        Stream {
1163            peg: self.peg.clone(),
1164            inner: self.inner.clone(),
1165        }
1166    }
1167}
1168
1169#[cfg(test)]
1170mod test {
1171    use super::*;
1172    use std::sync::mpsc::sync_channel;
1173
1174    #[test]
1175    fn test_sink_auto_traits() {
1176        fn f<X: Sync + Send>(_: X) {}
1177        let sink: Sink<u32> = Sink::new();
1178        f(sink);
1179    }
1180
1181    #[test]
1182    fn test_stream_auto_traits() {
1183        fn f<X: Sync + Send + Clone>(_: X) {}
1184        struct Foo(); // not clonable, but Stream<Foo> should be
1185        let sink: Sink<Foo> = Sink::new();
1186        f(sink.stream());
1187    }
1188
1189    #[test]
1190    fn test_subscription_auto_traits() {
1191        fn f<X: Sync + Send + Clone>(_: X) {}
1192        let sink: Sink<u32> = Sink::new();
1193        let sub = sink.stream().subscribe(|_| {});
1194        f(sub);
1195    }
1196
1197    #[test]
1198    fn test_chained_maps() {
1199        let sink: Sink<u32> = Sink::new();
1200        // the risk is that the intermediary map drops the subscription
1201        // and the entire chain stalls.
1202        let map = sink.stream().map(|x| x + 1).map(|x| x * 2);
1203        let coll = map.collect();
1204        sink.update(0);
1205        sink.update(1);
1206        sink.update(2);
1207        sink.end();
1208        assert_eq!(coll.wait(), vec![2, 4, 6]);
1209    }
1210
1211    #[test]
1212    fn test_of() {
1213        let stream = Stream::of(42);
1214        let (tx, rx) = sync_channel(1);
1215        stream.subscribe(move |x| tx.send(*x.unwrap()).unwrap());
1216        assert_eq!(rx.recv().unwrap(), 42);
1217    }
1218
1219    #[test]
1220    fn test_imitate() {
1221        let sink: Sink<u32> = Sink::new();
1222        let imit: Imitator<u32> = Imitator::new();
1223        let map = sink.stream().map(|x| x * 2);
1224        let coll = imit.stream().collect();
1225        imit.imitate(&map);
1226        sink.update(0);
1227        sink.update(1);
1228        sink.update(2);
1229        sink.end();
1230        assert_eq!(coll.wait(), vec![0, 2, 4]);
1231    }
1232
1233    #[test]
1234    fn test_fold_and_last() {
1235        let sink: Sink<u32> = Sink::new();
1236        // this potentially creates an edge case where
1237        // last might hang on to the rc value that fold has in memory
1238        let fold = sink
1239            .stream()
1240            .fold("|".to_string(), |p, c| format!("{} {}", p, c))
1241            .last();
1242        let coll = fold.collect();
1243        sink.update(42);
1244        sink.end();
1245        assert_eq!(coll.wait(), vec!["| 42".to_string()]);
1246    }
1247
1248    #[test]
1249    fn test_fold_and_remember() {
1250        let sink: Sink<u32> = Sink::new();
1251        // this potentially creates an edge case where
1252        // remember might hang on to the rc value that fold has in memory
1253        let fold = sink
1254            .stream()
1255            .fold("|".to_string(), |p, c| format!("{} {}", p, c))
1256            .remember();
1257        let coll = fold.collect();
1258        sink.update(42);
1259        sink.end();
1260        assert_eq!(coll.wait(), vec!["|".to_string(), "| 42".to_string()]);
1261    }
1262
1263    #[test]
1264    fn test_imitate_cycle() {
1265        let imitator = Stream::imitator();
1266
1267        let fold = imitator
1268            .stream()
1269            .fold(1, |p, c| if *c < 10 { p + c } else { p })
1270            .dedupe();
1271
1272        let sink = Stream::sink();
1273
1274        let merge = Stream::merge(vec![fold, sink.stream()]);
1275        imitator.imitate(&merge);
1276
1277        let coll = merge.collect();
1278
1279        sink.update(1);
1280        assert_eq!(coll.take(), vec![1, 2, 4, 8, 16]);
1281    }
1282
1283    #[test]
1284    fn test_combine() {
1285        let sink1 = Stream::sink();
1286        let sink2 = Stream::sink();
1287
1288        let comb = Stream::combine2(&sink1.stream(), &sink2.stream());
1289
1290        let coll = comb.collect();
1291
1292        sink1.update(0.0);
1293        sink2.update(10);
1294        sink1.update(1.0);
1295        sink1.update(2.0);
1296        sink2.update(11);
1297        sink1.update(3.0);
1298        sink1.end();
1299        sink2.end();
1300
1301        assert_eq!(
1302            coll.wait(),
1303            vec![(0.0, 10), (1.0, 10), (2.0, 10), (2.0, 11), (3.0, 11)]
1304        );
1305    }
1306
1307}