disk_ringbuffer/
ringbuf.rs1use crate::qpage::{self, PopResult, PushResult, QPage};
2use mmap_wrapper::MmapMutWrapper;
3use static_assertions::const_assert;
4use std::marker::PhantomData;
5use std::path::{Path, PathBuf};
6use std::sync::atomic::{AtomicUsize, Ordering};
7use std::sync::RwLock;
8
9pub const DEFAULT_INTERNAL_BUF_SIZE: usize = 4096;
10const_assert!(DEFAULT_INTERNAL_BUF_SIZE < qpage::DEFAULT_MAX_MSG_SIZE);
11
12#[derive(thiserror::Error, Debug)]
13pub enum RingbufError {
14 #[error("invalid read")]
15 ReadError,
16 #[error(transparent)]
17 QError(#[from] crate::qpage::Error),
18 #[error(transparent)]
19 IoError(#[from] std::io::Error),
20}
21
22const PAGE_EXT: &str = "page.bin";
23const INFO_NAME: &str = ".info";
24
25#[derive(Clone)]
26pub struct Sender {}
27#[derive(Clone)]
28pub struct Receiver {}
29
30#[derive(Clone)]
31pub struct DiskRing<T> {
32 _kind: PhantomData<T>,
33 path: PathBuf,
34 read_byte: usize,
35 qpage_no: usize,
36 qpage: MmapMutWrapper<QPage>,
37 diskring_info: MmapMutWrapper<DiskRingInfo>,
38}
39
40#[repr(C)]
41pub struct DiskRingInfo {
42 max_qpages: AtomicUsize,
43 qpage_count: RwLock<usize>,
44}
45
46impl DiskRingInfo {
47 fn new<P: AsRef<Path>>(path: P) -> Result<MmapMutWrapper<DiskRingInfo>, RingbufError> {
48 let f = std::fs::File::options()
51 .read(true)
52 .write(true)
53 .create(true)
54 .truncate(false)
55 .open(path)?;
56
57 let _ = f.set_len(std::mem::size_of::<Self>() as u64);
58 let m = unsafe { memmap2::MmapMut::map_mut(&f)? };
59
60 Ok(unsafe { MmapMutWrapper::<Self>::new(m) })
61 }
62}
63
64pub fn get_or_update_max_qpage<P: AsRef<Path>>(path: P, val: usize) -> Result<usize, RingbufError> {
65 let mut diskring_info = DiskRingInfo::new(path.as_ref().join(INFO_NAME))?;
66
67 let curr_max_qpages = diskring_info.get_inner().max_qpages.load(Ordering::Relaxed);
68
69 if val == curr_max_qpages {
70 return Ok(val);
71 }
72
73 set_max_qpage(path, val)
74}
75
76pub fn set_max_qpage<P: AsRef<Path>>(path: P, val: usize) -> Result<usize, RingbufError> {
77 let mut diskring_info = DiskRingInfo::new(path.as_ref().join(INFO_NAME))?;
78
79 let _qpage_count_lock = diskring_info
80 .get_inner()
81 .qpage_count
82 .write()
83 .expect("unpoisoned lock");
84
85 Ok(diskring_info
86 .get_inner()
87 .max_qpages
88 .swap(val, Ordering::Relaxed))
89}
90
91pub fn new<P: AsRef<Path>>(
92 path: P,
93) -> Result<(DiskRing<Sender>, DiskRing<Receiver>), RingbufError> {
94 std::fs::create_dir_all(path.as_ref())?;
95
96 let qpage_no = get_qpage_count_static(&path);
97 let qpage = QPage::new(
98 path.as_ref()
99 .join(qpage_no.to_string())
100 .with_extension(PAGE_EXT),
101 )?;
102
103 let diskring_info = DiskRingInfo::new(path.as_ref().join(INFO_NAME))?;
104
105 Ok((
106 DiskRing {
107 _kind: PhantomData,
108 path: path.as_ref().into(),
109 read_byte: 0,
110 diskring_info: diskring_info.clone(),
111 qpage: qpage.clone(),
112 qpage_no,
113 },
114 DiskRing {
115 _kind: PhantomData,
116 path: path.as_ref().into(),
117 read_byte: 0,
118 diskring_info,
119 qpage,
120 qpage_no,
121 },
122 ))
123}
124
125impl Iterator for DiskRing<Receiver> {
126 type Item = Result<Option<String>, RingbufError>;
127
128 fn next(&mut self) -> Option<Self::Item> {
129 Some(self.pop())
130 }
131}
132
133impl DiskRing<Receiver> {
134 pub fn new<P: AsRef<Path>>(path: P) -> Result<DiskRing<Receiver>, RingbufError> {
135 let qpage_no = get_qpage_count_static(&path);
136 let qpage = QPage::new(
137 path.as_ref()
138 .join(qpage_no.to_string())
139 .with_extension(PAGE_EXT),
140 )?;
141
142 let diskring_info = DiskRingInfo::new(path.as_ref().join(INFO_NAME))?;
143
144 Ok(DiskRing {
145 _kind: PhantomData,
146 path: path.as_ref().into(),
147 read_byte: 0,
148 diskring_info: diskring_info.clone(),
149 qpage: qpage.clone(),
150 qpage_no,
151 })
152 }
153
154 fn page_flip(&mut self) -> Result<(), RingbufError> {
155 let max_qpages = self
156 .diskring_info
157 .get_inner()
158 .max_qpages
159 .load(Ordering::Relaxed);
160
161 if max_qpages > 0 {
162 let qpage_count = self
163 .diskring_info
164 .get_inner()
165 .qpage_count
166 .read()
167 .expect("unpoisoned lock");
168
169 self.qpage_no =
170 std::cmp::max(self.qpage_no + 1, qpage_count.saturating_sub(max_qpages));
171 } else {
172 self.qpage_no += 1;
173 }
174
175 self.read_byte = 0;
176 self.qpage = QPage::new(
177 self.path
178 .join(self.qpage_no.to_string())
179 .with_extension(PAGE_EXT),
180 )?;
181
182 Ok(())
183 }
184
185 pub fn pop(&mut self) -> Result<Option<String>, RingbufError> {
186 loop {
187 match self.qpage.get_inner().try_pop(self.read_byte)? {
188 PopResult::Msg(m) => {
189 self.read_byte += m.len() + size_of::<qpage::MsgLengthType>();
190 return Ok(Some(String::from_utf8_lossy(m).to_string()));
191 }
192 PopResult::NoNewMsgs => return Ok(None),
193 PopResult::PageDone => {}
194 };
195
196 self.page_flip()?;
197 }
198 }
199}
200
201impl DiskRing<Sender> {
202 pub fn new<P: AsRef<Path>>(path: P) -> Result<DiskRing<Sender>, RingbufError> {
203 let qpage_no = get_qpage_count_static(&path);
204 let qpage = QPage::new(
205 path.as_ref()
206 .join(qpage_no.to_string())
207 .with_extension(PAGE_EXT),
208 )?;
209
210 let diskring_info = DiskRingInfo::new(path.as_ref().join(INFO_NAME))?;
211
212 Ok(DiskRing {
213 _kind: PhantomData,
214 path: path.as_ref().into(),
215 read_byte: 0,
216 diskring_info: diskring_info.clone(),
217 qpage: qpage.clone(),
218 qpage_no,
219 })
220 }
221
222 fn page_flip(&mut self) -> Result<(), std::io::Error> {
223 let qpage_count = self
224 .diskring_info
225 .get_inner()
226 .qpage_count
227 .read()
228 .expect("unpoisoned lock");
229
230 if self.qpage_no < *qpage_count {
231 self.qpage_no += 1;
232 return Ok(());
233 }
234
235 if self.qpage_no == *qpage_count {
236 drop(qpage_count);
237
238 let mut qpage_count = self
239 .diskring_info
240 .get_inner()
241 .qpage_count
242 .write()
243 .expect("unpoisoned lock");
244
245 if self.qpage_no < *qpage_count {
246 self.qpage_no += 1;
247 return Ok(());
248 }
249
250 *qpage_count += 1;
251 self.qpage_no += 1;
252
253 let max_qpages = self
254 .diskring_info
255 .get_inner()
256 .max_qpages
257 .load(Ordering::Relaxed);
258
259 if max_qpages == 0 {
261 return Ok(());
262 }
263
264 if *qpage_count >= max_qpages {
265 std::fs::remove_file(
266 self.path
267 .join((*qpage_count - max_qpages).to_string())
268 .with_extension(PAGE_EXT),
269 )?;
270 }
271 }
272
273 Ok(())
274 }
275
276 pub fn push<T: AsRef<[u8]>>(&mut self, input: T) -> Result<usize, RingbufError> {
277 loop {
278 match self.qpage.get_inner().try_push(input.as_ref())? {
279 PushResult::BytesWritten(x) => return Ok(x),
280 PushResult::PageFull => {}
281 }
282
283 self.page_flip()?;
284
285 self.qpage = QPage::new(
286 self.path
287 .join(self.qpage_no.to_string())
288 .with_extension(PAGE_EXT),
289 )?;
290 }
291 }
292}
293
294fn get_qpage_count_static<P: AsRef<Path>>(path: P) -> usize {
295 let Ok(mut diskring_info) = DiskRingInfo::new(path.as_ref().join(INFO_NAME)) else {
296 return 0;
297 };
298
299 let qpage_count = diskring_info
300 .get_inner()
301 .qpage_count
302 .read()
303 .expect("unpoisoned lock");
304
305 *qpage_count
306}
307
308#[test]
309fn seq_test() {
310 let test_dir_path = "test-seq";
311 let (mut tx, mut rx) = new(test_dir_path).unwrap();
312
313 let now = std::time::Instant::now();
314 for i in 0..50_000_000 {
315 tx.push(i.to_string()).unwrap();
316 }
317
318 for i in 0..50_000_000 {
319 let m = rx.pop().unwrap();
320 assert_eq!(m, Some(i.to_string()));
321 }
322
323 eprintln!("took {} ms", now.elapsed().as_millis());
324
325 std::fs::remove_dir_all(test_dir_path).unwrap();
326}
327
328#[test]
329fn seq_buffered_test() {
330 let test_dir_path = "test-seq-buf";
331 let (mut tx, mut rx) = new(test_dir_path).unwrap();
332
333 let now = std::time::Instant::now();
334 for i in 0..50_000_000 {
335 tx.push(i.to_string()).unwrap();
336 }
337
338 for i in 0..50_000_000 {
339 let m = rx.pop().unwrap();
340 assert_eq!(m, Some(i.to_string()));
341 }
342
343 eprintln!("took {} ms", now.elapsed().as_millis());
344
345 std::fs::remove_dir_all(test_dir_path).unwrap();
346}
347
348#[test]
349fn spsc_test() {
350 let test_dir_path = "test-spsc";
351 let (mut tx, mut rx) = new(test_dir_path).unwrap();
352
353 let now = std::time::Instant::now();
354 let t = std::thread::spawn(move || {
355 for i in 0..50_000_000 {
356 tx.push(i.to_string()).unwrap();
357 }
358 });
359
360 let mut i = 0;
361 loop {
362 if i == 50_000_000 {
363 break;
364 }
365
366 let m = match rx.pop().unwrap() {
367 Some(m) => m,
368 None => continue,
369 };
370
371 assert_eq!(m, i.to_string());
372 i += 1;
373 }
374
375 t.join().unwrap();
376
377 eprintln!("took {} ms", now.elapsed().as_millis());
378
379 std::fs::remove_dir_all(test_dir_path).unwrap();
380}
381
382#[test]
383fn mpsc_test() {
384 let test_dir_path = "test-mpsc";
385 let num_threads = 4;
386 let mut threads = Vec::new();
387
388 let (tx, mut rx) = new(test_dir_path).unwrap();
389
390 let now = std::time::Instant::now();
391
392 for _ in 0..num_threads {
393 let mut tx_clone = tx.clone();
394 threads.push(std::thread::spawn(move || {
395 for i in 0..50_000_000 / num_threads {
396 tx_clone.push(i.to_string()).unwrap();
397 }
398 }));
399 }
400
401 drop(tx);
402
403 let mut i = 0;
404 loop {
405 if i == 50_000_000 {
406 break;
407 }
408
409 let _m = match rx.pop().unwrap() {
410 Some(_m) => _m,
411 None => continue,
412 };
413
414 i += 1;
415 }
416
417 for t in threads {
418 t.join().unwrap();
419 }
420
421 eprintln!("took {} ms", now.elapsed().as_millis());
422
423 std::fs::remove_dir_all(test_dir_path).unwrap();
424}