reactive_mutiny/
mutiny_stream.rs

1//! Resting place for [MutinyStream]
2
3use crate::types::ChannelConsumer;
4use std::{
5    pin::Pin,
6    sync::Arc,
7    fmt::Debug,
8    task::{Context, Poll},
9};
10use std::marker::PhantomData;
11use futures::stream::Stream;
12
13
14/// Special `Stream` implementation to avoid using dynamic dispatching, so to allow
15/// the compiler to fully optimize the whole event consumption chain.\
16/// The following paths are covered:
17///   - from the container's `consume()` (providing `InItemType` items),
18///   - passing through this Stream implementation,
19///   - then through the user provided `pipeline_builder()`
20///   - and, finally, to the `StreamExecutor`.
21/// 
22/// ... allowing all of them to behave as a single function, that gets optimized together.
23pub struct MutinyStream<'a, ItemType:            Debug + Send + Sync + 'a,
24                            ChannelConsumerType: ChannelConsumer<'a, DerivedItemType> + ?Sized,
25                            DerivedItemType:     'a + Debug> {
26    stream_id:     u32,
27    events_source: Arc<ChannelConsumerType>,
28    _phantom:      PhantomData<(&'a ItemType, &'a DerivedItemType)>,
29
30}
31
32impl<'a, ItemType:            Debug + Send + Sync,
33         ChannelConsumerType: ChannelConsumer<'a, DerivedItemType>,
34         DerivedItemType:     Debug>
35MutinyStream<'a, ItemType, ChannelConsumerType, DerivedItemType> {
36
37    pub fn new(stream_id: u32, events_source: &Arc<ChannelConsumerType>) -> Self {
38        Self {
39            stream_id,
40            events_source: events_source.clone(),
41            _phantom:      PhantomData,
42        }
43    }
44
45}
46
47impl<'a, ItemType:            Debug + Send + Sync + 'a,
48         ChannelConsumerType: ChannelConsumer<'a, DerivedItemType>,
49         DerivedItemType:     Debug>
50Stream for
51MutinyStream<'a, ItemType, ChannelConsumerType, DerivedItemType> {
52
53    type Item = DerivedItemType;
54
55    #[inline(always)]
56    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
57        let event = self.events_source.consume(self.stream_id);
58        match event {
59            Some(_) => Poll::Ready(event),
60            None => {
61                if self.events_source.keep_stream_running(self.stream_id) {
62                    self.events_source.register_stream_waker(self.stream_id, cx.waker());
63                    Poll::Pending
64                } else {
65                    Poll::Ready(None)
66                }
67            },
68        }
69    }
70}
71
72impl<'a, ItemType:            Debug + Send + Sync,
73         ChannelConsumerType: ChannelConsumer<'a, DerivedItemType> + ?Sized,
74         DerivedItemType:     Debug>
75Drop
76for MutinyStream<'a, ItemType, ChannelConsumerType, DerivedItemType> {
77    fn drop(&mut self) {
78        self.events_source.drop_resources(self.stream_id);
79    }
80}