seq_data_file/
lib.rs

1//! Seq Data is a simple file format that contains multiple chunks of data prefixed by a length
2use std::fs::{File, OpenOptions};
3use std::io::{BufReader, Read, Seek, Write};
4use std::marker::PhantomData;
5use std::path::Path;
6
7mod format;
8mod ioutils;
9
10#[cfg(feature = "async")]
11pub mod nonblocking;
12
13pub use format::{NoMagicNoHeader, SeqDataFormat};
14use ioutils::optional_read_exact;
15pub use ioutils::truncate_at;
16
17/// Writer for a new SeqData
18pub struct SeqDataWriter<Format: SeqDataFormat> {
19    file: File,
20    phantom: PhantomData<Format>,
21}
22
23impl<Format: SeqDataFormat> SeqDataWriter<Format> {
24    /// Create a new SeqData File at the location specified
25    ///
26    /// If the file already exists, this call will fail
27    ///
28    /// The header need to fits the size of Format::HEADER_SIZE
29    pub fn create<P: AsRef<Path>>(path: P, header: &[u8]) -> std::io::Result<Self> {
30        if Format::HEADER_SIZE != header.len() {
31            return Err(std::io::Error::new(
32                std::io::ErrorKind::Other,
33                format!(
34                    "header has invalid size, expecting {} but got {}",
35                    Format::HEADER_SIZE,
36                    header.len()
37                ),
38            ));
39        }
40
41        let mut file = OpenOptions::new()
42            .read(false)
43            .write(true)
44            .create_new(true)
45            .append(true)
46            .open(path)?;
47        file.write_all(&Format::MAGIC)?;
48        file.write_all(header)?;
49        Ok(SeqDataWriter {
50            file,
51            phantom: PhantomData,
52        })
53    }
54
55    /// Open a SeqData File at the location specified
56    ///
57    /// If the file already exists, this call will fail
58    ///
59    /// The header need to fits the size of Format::HEADER_SIZE
60    pub fn open<P: AsRef<Path>>(path: P, header: &[u8]) -> std::io::Result<(Self, Vec<u8>)> {
61        if Format::HEADER_SIZE != header.len() {
62            return Err(std::io::Error::new(
63                std::io::ErrorKind::Other,
64                format!(
65                    "header has invalid size, expecting {} but got {}",
66                    Format::HEADER_SIZE,
67                    header.len()
68                ),
69            ));
70        }
71
72        let mut file = OpenOptions::new()
73            .read(true)
74            .write(true)
75            .create_new(false)
76            .append(true)
77            .open(path)?;
78
79        file.seek(std::io::SeekFrom::Start(0))?;
80        let header = read_magic_and_header(PhantomData::<Format>, &mut file)?;
81        file.seek(std::io::SeekFrom::End(0))?;
82
83        Ok((
84            SeqDataWriter {
85                file,
86                phantom: PhantomData,
87            },
88            header,
89        ))
90    }
91
92    /// Append a new data chunk to this file
93    pub fn append(&mut self, data: &[u8]) -> std::io::Result<()> {
94        write_chunk(&mut self.file, data)
95    }
96}
97
98/// Reader for SeqData
99pub struct SeqDataReader<Format: SeqDataFormat> {
100    buf_reader: BufReader<File>,
101    pos: u64,
102    len: u64,
103    phantom: PhantomData<Format>,
104}
105
106fn read_magic_and_header<Format: SeqDataFormat>(
107    _format: PhantomData<Format>,
108    file: &mut File,
109) -> std::io::Result<Vec<u8>> {
110    // try to read the magic
111    const MAGIC_READ_BUF_SIZE: usize = 16;
112    let mut magic_read_buf = [0u8; MAGIC_READ_BUF_SIZE];
113    let mut magic_slice = Format::MAGIC;
114    while !magic_slice.is_empty() {
115        let sz = Format::MAGIC.len().min(MAGIC_READ_BUF_SIZE);
116        let rd = file.read(&mut magic_read_buf[0..sz])?;
117        if rd == 0 {
118            return Err(std::io::Error::new(
119                std::io::ErrorKind::UnexpectedEof,
120                "unexpected EOF in magic reading",
121            ));
122        }
123        if magic_slice[0..rd] != magic_read_buf[0..rd] {
124            return Err(std::io::Error::new(
125                std::io::ErrorKind::Other,
126                "magic do not match expected value",
127            ));
128        }
129        magic_slice = &magic_slice[rd..];
130    }
131
132    let mut header = vec![0u8; Format::HEADER_SIZE];
133    file.read_exact(&mut header)?;
134    Ok(header)
135}
136
137impl<Format: SeqDataFormat> SeqDataReader<Format> {
138    /// Open a SeqData for reading
139    pub fn open<P: AsRef<Path>>(path: P) -> std::io::Result<(Self, Vec<u8>)> {
140        let mut file = File::open(path)?;
141
142        let phantom = PhantomData;
143        let len = get_file_length(phantom, &mut file)?;
144        let header = read_magic_and_header(phantom, &mut file)?;
145
146        let buf_reader = BufReader::with_capacity(1024 * 1024, file);
147        Ok((
148            SeqDataReader {
149                buf_reader,
150                pos: 0,
151                len,
152                phantom,
153            },
154            header,
155        ))
156    }
157
158    pub fn len(&self) -> u64 {
159        self.len
160    }
161
162    pub fn position(&self) -> u64 {
163        self.pos
164    }
165
166    /// Return the next block along with the current offset if it exists, or None if
167    /// reached the end of file.
168    pub fn next(&mut self) -> Option<std::io::Result<(u64, Vec<u8>)>> {
169        match read_chunk(&mut self.buf_reader) {
170            None => None,
171            Some(Err(e)) => Some(Err(e)),
172            Some(Ok(buf)) => {
173                let current_pos = self.pos;
174                self.pos += size_of::<PrefixLength>() as u64 + buf.len() as u64;
175                Some(Ok((current_pos, buf)))
176            }
177        }
178    }
179}
180
181/// Seq Data Reader with seek
182pub struct SeqDataReaderSeek<Format: SeqDataFormat> {
183    handle: File,
184    phantom: PhantomData<Format>,
185    start: u64,
186    len: u64,
187}
188
189impl<Format: SeqDataFormat> SeqDataReaderSeek<Format> {
190    /// Open a new Seq Data seeker
191    pub fn open<P: AsRef<Path>>(path: P) -> std::io::Result<(Self, Vec<u8>)> {
192        let mut handle = File::open(path)?;
193
194        let phantom = PhantomData;
195        let len = get_file_length(phantom, &mut handle)?;
196        let header = read_magic_and_header(phantom, &mut handle)?;
197
198        let start = handle.seek(std::io::SeekFrom::Current(0))?;
199
200        Ok((
201            Self {
202                handle,
203                phantom,
204                len,
205                start,
206            },
207            header,
208        ))
209    }
210
211    /// Return the next block along with the current offset if it exists, or None if
212    /// reached the end of file.
213    pub fn next(&mut self) -> std::io::Result<Vec<u8>> {
214        read_chunk(&mut self.handle).unwrap()
215    }
216
217    /// Return the next block at the offset specified
218    ///
219    /// Note that if the position specified is not a valid boundary,
220    /// then arbitrary invalid stuff might be returns, or some Err
221    /// related to reading data
222    pub fn next_at(&mut self, pos: u64) -> std::io::Result<Vec<u8>> {
223        if pos >= self.len {
224            return Err(std::io::Error::new(
225                std::io::ErrorKind::Other,
226                format!(
227                    "trying to access data at {} but data length {}",
228                    pos, self.len
229                ),
230            ));
231        }
232
233        let seek = self.start + pos;
234        self.handle.seek(std::io::SeekFrom::Start(seek))?;
235        self.next()
236    }
237}
238
239type PrefixLength = u32;
240
241fn read_chunk<R: Read>(file: &mut R) -> Option<std::io::Result<Vec<u8>>> {
242    let mut lenbuf = [0; size_of::<PrefixLength>()];
243    // try to read the length, if the length return a none, we just expect
244    // having reached the end of the stream then
245    match optional_read_exact(file, &mut lenbuf) {
246        None => None,
247        Some(Err(e)) => Some(Err(e)),
248        Some(Ok(())) => {
249            let len = PrefixLength::from_le_bytes(lenbuf);
250
251            // create a buffer of the prefix length 'len' and read all data
252            let mut out = vec![0; len as usize];
253            match file.read_exact(&mut out) {
254                Err(e) => Some(Err(e)),
255                Ok(()) => Some(Ok(out)),
256            }
257        }
258    }
259}
260
261fn write_chunk(file: &mut File, data: &[u8]) -> std::io::Result<()> {
262    let max = PrefixLength::MAX as usize;
263    assert!(data.len() <= max);
264    let len: u32 = data.len() as PrefixLength;
265    let header = len.to_le_bytes();
266    file.write_all(&header)?;
267    file.write_all(data)?;
268    Ok(())
269}
270
271fn get_file_length<Format: SeqDataFormat>(
272    _phantom: PhantomData<Format>,
273    file: &mut File,
274) -> std::io::Result<u64> {
275    let meta = file.metadata()?;
276    let total_len = meta.len();
277
278    let minimum_size = Format::MAGIC.len() as u64 + Format::HEADER_SIZE as u64;
279    if total_len < minimum_size {
280        return Err(std::io::Error::new(
281            std::io::ErrorKind::Other,
282            "file not contains enough bytes for magic and header",
283        ));
284    }
285    Ok(total_len - minimum_size)
286}