1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
#![forbid(unsafe_code)]
use crate::utils::*;
use event_listener::Event;
use std::{collections::VecDeque, marker::Unpin};
/// A simple event / revision queue
#[derive(Debug)]
#[must_use = "Queue does nothing unless you call .next() or some variation of it"]
pub struct Queue<T> {
// the $next field is partially shared, e.g. all queues derived from the same
// original queue can find the current $next value, but may be a bit behind
// (e.g. have unconsumed revisions,
// which should be iterated to get the current value)
next: NextRevision<T>,
// currently pending revisions
pub pending: VecDeque<T>,
// waiting next... calls
next_ops: Arc<Event>,
}
impl<T> Clone for Queue<T> {
#[inline]
fn clone(&self) -> Self {
Queue {
next: Arc::clone(&self.next),
pending: Default::default(),
next_ops: Arc::clone(&self.next_ops),
}
}
}
impl<T> Default for Queue<T> {
#[inline]
fn default() -> Self {
Queue {
next: Arc::new(AtomSetOnce::empty()),
pending: Default::default(),
next_ops: Arc::new(Default::default()),
}
}
}
impl<T: Unpin> Unpin for Queue<T> {}
impl<T: Send + 'static> Iterator for Queue<T> {
type Item = RevisionRef<T>;
fn next(&mut self) -> Option<RevisionRef<T>> {
fn intern_<T: Send + 'static>(this: &mut Queue<T>) -> Option<RevisionRef<T>> {
while let Some(data) = this.pending.pop_front() {
// 1. prepare revision
let latest = Arc::new(AtomSetOnce::empty());
let revnode = Box::new(RevisionNode {
data,
next: Arc::clone(&latest),
});
// 2. try to publish revision
// e.g. append to the first 'None' ptr in the 'latest' chain
if let Some((old, new)) =
RevisionRef::new_cas(&mut this.next, revnode)
{
// this publishing failed
this.pending.push_front((*new).data);
// we discovered a new revision, return that
return Some(old);
}
// publishing succeeded
// RevisionRef::new_cas doesn't update this.next in that case
this.next = latest;
// continue publishing until another thread interrupts us
}
RevisionRef::new(&this.next).map(|x| {
this.next = RevisionRef::next(&x);
x
})
}
let orig_pending_len = self.pending.len();
let ret = intern_(self);
// may have published something
if orig_pending_len != self.pending.len() {
self.next_ops.notify(usize::MAX);
}
ret
}
}
impl<T: Send + 'static> Queue<T> {
/// Similiar to [`Queue::next_blocking`], but `async`.
/// Only returns `None` if no other reference to the queue
/// exists anymore, thus, otherwise nothing could wake this up.
pub async fn next_async(&mut self) -> Option<RevisionRef<T>> {
loop {
// put ourselves into the waiting list
let listener = self.next_ops.listen();
if let ret @ Some(_) = self.next() {
// we got something, return
return ret;
} else if Arc::get_mut(&mut self.next_ops).is_some() {
// cancel if no one is listening
// skip publishing + notifying phase bc no one is listening
// we need to re-check to catch a race-condition between
// the call to $self.next and the check of $self.next_ops
// in between other queue instances may have been destroyed,
// but messages are still in the queue.
return RevisionRef::new(&self.next).map(|x| {
self.next = RevisionRef::next(&x);
x
});
} else {
listener.await;
}
}
}
/// Similiar to [`next`](Iterator::next), but
/// waits for an event on the WokeQueue, until at least one event
/// (or event block) got ready.
/// Only returns None if no other reference to the queue exists anymore,
/// to prevent a deadlock, because nothing could wake this up.
#[inline(always)]
pub fn next_blocking(&mut self) -> Option<RevisionRef<T>> {
futures_lite::future::block_on(self.next_async())
}
/// This method enqueues the pending revision for publishing.
/// The iterator **must** be "collected"/"polled"
/// (calling [`Iterator::next`] until it returns None) to publish them.
#[inline(always)]
pub fn enqueue(&mut self, pending: T) {
self.pending.push_back(pending);
}
/// Discards all newly published revisions and enforces publishing
/// of our pending revisions.
#[inline(always)]
pub fn skip_and_publish(&mut self) {
while self.next().is_some() {}
}
}
impl<T> Queue<T> {
#[inline(always)]
pub fn new() -> Self {
Default::default()
}
}
impl<T: std::fmt::Debug> Queue<T> {
/// Helper function, prints all unprocessed, newly published revisions
#[cold]
pub fn print_debug<W: std::io::Write>(
&self,
mut writer: W,
prefix: &str,
) -> std::io::Result<()> {
print_queue(&mut writer, Arc::clone(&self.next), prefix)?;
writeln!(writer, "{} pending = {:?}", prefix, &self.pending)?;
writeln!(
writer,
"{} next_ops = {:?} x{}",
prefix,
&self.next_ops,
Arc::strong_count(&self.next_ops)
)?;
Ok(())
}
}