1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
//! [![travis](https://img.shields.io/travis/mexus/lazy-queue.svg)](https://travis-ci.org/mexus/lazy-queue)
//! [![crates.io](https://img.shields.io/crates/v/lazy-queue.svg)](https://crates.io/crates/lazy-queue)
//! [![docs.rs](https://docs.rs/lazy-queue/badge.svg)](https://docs.rs/lazy-queue)
//!
//! [Master docs](https://mexus.github.io/lazy-queue/lazy_queue/index.html)
//!
//! Lazy future-driven queue processing.
//!
//! Some typical use cases would be:
//!
//! * Offloading work from multiple threads into a single-threaded processor.
//! * Posponing computation.
//! * Processing multiple tasks in a batch.
//! * ...
//!
//! ## License
//!
//! Licensed under either of
//!
//!  * Apache License, Version 2.0 ([LICENSE-APACHE](LICENSE-APACHE) or http://www.apache.org/licenses/LICENSE-2.0)
//!  * MIT license ([LICENSE-MIT](LICENSE-MIT) or http://opensource.org/licenses/MIT)
//!
//! at your option.
//!
//! ### Contribution
//!
//! Unless you explicitly state otherwise, any contribution intentionally submitted
//! for inclusion in the work by you, as defined in the Apache-2.0 license, shall be dual licensed as above, without any
//! additional terms or conditions.

#![deny(missing_docs)]

/// This macro creates `LazyQueue<Item>` and `QueueProcessor<Item, P, I>` (where `I: IntoFuture`)
/// and implementations for them.
macro_rules! implement {
    (
        $doc:literal,
        $sender:ty,
        $receiver:ty,
        ($($new_args:ident: $new_args_ty:ty),*),
        $make_chan:expr,
    ) => {
        #[doc = $doc]
        pub struct LazyQueue<Item> {
            sender: $sender,
        }

        impl<Item> Clone for LazyQueue<Item> {
            fn clone(&self) -> Self {
                LazyQueue {
                    sender: self.sender.clone(),
                }
            }
        }

        /// Lazy queue processor.
        #[must_use = "futures do nothing unless polled"]
        pub struct QueueProcessor<Item, P, I>
        where
            I: ::std::future::Future,
        {
            inner: crate::inner::StreamProcessor<$receiver, P, I>,
        }

        impl<Item> LazyQueue<Item> {
            /// Creates a new lazy queue using given processor.
            pub fn new<F, I>(processor: F, $($new_args: $new_args_ty),*) -> (Self, QueueProcessor<Item, F, I>)
            where
                F: FnMut(Item) -> I,
                I: ::std::future::Future,
            {
                let (sender, receiver) = $make_chan;
                (
                    LazyQueue { sender },
                    QueueProcessor {
                        inner: StreamProcessor::new(receiver, processor),
                    },
                )
            }
        }

        impl<Item, P, I> ::std::future::Future for QueueProcessor<Item, P, I>
        where
            P: FnMut(Item) -> I,
            I: ::std::future::Future,
        {
            type Output = ();

            fn poll(self: ::std::pin::Pin<&mut Self>, ctx: &mut std::task::Context<'_>)
                -> ::std::task::Poll<Self::Output> {
                let inner = unsafe { self.map_unchecked_mut(|this| &mut this.inner) };
                inner.poll(ctx)
            }
        }
    };
}

mod inner;
mod sync;
pub use crate::sync::{bounded, unbounded};