callbag/
lib.rs

1//! # Rust implementation of the [callbag spec][callbag-spec] for reactive/iterable programming
2//!
3//! Basic [callbag][callbag-spec] factories and operators to get started with.
4//!
5//! **Highlights:**
6//!
7//! - Supports reactive stream programming
8//! - Supports iterable programming (also!)
9//! - Same operator works for both of the above
10//! - Extensible
11//!
12//! Imagine a hybrid between an [Observable][tc39-observable] and an
13//! [(Async)Iterable][tc39-async-iteration], that's what callbags are all about. It's all done with
14//! a few simple callbacks, following the [callbag spec][callbag-spec].
15//!
16//! # Examples
17//!
18//! ## Reactive programming examples
19//!
20//! Pick the first 5 odd numbers from a clock that ticks every second, then start observing them:
21//!
22//! ```
23//! use async_nursery::Nursery;
24//! use crossbeam_queue::SegQueue;
25//! use std::{sync::Arc, time::Duration};
26//!
27//! use callbag::{filter, for_each, interval, map, pipe, take};
28//!
29//! let (nursery, nursery_out) = Nursery::new(async_executors::AsyncStd);
30//!
31//! let actual = Arc::new(SegQueue::new());
32//!
33//! pipe!(
34//!     interval(Duration::from_millis(1_000), nursery.clone()),
35//!     map(|x| x + 1),
36//!     filter(|x| x % 2 == 1),
37//!     take(5),
38//!     for_each({
39//!         let actual = Arc::clone(&actual);
40//!         move |x| {
41//!             println!("{}", x);
42//!             actual.push(x);
43//!         }
44//!     }),
45//! );
46//!
47//! drop(nursery);
48//! async_std::task::block_on(nursery_out);
49//!
50//! assert_eq!(
51//!     &{
52//!         let mut v = vec![];
53//!         for _i in 0..actual.len() {
54//!             v.push(actual.pop().unwrap());
55//!         }
56//!         v
57//!     }[..],
58//!     [1, 3, 5, 7, 9]
59//! );
60//! ```
61//!
62//! ## Iterable programming examples
63//!
64//! From a range of numbers, pick 5 of them and divide them by 4, then start pulling those one by one:
65//!
66//! ```
67//! use crossbeam_queue::SegQueue;
68//! use std::sync::Arc;
69//!
70//! use callbag::{for_each, from_iter, map, pipe, take};
71//!
72//! #[derive(Clone)]
73//! struct Range {
74//!     i: usize,
75//!     to: usize,
76//! }
77//!
78//! impl Range {
79//!     fn new(from: usize, to: usize) -> Self {
80//!         Range { i: from, to }
81//!     }
82//! }
83//!
84//! impl Iterator for Range {
85//!     type Item = usize;
86//!
87//!     fn next(&mut self) -> Option<Self::Item> {
88//!         let i = self.i;
89//!         if i <= self.to {
90//!             self.i += 1;
91//!             Some(i)
92//!         } else {
93//!             None
94//!         }
95//!     }
96//! }
97//!
98//! let actual = Arc::new(SegQueue::new());
99//!
100//! pipe!(
101//!     from_iter(Range::new(40, 99)),
102//!     take(5),
103//!     map(|x| x as f64 / 4.0),
104//!     for_each({
105//!         let actual = Arc::clone(&actual);
106//!         move |x| {
107//!             println!("{}", x);
108//!             actual.push(x);
109//!         }
110//!     }),
111//! );
112//!
113//! assert_eq!(
114//!     &{
115//!         let mut v = vec![];
116//!         for _i in 0..actual.len() {
117//!             v.push(actual.pop().unwrap());
118//!         }
119//!         v
120//!     }[..],
121//!     [10.0, 10.25, 10.5, 10.75, 11.0]
122//! );
123//! ```
124//!
125//! # API
126//!
127//! The list below shows what's included.
128//!
129//! ## Source factories
130//!
131//! - [from_iter][crate::from_iter()]
132//! - [interval][crate::interval()]
133//!
134//! ## Sink factories
135//!
136//! - [for_each][crate::for_each()]
137//!
138//! ## Transformation operators
139//!
140//! - [map][crate::map()]
141//! - [scan][crate::scan()]
142//! - [flatten][crate::flatten()]
143//!
144//! ## Filtering operators
145//!
146//! - [take][crate::take()]
147//! - [skip][crate::skip()]
148//! - [filter][crate::filter()]
149//!
150//! ## Combination operators
151//!
152//! - [merge!][crate::merge!]
153//! - [concat!][crate::concat!]
154//! - [combine!][crate::combine!]
155//!
156//! ## Utilities
157//!
158//! - [share][crate::share()]
159//! - [pipe!][crate::pipe!]
160//!
161//! # Terminology
162//!
163//! - **source**: a callbag that delivers data
164//! - **sink**: a callbag that receives data
165//! - **puller sink**: a sink that actively requests data from the source
166//! - **pullable source**: a source that delivers data only on demand (on receiving a request)
167//! - **listener sink**: a sink that passively receives data from the source
168//! - **listenable source**: source which sends data to the sink without waiting for requests
169//! - **operator**: a callbag based on another callbag which applies some operation
170//!
171//! [callbag-spec]: https://github.com/callbag/callbag
172//! [tc39-async-iteration]: https://github.com/tc39/proposal-async-iteration
173//! [tc39-observable]: https://github.com/tc39/proposal-observable
174
175#[cfg(feature = "combine")]
176pub use crate::combine::combine;
177#[cfg(feature = "concat")]
178pub use crate::concat::concat;
179pub use crate::core::*;
180#[cfg(feature = "filter")]
181pub use crate::filter::filter;
182#[cfg(feature = "flatten")]
183pub use crate::flatten::flatten;
184#[cfg(feature = "for_each")]
185pub use crate::for_each::for_each;
186#[cfg(feature = "from_iter")]
187pub use crate::from_iter::from_iter;
188#[cfg(feature = "interval")]
189pub use crate::interval::interval;
190#[cfg(feature = "map")]
191pub use crate::map::map;
192#[cfg(feature = "merge")]
193pub use crate::merge::merge;
194#[cfg(feature = "scan")]
195pub use crate::scan::scan;
196#[cfg(feature = "share")]
197pub use crate::share::share;
198#[cfg(feature = "skip")]
199pub use crate::skip::skip;
200#[cfg(feature = "take")]
201pub use crate::take::take;
202
203#[cfg(feature = "combine")]
204mod combine;
205#[cfg(feature = "concat")]
206mod concat;
207mod core;
208#[cfg(feature = "filter")]
209mod filter;
210#[cfg(feature = "flatten")]
211mod flatten;
212#[cfg(feature = "for_each")]
213mod for_each;
214#[cfg(feature = "from_iter")]
215mod from_iter;
216#[cfg(feature = "interval")]
217mod interval;
218#[cfg(feature = "map")]
219mod map;
220#[cfg(feature = "merge")]
221mod merge;
222#[cfg(feature = "pipe")]
223mod pipe;
224#[cfg(feature = "scan")]
225mod scan;
226#[cfg(feature = "share")]
227mod share;
228#[cfg(feature = "skip")]
229mod skip;
230#[cfg(feature = "take")]
231mod take;
232
233#[doc = include_str!("../README.md")]
234#[cfg(doctest)]
235pub struct ReadmeDoctests;