1use crate::ser::{
16 self, BinWriter, DeserializationMode, ProtocolVersion, Readable, Reader, StreamingReader,
17 Writeable, Writer,
18};
19use std::fmt::Debug;
20use std::fs::{self, File, OpenOptions};
21use std::io::{self, BufReader, BufWriter, Seek, SeekFrom, Write};
22use std::marker;
23use std::path::{Path, PathBuf};
24use tempfile::tempfile;
25
26#[derive(Clone, Debug)]
33pub struct SizeEntry {
34 pub offset: u64,
36 pub size: u16,
38}
39
40impl SizeEntry {
41 pub const LEN: u16 = 8 + 2;
43}
44
45impl Readable for SizeEntry {
46 fn read<R: Reader>(reader: &mut R) -> Result<SizeEntry, ser::Error> {
47 Ok(SizeEntry {
48 offset: reader.read_u64()?,
49 size: reader.read_u16()?,
50 })
51 }
52}
53
54impl Writeable for SizeEntry {
55 fn write<W: Writer>(&self, writer: &mut W) -> Result<(), ser::Error> {
56 writer.write_u64(self.offset)?;
57 writer.write_u16(self.size)?;
58 Ok(())
59 }
60}
61
62pub enum SizeInfo {
64 FixedSize(u16),
66 VariableSize(Box<AppendOnlyFile<SizeEntry>>),
68}
69
70pub struct DataFile<T> {
72 file: AppendOnlyFile<T>,
73}
74
75impl<T> DataFile<T>
76where
77 T: Readable + Writeable + Debug,
78{
79 pub fn open<P>(
81 path: P,
82 size_info: SizeInfo,
83 version: ProtocolVersion,
84 ) -> io::Result<DataFile<T>>
85 where
86 P: AsRef<Path> + Debug,
87 {
88 Ok(DataFile {
89 file: AppendOnlyFile::open(path, size_info, version)?,
90 })
91 }
92
93 pub fn append(&mut self, data: &T) -> io::Result<u64> {
97 self.file.append_elmt(data)?;
98 Ok(self.size_unsync())
99 }
100
101 pub fn extend_from_slice(&mut self, data: &[T]) -> io::Result<u64> {
105 self.file.append_elmts(data)?;
106 Ok(self.size_unsync())
107 }
108
109 pub fn read(&self, position: u64) -> Option<T> {
117 self.file.read_as_elmt(position - 1).ok()
118 }
119
120 pub fn rewind(&mut self, position: u64) {
122 self.file.rewind(position)
123 }
124
125 pub fn flush(&mut self) -> io::Result<()> {
127 self.file.flush()
128 }
129
130 pub fn discard(&mut self) {
132 self.file.discard()
133 }
134
135 pub fn size(&self) -> u64 {
137 self.file.size_in_elmts().unwrap_or(0)
138 }
139
140 fn size_unsync(&self) -> u64 {
142 self.file.size_unsync_in_elmts().unwrap_or(0)
143 }
144
145 pub fn path(&self) -> &Path {
147 self.file.path()
148 }
149
150 pub fn release(&mut self) {
152 self.file.release();
153 }
154
155 pub fn write_tmp_pruned(&self, prune_pos: &[u64]) -> io::Result<()> {
157 let prune_idx: Vec<_> = prune_pos.iter().map(|x| x - 1).collect();
159 self.file.write_tmp_pruned(prune_idx.as_slice())
160 }
161
162 pub fn replace_with_tmp(&mut self) -> io::Result<()> {
165 self.file.replace_with_tmp()
166 }
167}
168
169pub struct AppendOnlyFile<T> {
178 path: PathBuf,
179 file: Option<File>,
180 size_info: SizeInfo,
181 version: ProtocolVersion,
182 mmap: Option<memmap::Mmap>,
183
184 buffer: Vec<u8>,
186 buffer_start_pos: u64,
187 buffer_start_pos_bak: u64,
188 _marker: marker::PhantomData<T>,
189}
190
191impl AppendOnlyFile<SizeEntry> {
192 fn sum_sizes(&self) -> io::Result<u64> {
193 let mut sum = 0;
194 for pos in 0..self.buffer_start_pos {
195 let entry = self.read_as_elmt(pos)?;
196 sum += entry.size as u64;
197 }
198 Ok(sum)
199 }
200}
201
202impl<T> AppendOnlyFile<T>
203where
204 T: Debug + Readable + Writeable,
205{
206 pub fn open<P>(
208 path: P,
209 size_info: SizeInfo,
210 version: ProtocolVersion,
211 ) -> io::Result<AppendOnlyFile<T>>
212 where
213 P: AsRef<Path> + Debug,
214 {
215 let mut aof = AppendOnlyFile {
216 file: None,
217 path: path.as_ref().to_path_buf(),
218 size_info,
219 version,
220 mmap: None,
221 buffer: vec![],
222 buffer_start_pos: 0,
223 buffer_start_pos_bak: 0,
224 _marker: marker::PhantomData,
225 };
226 aof.init()?;
227
228 let expected_size = aof.size()?;
233 if let SizeInfo::VariableSize(ref mut size_file) = &mut aof.size_info {
234 if size_file.sum_sizes()? != expected_size {
235 aof.rebuild_size_file()?;
236
237 aof.init()?;
240 }
241 }
242
243 Ok(aof)
244 }
245
246 pub fn init(&mut self) -> io::Result<()> {
249 if let SizeInfo::VariableSize(ref mut size_file) = self.size_info {
250 size_file.init()?;
251 }
252
253 self.file = Some(
254 OpenOptions::new()
255 .read(true)
256 .append(true)
257 .create(true)
258 .open(self.path.clone())?,
259 );
260
261 if self.size()? == 0 {
263 self.buffer_start_pos = 0;
264 } else {
265 self.mmap = Some(unsafe { memmap::Mmap::map(&self.file.as_ref().unwrap())? });
266 self.buffer_start_pos = self.size_in_elmts()?;
267 }
268
269 Ok(())
270 }
271
272 fn size_in_elmts(&self) -> io::Result<u64> {
273 match self.size_info {
274 SizeInfo::FixedSize(elmt_size) => Ok(self.size()? / elmt_size as u64),
275 SizeInfo::VariableSize(ref size_file) => size_file.size_in_elmts(),
276 }
277 }
278
279 fn size_unsync_in_elmts(&self) -> io::Result<u64> {
280 match self.size_info {
281 SizeInfo::FixedSize(elmt_size) => {
282 Ok(self.buffer_start_pos + (self.buffer.len() as u64 / elmt_size as u64))
283 }
284 SizeInfo::VariableSize(ref size_file) => size_file.size_unsync_in_elmts(),
285 }
286 }
287
288 fn append_elmt(&mut self, data: &T) -> io::Result<()> {
290 let mut bytes = ser::ser_vec(data, self.version)
291 .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
292 self.append(&mut bytes)?;
293 Ok(())
294 }
295
296 fn append_elmts(&mut self, data: &[T]) -> io::Result<()> {
298 for x in data {
299 self.append_elmt(x)?;
300 }
301 Ok(())
302 }
303
304 pub fn append(&mut self, bytes: &mut [u8]) -> io::Result<()> {
307 if let SizeInfo::VariableSize(ref mut size_file) = &mut self.size_info {
308 let next_pos = size_file.size_unsync_in_elmts()?;
309 let offset = if next_pos == 0 {
310 0
311 } else {
312 let prev_entry = size_file.read_as_elmt(next_pos - 1)?;
313 prev_entry.offset + prev_entry.size as u64
314 };
315 size_file.append_elmt(&SizeEntry {
316 offset,
317 size: bytes.len() as u16,
318 })?;
319 }
320
321 self.buffer.extend_from_slice(bytes);
322 Ok(())
323 }
324
325 fn offset_and_size(&self, pos: u64) -> io::Result<(u64, u16)> {
329 match self.size_info {
330 SizeInfo::FixedSize(elmt_size) => Ok((pos * elmt_size as u64, elmt_size)),
331 SizeInfo::VariableSize(ref size_file) => {
332 let entry = size_file.read_as_elmt(pos)?;
334 Ok((entry.offset, entry.size))
335 }
336 }
337 }
338
339 pub fn rewind(&mut self, pos: u64) {
343 if let SizeInfo::VariableSize(ref mut size_file) = &mut self.size_info {
344 size_file.rewind(pos);
345 }
346
347 if self.buffer_start_pos_bak == 0 {
348 self.buffer_start_pos_bak = self.buffer_start_pos;
349 }
350 self.buffer_start_pos = pos;
351 }
352
353 pub fn flush(&mut self) -> io::Result<()> {
356 if let SizeInfo::VariableSize(ref mut size_file) = &mut self.size_info {
357 size_file.flush()?
359 }
360
361 if self.buffer_start_pos_bak > 0 {
362 self.mmap = None;
365 self.file = None;
366 {
367 let file = OpenOptions::new()
368 .read(true)
369 .create(true)
370 .write(true)
371 .open(&self.path)?;
372
373 if self.buffer_start_pos == 0 {
375 file.set_len(0)?;
376 } else {
377 let (offset, size) = self.offset_and_size(self.buffer_start_pos - 1)?;
378 file.set_len(offset + size as u64)?;
379 };
380 }
381 }
382
383 {
384 let file = OpenOptions::new()
385 .read(true)
386 .create(true)
387 .append(true)
388 .open(&self.path)?;
389 self.file = Some(file);
390 self.buffer_start_pos_bak = 0;
391 }
392
393 self.file.as_mut().unwrap().write_all(&self.buffer[..])?;
394 self.file.as_mut().unwrap().sync_all()?;
395
396 self.buffer.clear();
397 self.buffer_start_pos = self.size_in_elmts()?;
398
399 if self.file.as_ref().unwrap().metadata()?.len() == 0 {
401 self.mmap = None;
402 } else {
403 self.mmap = Some(unsafe { memmap::Mmap::map(&self.file.as_ref().unwrap())? });
404 }
405
406 Ok(())
407 }
408
409 pub fn discard(&mut self) {
411 if self.buffer_start_pos_bak > 0 {
412 self.buffer_start_pos = self.buffer_start_pos_bak;
414 self.buffer_start_pos_bak = 0;
415 }
416
417 if let SizeInfo::VariableSize(ref mut size_file) = &mut self.size_info {
419 size_file.discard();
420 }
421
422 self.buffer = vec![];
423 }
424
425 pub fn read(&self, pos: u64) -> io::Result<&[u8]> {
430 if pos >= self.size_unsync_in_elmts()? {
431 return Ok(<&[u8]>::default());
432 }
433 let (offset, length) = self.offset_and_size(pos)?;
434 let res = if pos < self.buffer_start_pos {
435 self.read_from_mmap(offset, length)
436 } else {
437 let (buffer_offset, _) = self.offset_and_size(self.buffer_start_pos)?;
438 self.read_from_buffer(offset.saturating_sub(buffer_offset), length)
439 };
440 Ok(res)
441 }
442
443 fn read_as_elmt(&self, pos: u64) -> io::Result<T> {
444 let data = self.read(pos)?;
445 ser::deserialize(&mut &data[..], self.version, DeserializationMode::default())
446 .map_err(|e| io::Error::new(io::ErrorKind::Other, e))
447 }
448
449 fn read_from_buffer(&self, offset: u64, length: u16) -> &[u8] {
453 if self.buffer.len() < (offset as usize + length as usize) {
454 <&[u8]>::default()
455 } else {
456 &self.buffer[(offset as usize)..(offset as usize + length as usize)]
457 }
458 }
459
460 fn read_from_mmap(&self, offset: u64, length: u16) -> &[u8] {
465 if let Some(mmap) = &self.mmap {
466 if mmap.len() < (offset as usize + length as usize) {
467 <&[u8]>::default()
468 } else {
469 &mmap[(offset as usize)..(offset as usize + length as usize)]
470 }
471 } else {
472 <&[u8]>::default()
473 }
474 }
475
476 pub fn as_temp_file(&self) -> io::Result<File> {
480 let mut reader = BufReader::new(File::open(&self.path)?);
481 let mut writer = BufWriter::new(tempfile()?);
482 io::copy(&mut reader, &mut writer)?;
483
484 writer.seek(SeekFrom::Start(0))?;
487
488 let file = writer.into_inner()?;
489 Ok(file)
490 }
491
492 fn tmp_path(&self) -> PathBuf {
493 self.path.with_extension("tmp")
494 }
495
496 pub fn write_tmp_pruned(&self, prune_pos: &[u64]) -> io::Result<()> {
499 let reader = File::open(&self.path)?;
500 let mut buf_reader = BufReader::new(reader);
501 let mut streaming_reader = StreamingReader::new(&mut buf_reader, self.version);
502
503 let mut buf_writer = BufWriter::new(File::create(&self.tmp_path())?);
504 let mut bin_writer = BinWriter::new(&mut buf_writer, self.version);
505
506 let mut current_pos = 0;
507 let mut prune_pos = prune_pos;
508 while let Ok(elmt) = T::read(&mut streaming_reader) {
509 if prune_pos.contains(¤t_pos) {
510 prune_pos = &prune_pos[1..];
512 } else {
513 elmt.write(&mut bin_writer)
515 .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
516 }
517 current_pos += 1;
518 }
519 buf_writer.flush()?;
520 Ok(())
521 }
522
523 pub fn replace_with_tmp(&mut self) -> io::Result<()> {
526 self.replace(&self.tmp_path())?;
529
530 if let SizeInfo::VariableSize(_) = &self.size_info {
533 self.rebuild_size_file()?;
534 }
535
536 self.init()?;
538
539 Ok(())
540 }
541
542 fn rebuild_size_file(&mut self) -> io::Result<()> {
543 if let SizeInfo::VariableSize(ref mut size_file) = &mut self.size_info {
544 let tmp_path = size_file.path.with_extension("tmp");
546 debug!("rebuild_size_file: {:?}", tmp_path);
547
548 {
550 let reader = File::open(&self.path)?;
551 let mut buf_reader = BufReader::new(reader);
552 let mut streaming_reader = StreamingReader::new(&mut buf_reader, self.version);
553
554 let mut buf_writer = BufWriter::new(File::create(&tmp_path)?);
555 let mut bin_writer = BinWriter::new(&mut buf_writer, self.version);
556
557 let mut current_offset = 0;
558 while let Ok(_) = T::read(&mut streaming_reader) {
559 let size = streaming_reader
560 .total_bytes_read()
561 .saturating_sub(current_offset) as u16;
562 let entry = SizeEntry {
563 offset: current_offset,
564 size,
565 };
566
567 entry
569 .write(&mut bin_writer)
570 .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
571
572 current_offset += size as u64;
573 }
574 buf_writer.flush()?;
575 }
576
577 size_file.replace(&tmp_path)?;
580 }
581
582 Ok(())
583 }
584
585 fn replace<P>(&mut self, with: P) -> io::Result<()>
588 where
589 P: AsRef<Path> + Debug,
590 {
591 self.release();
592 fs::remove_file(&self.path)?;
593 fs::rename(with, &self.path)?;
594 Ok(())
595 }
596
597 pub fn release(&mut self) {
599 self.mmap = None;
600 self.file = None;
601
602 if let SizeInfo::VariableSize(ref mut size_file) = &mut self.size_info {
604 size_file.release();
605 }
606 }
607
608 pub fn size(&self) -> io::Result<u64> {
610 fs::metadata(&self.path).map(|md| md.len())
611 }
612
613 pub fn path(&self) -> &Path {
615 &self.path
616 }
617}