reactive_mutiny/
mutiny_stream.rs1use crate::types::ChannelConsumer;
4use std::{
5 pin::Pin,
6 sync::Arc,
7 fmt::Debug,
8 task::{Context, Poll},
9};
10use std::marker::PhantomData;
11use futures::stream::Stream;
12
13
14pub struct MutinyStream<'a, ItemType: Debug + Send + Sync + 'a,
24 ChannelConsumerType: ChannelConsumer<'a, DerivedItemType> + ?Sized,
25 DerivedItemType: 'a + Debug> {
26 stream_id: u32,
27 events_source: Arc<ChannelConsumerType>,
28 _phantom: PhantomData<(&'a ItemType, &'a DerivedItemType)>,
29
30}
31
32impl<'a, ItemType: Debug + Send + Sync,
33 ChannelConsumerType: ChannelConsumer<'a, DerivedItemType>,
34 DerivedItemType: Debug>
35MutinyStream<'a, ItemType, ChannelConsumerType, DerivedItemType> {
36
37 pub fn new(stream_id: u32, events_source: &Arc<ChannelConsumerType>) -> Self {
38 Self {
39 stream_id,
40 events_source: events_source.clone(),
41 _phantom: PhantomData,
42 }
43 }
44
45}
46
47impl<'a, ItemType: Debug + Send + Sync + 'a,
48 ChannelConsumerType: ChannelConsumer<'a, DerivedItemType>,
49 DerivedItemType: Debug>
50Stream for
51MutinyStream<'a, ItemType, ChannelConsumerType, DerivedItemType> {
52
53 type Item = DerivedItemType;
54
55 #[inline(always)]
56 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
57 let event = self.events_source.consume(self.stream_id);
58 match event {
59 Some(_) => Poll::Ready(event),
60 None => {
61 if self.events_source.keep_stream_running(self.stream_id) {
62 self.events_source.register_stream_waker(self.stream_id, cx.waker());
63 Poll::Pending
64 } else {
65 Poll::Ready(None)
66 }
67 },
68 }
69 }
70}
71
72impl<'a, ItemType: Debug + Send + Sync,
73 ChannelConsumerType: ChannelConsumer<'a, DerivedItemType> + ?Sized,
74 DerivedItemType: Debug>
75Drop
76for MutinyStream<'a, ItemType, ChannelConsumerType, DerivedItemType> {
77 fn drop(&mut self) {
78 self.events_source.drop_resources(self.stream_id);
79 }
80}