lazy_queue/
lib.rs

1//! [![travis](https://img.shields.io/travis/mexus/lazy-queue.svg)](https://travis-ci.org/mexus/lazy-queue)
2//! [![crates.io](https://img.shields.io/crates/v/lazy-queue.svg)](https://crates.io/crates/lazy-queue)
3//! [![docs.rs](https://docs.rs/lazy-queue/badge.svg)](https://docs.rs/lazy-queue)
4//!
5//! [Master docs](https://mexus.github.io/lazy-queue/lazy_queue/index.html)
6//!
7//! Lazy future-driven queue processing.
8//!
9//! Some typical use cases would be:
10//!
11//! * Offloading work from multiple threads into a single-threaded processor.
12//! * Posponing computation.
13//! * Processing multiple tasks in a batch.
14//! * ...
15//!
16//! ## License
17//!
18//! Licensed under either of
19//!
20//!  * Apache License, Version 2.0 ([LICENSE-APACHE](LICENSE-APACHE) or http://www.apache.org/licenses/LICENSE-2.0)
21//!  * MIT license ([LICENSE-MIT](LICENSE-MIT) or http://opensource.org/licenses/MIT)
22//!
23//! at your option.
24//!
25//! ### Contribution
26//!
27//! Unless you explicitly state otherwise, any contribution intentionally submitted
28//! for inclusion in the work by you, as defined in the Apache-2.0 license, shall be dual licensed as above, without any
29//! additional terms or conditions.
30
31#![deny(missing_docs)]
32
33/// This macro creates `LazyQueue<Item>` and `QueueProcessor<Item, P, I>` (where `I: IntoFuture`)
34/// and implementations for them.
35macro_rules! implement {
36    (
37        $doc:literal,
38        $sender:ty,
39        $receiver:ty,
40        ($($new_args:ident: $new_args_ty:ty),*),
41        $make_chan:expr,
42    ) => {
43        #[doc = $doc]
44        pub struct LazyQueue<Item> {
45            sender: $sender,
46        }
47
48        impl<Item> Clone for LazyQueue<Item> {
49            fn clone(&self) -> Self {
50                LazyQueue {
51                    sender: self.sender.clone(),
52                }
53            }
54        }
55
56        /// Lazy queue processor.
57        #[must_use = "futures do nothing unless polled"]
58        pub struct QueueProcessor<Item, P, I>
59        where
60            I: ::std::future::Future,
61        {
62            inner: crate::inner::StreamProcessor<$receiver, P, I>,
63        }
64
65        impl<Item> LazyQueue<Item> {
66            /// Creates a new lazy queue using given processor.
67            pub fn new<F, I>(processor: F, $($new_args: $new_args_ty),*) -> (Self, QueueProcessor<Item, F, I>)
68            where
69                F: FnMut(Item) -> I,
70                I: ::std::future::Future,
71            {
72                let (sender, receiver) = $make_chan;
73                (
74                    LazyQueue { sender },
75                    QueueProcessor {
76                        inner: StreamProcessor::new(receiver, processor),
77                    },
78                )
79            }
80        }
81
82        impl<Item, P, I> ::std::future::Future for QueueProcessor<Item, P, I>
83        where
84            P: FnMut(Item) -> I,
85            I: ::std::future::Future,
86        {
87            type Output = ();
88
89            fn poll(self: ::std::pin::Pin<&mut Self>, ctx: &mut std::task::Context<'_>)
90                -> ::std::task::Poll<Self::Output> {
91                let inner = unsafe { self.map_unchecked_mut(|this| &mut this.inner) };
92                inner.poll(ctx)
93            }
94        }
95    };
96}
97
98mod inner;
99mod sync;
100pub use crate::sync::{bounded, unbounded};