d4_framefile/
stream.rs

1use crate::randfile::RandFile;
2
3use std::io::{Read, Result, Seek, Write};
4use std::num::NonZeroI64;
5
6#[repr(packed)]
7#[derive(Default, Clone, Copy)]
8pub(crate) struct FrameHeader {
9    pub(crate) linked_frame: Option<NonZeroI64>,
10    pub(crate) linked_frame_size: u64,
11}
12
13impl FrameHeader {
14    fn new(relative_offset: i64, frame_size: u64) -> Self {
15        Self {
16            linked_frame: NonZeroI64::new(relative_offset.to_le()),
17            linked_frame_size: frame_size.to_le(),
18        }
19    }
20    fn from_bytes(data: &[u8]) -> FrameHeader {
21        assert!(data.len() >= std::mem::size_of::<Self>());
22        let data = *unsafe { std::mem::transmute::<_, &FrameHeader>(data.as_ptr()) };
23        let offset = data.linked_frame.map_or(0, |x| x.get().to_le());
24        let size = data.linked_frame_size;
25        Self::new(offset, size)
26    }
27
28    fn as_bytes(&self) -> &[u8] {
29        unsafe {
30            std::slice::from_raw_parts(
31                self as *const Self as *const u8,
32                std::mem::size_of::<Self>(),
33            )
34        }
35    }
36}
37
38/// The frame is a consecutive data block in any variant length stream.
39/// Note this is the in-memory representation of a stream
40/// It also have mapped form, please see crate::mapped::MappedStream
41#[derive(Default)]
42struct Frame {
43    header: FrameHeader,
44    /// The offset from the beginning of the file to the head of this frame. If this frame haven't flushed to disk yet, this field is None
45    /// Once this frame is flushed, this field should be updated to the absolute offset of this frame
46    offset: Option<u64>,
47    /// The absolute offset for it's parent frame, if this is the first frame of stream, this should be None
48    parent_frame: Option<(u64, usize)>,
49    /// The size of the current frame
50    current_frame_size: usize,
51    /// The flag indicates if the buffer is dirty
52    dirty: bool,
53    /// The offset from the start of the frame to the payload
54    payload_offset: usize,
55    /// The actual data block
56    data: Vec<u8>,
57}
58
59impl Frame {
60    fn update_frame_link<W: Seek + Write>(
61        &mut self,
62        file: &mut RandFile<W>,
63        offset: u64,
64        size: usize,
65    ) -> Result<()> {
66        if let Some((parent_frame, _parent_size)) = self.parent_frame {
67            let new_header = FrameHeader::new(offset as i64 - parent_frame as i64, size as u64);
68            file.update_block(parent_frame, new_header.as_bytes())
69        } else {
70            Ok(())
71        }
72    }
73
74    fn sync_current_frame<W: Seek + Write>(&mut self, file: &mut RandFile<W>) -> Result<()> {
75        if !self.dirty {
76            return Ok(());
77        }
78
79        if let Some(offset) = self.offset {
80            file.update_block(offset, &self.data)?;
81        } else {
82            let offset = file.append_block(&self.data)?;
83            self.offset = Some(offset);
84            self.update_frame_link(file, offset, self.current_frame_size)?;
85        };
86        Ok(())
87    }
88
89    fn reserve_frame<W: Write + Seek>(
90        &mut self,
91        file: &mut RandFile<W>,
92        size: usize,
93    ) -> Result<()> {
94        if self.offset.is_some() {
95            return Err(std::io::Error::new(
96                std::io::ErrorKind::Other,
97                "Invalid reservation",
98            ));
99        }
100
101        let offset = file.reserve_block(size)?;
102        self.offset = Some(offset);
103        self.current_frame_size = size;
104        self.update_frame_link(file, offset, size)?;
105        Ok(())
106    }
107
108    fn zero_frame(&mut self) {
109        self.data[self.payload_offset..]
110            .iter_mut()
111            .for_each(|m| *m = 0);
112        self.data.resize(self.current_frame_size, 0);
113    }
114
115    fn alloc_new_frame<W: Seek + Write>(
116        this: Option<Self>,
117        file: &mut RandFile<W>,
118        reserve: usize,
119    ) -> Result<Self> {
120        let (mut ret, parent) = if let Some(mut current) = this {
121            current.sync_current_frame(file)?;
122            let parent = current.offset.map(|ofs| (ofs, current.current_frame_size));
123            (current, parent)
124        } else {
125            (Self::default(), None)
126        };
127        ret.header.linked_frame = None;
128        ret.header.linked_frame_size = 0;
129        ret.parent_frame = parent;
130        ret.offset = None;
131        ret.current_frame_size = std::mem::size_of::<FrameHeader>();
132        ret.dirty = true;
133        ret.payload_offset = std::mem::size_of::<FrameHeader>();
134        ret.data.resize(std::mem::size_of::<FrameHeader>(), 0);
135        if reserve > 0 {
136            ret.reserve_frame(file, reserve)?;
137        }
138        Ok(ret)
139    }
140
141    fn load_from_file<R: Seek + Read>(
142        file: &mut RandFile<R>,
143        offset: u64,
144        size: usize,
145        read_payload: bool,
146        buf: Option<Self>,
147        backward: bool,
148    ) -> Result<Self> {
149        let bytes_to_read = if !read_payload {
150            std::mem::size_of::<FrameHeader>()
151        } else {
152            size
153        };
154
155        let mut ret = if let Some(buf) = buf {
156            buf
157        } else {
158            Self::default()
159        };
160
161        ret.data.resize(bytes_to_read, 0);
162        if ret.data.len() != file.read_block(offset, &mut ret.data[..])? {
163            return Err(std::io::Error::new(
164                std::io::ErrorKind::Other,
165                "Invalid frame size",
166            ));
167        }
168
169        ret.header = FrameHeader::from_bytes(&ret.data);
170
171        ret.dirty = false;
172        ret.payload_offset = std::mem::size_of::<FrameHeader>();
173        ret.current_frame_size = size;
174        if !backward {
175            ret.parent_frame = ret.offset.map(|offset| (offset, ret.current_frame_size));
176        } else {
177            ret.parent_frame = None;
178        }
179        ret.offset = Some(offset);
180
181        Ok(ret)
182    }
183
184    fn load_next_frame<R: Seek + Read>(
185        self,
186        file: &mut RandFile<R>,
187        read_payload: bool,
188    ) -> Result<Option<Self>> {
189        if let Some(offset) = self.offset {
190            if let Some(rel_addr) = self.header.linked_frame.map(i64::from) {
191                let size = self.header.linked_frame_size as usize;
192                let addr = (offset as i64 + rel_addr) as u64;
193                return Self::load_from_file(file, addr, size, read_payload, Some(self), false)
194                    .map(Some);
195            }
196        }
197        Ok(None)
198    }
199
200    fn load_previous_frame<R: Seek + Read>(
201        self,
202        file: &mut RandFile<R>,
203        read_payload: bool,
204    ) -> Result<Option<Self>> {
205        if let Some((parent_ofs, parent_size)) = self.parent_frame {
206            return Self::load_from_file(
207                file,
208                parent_ofs,
209                parent_size,
210                read_payload,
211                Some(self),
212                true,
213            )
214            .map(Some);
215        }
216        Ok(None)
217    }
218}
219
220pub struct Stream<T> {
221    file: RandFile<T>,
222    current_frame: Option<Frame>,
223    cursor: usize,
224    frame_size: usize,
225    pre_alloc: bool,
226    on_drop: Box<dyn FnOnce(&mut Self) + Send + Sync>,
227}
228impl<T> Stream<T> {
229    pub fn set_frame_size(&mut self, size: usize) {
230        self.frame_size = size;
231    }
232    pub fn double_frame_size(&mut self, limit: usize) {
233        if self.frame_size * 2 > limit {
234            self.frame_size = limit;
235            return;
236        }
237        self.frame_size *= 2;
238    }
239    pub(crate) fn clone_underlying_file<'b>(&'b self) -> RandFile<T> {
240        self.file.clone()
241    }
242
243    pub(crate) fn get_frame_offset(&self) -> Option<u64> {
244        self.current_frame.as_ref().and_then(|frame| frame.offset)
245    }
246
247    pub(crate) fn get_frame_size(&self) -> Option<usize> {
248        self.current_frame
249            .as_ref()
250            .map(|frame| frame.current_frame_size)
251    }
252
253    pub fn get_frame_capacity(&self) -> usize {
254        self.frame_size - std::mem::size_of::<FrameHeader>()
255    }
256}
257impl<T: Read + Write + Seek> Stream<T> {
258    pub fn flush(&mut self) -> Result<()> {
259        let current_frame = std::mem::replace(&mut self.current_frame, None);
260        self.current_frame = Some(Frame::alloc_new_frame(current_frame, &mut self.file, 0)?);
261        self.cursor = 0;
262        Ok(())
263    }
264
265    pub fn write(&mut self, buffer: &[u8]) -> Result<usize> {
266        self.write_with_alloc_callback(buffer, |_| ())
267    }
268
269    pub fn disable_pre_alloc(&mut self) {
270        self.pre_alloc = false;
271    }
272
273    pub fn write_frame(&mut self, buffer: &[u8]) -> Result<()> {
274        self.flush()?;
275        if let Some(frame) = self.current_frame.as_mut() {
276            frame.data.extend_from_slice(buffer);
277            frame.current_frame_size = frame.data.len();
278        }
279        Ok(())
280    }
281
282    /// Append data to the given stream. Similar to write, but this allows to inject a callback function,
283    /// which you can modify the configuration of the stream before the stream actually synced to the file.
284    pub fn write_with_alloc_callback<R: FnMut(&mut Self)>(
285        &mut self,
286        buffer: &[u8],
287        mut callback: R,
288    ) -> Result<usize> {
289        let mut ret = 0;
290        let mut ptr = buffer;
291        while !ptr.is_empty() {
292            // First, let's determine the size we can write for this iteration
293            let bytes_can_write = if self
294                .current_frame
295                .as_ref()
296                .map_or(false, |s| s.offset.is_some())
297            {
298                // If we are actually writing some block that is backed in the target file,
299                // we are limited by the size of current frame
300                self.current_frame
301                    .as_ref()
302                    .map_or(0, |f| f.current_frame_size - f.payload_offset - self.cursor)
303                    .min(ptr.len())
304            } else {
305                // Otherwise we are free to extend the frame size if the frame size limit is unspecified
306                if self.frame_size > 0 {
307                    self.current_frame
308                        .as_ref()
309                        .map_or(0, |f| self.frame_size - f.payload_offset - self.cursor)
310                        .min(ptr.len())
311                } else {
312                    ptr.len()
313                }
314            };
315
316            if bytes_can_write == 0 {
317                if let Some(Some(_)) = self.current_frame.as_ref().map(|f| f.header.linked_frame) {
318                    let mut current_frame = self.current_frame.take().unwrap();
319                    current_frame.sync_current_frame(&mut self.file)?;
320                    self.current_frame = current_frame.load_next_frame(&mut self.file, true)?;
321                } else {
322                    callback(self);
323                    let current_frame = self.current_frame.take();
324                    self.current_frame =
325                        Some(Frame::alloc_new_frame(current_frame, &mut self.file, 0)?);
326                    if self.frame_size > 0 && self.pre_alloc {
327                        let frame = self.current_frame.as_mut().unwrap();
328                        frame.reserve_frame(&mut self.file, self.frame_size)?;
329                        frame.zero_frame();
330                    }
331                }
332                self.cursor = 0;
333                continue;
334            }
335
336            let cursor = self.cursor;
337            if let Some(ref mut frame) = self.current_frame {
338                let start = frame.payload_offset + cursor;
339                let end = start + bytes_can_write;
340                if frame.data.len() < end {
341                    frame.data.resize(end, 0);
342                }
343                frame.data[start..end].copy_from_slice(&ptr[..bytes_can_write]);
344                frame.current_frame_size = frame.current_frame_size.max(end);
345                frame.dirty = true;
346            }
347            ptr = &ptr[bytes_can_write..];
348            self.cursor += bytes_can_write;
349            ret += bytes_can_write;
350        }
351        Ok(ret)
352    }
353}
354impl<T: Read + Seek> Read for Stream<T> {
355    fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
356        Stream::read(self, buf)
357    }
358}
359impl<T: Read + Write + Seek> Write for Stream<T> {
360    fn write(&mut self, buf: &[u8]) -> Result<usize> {
361        Stream::write(self, buf)
362    }
363    fn flush(&mut self) -> Result<()> {
364        self.flush()
365    }
366}
367impl<T: Read + Seek> AsRef<[u8]> for Stream<T> {
368    fn as_ref(&self) -> &[u8] {
369        if let Some(r) = self.read_current_frame() {
370            r
371        } else {
372            &[]
373        }
374    }
375}
376impl<T: Read + Seek> Stream<T> {
377    pub fn load_next_frame(&mut self) -> Result<()> {
378        if let Some(this_frame) = self.current_frame.take() {
379            self.cursor = 0;
380            self.current_frame = this_frame.load_next_frame(&mut self.file, true)?;
381        }
382        Ok(())
383    }
384    pub fn read_current_frame(&self) -> Option<&[u8]> {
385        if let Some(this_frame) = self.current_frame.as_ref() {
386            return Some(&this_frame.data[this_frame.payload_offset..]);
387        }
388        None
389    }
390    pub fn copy_current_frame_data(&self, buf: &mut Vec<u8>) {
391        buf.clear();
392        buf.extend_from_slice(self.read_current_frame().unwrap_or(&[]));
393    }
394    pub fn read(&mut self, buffer: &mut [u8]) -> Result<usize> {
395        let mut ret = 0;
396        let mut ptr = buffer;
397        while self.current_frame.is_some() && !ptr.is_empty() {
398            let bytes_read = {
399                let can_read = self
400                    .current_frame
401                    .as_ref()
402                    .map_or(0, |f| f.data.len() - f.payload_offset)
403                    .max(self.cursor)
404                    - self.cursor;
405                if can_read == 0 {
406                    let this_frame = std::mem::replace(&mut self.current_frame, None);
407                    self.current_frame =
408                        this_frame.unwrap().load_next_frame(&mut self.file, true)?;
409                    self.cursor = 0;
410                    continue;
411                }
412                can_read
413            }
414            .min(ptr.len());
415            ptr[..bytes_read].copy_from_slice(
416                self.current_frame
417                    .as_ref()
418                    .map(|f| {
419                        let start = f.payload_offset + self.cursor;
420                        let end = start + bytes_read;
421                        &f.data[start..end]
422                    })
423                    .unwrap(),
424            );
425            ret += bytes_read;
426            ptr = &mut ptr[bytes_read..];
427            self.cursor += bytes_read;
428        }
429        Ok(ret)
430    }
431    pub(crate) fn open_for_read(file: RandFile<T>, primary_frame: (u64, usize)) -> Result<Self> {
432        Self::open_with_ondrop(file, primary_frame, |_| {})
433    }
434    pub(crate) fn open_for_update(file: RandFile<T>, primary_frame: (u64, usize)) -> Result<Self>
435    where
436        T: Write,
437    {
438        Self::open_with_ondrop(file, primary_frame, |s| s.flush().unwrap())
439    }
440    pub(crate) fn open_with_ondrop<D: FnOnce(&mut Self) + Send + Sync + 'static>(
441        mut file: RandFile<T>,
442        primary_frame: (u64, usize),
443        on_drop: D,
444    ) -> Result<Self> {
445        let primary_frame = Frame::load_from_file(
446            &mut file,
447            primary_frame.0,
448            primary_frame.1,
449            true,
450            None,
451            false,
452        )?;
453        let frame_size = primary_frame.current_frame_size;
454        let current_frame = Some(primary_frame);
455        Ok(Self {
456            file,
457            current_frame,
458            cursor: 0,
459            frame_size,
460            on_drop: Box::new(on_drop),
461            pre_alloc: true,
462        })
463    }
464}
465
466impl<'a, T: Read + Write + Seek> Stream<T> {
467    pub fn update_current_byte(&mut self, byte: u8) -> Result<usize> {
468        if let Some(current_frame) = &self.current_frame {
469            if self.cursor > 0 {
470                self.cursor -= 1;
471            } else {
472                if current_frame.parent_frame.is_some() {
473                    let this_frame = self.current_frame.take().unwrap();
474                    this_frame.load_previous_frame(&mut self.file, true)?;
475                }
476                // Otherwise this is the begining of the stream, thus no need to go back
477            }
478            self.write(&[byte])?;
479            return Ok(1);
480        }
481        Ok(0)
482    }
483    pub(crate) fn create(mut file: RandFile<T>, frame_size: usize) -> Result<Self> {
484        let current_frame = Some(Frame::alloc_new_frame(None, &mut file, frame_size)?);
485        Ok(Self {
486            file,
487            current_frame,
488            cursor: 0,
489            frame_size,
490            on_drop: Box::new(|this| {
491                this.flush().unwrap();
492            }),
493            pre_alloc: true,
494        })
495    }
496}
497
498impl<T> Drop for Stream<T> {
499    fn drop(&mut self) {
500        let drop_callback = std::mem::replace(&mut self.on_drop, Box::new(|_| {}));
501        drop_callback(self);
502    }
503}
504
505#[cfg(test)]
506mod test {
507    use super::Stream;
508    use crate::randfile::RandFile;
509    use std::io::Cursor;
510    type TestResult<T> = std::result::Result<T, Box<dyn std::error::Error>>;
511    #[test]
512    fn test_stream_send() {
513        fn check_send<T: Send>() {}
514        check_send::<Stream<std::fs::File>>();
515    }
516    #[test]
517    fn test_compose_stream() -> TestResult<()> {
518        let mut buffer = vec![];
519        {
520            let fp = Cursor::new(&mut buffer);
521            let file = RandFile::new(fp);
522
523            let mut stream = Stream::create(file.clone(), 0)?;
524            let mut stream2 = Stream::create(file, 0)?;
525
526            stream.write(b"This is a test frame")?;
527            stream2.write(b"This is another stream")?;
528
529            stream.flush()?;
530
531            stream.write(b"This is the second block")?;
532            stream2.write(b"This is another stream - 2")?;
533            stream2.flush()?;
534            stream.flush()?;
535        }
536
537        {
538            let fp = Cursor::new(&mut buffer);
539            let file = RandFile::new(fp);
540            let mut stream = Stream::open_for_read(file, (0, 30))?;
541            let mut data = [0; 100];
542            assert_eq!(38, stream.read(&mut data)?);
543        }
544
545        Ok(())
546    }
547    #[test]
548    fn test_traverse_file() -> TestResult<()> {
549        let test_blob: Vec<_> = vec![
550            19, 0, 0, 0, 0, 0, 0, 0, //Linked Frame
551            20, 0, 0, 0, 0, 0, 0, 0, // Linked Frame size
552            0xdd, 0xdd, 0xdd, // Frame data
553            0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
554        ];
555        let reader = Cursor::new(test_blob);
556        let file = RandFile::new(reader);
557
558        let mut stream = Stream::open_for_read(file, (0, 19))?;
559
560        let mut buffer = vec![0; 100];
561
562        assert_eq!(7, stream.read(&mut buffer)?);
563
564        Ok(())
565    }
566
567    #[test]
568    fn test_modify_stream() -> TestResult<()> {
569        let test_blob: Vec<_> = vec![
570            19, 0, 0, 0, 0, 0, 0, 0, //Linked Frame
571            20, 0, 0, 0, 0, 0, 0, 0, // Linked Frame size
572            0xdd, 0xdd, 0x00, // Frame data
573            0, 0, 0, 0, 0, 0, 0, 0, // Linked Frame
574            0, 0, 0, 0, 0, 0, 0, 0, // Linked Frame size
575            0, 0, 0, 0,
576        ];
577        let reader = Cursor::new(test_blob);
578        let file = RandFile::new(reader);
579
580        let mut stream = Stream::open_for_read(file, (0, 19))?;
581
582        loop {
583            let mut buf = [0; 1];
584            stream.read(&mut buf[..]).unwrap();
585            if buf[0] == 0 {
586                break;
587            }
588        }
589        assert_eq!(stream.update_current_byte(0xdd).unwrap(), 1);
590        stream.write(&[0xdd]).unwrap();
591        stream.write(&[0xdd]).unwrap();
592        stream.flush().unwrap();
593
594        let test_blob = stream
595            .clone_underlying_file()
596            .clone_inner()
597            .unwrap()
598            .into_inner();
599
600        assert_eq!(test_blob[18], 0xdd);
601        assert_eq!(test_blob[35], 0xdd);
602        assert_eq!(test_blob[36], 0xdd);
603
604        Ok(())
605    }
606}