1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
use std::{
cell::UnsafeCell,
sync::{
Arc,
atomic::{AtomicUsize, Ordering},
},
};
use io_uring::squeue::Entry;
use crate::{
FOUR_KB_BLOCK,
quik_io::{FlushableBuffer, SubmitQueueEntry},
state::State,
};
use crate::{
Buffer, BufferError, FLUSH_IN_PROGRESS_BIT, FlushBuffer, OFFSET_ONE, ONE_MEGABYTE_BLOCK,
SEALED_BIT, WRITER_ONE, state_flush_in_progress, state_offset, state_sealed, state_writers,
};
impl FlushBuffer {
/// Create a new `FlushBuffer` at ring position `buffer_number` with a
/// `size`-byte aligned backing allocation.
///
/// The initial LSS address slot is set to `buffer_number` so that buffers
/// are pre-assigned non-overlapping slots at construction time. The ring
/// will update this via [`set_new_address_space_range`](Self::set_new_address_space_range)
/// each time the buffer is reused.
pub fn new_buffer(buffer_number: usize, size: usize) -> FlushBuffer {
Self {
state: AtomicUsize::new(0),
buf: Arc::new(Buffer::new_aligned(size)),
pos: buffer_number,
local_address: AtomicUsize::new(0),
sqe: UnsafeCell::new(None),
}
}
pub fn numbered_buffer(buf_pos: usize) -> FlushBuffer {
Self {
state: AtomicUsize::new(0),
buf: Arc::new(Buffer::new_aligned(ONE_MEGABYTE_BLOCK)),
pos: buf_pos,
local_address: AtomicUsize::new(0),
sqe: UnsafeCell::new(None),
}
}
/// Return `true` if this buffer is open to new writers.
///
/// A buffer is available when neither the sealed bit nor the
/// flush-in-progress bit is set.
pub fn is_available(&self) -> bool {
let state = self.state.load(Ordering::Acquire);
(state & (SEALED_BIT | FLUSH_IN_PROGRESS_BIT)) == 0 && state_offset(state) == 0
}
/// Attempt to atomically reserve `payload_size` bytes in this buffer.
///
/// On success returns the byte offset at which the caller should write its
/// payload. The caller **must** call [`decrement_writers`](Self::decrement_writers)
/// once the write is complete.
///
/// # Errors
///
/// * [`BufferError::EncounteredSealedBuffer`] — the buffer is sealed or a
/// flush is in progress; the caller should ask the ring to rotate.
/// * [`BufferError::InsufficientSpace`] — `payload_size` bytes would exceed
/// [`ONE_MEGABYTE_BLOCK`]; the caller should seal the buffer and retry on the
/// next one.
/// * [`BufferError::FailedReservation`] — the CAS failed due to contention;
/// the caller should retry immediately.
///
/// # Panics
///
/// Panics in debug builds if `payload_size > ONE_MEGABYTE_BLOCK`.
pub fn reserve_space(&self, payload_size: usize) -> Result<usize, BufferError> {
assert!(
payload_size <= ONE_MEGABYTE_BLOCK,
"payload larger than buffer"
);
let state = self.state.load(Ordering::Acquire);
if state & (SEALED_BIT | FLUSH_IN_PROGRESS_BIT) != 0 {
return Err(BufferError::EncounteredSealedBuffer);
}
let offset = state_offset(state);
if offset + payload_size > ONE_MEGABYTE_BLOCK {
return Err(BufferError::InsufficientSpace);
}
// Analagous to the increment_writers() method
let new = state
.wrapping_add(payload_size * OFFSET_ONE)
.wrapping_add(WRITER_ONE);
match self
.state
.compare_exchange(state, new, Ordering::AcqRel, Ordering::Acquire)
{
Ok(_) => Ok(offset),
Err(_) => Err(BufferError::FailedReservation),
}
}
/// Helper to incrmenent the offset pointer in a vacuum
///
/// Use only in single threaded environments and scenarios
pub fn increment_offset(&self, payload_size: usize) -> Result<usize, BufferError> {
let state = self.state.load(Ordering::Acquire);
let offset = state_offset(state);
if offset + payload_size > ONE_MEGABYTE_BLOCK {
return Err(BufferError::InsufficientSpace);
}
// Analagous to the increment_writers() method
let new = state.wrapping_add(payload_size * OFFSET_ONE);
self.state.store(new, Ordering::Release);
Ok(offset)
}
/// Decrement the active-writer count by one.
///
/// Should be called by every thread that previously succeeded at
/// [`reserve_space`](Self::reserve_space) once it has finished copying its
/// payload. Returns the **previous** state word value.
#[inline]
pub fn decrement_writers(&self) -> usize {
self.state.fetch_sub(WRITER_ONE, Ordering::AcqRel)
}
/// Increment the active-writer count by one.
///
/// Returns the **previous** state word value.
#[inline]
pub fn increment_writers(&self) -> usize {
self.state.fetch_add(WRITER_ONE, Ordering::AcqRel)
}
/// Set the flush-in-progress bit.
///
/// Returns the **previous** state word value. The caller should check
/// whether the bit was already set in the returned value — only the thread
/// that observes the bit transitioning from `0` to `1` owns the flush.
#[inline]
pub fn set_flush_in_progress(&self) -> usize {
self.state.fetch_or(FLUSH_IN_PROGRESS_BIT, Ordering::AcqRel)
}
/// Clear the flush-in-progress bit.
///
/// Returns the **previous** state word value.
#[inline]
pub fn clear_flush_in_progress(&self) -> usize {
self.state
.fetch_and(!FLUSH_IN_PROGRESS_BIT, Ordering::AcqRel)
}
/// Copy `payload` into the buffer at `offset`.
///
/// # Safety
///
/// The caller must have obtained `offset` from a successful
/// [`reserve_space`](Self::reserve_space) call and must not alias the same
/// region from another thread.
pub fn write(&self, offset: usize, payload: &[u8]) {
unsafe {
let dst = (*self.buf.buffer.get()).add(offset);
std::ptr::copy_nonoverlapping(payload.as_ptr(), dst, payload.len());
}
}
/// Returns true if the current buffer is sealed
///
pub fn is_sealed(&self) -> bool {
let state = self.state.fetch_or(SEALED_BIT, Ordering::AcqRel);
state_sealed(state)
}
/// Set the sealed bit, preventing any further reservations.
///
/// # Errors
///
/// Returns [`BufferError::EncounteredSealedBufferDuringCOMPEX`] if the
/// buffer was already sealed before this call.
pub fn seal(&self) -> Result<(), BufferError> {
let prev = self.state.fetch_or(SEALED_BIT, Ordering::AcqRel);
if state_sealed(prev) {
Err(BufferError::EncounteredSealedBufferDuringCOMPEX)
} else {
Ok(())
}
}
/// Clear the sealed bit, re-opening the buffer to new writers.
///
/// Only succeeds when there are no active writers and no flush is in
/// progress.
///
/// # Errors
///
/// * [`BufferError::ActiveUsers`] — writers or a flush are still active.
/// * [`BufferError::EncounteredUnSealedBufferDuringCOMPEX`] — the buffer
/// was not sealed to begin with.
/// * [`BufferError::FailedUnsealed`] — the CAS failed; retry.
#[allow(unused)]
pub fn un_sealed(&self) -> Result<(), BufferError> {
let current = self.state.load(Ordering::Acquire);
if state_writers(current) != 0 || state_flush_in_progress(current) {
return Err(BufferError::ActiveUsers);
}
if !state_sealed(current) {
return Err(BufferError::EncounteredUnSealedBufferDuringCOMPEX);
}
match self.state.compare_exchange(
current,
current & !SEALED_BIT,
Ordering::AcqRel,
Ordering::Acquire,
) {
Ok(_) => Ok(()),
Err(_) => Err(BufferError::FailedUnsealed),
}
}
/// Reset the write offset to zero, leaving all flag bits intact.
///
/// Intended for use in tests only. In production code the ring resets
/// buffers through [`FlushBufferRing::reset_buffer`].
pub fn reset_offset(&self) {
loop {
let current = self.state.load(Ordering::Acquire);
let zeroed = current & 0x0000_0000_FFFF_FFFF;
if self
.state
.compare_exchange(current, zeroed, Ordering::AcqRel, Ordering::Acquire)
.is_ok()
{
break;
}
}
}
/// Returns the amount of bytes writte to the shared buffer
pub fn size(&self) -> usize {
let len = state_offset(self.state.load(Ordering::Acquire));
len
}
/// Return a raw snapshot of the packed state word.
///
/// Available in test builds only. Use the `state_offset`, `state_writers`,
/// `state_sealed`, and `state_flush_in_progress` helpers to decode the
/// individual fields.
pub fn state(&self) -> State {
State::from(self.state.load(Ordering::Acquire))
}
/// Returns the local LSS address slot assigned to this buffer.
pub fn local_address(&self, order: Ordering) -> usize {
self.local_address.load(order)
}
pub fn set_address(&self, address_space: usize) -> Result<usize, usize> {
let range = self.local_address.load(Ordering::Relaxed);
self.local_address.compare_exchange(
range,
address_space,
Ordering::Acquire,
Ordering::Relaxed,
)
}
/// Returns a reference to the submit queue entry storage.
pub fn sqe(&self) -> &UnsafeCell<Option<Entry>> {
&self.sqe
}
/// Returns the position of this buffer within the parent BufferRing.
pub fn buffer_position(&self) -> usize {
self.pos
}
}
impl FlushableBuffer for FlushBuffer {
fn buffer_data(&self) -> &[u8] {
unsafe {
let ptr = *self.buf.buffer.get();
let len = state_offset(self.state.load(Ordering::Acquire));
&*std::ptr::slice_from_raw_parts(ptr, len.next_multiple_of(FOUR_KB_BLOCK))
}
}
fn offset(&self) -> u64 {
self.local_address.load(Ordering::Acquire) as u64
}
fn user_data(&self) -> u64 {
self as *const FlushBuffer as u64
}
fn submit_entry(&self) -> &SubmitQueueEntry {
&self.sqe
}
}
impl Default for FlushBuffer {
fn default() -> Self {
Self {
state: AtomicUsize::new(0),
buf: Arc::new(Buffer::new_aligned(ONE_MEGABYTE_BLOCK)),
pos: 0,
local_address: AtomicUsize::new(0),
sqe: UnsafeCell::new(None),
}
}
}