disk_ringbuffer/
ringbuf.rs

1use crate::qpage::{self, PopResult, PushResult, QPage};
2use mmap_wrapper::MmapMutWrapper;
3use static_assertions::const_assert;
4use std::marker::PhantomData;
5use std::path::{Path, PathBuf};
6use std::sync::atomic::{AtomicUsize, Ordering};
7use std::sync::RwLock;
8
9pub const DEFAULT_INTERNAL_BUF_SIZE: usize = 4096;
10const_assert!(DEFAULT_INTERNAL_BUF_SIZE < qpage::DEFAULT_MAX_MSG_SIZE);
11
12#[derive(thiserror::Error, Debug)]
13pub enum RingbufError {
14    #[error("invalid read")]
15    ReadError,
16    #[error(transparent)]
17    QError(#[from] crate::qpage::Error),
18    #[error(transparent)]
19    IoError(#[from] std::io::Error),
20}
21
22const PAGE_EXT: &str = "page.bin";
23const INFO_NAME: &str = ".info";
24
25#[derive(Clone)]
26pub struct Sender {}
27#[derive(Clone)]
28pub struct Receiver {}
29
30#[derive(Clone)]
31pub struct DiskRing<T> {
32    _kind: PhantomData<T>,
33    path: PathBuf,
34    read_byte: usize,
35    qpage_no: usize,
36    qpage: MmapMutWrapper<QPage>,
37    diskring_info: MmapMutWrapper<DiskRingInfo>,
38}
39
40#[repr(C)]
41pub struct DiskRingInfo {
42    max_qpages: AtomicUsize,
43    qpage_count: RwLock<usize>,
44}
45
46impl DiskRingInfo {
47    fn new<P: AsRef<Path>>(path: P) -> Result<MmapMutWrapper<DiskRingInfo>, RingbufError> {
48        // fails when disk is full
49        // or when parent directories don't exist
50        let f = std::fs::File::options()
51            .read(true)
52            .write(true)
53            .create(true)
54            .truncate(false)
55            .open(path)?;
56
57        let _ = f.set_len(std::mem::size_of::<Self>() as u64);
58        let m = unsafe { memmap2::MmapMut::map_mut(&f)? };
59
60        Ok(unsafe { MmapMutWrapper::<Self>::new(m) })
61    }
62}
63
64pub fn get_or_update_max_qpage<P: AsRef<Path>>(path: P, val: usize) -> Result<usize, RingbufError> {
65    let mut diskring_info = DiskRingInfo::new(path.as_ref().join(INFO_NAME))?;
66
67    let curr_max_qpages = diskring_info.get_inner().max_qpages.load(Ordering::Relaxed);
68
69    if val == curr_max_qpages {
70        return Ok(val);
71    }
72
73    set_max_qpage(path, val)
74}
75
76pub fn set_max_qpage<P: AsRef<Path>>(path: P, val: usize) -> Result<usize, RingbufError> {
77    let mut diskring_info = DiskRingInfo::new(path.as_ref().join(INFO_NAME))?;
78
79    let _qpage_count_lock = diskring_info
80        .get_inner()
81        .qpage_count
82        .write()
83        .expect("unpoisoned lock");
84
85    Ok(diskring_info
86        .get_inner()
87        .max_qpages
88        .swap(val, Ordering::Relaxed))
89}
90
91pub fn new<P: AsRef<Path>>(
92    path: P,
93) -> Result<(DiskRing<Sender>, DiskRing<Receiver>), RingbufError> {
94    std::fs::create_dir_all(path.as_ref())?;
95
96    let qpage_no = get_qpage_count_static(&path);
97    let qpage = QPage::new(
98        path.as_ref()
99            .join(qpage_no.to_string())
100            .with_extension(PAGE_EXT),
101    )?;
102
103    let diskring_info = DiskRingInfo::new(path.as_ref().join(INFO_NAME))?;
104
105    Ok((
106        DiskRing {
107            _kind: PhantomData,
108            path: path.as_ref().into(),
109            read_byte: 0,
110            diskring_info: diskring_info.clone(),
111            qpage: qpage.clone(),
112            qpage_no,
113        },
114        DiskRing {
115            _kind: PhantomData,
116            path: path.as_ref().into(),
117            read_byte: 0,
118            diskring_info,
119            qpage,
120            qpage_no,
121        },
122    ))
123}
124
125impl Iterator for DiskRing<Receiver> {
126    type Item = Result<Option<String>, RingbufError>;
127
128    fn next(&mut self) -> Option<Self::Item> {
129        Some(self.pop())
130    }
131}
132
133impl DiskRing<Receiver> {
134    pub fn new<P: AsRef<Path>>(path: P) -> Result<DiskRing<Receiver>, RingbufError> {
135        let qpage_no = get_qpage_count_static(&path);
136        let qpage = QPage::new(
137            path.as_ref()
138                .join(qpage_no.to_string())
139                .with_extension(PAGE_EXT),
140        )?;
141
142        let diskring_info = DiskRingInfo::new(path.as_ref().join(INFO_NAME))?;
143
144        Ok(DiskRing {
145            _kind: PhantomData,
146            path: path.as_ref().into(),
147            read_byte: 0,
148            diskring_info: diskring_info.clone(),
149            qpage: qpage.clone(),
150            qpage_no,
151        })
152    }
153
154    fn page_flip(&mut self) -> Result<(), RingbufError> {
155        let max_qpages = self
156            .diskring_info
157            .get_inner()
158            .max_qpages
159            .load(Ordering::Relaxed);
160
161        if max_qpages > 0 {
162            let qpage_count = self
163                .diskring_info
164                .get_inner()
165                .qpage_count
166                .read()
167                .expect("unpoisoned lock");
168
169            self.qpage_no =
170                std::cmp::max(self.qpage_no + 1, qpage_count.saturating_sub(max_qpages));
171        } else {
172            self.qpage_no += 1;
173        }
174
175        self.read_byte = 0;
176        self.qpage = QPage::new(
177            self.path
178                .join(self.qpage_no.to_string())
179                .with_extension(PAGE_EXT),
180        )?;
181
182        Ok(())
183    }
184
185    pub fn pop(&mut self) -> Result<Option<String>, RingbufError> {
186        loop {
187            match self.qpage.get_inner().try_pop(self.read_byte)? {
188                PopResult::Msg(m) => {
189                    self.read_byte += m.len() + size_of::<qpage::MsgLengthType>();
190                    return Ok(Some(String::from_utf8_lossy(m).to_string()));
191                }
192                PopResult::NoNewMsgs => return Ok(None),
193                PopResult::PageDone => {}
194            };
195
196            self.page_flip()?;
197        }
198    }
199}
200
201impl DiskRing<Sender> {
202    pub fn new<P: AsRef<Path>>(path: P) -> Result<DiskRing<Sender>, RingbufError> {
203        let qpage_no = get_qpage_count_static(&path);
204        let qpage = QPage::new(
205            path.as_ref()
206                .join(qpage_no.to_string())
207                .with_extension(PAGE_EXT),
208        )?;
209
210        let diskring_info = DiskRingInfo::new(path.as_ref().join(INFO_NAME))?;
211
212        Ok(DiskRing {
213            _kind: PhantomData,
214            path: path.as_ref().into(),
215            read_byte: 0,
216            diskring_info: diskring_info.clone(),
217            qpage: qpage.clone(),
218            qpage_no,
219        })
220    }
221
222    fn page_flip(&mut self) -> Result<(), std::io::Error> {
223        let qpage_count = self
224            .diskring_info
225            .get_inner()
226            .qpage_count
227            .read()
228            .expect("unpoisoned lock");
229
230        if self.qpage_no < *qpage_count {
231            self.qpage_no += 1;
232            return Ok(());
233        }
234
235        if self.qpage_no == *qpage_count {
236            drop(qpage_count);
237
238            let mut qpage_count = self
239                .diskring_info
240                .get_inner()
241                .qpage_count
242                .write()
243                .expect("unpoisoned lock");
244
245            if self.qpage_no < *qpage_count {
246                self.qpage_no += 1;
247                return Ok(());
248            }
249
250            *qpage_count += 1;
251            self.qpage_no += 1;
252
253            let max_qpages = self
254                .diskring_info
255                .get_inner()
256                .max_qpages
257                .load(Ordering::Relaxed);
258
259            // setting max_total_pages to zero implies an unbounded ringbuf / queue
260            if max_qpages == 0 {
261                return Ok(());
262            }
263
264            if *qpage_count >= max_qpages {
265                std::fs::remove_file(
266                    self.path
267                        .join((*qpage_count - max_qpages).to_string())
268                        .with_extension(PAGE_EXT),
269                )?;
270            }
271        }
272
273        Ok(())
274    }
275
276    pub fn push<T: AsRef<[u8]>>(&mut self, input: T) -> Result<usize, RingbufError> {
277        loop {
278            match self.qpage.get_inner().try_push(input.as_ref())? {
279                PushResult::BytesWritten(x) => return Ok(x),
280                PushResult::PageFull => {}
281            }
282
283            self.page_flip()?;
284
285            self.qpage = QPage::new(
286                self.path
287                    .join(self.qpage_no.to_string())
288                    .with_extension(PAGE_EXT),
289            )?;
290        }
291    }
292}
293
294fn get_qpage_count_static<P: AsRef<Path>>(path: P) -> usize {
295    let Ok(mut diskring_info) = DiskRingInfo::new(path.as_ref().join(INFO_NAME)) else {
296        return 0;
297    };
298
299    let qpage_count = diskring_info
300        .get_inner()
301        .qpage_count
302        .read()
303        .expect("unpoisoned lock");
304
305    *qpage_count
306}
307
308#[test]
309fn seq_test() {
310    let test_dir_path = "test-seq";
311    let (mut tx, mut rx) = new(test_dir_path).unwrap();
312
313    let now = std::time::Instant::now();
314    for i in 0..50_000_000 {
315        tx.push(i.to_string()).unwrap();
316    }
317
318    for i in 0..50_000_000 {
319        let m = rx.pop().unwrap();
320        assert_eq!(m, Some(i.to_string()));
321    }
322
323    eprintln!("took {} ms", now.elapsed().as_millis());
324
325    std::fs::remove_dir_all(test_dir_path).unwrap();
326}
327
328#[test]
329fn seq_buffered_test() {
330    let test_dir_path = "test-seq-buf";
331    let (mut tx, mut rx) = new(test_dir_path).unwrap();
332
333    let now = std::time::Instant::now();
334    for i in 0..50_000_000 {
335        tx.push(i.to_string()).unwrap();
336    }
337
338    for i in 0..50_000_000 {
339        let m = rx.pop().unwrap();
340        assert_eq!(m, Some(i.to_string()));
341    }
342
343    eprintln!("took {} ms", now.elapsed().as_millis());
344
345    std::fs::remove_dir_all(test_dir_path).unwrap();
346}
347
348#[test]
349fn spsc_test() {
350    let test_dir_path = "test-spsc";
351    let (mut tx, mut rx) = new(test_dir_path).unwrap();
352
353    let now = std::time::Instant::now();
354    let t = std::thread::spawn(move || {
355        for i in 0..50_000_000 {
356            tx.push(i.to_string()).unwrap();
357        }
358    });
359
360    let mut i = 0;
361    loop {
362        if i == 50_000_000 {
363            break;
364        }
365
366        let m = match rx.pop().unwrap() {
367            Some(m) => m,
368            None => continue,
369        };
370
371        assert_eq!(m, i.to_string());
372        i += 1;
373    }
374
375    t.join().unwrap();
376
377    eprintln!("took {} ms", now.elapsed().as_millis());
378
379    std::fs::remove_dir_all(test_dir_path).unwrap();
380}
381
382#[test]
383fn mpsc_test() {
384    let test_dir_path = "test-mpsc";
385    let num_threads = 4;
386    let mut threads = Vec::new();
387
388    let (tx, mut rx) = new(test_dir_path).unwrap();
389
390    let now = std::time::Instant::now();
391
392    for _ in 0..num_threads {
393        let mut tx_clone = tx.clone();
394        threads.push(std::thread::spawn(move || {
395            for i in 0..50_000_000 / num_threads {
396                tx_clone.push(i.to_string()).unwrap();
397            }
398        }));
399    }
400
401    drop(tx);
402
403    let mut i = 0;
404    loop {
405        if i == 50_000_000 {
406            break;
407        }
408
409        let _m = match rx.pop().unwrap() {
410            Some(_m) => _m,
411            None => continue,
412        };
413
414        i += 1;
415    }
416
417    for t in threads {
418        t.join().unwrap();
419    }
420
421    eprintln!("took {} ms", now.elapsed().as_millis());
422
423    std::fs::remove_dir_all(test_dir_path).unwrap();
424}