1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
use super::{SeqRead, SeqWrite};
use crate::os::{map_ring, unmap_ring};
use crate::{Result, Size};

use std::cmp;
use std::io::{self, BufRead, Read, Write};
use std::ops::Deref;

/// Fixed-size reliable read/write buffer with sequential address mapping.
///
/// This uses a circular address mapping scheme. That is, for any buffer of
/// size `N`, the pointer address range of `0..N` maps to the same physical
/// memory as the range `N..2*N`. This guarantees that the entire read or
/// write range may be addressed as a single sequence of bytes.
///
/// Unlike the [`InfiniteRing`], this type otherise acts as a "normal" buffer.
/// Writes fill up the buffer, and when full, no furthur writes may be
/// performed until a read occurs. The writable length sequence is the capacity
/// of the buffer, less any pending readable bytes.
///
/// # Examples
///
/// ```
/// use vmap::io::{Ring, SeqWrite};
/// use std::io::{BufRead, Read, Write};
///
/// # fn main() -> std::io::Result<()> {
/// let mut buf = Ring::new(4000).unwrap();
/// let mut i = 1;
///
/// // Fill up the buffer with lines.
/// while buf.write_len() > 20 {
///     write!(&mut buf, "this is test line {}\n", i)?;
///     i += 1;
/// }
///
/// // No more space is available.
/// assert!(write!(&mut buf, "this is test line {}\n", i).is_err());
///
/// let mut line = String::new();
///
/// // Read the first line written.
/// let len = buf.read_line(&mut line)?;
/// assert_eq!(line, "this is test line 1\n");
///
/// line.clear();
///
/// // Read the second line written.
/// let len = buf.read_line(&mut line)?;
/// assert_eq!(line, "this is test line 2\n");
///
/// // Now there is enough space to write more.
/// write!(&mut buf, "this is test line {}\n", i)?;
/// # Ok(())
/// # }
/// ```
#[derive(Debug)]
pub struct Ring {
    ptr: *mut u8,
    len: usize,
    rpos: u64,
    wpos: u64,
}

impl Ring {
    /// Constructs a new buffer instance.
    ///
    /// The hint is a minimum size for the buffer. This size will be rounded up
    /// to the nearest page size for the actual capacity. The allocation will
    /// occupy double the space in the virtual memory table, but the physical
    /// memory usage will remain at the desired capacity.
    pub fn new(hint: usize) -> Result<Self> {
        let len = Size::alloc().round(hint);
        let ptr = map_ring(len)?;
        Ok(Self {
            ptr,
            len,
            rpos: 0,
            wpos: 0,
        })
    }

    /// Clears the buffer, resetting the filled region to empty.
    ///
    /// The number of initialized bytes is not changed, and the contents of the buffer are not modified.
    pub fn clear(&mut self) {
        self.rpos = 0;
        self.wpos = 0;
    }
}

impl Drop for Ring {
    fn drop(&mut self) {
        unsafe { unmap_ring(self.ptr, self.write_capacity()) }.unwrap_or_default();
    }
}

impl SeqRead for Ring {
    fn as_read_ptr(&self) -> *const u8 {
        self.ptr
    }

    fn read_offset(&self) -> usize {
        self.rpos as usize % self.len
    }

    fn read_len(&self) -> usize {
        (self.wpos - self.rpos) as usize
    }
}

impl SeqWrite for Ring {
    fn as_write_ptr(&mut self) -> *mut u8 {
        self.ptr
    }

    fn write_offset(&self) -> usize {
        self.wpos as usize % self.len
    }

    fn write_len(&self) -> usize {
        self.write_capacity() - self.read_len()
    }

    fn write_capacity(&self) -> usize {
        self.len
    }

    fn feed(&mut self, len: usize) {
        self.wpos += cmp::min(len, self.write_len()) as u64;
    }
}

impl BufRead for Ring {
    fn fill_buf(&mut self) -> io::Result<&[u8]> {
        Ok(self.as_read_slice(std::usize::MAX))
    }

    fn consume(&mut self, len: usize) {
        self.rpos += cmp::min(len, self.read_len()) as u64;
    }
}

impl Read for Ring {
    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
        self.read_from(buf)
    }
}

impl Write for Ring {
    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
        self.write_into(buf)
    }

    fn flush(&mut self) -> io::Result<()> {
        Ok(())
    }
}

impl Deref for Ring {
    type Target = [u8];

    #[inline]
    fn deref(&self) -> &Self::Target {
        self.as_read_slice(usize::MAX)
    }
}

impl AsRef<[u8]> for Ring
where
    <Ring as Deref>::Target: AsRef<[u8]>,
{
    fn as_ref(&self) -> &[u8] {
        self.deref()
    }
}

/// Fixed-size lossy read/write buffer with sequential address mapping.
///
/// This uses a circular address mapping scheme. That is, for any buffer of
/// size `N`, the pointer address range of `0..N` maps to the same physical
/// memory as the range `N..2*N`. This guarantees that the entire read or
/// write range may be addressed as a single sequence of bytes.
///
/// Unlike the [`Ring`], writes to this type may evict bytes from the read side
/// of the queue. The writeable size is always equal to the overall capacity of
/// the buffer.
///
/// # Examples
///
/// ```
/// use vmap::io::{InfiniteRing, SeqRead, SeqWrite};
/// use std::io::{BufRead, Read, Write};
///
/// # fn main() -> std::io::Result<()> {
/// let mut buf = InfiniteRing::new(4000).unwrap();
/// let mut i = 1;
/// let mut total = 0;
/// while total < buf.write_capacity() {
///     let tmp = format!("this is test line {}\n", i);
///     write!(buf, "{}", tmp);
///     total += tmp.len();
///     i += 1;
/// }
///
/// // skip over the overwritten tail
/// buf.consume(20 - buf.read_offset());
///
/// // read the next line
/// let mut line = String::new();
/// let len = buf.read_line(&mut line)?;
///
/// assert_eq!(len, 20);
/// assert_eq!(&line[line.len()-20..], "this is test line 2\n");
/// # Ok(())
/// # }
/// ```
#[derive(Debug)]
pub struct InfiniteRing {
    ptr: *mut u8,
    len: usize,
    rlen: u64,
    wpos: u64,
}

impl InfiniteRing {
    /// Constructs a new ring buffer instance.
    ///
    /// The hint is a minimum size for the buffer. This size will be rounded up
    /// to the nearest page size for the actual capacity. The allocation will
    /// occupy double the space in the virtual memory table, but the physical
    /// memory usage will remain at the desired capacity.
    pub fn new(hint: usize) -> Result<Self> {
        let len = Size::alloc().round(hint);
        let ptr = map_ring(len)?;
        Ok(Self {
            ptr,
            len,
            rlen: 0,
            wpos: 0,
        })
    }
}

impl Drop for InfiniteRing {
    fn drop(&mut self) {
        unsafe { unmap_ring(self.ptr, self.write_capacity()) }.unwrap_or_default()
    }
}

impl SeqRead for InfiniteRing {
    fn as_read_ptr(&self) -> *const u8 {
        self.ptr
    }
    fn read_offset(&self) -> usize {
        (self.wpos - self.rlen) as usize % self.len
    }
    fn read_len(&self) -> usize {
        self.rlen as usize
    }
}

impl SeqWrite for InfiniteRing {
    fn as_write_ptr(&mut self) -> *mut u8 {
        self.ptr
    }
    fn write_offset(&self) -> usize {
        self.wpos as usize % self.len
    }
    fn write_len(&self) -> usize {
        self.write_capacity()
    }
    fn write_capacity(&self) -> usize {
        self.len
    }
    fn feed(&mut self, len: usize) {
        self.wpos += cmp::min(len, self.write_len()) as u64;
        self.rlen = cmp::min(self.rlen + len as u64, self.len as u64);
    }
}

impl BufRead for InfiniteRing {
    fn fill_buf(&mut self) -> io::Result<&[u8]> {
        Ok(self.as_read_slice(std::usize::MAX))
    }

    fn consume(&mut self, len: usize) {
        self.rlen -= cmp::min(len, self.read_len()) as u64;
    }
}

impl Read for InfiniteRing {
    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
        self.read_from(buf)
    }
}

impl Write for InfiniteRing {
    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
        self.write_into(buf)
    }

    fn write_all(&mut self, buf: &[u8]) -> io::Result<()> {
        let len = {
            let dst = self.as_write_slice(buf.len());
            let len = dst.len();
            let tail = buf.len() - len;
            dst.copy_from_slice(&buf[tail..]);
            len
        };
        self.feed(len);
        Ok(())
    }

    fn flush(&mut self) -> io::Result<()> {
        Ok(())
    }
}

impl Deref for InfiniteRing {
    type Target = [u8];

    #[inline]
    fn deref(&self) -> &Self::Target {
        self.as_read_slice(usize::MAX)
    }
}

impl AsRef<[u8]> for InfiniteRing
where
    <InfiniteRing as Deref>::Target: AsRef<[u8]>,
{
    fn as_ref(&self) -> &[u8] {
        self.deref()
    }
}