rc_event_queue/
event_queue.rs

1#[cfg(not(loom))]
2#[cfg(test)]
3mod test;
4
5use crate::sync::{Ordering};
6use crate::sync::{Mutex, Arc};
7use crate::sync::{SpinMutex};
8
9use std::ptr::{null_mut, null, NonNull};
10use crate::event_reader::EventReader;
11use std::ops::ControlFlow;
12use std::ops::ControlFlow::{Continue, Break};
13use std::marker::PhantomPinned;
14use std::pin::Pin;
15use crate::cursor::Cursor;
16use crate::dynamic_chunk::{DynamicChunk};
17#[cfg(feature = "double_buffering")]
18use crate::dynamic_chunk::{DynamicChunkRecycled};
19use crate::{StartPositionEpoch};
20
21/// This way you can control when chunk's memory deallocation happens.
22/// _In addition, some operations may cause deallocations as well._
23#[derive(PartialEq)]
24pub enum CleanupMode{
25    /// Cleanup will be called when chunk fully read.
26    ///
27    /// In this mode memory will be freed ASAP - right in the end of reader consumption session.
28    ///
29    /// !! Not allowed for spmc !!
30    OnChunkRead,
31    /// Cleanup will be called when new chunk created.
32    OnNewChunk,
33    /// Cleanup will never be called. You should call `EventQueue::cleanup` manually.
34    Never
35}
36
37pub trait Settings{
38    const MIN_CHUNK_SIZE : u32;
39    const MAX_CHUNK_SIZE : u32;
40    const CLEANUP        : CleanupMode;
41
42    // for spmc/mpmc
43    /// Lock on new chunk cleanup event. Will dead-lock if already locked.
44    const LOCK_ON_NEW_CHUNK_CLEANUP: bool;
45    /// Call cleanup on unsubscribe?
46    const CLEANUP_IN_UNSUBSCRIBE: bool;
47}
48
49pub struct List<T, S: Settings>{
50    first: *mut DynamicChunk<T, S>,
51    last : *mut DynamicChunk<T, S>,
52    chunk_id_counter: usize,
53    total_capacity: usize,
54
55    readers_count: u32,
56
57    /// 0 - means no penult
58    penult_chunk_size: u32,
59
60    #[cfg(feature = "double_buffering")]
61    /// Biggest freed chunk
62    free_chunk: Option<DynamicChunkRecycled<T, S>>,
63}
64
65pub struct EventQueue<T, S: Settings>{
66    pub(crate) list  : Mutex<List<T, S>>,
67
68    /// Separate lock from list::start_position_epoch, is safe, because start_point_epoch encoded in
69    /// chunk's atomic len+epoch.
70    // TODO: Make RWLock? Bench.
71    // TODO: Optioned
72    pub(crate) start_position: SpinMutex<Option<Cursor<T, S>>>,
73
74    _pinned: PhantomPinned,
75}
76
77//unsafe impl<T, S: Settings> Send for EventQueue<T, S>{}
78//unsafe impl<T, S: Settings> Sync for EventQueue<T, S>{}
79
80impl<T, S: Settings> EventQueue<T, S>
81{
82    pub fn with_capacity(new_capacity: u32) -> Pin<Arc<Self>>{
83        assert!(S::MIN_CHUNK_SIZE <= new_capacity && new_capacity <= S::MAX_CHUNK_SIZE);
84
85        let this = Arc::new(Self{
86            list: Mutex::new(List{
87                first: null_mut(),
88                last: null_mut(),
89                chunk_id_counter: 0,
90                readers_count:0,
91                total_capacity:new_capacity as usize,
92                penult_chunk_size : 0,
93
94                #[cfg(feature = "double_buffering")]
95                free_chunk: None,
96            }),
97            start_position: SpinMutex::new(None),
98            _pinned: PhantomPinned,
99        });
100
101        let node = DynamicChunk::<T, S>::construct(
102            0, StartPositionEpoch::zero(), &*this, new_capacity as usize);
103
104        {
105            let mut list = this.list.lock();
106            list.first = node;
107            list.last  = node;
108        }
109
110        unsafe{ Pin::new_unchecked(this) }
111    }
112
113    #[inline]
114    fn add_chunk_sized(&self, list: &mut List<T, S>, size: usize) -> &mut DynamicChunk<T, S>{
115        let node = unsafe{&mut *list.last};
116        let epoch = node.chunk_state(Ordering::Relaxed).epoch();
117
118        // make new node
119        list.chunk_id_counter += 1;
120
121        #[cfg(not(feature = "double_buffering"))]
122        let new_node = DynamicChunk::<T, S>::construct(list.chunk_id_counter, epoch, self, size);
123
124        #[cfg(feature = "double_buffering")]
125        let new_node = {
126            let mut new_node: *mut DynamicChunk<T, S> = null_mut();
127
128            if let Some(recycled_chunk) = &list.free_chunk {
129                // Check if recycled_chunk have exact capacity.
130                if recycled_chunk.capacity() == size {
131                    // unwrap_unchecked()
132                    new_node =
133                    match list.free_chunk.take() {
134                        Some(recycled_chunk) => {
135                            unsafe { DynamicChunk::from_recycled(
136                                recycled_chunk,
137                                list.chunk_id_counter,
138                                epoch) }
139                        }, None => unsafe { std::hint::unreachable_unchecked() },
140                    }
141                } else {
142                    // TODO: try free in cleanup somehow
143                    list.free_chunk = None;
144                }
145            }
146
147            if new_node.is_null(){
148                new_node = DynamicChunk::<T, S>::construct(list.chunk_id_counter, epoch, self, size);
149            }
150            new_node
151        };
152
153        // connect
154        node.set_next(new_node, Ordering::Release);
155        list.last = new_node;
156        list.penult_chunk_size = node.capacity() as u32;
157        list.total_capacity += size;
158
159        unsafe{&mut *new_node}
160    }
161
162    #[inline]
163    fn on_new_chunk_cleanup(&self, list: &mut List<T, S>){
164        if S::CLEANUP == CleanupMode::OnNewChunk{
165            // this should acts as compile-time-if.
166            if S::LOCK_ON_NEW_CHUNK_CLEANUP{
167                let _lock = self.list.lock();
168                self.cleanup_impl(list);
169            } else {
170                self.cleanup_impl(list);
171            }
172        }
173    }
174
175    #[inline]
176    fn add_chunk(&self, list: &mut List<T, S>) -> &mut DynamicChunk<T, S>{
177        let node = unsafe{&*list.last};
178
179        self.on_new_chunk_cleanup(list);
180
181        // Size pattern 4,4,8,8,16,16
182        let new_size: usize = {
183            if list.penult_chunk_size as usize == node.capacity(){
184                std::cmp::min(node.capacity() * 2, S::MAX_CHUNK_SIZE as usize)
185            } else {
186                node.capacity()
187            }
188        };
189
190        self.add_chunk_sized(list, new_size)
191    }
192
193    // Have 10% better performance. Observable in spmc.
194    #[inline]
195    pub fn push(&self, list: &mut List<T, S>, value: T){
196        let mut node = unsafe{&mut *list.last};
197
198        // Relaxed because we update only under lock
199        let chunk_state = node.chunk_state(Ordering::Relaxed);
200        let mut storage_len = chunk_state.len();
201
202        if /*unlikely*/ storage_len == node.capacity() as u32{
203            node = self.add_chunk(&mut *list);
204            storage_len = 0;
205        }
206
207        unsafe { node.push_at(value, storage_len, chunk_state, Ordering::Release); }
208    }
209
210/*
211    #[inline]
212    pub fn push(&self, list: &mut List<T, S>, value: T){
213        let node = unsafe{&mut *list.last};
214
215        if let Err(err) = node.try_push(value, Ordering::Release){
216            unsafe {
217                self.add_chunk(&mut *list)
218                    .push_unchecked(err.value, Ordering::Release);
219            }
220        }
221    }
222*/
223
224    // Not an Extend trait, because Extend::extend(&mut self)
225    #[inline]
226    pub fn extend<I>(&self, list: &mut List<T, S>, iter: I)
227        where I: IntoIterator<Item = T>
228    {
229        let mut node = unsafe{&mut *list.last};
230
231        let mut iter = iter.into_iter();
232
233        while node.extend(&mut iter, Ordering::Release).is_err(){
234            match iter.next() {
235                None => {return;}
236                Some(value) => {
237                    // add chunk and push value there
238                    node = self.add_chunk(&mut *list);
239                    unsafe{ node.push_unchecked(value, Ordering::Relaxed); }
240                }
241            };
242        }
243    }
244
245    /// EventReader will start receive events from NOW.
246    /// It will not see events that was pushed BEFORE subscription.
247    pub fn subscribe(&self, list: &mut List<T, S>) -> EventReader<T, S>{
248        if list.readers_count == 0{
249            // Keep alive. Decrements in unsubscribe
250            unsafe { Arc::increment_strong_count(self); }
251        }
252        list.readers_count += 1;
253
254        let last_chunk = unsafe{&*list.last};
255        let chunk_state = last_chunk.chunk_state(Ordering::Relaxed);
256
257        // Enter chunk
258        last_chunk.readers_entered().fetch_add(1, Ordering::AcqRel);
259
260        EventReader{
261            position: Cursor{chunk: last_chunk, index: chunk_state.len() as usize},
262            start_position_epoch: chunk_state.epoch()
263        }
264    }
265
266    // Called from EventReader Drop
267    //
268    // `this_ptr` instead of `&self`, because `&self` as reference should be valid during
269    // function call. And we drop it sometimes.... through `Arc::decrement_strong_count`.
270    pub(crate) fn unsubscribe(this_ptr: NonNull<Self>, event_reader: &EventReader<T, S>){
271        let this = unsafe { this_ptr.as_ref() };
272        let mut list = this.list.lock();
273
274        // Exit chunk
275        unsafe{&*event_reader.position.chunk}.read_completely_times().fetch_add(1, Ordering::AcqRel);
276
277        if S::CLEANUP_IN_UNSUBSCRIBE && S::CLEANUP != CleanupMode::Never{
278            if list.first as *const _ == event_reader.position.chunk {
279                this.cleanup_impl(&mut *list);
280            }
281        }
282
283        list.readers_count -= 1;
284        if list.readers_count == 0{
285            drop(list);
286            // Safe to self-destruct
287            unsafe { Arc::decrement_strong_count(this_ptr.as_ptr()); }
288        }
289    }
290
291    unsafe fn free_chunk<const LOCK_ON_WRITE_START_POSITION: bool>(
292        &self,
293        chunk: *mut DynamicChunk<T, S>,
294        list: &mut List<T, S>)
295    {
296        if let Some(start_position) = *self.start_position.as_mut_ptr(){
297            if start_position.chunk == chunk{
298                if LOCK_ON_WRITE_START_POSITION{
299                    *self.start_position.lock() = None;
300                } else {
301                    *self.start_position.as_mut_ptr() = None;
302                }
303            }
304        }
305
306        list.total_capacity -= (*chunk).capacity();
307
308        #[cfg(not(feature = "double_buffering"))]
309        {
310            DynamicChunk::destruct(chunk);
311            std::mem::drop(list);   // just for use
312        }
313
314        #[cfg(feature = "double_buffering")]
315        {
316            if let Some(free_chunk) = &list.free_chunk {
317                if free_chunk.capacity() >= (*chunk).capacity() {
318                    // Discard - recycled chunk bigger then our
319                    DynamicChunk::destruct(chunk);
320                    return;
321                }
322            }
323            // Replace free_chunk with our.
324            list.free_chunk = Some(DynamicChunk::recycle(chunk));
325        }
326    }
327
328    fn cleanup_impl(&self, list: &mut List<T, S>){
329        unsafe {
330            // using _ptr version, because with &chunk - reference should be valid during whole
331            // lambda function call. (according to miri and some rust borrowing rules).
332            // And we actually drop that chunk.
333            foreach_chunk_ptr_mut(
334                list.first,
335                list.last,
336                Ordering::Relaxed,      // we're under mutex
337                |chunk_ptr| {
338                    // Do not lock prev_chunk.chunk_switch_mutex because we traverse in order.
339                    let chunk = &mut *chunk_ptr;
340                    let chunk_readers = chunk.readers_entered().load(Ordering::Acquire);
341                    let chunk_read_times = chunk.read_completely_times().load(Ordering::Acquire);
342                    // Cleanup only in order
343                    if chunk_readers != chunk_read_times {
344                        return Break(());
345                    }
346
347                    let next_chunk_ptr = chunk.next(Ordering::Relaxed);
348                    debug_assert!(!next_chunk_ptr.is_null());
349
350                    debug_assert!(std::ptr::eq(chunk, list.first));
351                    // Do not lock start_position permanently, because reader will
352                    // never enter chunk before list.first
353                    self.free_chunk::<true>(chunk, list);
354                    list.first = next_chunk_ptr;
355
356                    Continue(())
357                }
358            );
359        }
360        if list.first == list.last{
361            list.penult_chunk_size = 0;
362        }
363    }
364
365    /// This will traverse up to the start_point - and will free all unoccupied chunks. (out-of-order cleanup)
366    /// This one slower then cleanup_impl.
367    fn force_cleanup_impl(&self, list: &mut List<T, S>){
368        self.cleanup_impl(list);
369
370        // Lock start_position permanently, due to out of order chunk destruction.
371        // Reader can try enter in the chunk in the middle of force_cleanup execution.
372        let start_position = self.start_position.lock();
373        let terminal_chunk = match &*start_position{
374            None => { return; }
375            Some(cursor) => {cursor.chunk}
376        };
377        if list.first as *const _ == terminal_chunk{
378            return;
379        }
380        unsafe {
381            // cleanup_impl dealt with first chunk before. Omit.
382            let mut prev_chunk = list.first;
383            // using _ptr version, because with &chunk - reference should be valid during whole
384            // lambda function call. (according to miri and some rust borrowing rules).
385            // And we actually drop that chunk.
386            foreach_chunk_ptr_mut(
387                (*list.first).next(Ordering::Relaxed),
388                terminal_chunk,
389                Ordering::Relaxed,      // we're under mutex
390                |chunk| {
391                    // We need to lock only `prev_chunk`, because it is impossible
392                    // to get in `chunk` omitting chunk.readers_entered+1
393                    let lock = (*prev_chunk).chunk_switch_mutex().write();
394                        let chunk_readers = (*chunk).readers_entered().load(Ordering::Acquire);
395                        let chunk_read_times = (*chunk).read_completely_times().load(Ordering::Acquire);
396                        if chunk_readers != chunk_read_times {
397                            prev_chunk = chunk;
398                            return Continue(());
399                        }
400
401                        let next_chunk_ptr = (*chunk).next(Ordering::Relaxed);
402                        debug_assert!(!next_chunk_ptr.is_null());
403
404                        (*prev_chunk).set_next(next_chunk_ptr, Ordering::Release);
405                    drop(lock);
406
407                    self.free_chunk::<false>(chunk, list);
408                    Continue(())
409                }
410            );
411        }
412    }
413
414    pub fn cleanup(&self){
415        self.cleanup_impl(&mut *self.list.lock());
416    }
417
418    #[inline]
419    fn set_start_position(
420        &self,
421        list: &mut List<T, S>,
422        new_start_position: Cursor<T, S>)
423    {
424        *self.start_position.lock() = Some(new_start_position);
425
426        // update len_and_start_position_epoch in each chunk
427        let first_chunk = unsafe{&mut *list.first};
428        let new_epoch = first_chunk.chunk_state(Ordering::Relaxed).epoch().increment();
429        unsafe {
430            foreach_chunk_mut(
431                first_chunk,
432                null(),
433                Ordering::Relaxed,      // we're under mutex
434                |chunk| {
435                    chunk.set_epoch(new_epoch, Ordering::Relaxed, Ordering::Release);
436                    Continue(())
437                }
438            );
439        }
440    }
441
442    pub fn clear(&self, list: &mut List<T, S>){
443        let last_chunk = unsafe{ &*list.last };
444        let last_chunk_len = last_chunk.chunk_state(Ordering::Relaxed).len() as usize;
445
446        self.set_start_position(list, Cursor {
447            chunk: last_chunk,
448            index: last_chunk_len
449        });
450
451        self.force_cleanup_impl(list);
452    }
453
454    pub fn truncate_front(&self, list: &mut List<T, S>, len: usize) {
455        // make chunks* array
456
457        // TODO: subtract from total_capacity
458        // TODO: use small_vec
459        // TODO: loop if > 128?
460        // there is no way we can have memory enough to hold > 2^64 bytes.
461        let mut chunks : [*const DynamicChunk<T, S>; 128] = [null(); 128];
462        let chunks_count=
463            unsafe {
464                let mut i = 0;
465                foreach_chunk(
466                    list.first,
467                    null(),
468                    Ordering::Relaxed,      // we're under mutex
469                    |chunk| {
470                        chunks[i] = chunk;
471                        i+=1;
472                        Continue(())
473                    }
474                );
475                i
476            };
477
478        let mut total_len = 0;
479        for i in (0..chunks_count).rev(){
480            let chunk = unsafe{ &*chunks[i] };
481            let chunk_len = chunk.chunk_state(Ordering::Relaxed).len() as usize;
482            total_len += chunk_len;
483            if total_len >= len{
484                let new_start_position = Cursor {
485                    chunk: chunks[i],
486                    index: total_len - len
487                };
488                // Do we actually need to truncate?
489                if let Some(start_position) = unsafe{*self.start_position.as_mut_ptr()}{
490                    if start_position >= new_start_position{
491                        return;
492                    }
493                }
494
495                self.set_start_position(list, new_start_position);
496                self.force_cleanup_impl(list);
497                return;
498            }
499        }
500
501        // len is bigger then total_len.
502        // do nothing.
503    }
504
505    pub fn change_chunk_capacity(&self, list: &mut List<T, S>, new_capacity: u32){
506        assert!(S::MIN_CHUNK_SIZE <= new_capacity && new_capacity <= S::MAX_CHUNK_SIZE);
507        self.on_new_chunk_cleanup(list);
508        self.add_chunk_sized(&mut *list, new_capacity as usize);
509    }
510
511    pub fn total_capacity(&self, list: &List<T, S>) -> usize {
512        list.total_capacity
513    }
514
515    pub fn chunk_capacity(&self, list: &List<T, S>) -> usize {
516        unsafe { (*list.last).capacity() }
517    }
518
519/*
520    // chunks_count can be atomic. But does that needed?
521    pub fn chunks_count(&self) -> usize {
522        let list = self.list.lock();
523        unsafe{
524            list.chunk_id_counter/*(*list.last).id*/ - (*list.first).id() + 1
525        }
526    }*/
527}
528
529impl<T, S: Settings> Drop for EventQueue<T, S>{
530    fn drop(&mut self) {
531        let list = self.list.get_mut();
532        debug_assert!(list.readers_count == 0);
533        unsafe{
534            let mut node_ptr = list.first;
535            while node_ptr != null_mut() {
536                let node = &mut *node_ptr;
537                node_ptr = node.next(Ordering::Relaxed);
538                DynamicChunk::destruct(node);
539            }
540        }
541    }
542}
543
544#[inline(always)]
545pub(super) unsafe fn foreach_chunk<T, F, S: Settings>
546(
547    start_chunk_ptr : *const DynamicChunk<T, S>,
548    end_chunk_ptr   : *const DynamicChunk<T, S>,
549    load_ordering   : Ordering,
550    mut func : F
551)
552    where F: FnMut(&DynamicChunk<T, S>) -> ControlFlow<()>
553{
554    foreach_chunk_mut(
555        start_chunk_ptr as *mut _,
556        end_chunk_ptr,
557        load_ordering,
558        |mut_chunk| func(mut_chunk)
559    );
560}
561
562/// end_chunk_ptr may be null
563#[inline(always)]
564pub(super) unsafe fn foreach_chunk_mut<T, F, S: Settings>
565(
566    start_chunk_ptr : *mut DynamicChunk<T, S>,
567    end_chunk_ptr   : *const DynamicChunk<T, S>,
568    load_ordering   : Ordering,
569    mut func : F
570)
571    where F: FnMut(&mut DynamicChunk<T, S>) -> ControlFlow<()>
572{
573    foreach_chunk_ptr_mut(
574        start_chunk_ptr,
575        end_chunk_ptr,
576        load_ordering,
577        |mut_chunk_ptr| func(&mut *mut_chunk_ptr)
578    );
579}
580
581/// end_chunk_ptr may be null
582#[inline(always)]
583pub(super) unsafe fn foreach_chunk_ptr_mut<T, F, S: Settings>
584(
585    start_chunk_ptr : *mut DynamicChunk<T, S>,
586    end_chunk_ptr   : *const DynamicChunk<T, S>,
587    load_ordering   : Ordering,
588    mut func : F
589)
590    where F: FnMut(*mut DynamicChunk<T, S>) -> ControlFlow<()>
591{
592    debug_assert!(!start_chunk_ptr.is_null());
593    debug_assert!(
594        end_chunk_ptr.is_null()
595            ||
596        std::ptr::eq((*start_chunk_ptr).event(), (*end_chunk_ptr).event())
597    );
598    debug_assert!(
599        end_chunk_ptr.is_null()
600            ||
601        (*start_chunk_ptr).id() <= (*end_chunk_ptr).id()
602    );
603
604    let mut chunk_ptr = start_chunk_ptr;
605    while !chunk_ptr.is_null(){
606        if chunk_ptr as *const _ == end_chunk_ptr {
607            break;
608        }
609
610        // chunk can be dropped inside `func`, so fetch `next` beforehand
611        let next_chunk_ptr = (*chunk_ptr).next(load_ordering);
612
613        let proceed = func(chunk_ptr);
614        if proceed == Break(()) {
615            break;
616        }
617
618        chunk_ptr = next_chunk_ptr;
619    }
620}