1use std::fs::{File, OpenOptions};
3use std::io::{BufReader, Read, Seek, Write};
4use std::marker::PhantomData;
5use std::path::Path;
6
7mod format;
8mod ioutils;
9
10#[cfg(feature = "async")]
11pub mod nonblocking;
12
13pub use format::{NoMagicNoHeader, SeqDataFormat};
14use ioutils::optional_read_exact;
15pub use ioutils::truncate_at;
16
17pub struct SeqDataWriter<Format: SeqDataFormat> {
19 file: File,
20 pos: u64,
21 phantom: PhantomData<Format>,
22}
23
24impl<Format: SeqDataFormat> SeqDataWriter<Format> {
25 pub fn create<P: AsRef<Path>>(path: P, header: &[u8]) -> std::io::Result<Self> {
31 if Format::HEADER_SIZE != header.len() {
32 return Err(std::io::Error::new(
33 std::io::ErrorKind::Other,
34 format!(
35 "header has invalid size, expecting {} but got {}",
36 Format::HEADER_SIZE,
37 header.len()
38 ),
39 ));
40 }
41
42 let mut file = OpenOptions::new()
43 .read(false)
44 .write(true)
45 .create_new(true)
46 .append(true)
47 .open(path)?;
48 file.write_all(&Format::MAGIC)?;
49 file.write_all(header)?;
50 let pos = (Format::MAGIC.len() + header.len()) as u64;
51 Ok(SeqDataWriter {
52 file,
53 pos,
54 phantom: PhantomData,
55 })
56 }
57
58 pub fn open<P: AsRef<Path>>(path: P, header: &[u8]) -> std::io::Result<(Self, Vec<u8>)> {
64 if Format::HEADER_SIZE != header.len() {
65 return Err(std::io::Error::new(
66 std::io::ErrorKind::Other,
67 format!(
68 "header has invalid size, expecting {} but got {}",
69 Format::HEADER_SIZE,
70 header.len()
71 ),
72 ));
73 }
74
75 let mut file = OpenOptions::new()
76 .read(true)
77 .write(true)
78 .create_new(false)
79 .append(true)
80 .open(path)?;
81
82 file.seek(std::io::SeekFrom::Start(0))?;
83 let header = read_magic_and_header(PhantomData::<Format>, &mut file)?;
84 let pos = file.seek(std::io::SeekFrom::End(0))?;
85
86 Ok((
87 SeqDataWriter {
88 file,
89 pos,
90 phantom: PhantomData,
91 },
92 header,
93 ))
94 }
95
96 pub fn position(&self) -> u64 {
97 self.pos
98 }
99
100 pub fn append(&mut self, data: &[u8]) -> std::io::Result<()> {
102 let len = size_of::<PrefixLength>() + data.len();
103 write_chunk(&mut self.file, data)?;
104 self.pos += len as u64;
105 Ok(())
106 }
107}
108
109pub struct SeqDataReader<Format: SeqDataFormat> {
111 buf_reader: BufReader<File>,
112 pos: u64,
113 len: u64,
114 phantom: PhantomData<Format>,
115}
116
117fn read_magic_and_header<Format: SeqDataFormat>(
118 _format: PhantomData<Format>,
119 file: &mut File,
120) -> std::io::Result<Vec<u8>> {
121 const MAGIC_READ_BUF_SIZE: usize = 16;
123 let mut magic_read_buf = [0u8; MAGIC_READ_BUF_SIZE];
124 let mut magic_slice = Format::MAGIC;
125 while !magic_slice.is_empty() {
126 let sz = Format::MAGIC.len().min(MAGIC_READ_BUF_SIZE);
127 let rd = file.read(&mut magic_read_buf[0..sz])?;
128 if rd == 0 {
129 return Err(std::io::Error::new(
130 std::io::ErrorKind::UnexpectedEof,
131 "unexpected EOF in magic reading",
132 ));
133 }
134 if magic_slice[0..rd] != magic_read_buf[0..rd] {
135 return Err(std::io::Error::new(
136 std::io::ErrorKind::Other,
137 "magic do not match expected value",
138 ));
139 }
140 magic_slice = &magic_slice[rd..];
141 }
142
143 let mut header = vec![0u8; Format::HEADER_SIZE];
144 file.read_exact(&mut header)?;
145 Ok(header)
146}
147
148impl<Format: SeqDataFormat> SeqDataReader<Format> {
149 pub fn open<P: AsRef<Path>>(path: P) -> std::io::Result<(Self, Vec<u8>)> {
151 let mut file = File::open(path)?;
152
153 let phantom = PhantomData;
154 let len = get_file_length(phantom, &mut file)?;
155 let header = read_magic_and_header(phantom, &mut file)?;
156
157 let buf_reader = BufReader::with_capacity(1024 * 1024, file);
158 Ok((
159 SeqDataReader {
160 buf_reader,
161 pos: 0,
162 len,
163 phantom,
164 },
165 header,
166 ))
167 }
168
169 pub fn len(&self) -> u64 {
170 self.len
171 }
172
173 pub fn position(&self) -> u64 {
174 self.pos
175 }
176
177 pub fn next(&mut self) -> Option<std::io::Result<(u64, Vec<u8>)>> {
180 match read_chunk(&mut self.buf_reader) {
181 None => None,
182 Some(Err(e)) => Some(Err(e)),
183 Some(Ok(buf)) => {
184 let current_pos = self.pos;
185 self.pos += size_of::<PrefixLength>() as u64 + buf.len() as u64;
186 Some(Ok((current_pos, buf)))
187 }
188 }
189 }
190}
191
192pub struct SeqDataReaderSeek<Format: SeqDataFormat> {
194 handle: File,
195 phantom: PhantomData<Format>,
196 start: u64,
197 len: u64,
198}
199
200impl<Format: SeqDataFormat> SeqDataReaderSeek<Format> {
201 pub fn open<P: AsRef<Path>>(path: P) -> std::io::Result<(Self, Vec<u8>)> {
203 let mut handle = File::open(path)?;
204
205 let phantom = PhantomData;
206 let len = get_file_length(phantom, &mut handle)?;
207 let header = read_magic_and_header(phantom, &mut handle)?;
208
209 let start = handle.seek(std::io::SeekFrom::Current(0))?;
210
211 Ok((
212 Self {
213 handle,
214 phantom,
215 len,
216 start,
217 },
218 header,
219 ))
220 }
221
222 pub fn next(&mut self) -> std::io::Result<Vec<u8>> {
225 read_chunk(&mut self.handle).unwrap()
226 }
227
228 pub fn next_at(&mut self, pos: u64) -> std::io::Result<Vec<u8>> {
234 if pos >= self.len {
235 return Err(std::io::Error::new(
236 std::io::ErrorKind::Other,
237 format!(
238 "trying to access data at {} but data length {}",
239 pos, self.len
240 ),
241 ));
242 }
243
244 let seek = self.start + pos;
245 self.handle.seek(std::io::SeekFrom::Start(seek))?;
246 self.next()
247 }
248}
249
250type PrefixLength = u32;
251
252fn read_chunk<R: Read>(file: &mut R) -> Option<std::io::Result<Vec<u8>>> {
253 let mut lenbuf = [0; size_of::<PrefixLength>()];
254 match optional_read_exact(file, &mut lenbuf) {
257 None => None,
258 Some(Err(e)) => Some(Err(e)),
259 Some(Ok(())) => {
260 let len = PrefixLength::from_le_bytes(lenbuf);
261
262 let mut out = vec![0; len as usize];
264 match file.read_exact(&mut out) {
265 Err(e) => Some(Err(e)),
266 Ok(()) => Some(Ok(out)),
267 }
268 }
269 }
270}
271
272fn write_chunk(file: &mut File, data: &[u8]) -> std::io::Result<()> {
273 let max = PrefixLength::MAX as usize;
274 assert!(data.len() <= max);
275 let len: u32 = data.len() as PrefixLength;
276 let header = len.to_le_bytes();
277 file.write_all(&header)?;
278 file.write_all(data)?;
279 Ok(())
280}
281
282fn get_file_length<Format: SeqDataFormat>(
283 _phantom: PhantomData<Format>,
284 file: &mut File,
285) -> std::io::Result<u64> {
286 let meta = file.metadata()?;
287 let total_len = meta.len();
288
289 let minimum_size = Format::MAGIC.len() as u64 + Format::HEADER_SIZE as u64;
290 if total_len < minimum_size {
291 return Err(std::io::Error::new(
292 std::io::ErrorKind::Other,
293 "file not contains enough bytes for magic and header",
294 ));
295 }
296 Ok(total_len - minimum_size)
297}