xi/lib.rs
1//! A functional reactive stream library for rust.
2//!
3//! * Small (~20 operations)
4//! * Synchronous
5//! * No dependencies
6//! * Is FRP (ha!)
7//!
8//! Modelled on André Staltz' javascript library [xstream][xstrem] which nicely distills
9//! the ideas of [reactive extensions (Rx)][reactx] down to the essential minimum.
10//!
11//! This library is not FRP (Functional Reactive Programming) in the way it was
12//! defined by Conal Elliot, but as a paradigm that is both functional and reactive.
13//! [Why I cannot say FRP but I just did][notfrp].
14//!
15//! [xstrem]: https://github.com/staltz/xstream
16//! [reactx]: http://reactivex.io
17//! [notfrp]: https://medium.com/@andrestaltz/why-i-cannot-say-frp-but-i-just-did-d5ffaa23973b
18//!
19//! ## Example
20//!
21//! ```
22//! use xi::{Sink, Stream};
23//!
24//! // A sink is an originator of events that form a stream.
25//! let sink: Sink<u32> = Stream::sink();
26//!
27//! // Map the even numbers to their square.
28//! let stream: Stream<u32> = sink.stream()
29//! .filter(|i| i % 2 == 0)
30//! .map(|i| i * i);
31//!
32//! // Print the result
33//! stream.subscribe(|i| if let Some(i) = i {
34//! println!("{}", i)
35//! });
36//!
37//! // Send numbers into the sink.
38//! for i in 0..10 {
39//! sink.update(i);
40//! }
41//! sink.end();
42//! ```
43//!
44//! # Idea
45//!
46//! Functional Reactive Programming is a good foundation for functional programming (FP).
47//! The step-by-step approach of composing interlocked operations, is a relatively
48//! easy way to make an FP structure to a piece of software.
49//!
50//! ## Synchronous
51//!
52//! Libraries that deals with streams as values-over-time (or events) often conflate the
53//! idea of moving data from point A to B, with the operators that transform the data. The
54//! result is that the library must deal with queues of data, queue lengths and backpressure.
55//!
56//! _Xi has no queues_
57//!
58//! Every [`Sink::update()`](struct.Sink.html#method.update) of data into the tree of
59//! operations executes synchronously. Xi has no operators that dispatches "later",
60//! i.e. no `delay()` or other time shifting operations.
61//!
62//! That also means xi also has no internal threads, futures or otherwise.
63//!
64//! ## Thread safe
65//!
66//! Every part of the xi tree is thread safe. You can move a `Sink` into another thread,
67//! or subscribe and propagate on a UI main thread. The thread that calls `Sink::update()` is
68//! the thread executing the entire tree.
69//!
70//! That safety comes at a cost, xi is not a zero cost abstraction library. Every part of
71//! the tree is protected by a mutex lock. This is fine for most applications since a lock
72//! without contention is not much overhead in the execution. But if you plan on having
73//! lots of threads simultaneously updating many values into the tree, you might
74//! experience a performance hit due to lock contention.
75//!
76//! ## Be out of your way
77//!
78//! Xi tries to impose a minimum of cognitive load when using it.
79//!
80//! * Every operator is an `FnMut(&T)` to make it the most usable possible.
81//! * Not require `Sync` and/or `Send` on operator functions.
82//! * Xi stream instances themselves are `Sync` and `Send`.
83//! * Impose a minimum of constraints the event value `T`.
84//!
85//! ## Subscription lifetimes
86//!
87//! See [`Subscription`](struct.Subscription.html#subscription-lifetimes)
88
89#![warn(clippy::all)]
90#![allow(clippy::new_without_default)]
91
92use std::sync::atomic::{AtomicUsize, Ordering};
93use std::sync::{Arc, Condvar, Mutex};
94
95mod imit;
96mod inner;
97mod peg;
98mod sub;
99
100pub use crate::imit::Imitator;
101use crate::inner::{MemoryMode, SafeInner, IMITATORS};
102use crate::peg::Peg;
103pub use crate::sub::Subscription;
104
105/// A stream of events, values in time.
106///
107/// Streams have combinators to build "execution trees" working over events.
108///
109/// ## Memory
110///
111/// Some streams have "memory". Streams with memory keeps a copy of the last value they
112/// produced so that any new subscriber will syncronously receive the value.
113///
114/// Streams with memory are explicitly created using
115/// [`.remember()`](struct.Stream.html#method.remember), but also by other combinators
116/// such as [`.fold()`](struct.Stream.html#method.fold) and
117/// [`.start_with()`](struct.Stream.html#method.start_with).
118pub struct Stream<T: 'static> {
119 #[allow(dead_code)]
120 peg: Peg,
121 inner: SafeInner<T>,
122}
123
124impl<T> Stream<T> {
125 //
126
127 /// Create a sink that is used to push values into a stream.
128 ///
129 /// ```
130 /// let sink = xi::Stream::sink();
131 ///
132 /// // collect values going into the sink
133 /// let coll = sink.stream().collect();
134 ///
135 /// sink.update(0);
136 /// sink.update(1);
137 /// sink.update(2);
138 /// sink.end();
139 ///
140 /// assert_eq!(coll.wait(), vec![0, 1, 2]);
141 /// ```
142 pub fn sink() -> Sink<T> {
143 Sink::new()
144 }
145
146 /// Create a stream with memory that only emits one single value to anyone subscribing.
147 ///
148 /// ```
149 /// let value = xi::Stream::of(42);
150 ///
151 /// // both collectors will receive the value
152 /// let coll1 = value.collect();
153 /// let coll2 = value.collect();
154 ///
155 /// // use .take() since stream doesn't end
156 /// assert_eq!(coll1.take(), [42]);
157 /// assert_eq!(coll2.take(), [42]);
158 /// ```
159 pub fn of(value: T) -> Stream<T>
160 where
161 T: Clone,
162 {
163 let inner = SafeInner::new(MemoryMode::KeepUntilEnd, Some(value));
164 Stream {
165 peg: Peg::new_fake(),
166 inner,
167 }
168 }
169
170 /// Create a stream that never emits any value and never ends.
171 ///
172 /// ```
173 /// use xi::Stream;
174 ///
175 /// let never: Stream<u32> = Stream::never();
176 /// let coll = never.collect();
177 /// assert_eq!(coll.take(), vec![]);
178 /// ```
179 pub fn never() -> Stream<T> {
180 let inner = SafeInner::new(MemoryMode::NoMemory, None);
181 Stream {
182 peg: Peg::new_fake(),
183 inner,
184 }
185 }
186
187 /// Check if this stream has "memory".
188 ///
189 /// Streams with memory keeps a copy of the last value they produced so that any
190 /// new subscriber will syncronously receive the value.
191 ///
192 /// Streams with memory are explicitly created using `.remember()`, but also by
193 /// other combinators such as `.fold()` and `.start_with()`.
194 ///
195 /// The memory is not inherited to child combinators. I.e.
196 ///
197 /// ```
198 /// let sink = xi::Stream::sink();
199 /// sink.update(0);
200 ///
201 /// // This stream has memory.
202 /// let rem = sink.stream().remember();
203 ///
204 /// // This filtered stream has _NO_ memory.
205 /// let filt = rem.filter(|t| *t > 10);
206 ///
207 /// assert!(rem.has_memory());
208 /// assert!(!filt.has_memory());
209 /// ```
210 pub fn has_memory(&self) -> bool {
211 self.inner.lock().memory_mode().is_memory()
212 }
213
214 /// Creates an imitator. Imitators are used to make cyclic streams.
215 ///
216 ///
217 pub fn imitator() -> Imitator<T>
218 where
219 T: Clone,
220 {
221 Imitator::new()
222 }
223
224 /// Subscribe to events from this stream. The returned subscription can be used to
225 /// unsubscribe at a future time.
226 ///
227 /// Each value is wrapped in an `Option`, there will be exactly one None event when
228 /// the stream ends.
229 ///
230 /// ```
231 /// let sink = xi::Stream::sink();
232 /// let stream = sink.stream();
233 ///
234 /// let handle = std::thread::spawn(move || {
235 ///
236 /// // values are Some(0), Some(1), Some(2), None
237 /// stream.subscribe(|v| if let Some(v) = v {
238 /// println!("Got value: {}", v);
239 /// });
240 ///
241 /// // stall thread until stream ends.
242 /// stream.wait();
243 /// });
244 ///
245 /// sink.update(0);
246 /// sink.update(1);
247 /// sink.update(2);
248 /// sink.end();
249 ///
250 /// handle.join();
251 /// ```
252 pub fn subscribe<F>(&self, f: F) -> Subscription
253 where
254 F: FnMut(Option<&T>) + 'static,
255 {
256 let peg = self.inner.lock().add(f);
257 peg.keep_mode();
258 Subscription::new(peg)
259 }
260
261 /// Internal subscribe that stops subscribing if the subscription goes out of scope.
262 fn internal_subscribe<F: FnMut(Option<&T>) + 'static>(&self, f: F) -> Peg {
263 let mut peg = self.inner.lock().add(f);
264 peg.add_related(self.peg.clone());
265 peg
266 }
267
268 /// Collect events into a `Collector`. This is mostly interesting for testing.
269 ///
270 /// ```
271 /// let sink = xi::Stream::sink();
272 ///
273 /// // collect all values emitted into the sink
274 /// let coll = sink.stream().collect();
275 ///
276 /// std::thread::spawn(move || {
277 /// sink.update(0);
278 /// sink.update(1);
279 /// sink.update(2);
280 /// sink.end();
281 /// });
282 ///
283 /// let result = coll.wait(); // wait for stream to end
284 /// assert_eq!(result, vec![0, 1, 2]);
285 /// ```
286 pub fn collect(&self) -> Collector<T>
287 where
288 T: Clone,
289 {
290 let state = Arc::new((Mutex::new((false, Some(vec![]))), Condvar::new()));
291 let clone = state.clone();
292 let peg = self.internal_subscribe(move |t| {
293 let mut lock = clone.0.lock().unwrap();
294 if let Some(t) = t {
295 if let Some(v) = lock.1.as_mut() {
296 v.push(t.clone());
297 }
298 } else {
299 lock.0 = true;
300 clone.1.notify_all();
301 }
302 });
303 Collector { peg, state }
304 }
305
306 /// Dedupe stream by the event itself.
307 ///
308 /// This clones every event to compare with the next.
309 ///
310 /// ```
311 /// let sink = xi::Stream::sink();
312 ///
313 /// let deduped = sink.stream().dedupe();
314 ///
315 /// let coll = deduped.collect();
316 ///
317 /// sink.update(0);
318 /// sink.update(0);
319 /// sink.update(1);
320 /// sink.update(1);
321 /// sink.end();
322 ///
323 /// assert_eq!(coll.wait(), vec![0, 1]);
324 /// ```
325 pub fn dedupe(&self) -> Stream<T>
326 where
327 T: Clone + PartialEq,
328 {
329 self.dedupe_by(|v| v.clone())
330 }
331
332 /// Dedupe stream by some extracted value.
333 ///
334 /// ```
335 /// use xi::{Stream, Sink};
336 ///
337 /// #[derive(Clone, Debug)]
338 /// struct Foo(&'static str, usize);
339 ///
340 /// let sink: Sink<Foo> = Stream::sink();
341 ///
342 /// // dedupe this stream of Foo on the contained usize
343 /// let deduped = sink.stream().dedupe_by(|v| v.1);
344 ///
345 /// let coll = deduped.collect();
346 ///
347 /// sink.update(Foo("yo", 1));
348 /// sink.update(Foo("bro", 1));
349 /// sink.update(Foo("lo", 2));
350 /// sink.update(Foo("lo", 2));
351 /// sink.end();
352 ///
353 /// assert_eq!(format!("{:?}", coll.wait()),
354 /// "[Foo(\"yo\", 1), Foo(\"lo\", 2)]");
355 /// ```
356 pub fn dedupe_by<U, F>(&self, mut f: F) -> Stream<T>
357 where
358 U: PartialEq + 'static,
359 F: FnMut(&T) -> U + 'static,
360 {
361 let inner = SafeInner::new(MemoryMode::NoMemory, None);
362 let inner_clone = inner.clone();
363 let mut prev: Option<U> = None;
364 let peg = self.internal_subscribe(move |t| {
365 if let Some(t) = t {
366 let propagate = match (prev.take(), f(t)) {
367 (None, u) => {
368 // no previous value, save this and propagate
369 prev = Some(u);
370 true
371 }
372 (Some(pu), u) => {
373 if pu != u {
374 // new value is different to previous, save and propagate
375 prev = Some(u);
376 true
377 } else {
378 // new value is same as before, don't propagate
379 false
380 }
381 }
382 };
383 if propagate {
384 inner_clone.lock().update_borrowed(Some(t));
385 }
386 } else {
387 inner_clone.lock().update_borrowed(t);
388 }
389 });
390 Stream { peg, inner }
391 }
392
393 /// Drop an amount of initial values.
394 ///
395 /// ```
396 /// let sink = xi::Stream::sink();
397 ///
398 /// // drop 2 initial values
399 /// let dropped = sink.stream().drop(2);
400 ///
401 /// let coll = dropped.collect();
402 ///
403 /// sink.update(0);
404 /// sink.update(1);
405 /// sink.update(2);
406 /// sink.update(3);
407 /// sink.end();
408 ///
409 /// assert_eq!(coll.wait(), vec![2, 3]);
410 /// ```
411 pub fn drop(&self, amount: usize) -> Stream<T> {
412 let mut todo = amount + 1;
413 self.drop_while(move |_| {
414 if todo > 0 {
415 todo -= 1;
416 }
417 todo > 0
418 })
419 }
420
421 /// Don't take values while some condition holds true. Once the condition is false,
422 /// the resulting stream emits all events.
423 ///
424 /// ```
425 /// let sink = xi::Stream::sink();
426 ///
427 /// // drop initial odd values
428 /// let dropped = sink.stream().drop_while(|v| v % 2 == 1);
429 ///
430 /// let coll = dropped.collect();
431 ///
432 /// sink.update(1);
433 /// sink.update(3);
434 /// sink.update(4);
435 /// sink.update(5); // not dropped
436 /// sink.end();
437 ///
438 /// assert_eq!(coll.wait(), vec![4, 5]);
439 /// ```
440 pub fn drop_while<F>(&self, mut f: F) -> Stream<T>
441 where
442 F: FnMut(&T) -> bool + 'static,
443 {
444 let inner = SafeInner::new(MemoryMode::NoMemory, None);
445 let inner_clone = inner.clone();
446 let mut dropping = true;
447 let peg = self.internal_subscribe(move |t| {
448 if let Some(t) = t {
449 if dropping && !f(t) {
450 dropping = false;
451 }
452 if dropping {
453 return;
454 }
455 inner_clone.lock().update_borrowed(Some(t));
456 } else {
457 inner_clone.lock().update_borrowed(t);
458 }
459 });
460 Stream { peg, inner }
461 }
462
463 /// Produce a stream that ends when some other stream ends.
464 ///
465 /// ```
466 /// use xi::Stream;
467 ///
468 /// let sink1 = Stream::sink();
469 /// let sink2 = Stream::sink();
470 ///
471 /// // ending shows values of sink1, but ends when sink2 does.
472 /// let ending = sink1.stream().end_when(&sink2.stream());
473 ///
474 /// let coll = ending.collect();
475 ///
476 /// sink1.update(0);
477 /// sink2.update("yo");
478 /// sink1.update(1);
479 /// sink2.end();
480 /// sink1.update(2); // collector never sees this value
481 ///
482 /// assert_eq!(coll.wait(), [0, 1]);
483 /// ```
484 pub fn end_when<U>(&self, other: &Stream<U>) -> Stream<T> {
485 let inner = SafeInner::new(MemoryMode::NoMemory, None);
486 let inner_clone1 = inner.clone();
487 let inner_clone2 = inner.clone();
488 let peg1 = other.internal_subscribe(move |o| {
489 if o.is_none() {
490 inner_clone1.lock().update_borrowed(None);
491 }
492 });
493 let peg2 = self.internal_subscribe(move |t| {
494 inner_clone2.lock().update_borrowed(t);
495 });
496 let peg = Peg::many(vec![peg1, peg2]);
497 Stream { peg, inner }
498 }
499
500 /// Filter out a subset of the events in the stream.
501 ///
502 /// ```
503 /// let sink = xi::Stream::sink();
504 ///
505 /// // keep even numbers
506 /// let filtered = sink.stream().filter(|v| v % 2 == 0);
507 ///
508 /// let coll = filtered.collect();
509 ///
510 /// sink.update(0);
511 /// sink.update(1);
512 /// sink.update(2);
513 /// sink.end();
514 ///
515 /// assert_eq!(coll.wait(), vec![0, 2]);
516 /// ```
517 pub fn filter<F>(&self, mut f: F) -> Stream<T>
518 where
519 F: FnMut(&T) -> bool + 'static,
520 {
521 let inner = SafeInner::new(MemoryMode::NoMemory, None);
522 let inner_clone = inner.clone();
523 let peg = self.internal_subscribe(move |t| {
524 if let Some(t) = t {
525 if f(t) {
526 inner_clone.lock().update_borrowed(Some(t));
527 }
528 } else {
529 inner_clone.lock().update_borrowed(t);
530 }
531 });
532 Stream { peg, inner }
533 }
534
535 /// Combine events from the past, with new events to produce an output.
536 ///
537 /// This is roughly equivalent to a "fold" or "reduce" over an array. For each event we
538 /// emit the latest state out. The seed value is emitted straight away.
539 ///
540 /// The result is always a "memory" stream.
541 ///
542 /// ```
543 /// let sink = xi::Stream::sink();
544 ///
545 /// let folded = sink.stream()
546 /// .fold(40.5, |prev, next| prev + (*next as f32) / 2.0);
547 ///
548 /// let coll = folded.collect();
549 ///
550 /// sink.update(0);
551 /// sink.update(1);
552 /// sink.update(2);
553 /// sink.end();
554 ///
555 /// assert_eq!(coll.wait(), vec![40.5, 40.5, 41.0, 42.0]);
556 /// ```
557 pub fn fold<U, F>(&self, seed: U, mut f: F) -> Stream<U>
558 where
559 U: 'static,
560 F: FnMut(U, &T) -> U + 'static,
561 {
562 let inner = SafeInner::new(MemoryMode::KeepUntilEnd, Some(seed));
563 let inner_clone = inner.clone();
564 let peg = self.internal_subscribe(move |t| {
565 if let Some(t) = t {
566 let mut lock = inner_clone.lock();
567 if let Some(prev) = lock.take_memory() {
568 let next = f(prev, t);
569 lock.update_owned(Some(next));
570 } else {
571 panic!("fold without a previous value");
572 }
573 } else {
574 inner_clone.lock().update_owned(None);
575 }
576 });
577 Stream { peg, inner }
578 }
579
580 /// Internal imitate for imitator.
581 fn imitate(&self, imitator: SafeInner<T>) -> Peg
582 where
583 T: Clone,
584 {
585 self.internal_subscribe(move |t| {
586 let imitator_clone = imitator.clone();
587 if t.is_some() {
588 let t_clone = t.cloned();
589 IMITATORS.with(|imit_cell| {
590 let mut imit = imit_cell.borrow_mut();
591 imit.push(Box::new(move || {
592 // this is one clone too many. if we could use
593 // Box<FnOnce> on stable, we would do that instead
594 let t = t_clone.clone();
595 imitator_clone.lock().update_owned(t.clone());
596 }));
597 });
598 } else {
599 imitator_clone.lock().update_owned(None);
600 }
601 })
602 }
603
604 /// Emits the last seen event when the stream closes.
605 ///
606 /// ```
607 /// let sink = xi::Stream::sink();
608 ///
609 /// let coll = sink.stream().last().collect();
610 ///
611 /// sink.update(0);
612 /// sink.update(1);
613 /// sink.update(2);
614 /// sink.end();
615 ///
616 /// assert_eq!(coll.wait(), vec![2]);
617 /// ```
618 pub fn last(&self) -> Stream<T>
619 where
620 T: Clone,
621 {
622 let inner = SafeInner::new(MemoryMode::NoMemory, None);
623 let inner_clone = inner.clone();
624 let last = Mutex::new(None);
625 let peg = self.internal_subscribe(move |t| {
626 let mut lock = last.lock().unwrap();
627 if t.is_some() {
628 *lock = t.cloned();
629 } else {
630 let mut ilock = inner_clone.lock();
631 if let Some(l) = lock.take() {
632 ilock.update_owned(Some(l));
633 }
634 ilock.update_owned(None);
635 }
636 });
637 Stream { peg, inner }
638 }
639
640 /// Transform events.
641 ///
642 /// ```
643 /// let sink = xi::Stream::sink();
644 ///
645 /// let mapped = sink.stream().map(|v| format!("yo {}", v));
646 ///
647 /// let coll = mapped.collect();
648 ///
649 /// sink.update(41);
650 /// sink.update(42);
651 /// sink.end();
652 ///
653 /// assert_eq!(coll.wait(),
654 /// vec!["yo 41".to_string(), "yo 42".to_string()]);
655 /// ```
656 pub fn map<U, F>(&self, mut f: F) -> Stream<U>
657 where
658 U: 'static,
659 F: FnMut(&T) -> U + 'static,
660 {
661 let inner = SafeInner::new(MemoryMode::NoMemory, None);
662 let inner_clone = inner.clone();
663 let peg = self.internal_subscribe(move |t| {
664 if let Some(t) = t {
665 let u = f(t);
666 inner_clone.lock().update_owned(Some(u));
667 } else {
668 inner_clone.lock().update_owned(None);
669 }
670 });
671 Stream { peg, inner }
672 }
673
674 /// For every event, emit a static value.
675 ///
676 /// ```
677 /// let sink = xi::Stream::sink();
678 ///
679 /// let mapped = sink.stream().map_to(42.0);
680 ///
681 /// let coll = mapped.collect();
682 ///
683 /// sink.update(0);
684 /// sink.update(1);
685 /// sink.end();
686 ///
687 /// assert_eq!(coll.wait(), vec![42.0, 42.0]);
688 /// ```
689 pub fn map_to<U>(&self, u: U) -> Stream<U>
690 where
691 U: Clone + 'static,
692 {
693 self.map(move |_| u.clone())
694 }
695
696 /// Merge events from a bunch of streams to one stream.
697 ///
698 /// ```
699 /// use xi::Stream;
700 ///
701 /// let sink1 = Stream::sink();
702 /// let sink2 = Stream::sink();
703 ///
704 /// let merged = Stream::merge(vec![
705 /// sink1.stream(),
706 /// sink2.stream()
707 /// ]);
708 ///
709 /// let coll = merged.collect();
710 ///
711 /// sink1.update(0);
712 /// sink2.update(10);
713 /// sink1.update(1);
714 /// sink2.update(11);
715 /// sink1.end();
716 /// sink2.end();
717 ///
718 /// assert_eq!(coll.wait(), vec![0, 10, 1, 11]);
719 /// ```
720 pub fn merge(streams: Vec<Stream<T>>) -> Stream<T> {
721 let inner = SafeInner::new(MemoryMode::NoMemory, None);
722 let inner_clone = inner.clone();
723 let active = Arc::new(AtomicUsize::new(streams.len()));
724 let pegs: Vec<_> = streams
725 .into_iter()
726 .map(|stream| {
727 let inner_clone = inner_clone.clone();
728 let active = active.clone();
729 stream.internal_subscribe(move |t| {
730 if t.is_some() {
731 inner_clone.lock().update_borrowed(t);
732 } else if active.fetch_sub(1, Ordering::SeqCst) == 1 {
733 // all streams are ended. close the merged one
734 inner_clone.lock().update_borrowed(None);
735 }
736 })
737 })
738 .collect();
739 let peg = Peg::many(pegs);
740 Stream { peg, inner }
741 }
742
743 /// Make a stream in memory mode. Each value is remembered for future subscribers.
744 ///
745 /// ```
746 /// let sink = xi::Stream::sink();
747 ///
748 /// let rem = sink.stream().remember();
749 ///
750 /// sink.update(0);
751 /// sink.update(1);
752 ///
753 /// // receives last "remembered" value
754 /// let coll = rem.collect();
755 ///
756 /// sink.update(2);
757 /// sink.end();
758 ///
759 /// assert_eq!(coll.wait(), vec![1, 2]);
760 /// ```
761 pub fn remember(&self) -> Stream<T>
762 where
763 T: Clone,
764 {
765 self.remember_mode(MemoryMode::KeepUntilEnd)
766 }
767
768 /// Internal remember where we can chose "mode"
769 fn remember_mode(&self, mode: MemoryMode) -> Stream<T>
770 where
771 T: Clone,
772 {
773 let inner = SafeInner::new(mode, None);
774 let inner_clone = inner.clone();
775 let peg = self.internal_subscribe(move |t| {
776 let t = t.cloned();
777 inner_clone.lock().update_owned(t);
778 });
779 Stream { peg, inner }
780 }
781
782 /// On every event in this stream, combine with the last value of the other stream.
783 ///
784 /// ```
785 /// use xi::Stream;
786 ///
787 /// let sink1 = Stream::sink();
788 /// let sink2 = Stream::sink();
789 ///
790 /// let comb = sink1.stream().sample_combine(&sink2.stream());
791 ///
792 /// let coll = comb.collect();
793 ///
794 /// sink1.update(0); // lost, because no value in sink2
795 /// sink2.update("foo"); // doesn't trigger combine
796 /// sink1.update(1);
797 /// sink1.update(2);
798 /// sink2.update("bar");
799 /// sink2.end(); // sink2 is "bar" forever
800 /// sink1.update(3);
801 /// sink1.end();
802 ///
803 /// assert_eq!(coll.wait(),
804 /// vec![(1, "foo"), (2, "foo"), (3, "bar")])
805 /// ```
806 pub fn sample_combine<U>(&self, other: &Stream<U>) -> Stream<(T, U)>
807 where
808 T: Clone,
809 U: Clone,
810 {
811 let inner = SafeInner::new(MemoryMode::NoMemory, None);
812 let inner_clone = inner.clone();
813 let rem = other.remember_mode(MemoryMode::KeepAfterEnd);
814 let peg = self.internal_subscribe(move |t| {
815 if let Some(t) = t {
816 let rlock = rem.inner.lock();
817 if let Some(u) = rlock.peek_memory().as_ref() {
818 // we have both t and u
819 let v = (t.clone(), u.clone());
820 inner_clone.lock().update_owned(Some(v));
821 }
822 } else {
823 inner_clone.lock().update_borrowed(None);
824 }
825 });
826 Stream { peg, inner }
827 }
828
829 /// Prepend a start value to the stream. The result is a memory stream.
830 ///
831 /// ```
832 /// let sink = xi::Stream::sink();
833 ///
834 /// sink.update(0); // lost
835 ///
836 /// let started = sink.stream().start_with(1);
837 ///
838 /// let coll = started.collect(); // receives 1 and following
839 ///
840 /// sink.update(2);
841 /// sink.end();
842 ///
843 /// assert_eq!(coll.wait(), vec![1, 2]);
844 /// ```
845 pub fn start_with(&self, start: T) -> Stream<T> {
846 let inner = SafeInner::new(MemoryMode::KeepUntilEnd, Some(start));
847 let inner_clone = inner.clone();
848 let peg = self.internal_subscribe(move |t| {
849 inner_clone.lock().update_borrowed(t);
850 });
851 Stream { peg, inner }
852 }
853
854 /// Take a number of events, then end the stream.
855 ///
856 /// ```
857 /// let sink = xi::Stream::sink();
858 ///
859 /// let take2 = sink.stream().take(2);
860 ///
861 /// let coll = take2.collect();
862 ///
863 /// sink.update(0);
864 /// sink.update(1); // take2 ends here
865 /// sink.update(2);
866 ///
867 /// assert_eq!(coll.wait(), vec![0, 1]);
868 /// ```
869 pub fn take(&self, amount: usize) -> Stream<T> {
870 let mut todo = amount + 1;
871 self.take_while(move |_| {
872 if todo > 0 {
873 todo -= 1;
874 }
875 todo > 0
876 })
877 }
878
879 /// Take events from the stream as long as a condition holds true.
880 ///
881 /// ```
882 /// let sink = xi::Stream::sink();
883 ///
884 /// // take events as long as they are even
885 /// let take = sink.stream().take_while(|v| *v % 2 == 0);
886 ///
887 /// let coll = take.collect();
888 ///
889 /// sink.update(0);
890 /// sink.update(2);
891 /// sink.update(3); // take ends here
892 /// sink.update(4);
893 ///
894 /// assert_eq!(coll.wait(), vec![0, 2]);
895 /// ```
896 pub fn take_while<F>(&self, mut f: F) -> Stream<T>
897 where
898 F: FnMut(&T) -> bool + 'static,
899 {
900 let inner = SafeInner::new(MemoryMode::NoMemory, None);
901 let inner_clone = inner.clone();
902 let peg = self.internal_subscribe(move |t| {
903 if let Some(t) = t {
904 if f(t) {
905 inner_clone.lock().update_borrowed(Some(t));
906 } else {
907 inner_clone.lock().update_borrowed(None);
908 }
909 } else {
910 inner_clone.lock().update_borrowed(t);
911 }
912 });
913 Stream { peg, inner }
914 }
915
916 /// Stalls calling thread until the stream ends.
917 ///
918 /// ```
919 /// let sink = xi::Stream::sink();
920 /// let stream = sink.stream();
921 ///
922 /// std::thread::spawn(move || {
923 /// sink.update(0);
924 /// sink.update(1);
925 /// sink.update(2);
926 /// sink.end(); // this releases the wait
927 /// });
928 ///
929 /// stream.wait(); // wait for other thread
930 /// ```
931 #[allow(clippy::mutex_atomic)]
932 pub fn wait(&self) {
933 let pair = Arc::new((Mutex::new(false), Condvar::new()));
934 let pair2 = pair.clone();
935 let _sub = self.internal_subscribe(move |t| {
936 if t.is_none() {
937 let mut lock = pair2.0.lock().unwrap();
938 *lock = true;
939 pair2.1.notify_all();
940 }
941 });
942 let mut lock = pair.0.lock().unwrap();
943 while !*lock {
944 lock = pair.1.wait(lock).unwrap();
945 }
946 }
947}
948
949impl<T> Stream<Stream<T>> {
950 //
951
952 /// Flatten out a stream of streams, sequentially.
953 ///
954 /// For each new stream, unsubscribe from the previous, and subscribe to the new. The new
955 /// stream "interrupts" the previous stream.
956 ///
957 /// ```
958 /// use xi::{Stream, Sink};
959 ///
960 /// let sink1: Sink<Stream<u32>> = Stream::sink();
961 /// let sink2: Sink<u32> = Stream::sink();
962 /// let sink3: Sink<u32> = Stream::sink();
963 ///
964 /// let flat = sink1.stream().flatten();
965 ///
966 /// let coll = flat.collect();
967 ///
968 /// sink2.update(0); // lost
969 ///
970 /// sink1.update(sink2.stream());
971 /// sink2.update(1);
972 /// sink2.update(2);
973 /// sink2.end(); // does not end sink1
974 ///
975 /// sink3.update(10); // lost
976 ///
977 /// sink1.update(sink3.stream());
978 /// sink3.update(11);
979 ///
980 /// sink1.end();
981 ///
982 /// assert_eq!(coll.wait(), vec![1, 2, 11]);
983 /// ```
984 pub fn flatten(&self) -> Stream<T> {
985 let inner = SafeInner::new(MemoryMode::NoMemory, None);
986 let inner_clone = inner.clone();
987 let mut ipeg = None;
988 let peg = self.internal_subscribe(move |ts| {
989 if let Some(ts) = ts {
990 let inner_clone = inner_clone.clone();
991 ipeg = Some(ts.internal_subscribe(move |tv| {
992 if let Some(tv) = tv {
993 inner_clone.lock().update_borrowed(Some(tv));
994 } else {
995 // inner stream end does nothing to outer
996 }
997 }));
998 } else {
999 ipeg.take();
1000 inner_clone.lock().update_borrowed(None);
1001 }
1002 });
1003 Stream { peg, inner }
1004 }
1005
1006 /// Flatten out a stream of streams, concurrently.
1007 ///
1008 /// For each new stream, keep the previous, and subscribe to the new.
1009 ///
1010 /// ```
1011 /// use xi::{Stream, Sink};
1012 ///
1013 /// let sink1: Sink<Stream<u32>> = Stream::sink();
1014 /// let sink2: Sink<u32> = Stream::sink();
1015 /// let sink3: Sink<u32> = Stream::sink();
1016 ///
1017 /// let flat = sink1.stream().flatten_concurrently();
1018 ///
1019 /// let coll = flat.collect();
1020 ///
1021 /// sink2.update(0); // lost
1022 ///
1023 /// sink1.update(sink2.stream());
1024 /// sink2.update(1);
1025 /// sink2.update(2);
1026 ///
1027 /// sink3.update(10); // lost
1028 ///
1029 /// sink1.update(sink3.stream());
1030 /// sink3.update(11);
1031 /// sink2.update(3);
1032 /// sink3.update(12);
1033 ///
1034 /// sink1.end();
1035 ///
1036 /// assert_eq!(coll.wait(), vec![1, 2, 11, 3, 12]);
1037 /// ```
1038 pub fn flatten_concurrently(&self) -> Stream<T> {
1039 let inner = SafeInner::new(MemoryMode::NoMemory, None);
1040 let inner_clone = inner.clone();
1041 let peg = self.internal_subscribe(move |ts| {
1042 if let Some(ts) = ts {
1043 let inner_clone = inner_clone.clone();
1044 let ipeg = ts.internal_subscribe(move |tv| {
1045 if let Some(tv) = tv {
1046 inner_clone.lock().update_borrowed(Some(tv));
1047 } else {
1048 // inner stream end does nothing to outer
1049 }
1050 });
1051 ipeg.keep_mode(); // we drop ipeg, but keep listening
1052 } else {
1053 inner_clone.lock().update_borrowed(None);
1054 }
1055 });
1056 Stream { peg, inner }
1057 }
1058}
1059
1060include!("./comb.rs");
1061
1062/// A sink is a producer of events. Created by [`Stream::sink()`](struct.Stream.html#method.sink).
1063pub struct Sink<T: 'static> {
1064 inner: SafeInner<T>,
1065}
1066impl<T> Sink<T> {
1067 /// Create a new sink that in turn is used to stream events.
1068 fn new() -> Self {
1069 Sink {
1070 inner: SafeInner::new(MemoryMode::NoMemory, None),
1071 }
1072 }
1073
1074 /// Get a stream of events from this sink. One stream instance is created for each call,
1075 /// and they all receive the events from the sink.
1076 ///
1077 /// ```
1078 /// let sink = xi::Stream::sink();
1079 ///
1080 /// let stream1 = sink.stream();
1081 /// let stream2 = sink.stream();
1082 ///
1083 /// let coll1 = stream1.collect();
1084 /// let coll2 = stream1.collect();
1085 ///
1086 /// sink.update(42);
1087 /// sink.end();
1088 ///
1089 /// assert_eq!(coll1.wait(), vec![42]);
1090 /// assert_eq!(coll2.wait(), vec![42]);
1091 /// ```
1092 pub fn stream(&self) -> Stream<T> {
1093 Stream {
1094 peg: Peg::new_fake(),
1095 inner: self.inner.clone(),
1096 }
1097 }
1098
1099 /// Update a value into this sink.
1100 ///
1101 /// The execution of the combinators "hanging" off this sink is (thread safe) and
1102 /// synchronous. In other words, there is nothing in xi itself that will still be
1103 /// "to do" once the `update()` call returns.
1104 ///
1105 /// Each value is wrapped in an `Option` towards subscribers of the streams.
1106 ///
1107 /// ```
1108 /// let sink = xi::Stream::sink();
1109 /// let stream = sink.stream();
1110 ///
1111 /// stream.subscribe(|v| {
1112 /// // v is Some(0), Some(1), None
1113 /// });
1114 ///
1115 /// sink.update(0);
1116 /// sink.update(1);
1117 /// sink.end();
1118 /// ```
1119 pub fn update(&self, next: T) {
1120 self.inner.lock().update_and_imitate(Some(next));
1121 }
1122
1123 /// End the stream of events. Consumes the instance since no more values are to go into it.
1124 ///
1125 /// Subscribers will se a `None` value.
1126 ///
1127 /// Every stream hanging directly off this sink will also end. The exception is streams
1128 /// combining input from multiple source streams.
1129 pub fn end(self) {
1130 self.inner.lock().update_and_imitate(None);
1131 }
1132}
1133
1134/// The collector instance collects values from a stream. Created by
1135/// [`Stream::collect()`](struct.Stream.html#method.collect).
1136pub struct Collector<T> {
1137 #[allow(dead_code)]
1138 peg: Peg,
1139 #[allow(clippy::type_complexity)]
1140 state: Arc<(Mutex<(bool, Option<Vec<T>>)>, Condvar)>,
1141}
1142
1143impl<T> Collector<T> {
1144 /// Stall the thread and wait for the stream to end.
1145 pub fn wait(self) -> Vec<T> {
1146 let mut lock = self.state.0.lock().unwrap();
1147 while !lock.0 {
1148 lock = self.state.1.wait(lock).unwrap();
1149 }
1150 lock.1.take().unwrap()
1151 }
1152
1153 /// Take whatever is there, without the stream ending, and stop collecting.
1154 pub fn take(self) -> Vec<T> {
1155 let mut lock = self.state.0.lock().unwrap();
1156 lock.1.take().unwrap()
1157 }
1158}
1159
1160impl<T> Clone for Stream<T> {
1161 fn clone(&self) -> Self {
1162 Stream {
1163 peg: self.peg.clone(),
1164 inner: self.inner.clone(),
1165 }
1166 }
1167}
1168
1169#[cfg(test)]
1170mod test {
1171 use super::*;
1172 use std::sync::mpsc::sync_channel;
1173
1174 #[test]
1175 fn test_sink_auto_traits() {
1176 fn f<X: Sync + Send>(_: X) {}
1177 let sink: Sink<u32> = Sink::new();
1178 f(sink);
1179 }
1180
1181 #[test]
1182 fn test_stream_auto_traits() {
1183 fn f<X: Sync + Send + Clone>(_: X) {}
1184 struct Foo(); // not clonable, but Stream<Foo> should be
1185 let sink: Sink<Foo> = Sink::new();
1186 f(sink.stream());
1187 }
1188
1189 #[test]
1190 fn test_subscription_auto_traits() {
1191 fn f<X: Sync + Send + Clone>(_: X) {}
1192 let sink: Sink<u32> = Sink::new();
1193 let sub = sink.stream().subscribe(|_| {});
1194 f(sub);
1195 }
1196
1197 #[test]
1198 fn test_chained_maps() {
1199 let sink: Sink<u32> = Sink::new();
1200 // the risk is that the intermediary map drops the subscription
1201 // and the entire chain stalls.
1202 let map = sink.stream().map(|x| x + 1).map(|x| x * 2);
1203 let coll = map.collect();
1204 sink.update(0);
1205 sink.update(1);
1206 sink.update(2);
1207 sink.end();
1208 assert_eq!(coll.wait(), vec![2, 4, 6]);
1209 }
1210
1211 #[test]
1212 fn test_of() {
1213 let stream = Stream::of(42);
1214 let (tx, rx) = sync_channel(1);
1215 stream.subscribe(move |x| tx.send(*x.unwrap()).unwrap());
1216 assert_eq!(rx.recv().unwrap(), 42);
1217 }
1218
1219 #[test]
1220 fn test_imitate() {
1221 let sink: Sink<u32> = Sink::new();
1222 let imit: Imitator<u32> = Imitator::new();
1223 let map = sink.stream().map(|x| x * 2);
1224 let coll = imit.stream().collect();
1225 imit.imitate(&map);
1226 sink.update(0);
1227 sink.update(1);
1228 sink.update(2);
1229 sink.end();
1230 assert_eq!(coll.wait(), vec![0, 2, 4]);
1231 }
1232
1233 #[test]
1234 fn test_fold_and_last() {
1235 let sink: Sink<u32> = Sink::new();
1236 // this potentially creates an edge case where
1237 // last might hang on to the rc value that fold has in memory
1238 let fold = sink
1239 .stream()
1240 .fold("|".to_string(), |p, c| format!("{} {}", p, c))
1241 .last();
1242 let coll = fold.collect();
1243 sink.update(42);
1244 sink.end();
1245 assert_eq!(coll.wait(), vec!["| 42".to_string()]);
1246 }
1247
1248 #[test]
1249 fn test_fold_and_remember() {
1250 let sink: Sink<u32> = Sink::new();
1251 // this potentially creates an edge case where
1252 // remember might hang on to the rc value that fold has in memory
1253 let fold = sink
1254 .stream()
1255 .fold("|".to_string(), |p, c| format!("{} {}", p, c))
1256 .remember();
1257 let coll = fold.collect();
1258 sink.update(42);
1259 sink.end();
1260 assert_eq!(coll.wait(), vec!["|".to_string(), "| 42".to_string()]);
1261 }
1262
1263 #[test]
1264 fn test_imitate_cycle() {
1265 let imitator = Stream::imitator();
1266
1267 let fold = imitator
1268 .stream()
1269 .fold(1, |p, c| if *c < 10 { p + c } else { p })
1270 .dedupe();
1271
1272 let sink = Stream::sink();
1273
1274 let merge = Stream::merge(vec![fold, sink.stream()]);
1275 imitator.imitate(&merge);
1276
1277 let coll = merge.collect();
1278
1279 sink.update(1);
1280 assert_eq!(coll.take(), vec![1, 2, 4, 8, 16]);
1281 }
1282
1283 #[test]
1284 fn test_combine() {
1285 let sink1 = Stream::sink();
1286 let sink2 = Stream::sink();
1287
1288 let comb = Stream::combine2(&sink1.stream(), &sink2.stream());
1289
1290 let coll = comb.collect();
1291
1292 sink1.update(0.0);
1293 sink2.update(10);
1294 sink1.update(1.0);
1295 sink1.update(2.0);
1296 sink2.update(11);
1297 sink1.update(3.0);
1298 sink1.end();
1299 sink2.end();
1300
1301 assert_eq!(
1302 coll.wait(),
1303 vec![(0.0, 10), (1.0, 10), (2.0, 10), (2.0, 11), (3.0, 11)]
1304 );
1305 }
1306
1307}