reactive_mutiny/multi/channels/reference/
mmap_log.rs

1//! Resting place for the reference-based [MmapLog] Zero-Copy Multi Channel
2
3use crate::{
4    ogre_std::ogre_queues::{
5            meta_topic::MetaTopic,
6            log_topics::mmap_meta::MMapMeta,
7            meta_publisher::MetaPublisher,
8            meta_subscriber::MetaSubscriber,
9            log_topics::mmap_meta::MMapMetaSubscriber,
10        },
11    types::{ChannelCommon, ChannelMulti, ChannelProducer, ChannelConsumer, FullDuplexMultiChannel},
12    streams_manager::StreamsManagerBase,
13    mutiny_stream::MutinyStream,
14};
15use std::{
16    time::Duration,
17    sync::Arc,
18    fmt::Debug,
19    task::Waker,
20};
21use std::future::Future;
22
23
24const BUFFER_SIZE: usize = 1<<38;
25
26/// ...
27pub struct MmapLog<'a, ItemType:          Send + Sync + Debug,
28                       const MAX_STREAMS: usize = 16> {
29
30    /// common code for dealing with streams
31    streams_manager:     StreamsManagerBase<MAX_STREAMS>,
32    /// backing storage for events
33    log_queue:           Arc<MMapMeta<'a, ItemType>>,
34    /// tracking of each Stream's next event to send
35    subscribers:         [MMapMetaSubscriber<'a, ItemType>; MAX_STREAMS],
36}
37
38impl<'a, ItemType:          Send + Sync + Debug + 'a,
39         const MAX_STREAMS: usize>
40MmapLog<'a, ItemType, MAX_STREAMS> {
41
42    /// NOTE: currently the file contents are ignored, but this is to change in the future: we should continue from where we left
43    fn from_file<IntoString: Into<String>>(mmap_file_path: IntoString) -> Result<Arc<Self>, Box<dyn std::error::Error>> {
44        let mmap_file_path = mmap_file_path.into();
45        let log_queue = MMapMeta::new(&mmap_file_path, BUFFER_SIZE as u64)
46            .map_err(|err| format!("`mmap_log` channel couldn't mmap file '{mmap_file_path}': {err}"))?;
47        Ok(Arc::new(Self {
48            streams_manager: StreamsManagerBase::new(mmap_file_path),
49            log_queue:       log_queue.clone(),
50            subscribers:     [0; MAX_STREAMS].map(|_| MMapMetaSubscriber::Dynamic(log_queue.subscribe_to_new_events_only())),    // TODO 2023-05-28: Option<> to avoid unnecessary setting the values here?
51        }))
52    }
53}
54
55impl<'a, ItemType:          Send + Sync + Debug + 'a,
56         const MAX_STREAMS: usize>
57ChannelCommon<ItemType, &'static ItemType> for
58MmapLog<'a, ItemType, MAX_STREAMS> {
59
60    /// IMPLEMENTATION NOTE: use Self::from_file() instead for better control over the mmap file name and error handling
61    fn new<IntoString: Into<String>>(name: IntoString) -> Arc<Self> {
62        let name = name.into();
63        let mmap_file_path = format!("/tmp/{}.mmap", name.chars().map(|c| if c == ' ' || c >= '0' || c <= '9' || c >= 'A' || c <= 'z' { c } else { '_' }).collect::<String>());
64        Self::from_file(mmap_file_path).unwrap()
65    }
66
67    async fn flush(&self, timeout: Duration) -> u32 {
68        self.streams_manager.flush(timeout, || self.pending_items_count()).await
69    }
70
71    #[inline(always)]
72    fn is_channel_open(&self) -> bool {
73        self.streams_manager.is_any_stream_running()
74    }
75
76    async fn gracefully_end_stream(&self, stream_id: u32, timeout: Duration) -> bool {
77        self.streams_manager.end_stream(stream_id, timeout, || self.pending_items_count()).await
78    }
79
80    async fn gracefully_end_all_streams(&self, timeout: Duration) -> u32 {
81        self.streams_manager.end_all_streams(timeout, || self.pending_items_count()).await
82    }
83
84    fn cancel_all_streams(&self) {
85        self.streams_manager.cancel_all_streams();
86    }
87
88    #[inline(always)]
89    fn running_streams_count(&self) -> u32 {
90        self.streams_manager.running_streams_count()
91    }
92
93    #[inline(always)]
94    fn pending_items_count(&self) -> u32 {
95        self.streams_manager.used_streams().iter()
96            .take_while(|&&stream_id| stream_id != u32::MAX)
97            .map(|&stream_id| unsafe { self.subscribers.get_unchecked(stream_id as usize) }.remaining_elements_count())
98            .max().unwrap_or(0) as u32
99    }
100
101    #[inline(always)]
102    fn buffer_size(&self) -> u32 {
103        u32::MAX
104    }
105}
106
107
108impl<'a, ItemType:          Send + Sync + Debug + 'a,
109         const MAX_STREAMS: usize>
110ChannelMulti<'a, ItemType, &'static ItemType> for
111MmapLog<'a, ItemType, MAX_STREAMS> {
112
113    fn create_stream_for_old_events(self: &Arc<Self>) -> (MutinyStream<'a, ItemType, Self, &'static ItemType>, u32) where Self: ChannelConsumer<'a, &'static ItemType> {
114        let ref_self: &Self = self;
115        let mutable_self = unsafe { &mut *(*(ref_self as *const Self as *const std::cell::UnsafeCell<Self>)).get() };
116        let stream_id = self.streams_manager.create_stream_id();
117        mutable_self.subscribers[stream_id as usize] = MMapMetaSubscriber::Fixed(self.log_queue.subscribe_to_old_events_only());
118        (MutinyStream::new(stream_id, self), stream_id)
119    }
120
121    fn create_stream_for_new_events(self: &Arc<Self>) -> (MutinyStream<'a, ItemType, Self, &'static ItemType>, u32) {
122        let ref_self: &Self = self;
123        let mutable_self = unsafe { &mut *(*(ref_self as *const Self as *const std::cell::UnsafeCell<Self>)).get() };
124        let stream_id = self.streams_manager.create_stream_id();
125        mutable_self.subscribers[stream_id as usize] = MMapMetaSubscriber::Dynamic(self.log_queue.subscribe_to_new_events_only());
126        (MutinyStream::new(stream_id, self), stream_id)
127    }
128
129    fn create_streams_for_old_and_new_events(self: &Arc<Self>) -> ((MutinyStream<'a, ItemType, Self, &'static ItemType>, u32), (MutinyStream<'a, ItemType, Self, &'static ItemType>, u32)) where Self: ChannelConsumer<'a, &'static ItemType> {
130        let ref_self: &Self = self;
131        let mutable_self = unsafe { &mut *(*(ref_self as *const Self as *const std::cell::UnsafeCell<Self>)).get() };
132        let (stream_of_oldies, stream_of_newies) = self.log_queue.subscribe_to_separated_old_and_new_events();
133        let stream_of_oldies_id = self.streams_manager.create_stream_id();
134        let stream_of_newies_id = self.streams_manager.create_stream_id();
135        mutable_self.subscribers[stream_of_oldies_id as usize] = MMapMetaSubscriber::Fixed(stream_of_oldies);
136        mutable_self.subscribers[stream_of_newies_id as usize] = MMapMetaSubscriber::Dynamic(stream_of_newies);
137        ( (MutinyStream::new(stream_of_oldies_id, self), stream_of_oldies_id),
138          (MutinyStream::new(stream_of_newies_id, self), stream_of_newies_id) )
139    }
140
141    fn create_stream_for_old_and_new_events(self: &Arc<Self>) -> (MutinyStream<'a, ItemType, Self, &'static ItemType>, u32) where Self: ChannelConsumer<'a, &'static ItemType> {
142        let ref_self: &Self = self;
143        let mutable_self = unsafe { &mut *(*(ref_self as *const Self as *const std::cell::UnsafeCell<Self>)).get() };
144        let stream_id = self.streams_manager.create_stream_id();
145        mutable_self.subscribers[stream_id as usize] = MMapMetaSubscriber::Dynamic(self.log_queue.subscribe_to_joined_old_and_new_events());
146        (MutinyStream::new(stream_id, self), stream_id)
147    }
148}
149
150
151impl<'a, ItemType:          'a + Send + Sync + Debug,
152         const MAX_STREAMS: usize>
153ChannelProducer<'a, ItemType, &'static ItemType> for
154MmapLog<'a, ItemType, MAX_STREAMS> {
155
156    #[inline(always)]
157    fn send(&self, item: ItemType) -> keen_retry::RetryConsumerResult<(), ItemType, ()> {
158        match self.log_queue.publish_movable(item) {
159            (Some(_tail), _none_item) => {
160                let running_streams_count = self.streams_manager.running_streams_count();
161                let used_streams = self.streams_manager.used_streams();
162                for i in 0..running_streams_count {
163                    let stream_id = *unsafe { used_streams.get_unchecked(i as usize) };
164                    if stream_id != u32::MAX {
165                        self.streams_manager.wake_stream(stream_id);
166                    }
167                }
168                keen_retry::RetryResult::Ok { reported_input: (), output: () }
169            },
170            (None, some_item) => {
171                keen_retry::RetryResult::Transient { input: some_item.expect("reactive-mutiny: mmap_log::send() BUG! None `some_item`"), error: () }
172            }
173        }
174    }
175
176    #[inline(always)]
177    fn send_with<F: FnOnce(&mut ItemType)>(&self, setter: F) -> keen_retry::RetryConsumerResult<(), F, ()> {
178        match self.log_queue.publish(setter) {
179            (Some(_tail), _none_setter) => {
180                let running_streams_count = self.streams_manager.running_streams_count();
181                let used_streams = self.streams_manager.used_streams();
182                // TODO 2024-03-05: can this Stream awakening be optimized, like on the zero-copy channels? Tests should prove it.
183                for i in 0..running_streams_count {
184                    let stream_id = *unsafe { used_streams.get_unchecked(i as usize) };
185                    if stream_id != u32::MAX {
186                        self.streams_manager.wake_stream(stream_id);
187                    }
188                }
189                keen_retry::RetryResult::Ok { reported_input: (), output: () }
190            },
191            (None, some_setter) => {
192                keen_retry::RetryResult::Transient { input: some_setter.expect("reactive-mutiny: mmap_log::send_with() BUG! None `some_setter`"), error: () }
193            },
194        }
195    }
196
197    #[inline(always)]
198    async fn send_with_async<F:   FnOnce(&'a mut ItemType) -> Fut,
199                             Fut: Future<Output=&'a mut ItemType>>
200                            (&'a self,
201                             setter: F) -> keen_retry::RetryConsumerResult<(), F, ()> {
202        if let Some((slot, _slot_id)) = self.log_queue.leak_slot() {
203            let slot = setter(slot).await;
204            self.log_queue.publish_leaked_ref(slot);
205            let running_streams_count = self.streams_manager.running_streams_count();
206            let used_streams = self.streams_manager.used_streams();
207            // TODO 2024-03-05: can this Stream awakening be optimized, like on the zero-copy channels? Tests should prove it.
208            for i in 0..running_streams_count {
209                let stream_id = *unsafe { used_streams.get_unchecked(i as usize) };
210                if stream_id != u32::MAX {
211                    self.streams_manager.wake_stream(stream_id);
212                }
213            }
214            keen_retry::RetryResult::Ok { reported_input: (), output: () }
215        } else {
216            keen_retry::RetryResult::Transient { input: setter, error: () }
217        }
218    }
219
220    #[inline(always)]
221    fn send_derived(&self, _derived_item: &&'static ItemType) -> bool {
222        todo!("reactive_mutiny::multi::channels::references::MMapLog: `send_derived()` is not implemented for the MMapLog Multi channel '{}' -- it doesn't make sense to place a reference in an mmap", self.streams_manager.name())
223    }
224
225    #[inline(always)]
226    fn reserve_slot(&self) -> Option<&mut ItemType> {
227        self.log_queue.leak_slot()
228            .map(|(slot_ref, _slot_id)| slot_ref)
229    }
230
231    #[inline(always)]
232    fn try_send_reserved(&self, reserved_slot: &mut ItemType) -> bool {
233        self.log_queue.publish_leaked_ref(reserved_slot).is_some()
234    }
235
236    #[inline(always)]
237    fn try_cancel_slot_reserve(&self, reserved_slot: &mut ItemType) -> bool {
238        self.log_queue.unleak_slot_ref(reserved_slot);
239        todo!("Complete this implementation");
240    }
241}
242
243
244impl<'a, ItemType:          'a + Send + Sync + Debug,
245         const MAX_STREAMS: usize>
246ChannelConsumer<'a, &'static ItemType>
247for MmapLog<'a, ItemType, MAX_STREAMS> {
248
249    #[inline(always)]
250    fn consume(&self, stream_id: u32) -> Option<&'static ItemType> {
251        let subscriber = unsafe { self.subscribers.get_unchecked(stream_id as usize) };
252        match subscriber {
253
254            // dynamic subscriber -- for new events (may include old events as well -- in a continuous stream): yields events until interrupted
255            MMapMetaSubscriber::Dynamic(subscriber) => {
256                subscriber.consume(|slot| unsafe {&*(slot as *const ItemType)},
257                                   || false,
258                                   |_len_after| {})
259            },
260
261            // fixed subscriber -- for old-only events: once the first empty event is consumed, it is over (interrupts itself automatically)
262            MMapMetaSubscriber::Fixed(subscriber) => {
263                subscriber.consume(|slot| unsafe {&*(slot as *const ItemType)},
264                                   || {
265                                       self.streams_manager.cancel_stream(stream_id);
266                                       false
267                                   },
268                                   |_len_after| {})
269            },
270
271        }
272
273    }
274
275    #[inline(always)]
276    fn keep_stream_running(&self, stream_id: u32) -> bool {
277        self.streams_manager.keep_stream_running(stream_id)
278    }
279
280    #[inline(always)]
281    fn register_stream_waker(&self, stream_id: u32, waker: &Waker) {
282        self.streams_manager.register_stream_waker(stream_id, waker)
283    }
284
285    #[inline(always)]
286    fn drop_resources(&self, stream_id: u32) {
287        self.streams_manager.report_stream_dropped(stream_id);
288    }
289}
290
291
292impl<'a, ItemType:          Send + Sync + Debug + 'a,
293         const MAX_STREAMS: usize>
294Drop for
295MmapLog<'a, ItemType, MAX_STREAMS> {
296    fn drop(&mut self) {
297        self.streams_manager.cancel_all_streams();
298    }
299}
300
301
302impl <ItemType:          'static + Debug + Send + Sync,
303      const MAX_STREAMS: usize>
304FullDuplexMultiChannel for
305MmapLog<'static, ItemType, MAX_STREAMS> {
306
307    const MAX_STREAMS: usize = MAX_STREAMS;
308    const BUFFER_SIZE: usize = BUFFER_SIZE;
309    type ItemType            = ItemType;
310    type DerivedItemType     = &'static ItemType;
311}