seq_data_file/
lib.rs

1//! Seq Data is a simple file format that contains multiple chunks of data prefixed by a length
2use std::fs::{File, OpenOptions};
3use std::io::{BufReader, Read, Seek, Write};
4use std::marker::PhantomData;
5use std::path::Path;
6
7mod format;
8mod ioutils;
9
10#[cfg(feature = "async")]
11pub mod nonblocking;
12
13pub use format::{NoMagicNoHeader, SeqDataFormat};
14use ioutils::optional_read_exact;
15pub use ioutils::truncate_at;
16
17/// Writer for a new SeqData
18pub struct SeqDataWriter<Format: SeqDataFormat> {
19    file: File,
20    pos: u64,
21    phantom: PhantomData<Format>,
22}
23
24impl<Format: SeqDataFormat> SeqDataWriter<Format> {
25    /// Create a new SeqData File at the location specified
26    ///
27    /// If the file already exists, this call will fail
28    ///
29    /// The header need to fits the size of Format::HEADER_SIZE
30    pub fn create<P: AsRef<Path>>(path: P, header: &[u8]) -> std::io::Result<Self> {
31        if Format::HEADER_SIZE != header.len() {
32            return Err(std::io::Error::new(
33                std::io::ErrorKind::Other,
34                format!(
35                    "header has invalid size, expecting {} but got {}",
36                    Format::HEADER_SIZE,
37                    header.len()
38                ),
39            ));
40        }
41
42        let mut file = OpenOptions::new()
43            .read(false)
44            .write(true)
45            .create_new(true)
46            .append(true)
47            .open(path)?;
48        file.write_all(&Format::MAGIC)?;
49        file.write_all(header)?;
50        let pos = (Format::MAGIC.len() + header.len()) as u64;
51        Ok(SeqDataWriter {
52            file,
53            pos,
54            phantom: PhantomData,
55        })
56    }
57
58    /// Open a SeqData File at the location specified
59    ///
60    /// If the file already exists, this call will fail
61    ///
62    /// The header need to fits the size of Format::HEADER_SIZE
63    pub fn open<P: AsRef<Path>>(path: P, header: &[u8]) -> std::io::Result<(Self, Vec<u8>)> {
64        if Format::HEADER_SIZE != header.len() {
65            return Err(std::io::Error::new(
66                std::io::ErrorKind::Other,
67                format!(
68                    "header has invalid size, expecting {} but got {}",
69                    Format::HEADER_SIZE,
70                    header.len()
71                ),
72            ));
73        }
74
75        let mut file = OpenOptions::new()
76            .read(true)
77            .write(true)
78            .create_new(false)
79            .append(true)
80            .open(path)?;
81
82        file.seek(std::io::SeekFrom::Start(0))?;
83        let header = read_magic_and_header(PhantomData::<Format>, &mut file)?;
84        let pos = file.seek(std::io::SeekFrom::End(0))?;
85
86        Ok((
87            SeqDataWriter {
88                file,
89                pos,
90                phantom: PhantomData,
91            },
92            header,
93        ))
94    }
95
96    pub fn position(&self) -> u64 {
97        self.pos
98    }
99
100    /// Append a new data chunk to this file
101    pub fn append(&mut self, data: &[u8]) -> std::io::Result<()> {
102        let len = size_of::<PrefixLength>() + data.len();
103        write_chunk(&mut self.file, data)?;
104        self.pos += len as u64;
105        Ok(())
106    }
107}
108
109/// Reader for SeqData
110pub struct SeqDataReader<Format: SeqDataFormat> {
111    buf_reader: BufReader<File>,
112    pos: u64,
113    len: u64,
114    phantom: PhantomData<Format>,
115}
116
117fn read_magic_and_header<Format: SeqDataFormat>(
118    _format: PhantomData<Format>,
119    file: &mut File,
120) -> std::io::Result<Vec<u8>> {
121    // try to read the magic
122    const MAGIC_READ_BUF_SIZE: usize = 16;
123    let mut magic_read_buf = [0u8; MAGIC_READ_BUF_SIZE];
124    let mut magic_slice = Format::MAGIC;
125    while !magic_slice.is_empty() {
126        let sz = Format::MAGIC.len().min(MAGIC_READ_BUF_SIZE);
127        let rd = file.read(&mut magic_read_buf[0..sz])?;
128        if rd == 0 {
129            return Err(std::io::Error::new(
130                std::io::ErrorKind::UnexpectedEof,
131                "unexpected EOF in magic reading",
132            ));
133        }
134        if magic_slice[0..rd] != magic_read_buf[0..rd] {
135            return Err(std::io::Error::new(
136                std::io::ErrorKind::Other,
137                "magic do not match expected value",
138            ));
139        }
140        magic_slice = &magic_slice[rd..];
141    }
142
143    let mut header = vec![0u8; Format::HEADER_SIZE];
144    file.read_exact(&mut header)?;
145    Ok(header)
146}
147
148impl<Format: SeqDataFormat> SeqDataReader<Format> {
149    /// Open a SeqData for reading
150    pub fn open<P: AsRef<Path>>(path: P) -> std::io::Result<(Self, Vec<u8>)> {
151        let mut file = File::open(path)?;
152
153        let phantom = PhantomData;
154        let len = get_file_length(phantom, &mut file)?;
155        let header = read_magic_and_header(phantom, &mut file)?;
156
157        let buf_reader = BufReader::with_capacity(1024 * 1024, file);
158        Ok((
159            SeqDataReader {
160                buf_reader,
161                pos: 0,
162                len,
163                phantom,
164            },
165            header,
166        ))
167    }
168
169    pub fn len(&self) -> u64 {
170        self.len
171    }
172
173    pub fn position(&self) -> u64 {
174        self.pos
175    }
176
177    /// Return the next block along with the current offset if it exists, or None if
178    /// reached the end of file.
179    pub fn next(&mut self) -> Option<std::io::Result<(u64, Vec<u8>)>> {
180        match read_chunk(&mut self.buf_reader) {
181            None => None,
182            Some(Err(e)) => Some(Err(e)),
183            Some(Ok(buf)) => {
184                let current_pos = self.pos;
185                self.pos += size_of::<PrefixLength>() as u64 + buf.len() as u64;
186                Some(Ok((current_pos, buf)))
187            }
188        }
189    }
190}
191
192/// Seq Data Reader with seek
193pub struct SeqDataReaderSeek<Format: SeqDataFormat> {
194    handle: File,
195    phantom: PhantomData<Format>,
196    start: u64,
197    len: u64,
198}
199
200impl<Format: SeqDataFormat> SeqDataReaderSeek<Format> {
201    /// Open a new Seq Data seeker
202    pub fn open<P: AsRef<Path>>(path: P) -> std::io::Result<(Self, Vec<u8>)> {
203        let mut handle = File::open(path)?;
204
205        let phantom = PhantomData;
206        let len = get_file_length(phantom, &mut handle)?;
207        let header = read_magic_and_header(phantom, &mut handle)?;
208
209        let start = handle.seek(std::io::SeekFrom::Current(0))?;
210
211        Ok((
212            Self {
213                handle,
214                phantom,
215                len,
216                start,
217            },
218            header,
219        ))
220    }
221
222    /// Return the next block along with the current offset if it exists, or None if
223    /// reached the end of file.
224    pub fn next(&mut self) -> std::io::Result<Vec<u8>> {
225        read_chunk(&mut self.handle).unwrap()
226    }
227
228    /// Return the next block at the offset specified
229    ///
230    /// Note that if the position specified is not a valid boundary,
231    /// then arbitrary invalid stuff might be returns, or some Err
232    /// related to reading data
233    pub fn next_at(&mut self, pos: u64) -> std::io::Result<Vec<u8>> {
234        if pos >= self.len {
235            return Err(std::io::Error::new(
236                std::io::ErrorKind::Other,
237                format!(
238                    "trying to access data at {} but data length {}",
239                    pos, self.len
240                ),
241            ));
242        }
243
244        let seek = self.start + pos;
245        self.handle.seek(std::io::SeekFrom::Start(seek))?;
246        self.next()
247    }
248}
249
250type PrefixLength = u32;
251
252fn read_chunk<R: Read>(file: &mut R) -> Option<std::io::Result<Vec<u8>>> {
253    let mut lenbuf = [0; size_of::<PrefixLength>()];
254    // try to read the length, if the length return a none, we just expect
255    // having reached the end of the stream then
256    match optional_read_exact(file, &mut lenbuf) {
257        None => None,
258        Some(Err(e)) => Some(Err(e)),
259        Some(Ok(())) => {
260            let len = PrefixLength::from_le_bytes(lenbuf);
261
262            // create a buffer of the prefix length 'len' and read all data
263            let mut out = vec![0; len as usize];
264            match file.read_exact(&mut out) {
265                Err(e) => Some(Err(e)),
266                Ok(()) => Some(Ok(out)),
267            }
268        }
269    }
270}
271
272fn write_chunk(file: &mut File, data: &[u8]) -> std::io::Result<()> {
273    let max = PrefixLength::MAX as usize;
274    assert!(data.len() <= max);
275    let len: u32 = data.len() as PrefixLength;
276    let header = len.to_le_bytes();
277    file.write_all(&header)?;
278    file.write_all(data)?;
279    Ok(())
280}
281
282fn get_file_length<Format: SeqDataFormat>(
283    _phantom: PhantomData<Format>,
284    file: &mut File,
285) -> std::io::Result<u64> {
286    let meta = file.metadata()?;
287    let total_len = meta.len();
288
289    let minimum_size = Format::MAGIC.len() as u64 + Format::HEADER_SIZE as u64;
290    if total_len < minimum_size {
291        return Err(std::io::Error::new(
292            std::io::ErrorKind::Other,
293            "file not contains enough bytes for magic and header",
294        ));
295    }
296    Ok(total_len - minimum_size)
297}