1#[cfg(not(loom))]
2#[cfg(test)]
3mod test;
4
5use crate::sync::{Ordering};
6use crate::sync::{Mutex, Arc};
7use crate::sync::{SpinMutex};
8
9use std::ptr::{null_mut, null, NonNull};
10use crate::event_reader::EventReader;
11use std::ops::ControlFlow;
12use std::ops::ControlFlow::{Continue, Break};
13use std::marker::PhantomPinned;
14use std::pin::Pin;
15use crate::cursor::Cursor;
16use crate::dynamic_chunk::{DynamicChunk};
17#[cfg(feature = "double_buffering")]
18use crate::dynamic_chunk::{DynamicChunkRecycled};
19use crate::{StartPositionEpoch};
20
21#[derive(PartialEq)]
24pub enum CleanupMode{
25 OnChunkRead,
31 OnNewChunk,
33 Never
35}
36
37pub trait Settings{
38 const MIN_CHUNK_SIZE : u32;
39 const MAX_CHUNK_SIZE : u32;
40 const CLEANUP : CleanupMode;
41
42 const LOCK_ON_NEW_CHUNK_CLEANUP: bool;
45 const CLEANUP_IN_UNSUBSCRIBE: bool;
47}
48
49pub struct List<T, S: Settings>{
50 first: *mut DynamicChunk<T, S>,
51 last : *mut DynamicChunk<T, S>,
52 chunk_id_counter: usize,
53 total_capacity: usize,
54
55 readers_count: u32,
56
57 penult_chunk_size: u32,
59
60 #[cfg(feature = "double_buffering")]
61 free_chunk: Option<DynamicChunkRecycled<T, S>>,
63}
64
65pub struct EventQueue<T, S: Settings>{
66 pub(crate) list : Mutex<List<T, S>>,
67
68 pub(crate) start_position: SpinMutex<Option<Cursor<T, S>>>,
73
74 _pinned: PhantomPinned,
75}
76
77impl<T, S: Settings> EventQueue<T, S>
81{
82 pub fn with_capacity(new_capacity: u32) -> Pin<Arc<Self>>{
83 assert!(S::MIN_CHUNK_SIZE <= new_capacity && new_capacity <= S::MAX_CHUNK_SIZE);
84
85 let this = Arc::new(Self{
86 list: Mutex::new(List{
87 first: null_mut(),
88 last: null_mut(),
89 chunk_id_counter: 0,
90 readers_count:0,
91 total_capacity:new_capacity as usize,
92 penult_chunk_size : 0,
93
94 #[cfg(feature = "double_buffering")]
95 free_chunk: None,
96 }),
97 start_position: SpinMutex::new(None),
98 _pinned: PhantomPinned,
99 });
100
101 let node = DynamicChunk::<T, S>::construct(
102 0, StartPositionEpoch::zero(), &*this, new_capacity as usize);
103
104 {
105 let mut list = this.list.lock();
106 list.first = node;
107 list.last = node;
108 }
109
110 unsafe{ Pin::new_unchecked(this) }
111 }
112
113 #[inline]
114 fn add_chunk_sized(&self, list: &mut List<T, S>, size: usize) -> &mut DynamicChunk<T, S>{
115 let node = unsafe{&mut *list.last};
116 let epoch = node.chunk_state(Ordering::Relaxed).epoch();
117
118 list.chunk_id_counter += 1;
120
121 #[cfg(not(feature = "double_buffering"))]
122 let new_node = DynamicChunk::<T, S>::construct(list.chunk_id_counter, epoch, self, size);
123
124 #[cfg(feature = "double_buffering")]
125 let new_node = {
126 let mut new_node: *mut DynamicChunk<T, S> = null_mut();
127
128 if let Some(recycled_chunk) = &list.free_chunk {
129 if recycled_chunk.capacity() == size {
131 new_node =
133 match list.free_chunk.take() {
134 Some(recycled_chunk) => {
135 unsafe { DynamicChunk::from_recycled(
136 recycled_chunk,
137 list.chunk_id_counter,
138 epoch) }
139 }, None => unsafe { std::hint::unreachable_unchecked() },
140 }
141 } else {
142 list.free_chunk = None;
144 }
145 }
146
147 if new_node.is_null(){
148 new_node = DynamicChunk::<T, S>::construct(list.chunk_id_counter, epoch, self, size);
149 }
150 new_node
151 };
152
153 node.set_next(new_node, Ordering::Release);
155 list.last = new_node;
156 list.penult_chunk_size = node.capacity() as u32;
157 list.total_capacity += size;
158
159 unsafe{&mut *new_node}
160 }
161
162 #[inline]
163 fn on_new_chunk_cleanup(&self, list: &mut List<T, S>){
164 if S::CLEANUP == CleanupMode::OnNewChunk{
165 if S::LOCK_ON_NEW_CHUNK_CLEANUP{
167 let _lock = self.list.lock();
168 self.cleanup_impl(list);
169 } else {
170 self.cleanup_impl(list);
171 }
172 }
173 }
174
175 #[inline]
176 fn add_chunk(&self, list: &mut List<T, S>) -> &mut DynamicChunk<T, S>{
177 let node = unsafe{&*list.last};
178
179 self.on_new_chunk_cleanup(list);
180
181 let new_size: usize = {
183 if list.penult_chunk_size as usize == node.capacity(){
184 std::cmp::min(node.capacity() * 2, S::MAX_CHUNK_SIZE as usize)
185 } else {
186 node.capacity()
187 }
188 };
189
190 self.add_chunk_sized(list, new_size)
191 }
192
193 #[inline]
195 pub fn push(&self, list: &mut List<T, S>, value: T){
196 let mut node = unsafe{&mut *list.last};
197
198 let chunk_state = node.chunk_state(Ordering::Relaxed);
200 let mut storage_len = chunk_state.len();
201
202 if storage_len == node.capacity() as u32{
203 node = self.add_chunk(&mut *list);
204 storage_len = 0;
205 }
206
207 unsafe { node.push_at(value, storage_len, chunk_state, Ordering::Release); }
208 }
209
210#[inline]
226 pub fn extend<I>(&self, list: &mut List<T, S>, iter: I)
227 where I: IntoIterator<Item = T>
228 {
229 let mut node = unsafe{&mut *list.last};
230
231 let mut iter = iter.into_iter();
232
233 while node.extend(&mut iter, Ordering::Release).is_err(){
234 match iter.next() {
235 None => {return;}
236 Some(value) => {
237 node = self.add_chunk(&mut *list);
239 unsafe{ node.push_unchecked(value, Ordering::Relaxed); }
240 }
241 };
242 }
243 }
244
245 pub fn subscribe(&self, list: &mut List<T, S>) -> EventReader<T, S>{
248 if list.readers_count == 0{
249 unsafe { Arc::increment_strong_count(self); }
251 }
252 list.readers_count += 1;
253
254 let last_chunk = unsafe{&*list.last};
255 let chunk_state = last_chunk.chunk_state(Ordering::Relaxed);
256
257 last_chunk.readers_entered().fetch_add(1, Ordering::AcqRel);
259
260 EventReader{
261 position: Cursor{chunk: last_chunk, index: chunk_state.len() as usize},
262 start_position_epoch: chunk_state.epoch()
263 }
264 }
265
266 pub(crate) fn unsubscribe(this_ptr: NonNull<Self>, event_reader: &EventReader<T, S>){
271 let this = unsafe { this_ptr.as_ref() };
272 let mut list = this.list.lock();
273
274 unsafe{&*event_reader.position.chunk}.read_completely_times().fetch_add(1, Ordering::AcqRel);
276
277 if S::CLEANUP_IN_UNSUBSCRIBE && S::CLEANUP != CleanupMode::Never{
278 if list.first as *const _ == event_reader.position.chunk {
279 this.cleanup_impl(&mut *list);
280 }
281 }
282
283 list.readers_count -= 1;
284 if list.readers_count == 0{
285 drop(list);
286 unsafe { Arc::decrement_strong_count(this_ptr.as_ptr()); }
288 }
289 }
290
291 unsafe fn free_chunk<const LOCK_ON_WRITE_START_POSITION: bool>(
292 &self,
293 chunk: *mut DynamicChunk<T, S>,
294 list: &mut List<T, S>)
295 {
296 if let Some(start_position) = *self.start_position.as_mut_ptr(){
297 if start_position.chunk == chunk{
298 if LOCK_ON_WRITE_START_POSITION{
299 *self.start_position.lock() = None;
300 } else {
301 *self.start_position.as_mut_ptr() = None;
302 }
303 }
304 }
305
306 list.total_capacity -= (*chunk).capacity();
307
308 #[cfg(not(feature = "double_buffering"))]
309 {
310 DynamicChunk::destruct(chunk);
311 std::mem::drop(list); }
313
314 #[cfg(feature = "double_buffering")]
315 {
316 if let Some(free_chunk) = &list.free_chunk {
317 if free_chunk.capacity() >= (*chunk).capacity() {
318 DynamicChunk::destruct(chunk);
320 return;
321 }
322 }
323 list.free_chunk = Some(DynamicChunk::recycle(chunk));
325 }
326 }
327
328 fn cleanup_impl(&self, list: &mut List<T, S>){
329 unsafe {
330 foreach_chunk_ptr_mut(
334 list.first,
335 list.last,
336 Ordering::Relaxed, |chunk_ptr| {
338 let chunk = &mut *chunk_ptr;
340 let chunk_readers = chunk.readers_entered().load(Ordering::Acquire);
341 let chunk_read_times = chunk.read_completely_times().load(Ordering::Acquire);
342 if chunk_readers != chunk_read_times {
344 return Break(());
345 }
346
347 let next_chunk_ptr = chunk.next(Ordering::Relaxed);
348 debug_assert!(!next_chunk_ptr.is_null());
349
350 debug_assert!(std::ptr::eq(chunk, list.first));
351 self.free_chunk::<true>(chunk, list);
354 list.first = next_chunk_ptr;
355
356 Continue(())
357 }
358 );
359 }
360 if list.first == list.last{
361 list.penult_chunk_size = 0;
362 }
363 }
364
365 fn force_cleanup_impl(&self, list: &mut List<T, S>){
368 self.cleanup_impl(list);
369
370 let start_position = self.start_position.lock();
373 let terminal_chunk = match &*start_position{
374 None => { return; }
375 Some(cursor) => {cursor.chunk}
376 };
377 if list.first as *const _ == terminal_chunk{
378 return;
379 }
380 unsafe {
381 let mut prev_chunk = list.first;
383 foreach_chunk_ptr_mut(
387 (*list.first).next(Ordering::Relaxed),
388 terminal_chunk,
389 Ordering::Relaxed, |chunk| {
391 let lock = (*prev_chunk).chunk_switch_mutex().write();
394 let chunk_readers = (*chunk).readers_entered().load(Ordering::Acquire);
395 let chunk_read_times = (*chunk).read_completely_times().load(Ordering::Acquire);
396 if chunk_readers != chunk_read_times {
397 prev_chunk = chunk;
398 return Continue(());
399 }
400
401 let next_chunk_ptr = (*chunk).next(Ordering::Relaxed);
402 debug_assert!(!next_chunk_ptr.is_null());
403
404 (*prev_chunk).set_next(next_chunk_ptr, Ordering::Release);
405 drop(lock);
406
407 self.free_chunk::<false>(chunk, list);
408 Continue(())
409 }
410 );
411 }
412 }
413
414 pub fn cleanup(&self){
415 self.cleanup_impl(&mut *self.list.lock());
416 }
417
418 #[inline]
419 fn set_start_position(
420 &self,
421 list: &mut List<T, S>,
422 new_start_position: Cursor<T, S>)
423 {
424 *self.start_position.lock() = Some(new_start_position);
425
426 let first_chunk = unsafe{&mut *list.first};
428 let new_epoch = first_chunk.chunk_state(Ordering::Relaxed).epoch().increment();
429 unsafe {
430 foreach_chunk_mut(
431 first_chunk,
432 null(),
433 Ordering::Relaxed, |chunk| {
435 chunk.set_epoch(new_epoch, Ordering::Relaxed, Ordering::Release);
436 Continue(())
437 }
438 );
439 }
440 }
441
442 pub fn clear(&self, list: &mut List<T, S>){
443 let last_chunk = unsafe{ &*list.last };
444 let last_chunk_len = last_chunk.chunk_state(Ordering::Relaxed).len() as usize;
445
446 self.set_start_position(list, Cursor {
447 chunk: last_chunk,
448 index: last_chunk_len
449 });
450
451 self.force_cleanup_impl(list);
452 }
453
454 pub fn truncate_front(&self, list: &mut List<T, S>, len: usize) {
455 let mut chunks : [*const DynamicChunk<T, S>; 128] = [null(); 128];
462 let chunks_count=
463 unsafe {
464 let mut i = 0;
465 foreach_chunk(
466 list.first,
467 null(),
468 Ordering::Relaxed, |chunk| {
470 chunks[i] = chunk;
471 i+=1;
472 Continue(())
473 }
474 );
475 i
476 };
477
478 let mut total_len = 0;
479 for i in (0..chunks_count).rev(){
480 let chunk = unsafe{ &*chunks[i] };
481 let chunk_len = chunk.chunk_state(Ordering::Relaxed).len() as usize;
482 total_len += chunk_len;
483 if total_len >= len{
484 let new_start_position = Cursor {
485 chunk: chunks[i],
486 index: total_len - len
487 };
488 if let Some(start_position) = unsafe{*self.start_position.as_mut_ptr()}{
490 if start_position >= new_start_position{
491 return;
492 }
493 }
494
495 self.set_start_position(list, new_start_position);
496 self.force_cleanup_impl(list);
497 return;
498 }
499 }
500
501 }
504
505 pub fn change_chunk_capacity(&self, list: &mut List<T, S>, new_capacity: u32){
506 assert!(S::MIN_CHUNK_SIZE <= new_capacity && new_capacity <= S::MAX_CHUNK_SIZE);
507 self.on_new_chunk_cleanup(list);
508 self.add_chunk_sized(&mut *list, new_capacity as usize);
509 }
510
511 pub fn total_capacity(&self, list: &List<T, S>) -> usize {
512 list.total_capacity
513 }
514
515 pub fn chunk_capacity(&self, list: &List<T, S>) -> usize {
516 unsafe { (*list.last).capacity() }
517 }
518
519}
528
529impl<T, S: Settings> Drop for EventQueue<T, S>{
530 fn drop(&mut self) {
531 let list = self.list.get_mut();
532 debug_assert!(list.readers_count == 0);
533 unsafe{
534 let mut node_ptr = list.first;
535 while node_ptr != null_mut() {
536 let node = &mut *node_ptr;
537 node_ptr = node.next(Ordering::Relaxed);
538 DynamicChunk::destruct(node);
539 }
540 }
541 }
542}
543
544#[inline(always)]
545pub(super) unsafe fn foreach_chunk<T, F, S: Settings>
546(
547 start_chunk_ptr : *const DynamicChunk<T, S>,
548 end_chunk_ptr : *const DynamicChunk<T, S>,
549 load_ordering : Ordering,
550 mut func : F
551)
552 where F: FnMut(&DynamicChunk<T, S>) -> ControlFlow<()>
553{
554 foreach_chunk_mut(
555 start_chunk_ptr as *mut _,
556 end_chunk_ptr,
557 load_ordering,
558 |mut_chunk| func(mut_chunk)
559 );
560}
561
562#[inline(always)]
564pub(super) unsafe fn foreach_chunk_mut<T, F, S: Settings>
565(
566 start_chunk_ptr : *mut DynamicChunk<T, S>,
567 end_chunk_ptr : *const DynamicChunk<T, S>,
568 load_ordering : Ordering,
569 mut func : F
570)
571 where F: FnMut(&mut DynamicChunk<T, S>) -> ControlFlow<()>
572{
573 foreach_chunk_ptr_mut(
574 start_chunk_ptr,
575 end_chunk_ptr,
576 load_ordering,
577 |mut_chunk_ptr| func(&mut *mut_chunk_ptr)
578 );
579}
580
581#[inline(always)]
583pub(super) unsafe fn foreach_chunk_ptr_mut<T, F, S: Settings>
584(
585 start_chunk_ptr : *mut DynamicChunk<T, S>,
586 end_chunk_ptr : *const DynamicChunk<T, S>,
587 load_ordering : Ordering,
588 mut func : F
589)
590 where F: FnMut(*mut DynamicChunk<T, S>) -> ControlFlow<()>
591{
592 debug_assert!(!start_chunk_ptr.is_null());
593 debug_assert!(
594 end_chunk_ptr.is_null()
595 ||
596 std::ptr::eq((*start_chunk_ptr).event(), (*end_chunk_ptr).event())
597 );
598 debug_assert!(
599 end_chunk_ptr.is_null()
600 ||
601 (*start_chunk_ptr).id() <= (*end_chunk_ptr).id()
602 );
603
604 let mut chunk_ptr = start_chunk_ptr;
605 while !chunk_ptr.is_null(){
606 if chunk_ptr as *const _ == end_chunk_ptr {
607 break;
608 }
609
610 let next_chunk_ptr = (*chunk_ptr).next(load_ordering);
612
613 let proceed = func(chunk_ptr);
614 if proceed == Break(()) {
615 break;
616 }
617
618 chunk_ptr = next_chunk_ptr;
619 }
620}