reactive_mutiny/multi/channels/reference/
mmap_log.rs1use crate::{
4 ogre_std::ogre_queues::{
5 meta_topic::MetaTopic,
6 log_topics::mmap_meta::MMapMeta,
7 meta_publisher::MetaPublisher,
8 meta_subscriber::MetaSubscriber,
9 log_topics::mmap_meta::MMapMetaSubscriber,
10 },
11 types::{ChannelCommon, ChannelMulti, ChannelProducer, ChannelConsumer, FullDuplexMultiChannel},
12 streams_manager::StreamsManagerBase,
13 mutiny_stream::MutinyStream,
14};
15use std::{
16 time::Duration,
17 sync::Arc,
18 fmt::Debug,
19 task::Waker,
20};
21use std::future::Future;
22
23
24const BUFFER_SIZE: usize = 1<<38;
25
26pub struct MmapLog<'a, ItemType: Send + Sync + Debug,
28 const MAX_STREAMS: usize = 16> {
29
30 streams_manager: StreamsManagerBase<MAX_STREAMS>,
32 log_queue: Arc<MMapMeta<'a, ItemType>>,
34 subscribers: [MMapMetaSubscriber<'a, ItemType>; MAX_STREAMS],
36}
37
38impl<'a, ItemType: Send + Sync + Debug + 'a,
39 const MAX_STREAMS: usize>
40MmapLog<'a, ItemType, MAX_STREAMS> {
41
42 fn from_file<IntoString: Into<String>>(mmap_file_path: IntoString) -> Result<Arc<Self>, Box<dyn std::error::Error>> {
44 let mmap_file_path = mmap_file_path.into();
45 let log_queue = MMapMeta::new(&mmap_file_path, BUFFER_SIZE as u64)
46 .map_err(|err| format!("`mmap_log` channel couldn't mmap file '{mmap_file_path}': {err}"))?;
47 Ok(Arc::new(Self {
48 streams_manager: StreamsManagerBase::new(mmap_file_path),
49 log_queue: log_queue.clone(),
50 subscribers: [0; MAX_STREAMS].map(|_| MMapMetaSubscriber::Dynamic(log_queue.subscribe_to_new_events_only())), }))
52 }
53}
54
55impl<'a, ItemType: Send + Sync + Debug + 'a,
56 const MAX_STREAMS: usize>
57ChannelCommon<ItemType, &'static ItemType> for
58MmapLog<'a, ItemType, MAX_STREAMS> {
59
60 fn new<IntoString: Into<String>>(name: IntoString) -> Arc<Self> {
62 let name = name.into();
63 let mmap_file_path = format!("/tmp/{}.mmap", name.chars().map(|c| if c == ' ' || c >= '0' || c <= '9' || c >= 'A' || c <= 'z' { c } else { '_' }).collect::<String>());
64 Self::from_file(mmap_file_path).unwrap()
65 }
66
67 async fn flush(&self, timeout: Duration) -> u32 {
68 self.streams_manager.flush(timeout, || self.pending_items_count()).await
69 }
70
71 #[inline(always)]
72 fn is_channel_open(&self) -> bool {
73 self.streams_manager.is_any_stream_running()
74 }
75
76 async fn gracefully_end_stream(&self, stream_id: u32, timeout: Duration) -> bool {
77 self.streams_manager.end_stream(stream_id, timeout, || self.pending_items_count()).await
78 }
79
80 async fn gracefully_end_all_streams(&self, timeout: Duration) -> u32 {
81 self.streams_manager.end_all_streams(timeout, || self.pending_items_count()).await
82 }
83
84 fn cancel_all_streams(&self) {
85 self.streams_manager.cancel_all_streams();
86 }
87
88 #[inline(always)]
89 fn running_streams_count(&self) -> u32 {
90 self.streams_manager.running_streams_count()
91 }
92
93 #[inline(always)]
94 fn pending_items_count(&self) -> u32 {
95 self.streams_manager.used_streams().iter()
96 .take_while(|&&stream_id| stream_id != u32::MAX)
97 .map(|&stream_id| unsafe { self.subscribers.get_unchecked(stream_id as usize) }.remaining_elements_count())
98 .max().unwrap_or(0) as u32
99 }
100
101 #[inline(always)]
102 fn buffer_size(&self) -> u32 {
103 u32::MAX
104 }
105}
106
107
108impl<'a, ItemType: Send + Sync + Debug + 'a,
109 const MAX_STREAMS: usize>
110ChannelMulti<'a, ItemType, &'static ItemType> for
111MmapLog<'a, ItemType, MAX_STREAMS> {
112
113 fn create_stream_for_old_events(self: &Arc<Self>) -> (MutinyStream<'a, ItemType, Self, &'static ItemType>, u32) where Self: ChannelConsumer<'a, &'static ItemType> {
114 let ref_self: &Self = self;
115 let mutable_self = unsafe { &mut *(*(ref_self as *const Self as *const std::cell::UnsafeCell<Self>)).get() };
116 let stream_id = self.streams_manager.create_stream_id();
117 mutable_self.subscribers[stream_id as usize] = MMapMetaSubscriber::Fixed(self.log_queue.subscribe_to_old_events_only());
118 (MutinyStream::new(stream_id, self), stream_id)
119 }
120
121 fn create_stream_for_new_events(self: &Arc<Self>) -> (MutinyStream<'a, ItemType, Self, &'static ItemType>, u32) {
122 let ref_self: &Self = self;
123 let mutable_self = unsafe { &mut *(*(ref_self as *const Self as *const std::cell::UnsafeCell<Self>)).get() };
124 let stream_id = self.streams_manager.create_stream_id();
125 mutable_self.subscribers[stream_id as usize] = MMapMetaSubscriber::Dynamic(self.log_queue.subscribe_to_new_events_only());
126 (MutinyStream::new(stream_id, self), stream_id)
127 }
128
129 fn create_streams_for_old_and_new_events(self: &Arc<Self>) -> ((MutinyStream<'a, ItemType, Self, &'static ItemType>, u32), (MutinyStream<'a, ItemType, Self, &'static ItemType>, u32)) where Self: ChannelConsumer<'a, &'static ItemType> {
130 let ref_self: &Self = self;
131 let mutable_self = unsafe { &mut *(*(ref_self as *const Self as *const std::cell::UnsafeCell<Self>)).get() };
132 let (stream_of_oldies, stream_of_newies) = self.log_queue.subscribe_to_separated_old_and_new_events();
133 let stream_of_oldies_id = self.streams_manager.create_stream_id();
134 let stream_of_newies_id = self.streams_manager.create_stream_id();
135 mutable_self.subscribers[stream_of_oldies_id as usize] = MMapMetaSubscriber::Fixed(stream_of_oldies);
136 mutable_self.subscribers[stream_of_newies_id as usize] = MMapMetaSubscriber::Dynamic(stream_of_newies);
137 ( (MutinyStream::new(stream_of_oldies_id, self), stream_of_oldies_id),
138 (MutinyStream::new(stream_of_newies_id, self), stream_of_newies_id) )
139 }
140
141 fn create_stream_for_old_and_new_events(self: &Arc<Self>) -> (MutinyStream<'a, ItemType, Self, &'static ItemType>, u32) where Self: ChannelConsumer<'a, &'static ItemType> {
142 let ref_self: &Self = self;
143 let mutable_self = unsafe { &mut *(*(ref_self as *const Self as *const std::cell::UnsafeCell<Self>)).get() };
144 let stream_id = self.streams_manager.create_stream_id();
145 mutable_self.subscribers[stream_id as usize] = MMapMetaSubscriber::Dynamic(self.log_queue.subscribe_to_joined_old_and_new_events());
146 (MutinyStream::new(stream_id, self), stream_id)
147 }
148}
149
150
151impl<'a, ItemType: 'a + Send + Sync + Debug,
152 const MAX_STREAMS: usize>
153ChannelProducer<'a, ItemType, &'static ItemType> for
154MmapLog<'a, ItemType, MAX_STREAMS> {
155
156 #[inline(always)]
157 fn send(&self, item: ItemType) -> keen_retry::RetryConsumerResult<(), ItemType, ()> {
158 match self.log_queue.publish_movable(item) {
159 (Some(_tail), _none_item) => {
160 let running_streams_count = self.streams_manager.running_streams_count();
161 let used_streams = self.streams_manager.used_streams();
162 for i in 0..running_streams_count {
163 let stream_id = *unsafe { used_streams.get_unchecked(i as usize) };
164 if stream_id != u32::MAX {
165 self.streams_manager.wake_stream(stream_id);
166 }
167 }
168 keen_retry::RetryResult::Ok { reported_input: (), output: () }
169 },
170 (None, some_item) => {
171 keen_retry::RetryResult::Transient { input: some_item.expect("reactive-mutiny: mmap_log::send() BUG! None `some_item`"), error: () }
172 }
173 }
174 }
175
176 #[inline(always)]
177 fn send_with<F: FnOnce(&mut ItemType)>(&self, setter: F) -> keen_retry::RetryConsumerResult<(), F, ()> {
178 match self.log_queue.publish(setter) {
179 (Some(_tail), _none_setter) => {
180 let running_streams_count = self.streams_manager.running_streams_count();
181 let used_streams = self.streams_manager.used_streams();
182 for i in 0..running_streams_count {
184 let stream_id = *unsafe { used_streams.get_unchecked(i as usize) };
185 if stream_id != u32::MAX {
186 self.streams_manager.wake_stream(stream_id);
187 }
188 }
189 keen_retry::RetryResult::Ok { reported_input: (), output: () }
190 },
191 (None, some_setter) => {
192 keen_retry::RetryResult::Transient { input: some_setter.expect("reactive-mutiny: mmap_log::send_with() BUG! None `some_setter`"), error: () }
193 },
194 }
195 }
196
197 #[inline(always)]
198 async fn send_with_async<F: FnOnce(&'a mut ItemType) -> Fut,
199 Fut: Future<Output=&'a mut ItemType>>
200 (&'a self,
201 setter: F) -> keen_retry::RetryConsumerResult<(), F, ()> {
202 if let Some((slot, _slot_id)) = self.log_queue.leak_slot() {
203 let slot = setter(slot).await;
204 self.log_queue.publish_leaked_ref(slot);
205 let running_streams_count = self.streams_manager.running_streams_count();
206 let used_streams = self.streams_manager.used_streams();
207 for i in 0..running_streams_count {
209 let stream_id = *unsafe { used_streams.get_unchecked(i as usize) };
210 if stream_id != u32::MAX {
211 self.streams_manager.wake_stream(stream_id);
212 }
213 }
214 keen_retry::RetryResult::Ok { reported_input: (), output: () }
215 } else {
216 keen_retry::RetryResult::Transient { input: setter, error: () }
217 }
218 }
219
220 #[inline(always)]
221 fn send_derived(&self, _derived_item: &&'static ItemType) -> bool {
222 todo!("reactive_mutiny::multi::channels::references::MMapLog: `send_derived()` is not implemented for the MMapLog Multi channel '{}' -- it doesn't make sense to place a reference in an mmap", self.streams_manager.name())
223 }
224
225 #[inline(always)]
226 fn reserve_slot(&self) -> Option<&mut ItemType> {
227 self.log_queue.leak_slot()
228 .map(|(slot_ref, _slot_id)| slot_ref)
229 }
230
231 #[inline(always)]
232 fn try_send_reserved(&self, reserved_slot: &mut ItemType) -> bool {
233 self.log_queue.publish_leaked_ref(reserved_slot).is_some()
234 }
235
236 #[inline(always)]
237 fn try_cancel_slot_reserve(&self, reserved_slot: &mut ItemType) -> bool {
238 self.log_queue.unleak_slot_ref(reserved_slot);
239 todo!("Complete this implementation");
240 }
241}
242
243
244impl<'a, ItemType: 'a + Send + Sync + Debug,
245 const MAX_STREAMS: usize>
246ChannelConsumer<'a, &'static ItemType>
247for MmapLog<'a, ItemType, MAX_STREAMS> {
248
249 #[inline(always)]
250 fn consume(&self, stream_id: u32) -> Option<&'static ItemType> {
251 let subscriber = unsafe { self.subscribers.get_unchecked(stream_id as usize) };
252 match subscriber {
253
254 MMapMetaSubscriber::Dynamic(subscriber) => {
256 subscriber.consume(|slot| unsafe {&*(slot as *const ItemType)},
257 || false,
258 |_len_after| {})
259 },
260
261 MMapMetaSubscriber::Fixed(subscriber) => {
263 subscriber.consume(|slot| unsafe {&*(slot as *const ItemType)},
264 || {
265 self.streams_manager.cancel_stream(stream_id);
266 false
267 },
268 |_len_after| {})
269 },
270
271 }
272
273 }
274
275 #[inline(always)]
276 fn keep_stream_running(&self, stream_id: u32) -> bool {
277 self.streams_manager.keep_stream_running(stream_id)
278 }
279
280 #[inline(always)]
281 fn register_stream_waker(&self, stream_id: u32, waker: &Waker) {
282 self.streams_manager.register_stream_waker(stream_id, waker)
283 }
284
285 #[inline(always)]
286 fn drop_resources(&self, stream_id: u32) {
287 self.streams_manager.report_stream_dropped(stream_id);
288 }
289}
290
291
292impl<'a, ItemType: Send + Sync + Debug + 'a,
293 const MAX_STREAMS: usize>
294Drop for
295MmapLog<'a, ItemType, MAX_STREAMS> {
296 fn drop(&mut self) {
297 self.streams_manager.cancel_all_streams();
298 }
299}
300
301
302impl <ItemType: 'static + Debug + Send + Sync,
303 const MAX_STREAMS: usize>
304FullDuplexMultiChannel for
305MmapLog<'static, ItemType, MAX_STREAMS> {
306
307 const MAX_STREAMS: usize = MAX_STREAMS;
308 const BUFFER_SIZE: usize = BUFFER_SIZE;
309 type ItemType = ItemType;
310 type DerivedItemType = &'static ItemType;
311}