1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
//! Fillable buffer
//!
//! Buffers used for loading streams.
use std::ops::{Deref, DerefMut};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Mutex, MutexGuard};
use memmap2::MmapMut;
/// Fillable buffer
///
/// This struct enapsulates a memory-mapped buffer that can be simultaneously written to and read
/// from. Writes can only be appends, and reads can only happen to the portion that has already
/// been written.
pub(crate) struct Buffer {
/// The underlying memory map. This can be accessed for both reading and writing, and so is
/// stored in an UnsafeCell.
_mmap: MmapMut,
/// Pointer to the data in `_mmap`.
data_ptr: *mut u8,
/// The underlying memory map's capacity.
capacity: usize,
/// How much of the buffer has been filled. Reads are permitted in the range `0..filled`.
/// Writes are permitted in the range `filled..`.
filled: AtomicUsize,
/// Lock for write access, to ensure only one writer can write at a time.
lock: Mutex<()>,
}
unsafe impl Send for Buffer {}
unsafe impl Sync for Buffer {}
pub(crate) struct BufferWrite<'buffer> {
/// The buffer this write is for.
buffer: &'buffer Buffer,
/// Lock guard for write access.
_guard: MutexGuard<'buffer, ()>,
}
impl Buffer {
pub(crate) fn new(capacity: usize) -> Buffer {
let mut mmap = MmapMut::map_anon(capacity).unwrap();
let data_ptr = mmap.as_mut_ptr();
Buffer {
_mmap: mmap,
data_ptr,
capacity,
filled: AtomicUsize::new(0usize),
lock: Mutex::new(()),
}
}
/// Returns the writable portion of the buffer
pub(crate) fn write(&self) -> BufferWrite<'_> {
BufferWrite {
buffer: self,
_guard: self.lock.lock().unwrap(),
}
}
/// Returns the readable portion of the buffer
pub(crate) fn read(&self) -> &[u8] {
let end = self.filled.load(Ordering::SeqCst);
// Safety: `BufferWrite::written()` checks that `end <= capacity`
unsafe { std::slice::from_raw_parts(self.data_ptr, end) }
}
#[cfg(feature = "load_file")]
/// Returns the size of the readable portion of the buffer
pub(crate) fn available(&self) -> usize {
self.filled.load(Ordering::SeqCst)
}
}
impl<'buffer> BufferWrite<'buffer> {
/// Completes the write operation for `len` bytes to the buffer. After calling `written`, the
/// data is made available to callers to `read`.
pub(crate) fn written(self, len: usize) {
let new_filled = self.buffer.filled.load(Ordering::SeqCst).saturating_add(len).clamp(0, isize::MAX as usize);
assert!(new_filled <= self.buffer.capacity);
self.buffer.filled.store(new_filled, Ordering::SeqCst);
}
}
impl<'buffer> Deref for BufferWrite<'buffer> {
type Target = [u8];
fn deref(&self) -> &[u8] {
let start = self.buffer.filled.load(Ordering::SeqCst);
let start_ptr = unsafe { self.buffer.data_ptr.add( start) };
// Safety:
// * `BufferWrite::written()` enforces that `filled` is within capacity. It starts at 0 and
// never shrinks.
// * `_guard` enforces that no concurrent mutable reference exists
// * The slices returned from `BufferWrite::deref()/deref_mut()` never overlap with those
// from `Buffer::read` because the latter goes from `0..filled` and the former go from
// `filled..capacity`. The latter are no longer accessible once `filled` has been updated
// because `written()` consumes its argument.
unsafe { std::slice::from_raw_parts(start_ptr, self.buffer.capacity - start) }
}
}
impl<'buffer> DerefMut for BufferWrite<'buffer> {
fn deref_mut(&mut self) -> &mut [u8] {
let start = self.buffer.filled.load(Ordering::SeqCst);
let start_ptr = unsafe { self.buffer.data_ptr.add( start) };
// Safety:
// * `BufferWrite::written()` enforces that `filled` is within capacity. It starts at 0 and
// never shrinks.
// * `_guard` enforces that no concurrent mutable reference exists
// * The slices returned from `BufferWrite::deref()/deref_mut()` never overlap with those
// from `Buffer::read` because the latter goes from `0..filled` and the former go from
// `filled..capacity`. The latter are no longer accessible once `filled` has been updated
// because `written()` consumes its argument.
unsafe { std::slice::from_raw_parts_mut(start_ptr, self.buffer.capacity - start) }
}
}
#[cfg(test)]
mod test {
use super::*;
use std::sync::Arc;
use std::thread;
#[test]
fn test_buffer_write() {
let b = Arc::new(Buffer::new(20));
let b2 = b.clone();
let b3 = b.clone();
let mut w = b.write();
// do a write
w[0] = 42;
w.written(1);
// do some writes on other threads
let t1 = thread::spawn(move || {
let mut w = b2.write();
w[0] = 64;
w.written(1);
});
let t2 = thread::spawn(move || {
let mut w = b3.write();
w[0] = 81;
w.written(1);
});
t1.join().unwrap();
t2.join().unwrap();
let mut w = b.write();
// do another write
w[0] = 101;
w[1] = 99;
w.written(2);
assert_eq!(b.read().len(), 5);
assert_eq!(b.read()[0], 42);
// these two writes could have happened in any order
assert_eq!(b.read()[1] + b.read()[2], 64 + 81);
assert_eq!(b.read()[3], 101);
assert_eq!(b.read()[4], 99);
}
}