1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
use super::*;
use std::path::PathBuf;
use std::{borrow::Cow, slice, str};

/// a wrapper around read results that contains either the message or
/// informs the reader that it finished reading the page and should open
/// the next one
pub enum ReadResult<'a> {
    Msg(Cow<'a, str>),
    Continue,
}

/// a convenience wrapper around CPage to keep track of the directory it was made in (path)
pub struct Page {
    raw: *mut RawQPage,
    path: PathBuf,
}

unsafe impl Send for Page {}

impl Drop for Page {
    fn drop(&mut self) {
        unsafe { raw_qpage_drop(self.raw) };
    }
}

impl Clone for Page {
    fn clone(&self) -> Page {
        Page::new(self.path.to_str().expect("not unicode path"))
    }
}

impl Page {
    pub fn new<P: AsRef<str>>(path: P) -> Self {
        let path = path.as_ref();
        let c_page = unsafe { raw_qpage_new_rs(path.as_ptr(), path.len()) };

        Page {
            raw: c_page,
            path: path.into(),
        }
    }

    /// attemps to push a message into the page and returns the number of bytes written.
    /// a return value of zero implies that the page is full and that the writer should try
    /// again on a new page
    #[cfg(feature = "fast-read")]
    pub fn try_push<T: AsRef<[u8]>>(&self, input: T) -> Result<usize, i64> {
        let input = input.as_ref();

        unsafe {
            match raw_qpage_push_fast_read(self.raw, input.as_ptr(), input.len()) {
                i @ ..=-1 => Err(i),
                // a return value of 0 implies the page is full
                i @ 0.. => Ok(i as usize),
            }
        }
    }

    /// attemps to push a message into the page and returns the number of bytes written.
    /// a return value of zero implies that the page is full and that the writer should try
    /// again on a new page
    #[cfg(not(feature = "fast-read"))]
    pub fn try_push<T: AsRef<[u8]>>(&self, input: T) -> Result<usize, i64> {
        let input = input.as_ref();

        unsafe {
            match raw_qpage_push(self.raw, input.as_ptr(), input.len()) {
                i @ ..=-1 => Err(i),
                // a return value of 0 implies the page is full
                i @ 0.. => Ok(i as usize),
            }
        }
    }

    /// attempts to pop a message from the page and returns an optional ReadResult. a
    /// None value implies that there are no new messages to read
    #[cfg(feature = "fast-read")]
    pub fn try_pop(&self, start_byte: usize) -> Result<Option<ReadResult>, i64> {
        let slice = unsafe {
            let cs = raw_qpage_pop_fast_read(self.raw, start_byte);

            let cs = match cs.read_status {
                i @ ..=-1 => return Err(i),
                0 => cs,
                1 => return Ok(Some(ReadResult::Continue)),
                2 => return Ok(None),
                _ => unreachable!(),
            };

            slice::from_raw_parts(cs.ptr, cs.len)
        };

        Ok(Some(ReadResult::Msg(String::from_utf8_lossy(slice))))
    }

    /// attempts to pop a message from the page and returns an optional ReadResult. a
    /// None value implies that there are no new messages to read
    #[cfg(not(feature = "fast-read"))]
    pub fn try_pop(&self, start_byte: usize) -> Result<Option<ReadResult>, i64> {
        let slice = unsafe {
            let cs = raw_qpage_pop(self.raw, start_byte);

            let cs = match cs.read_status {
                i @ ..=-1 => return Err(i),
                0 => cs,
                1 => return Ok(Some(ReadResult::Continue)),
                2 => return Ok(None),
                _ => unreachable!(),
            };

            slice::from_raw_parts(cs.ptr, cs.len)
        };

        Ok(Some(ReadResult::Msg(String::from_utf8_lossy(slice))))
    }
}

#[test]
fn sequential_test() {
    const NUM: usize = 5_000_000;
    const FILE: &str = "testing_sequential_test";

    let now = std::time::Instant::now();

    let x = Page::new(FILE);
    let mut read_idx = 0;

    for i in 0..NUM {
        let i = i.to_string();
        let _ = x.try_push(&i).unwrap();
    }

    for i in 0..NUM {
        let p = x.try_pop(read_idx).unwrap().unwrap();
        let p = match p {
            ReadResult::Msg(m) => m,
            ReadResult::Continue => panic!("todo"),
        };

        read_idx += p.len() + 1;
        assert_eq!(i.to_string(), p);
    }

    eprintln!("took {} ms", now.elapsed().as_millis());

    std::fs::remove_file(FILE).unwrap();
}