cloud_mmr/storage/
types.rs

1// Copyright 2021 The Grin Developers
2// Licensed under the Apache License, Version 2.0 (the "License");
3// you may not use this file except in compliance with the License.
4// You may obtain a copy of the License at
5//
6//     http://www.apache.org/licenses/LICENSE-2.0
7//
8// Unless required by applicable law or agreed to in writing, software
9// distributed under the License is distributed on an "AS IS" BASIS,
10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11// See the License for the specific language governing permissions and
12// limitations under the License.
13
14//! Common storage-related types
15use crate::ser::{
16    self, BinWriter, DeserializationMode, ProtocolVersion, Readable, Reader, StreamingReader,
17    Writeable, Writer,
18};
19use std::fmt::Debug;
20use std::fs::{self, File, OpenOptions};
21use std::io::{self, BufReader, BufWriter, Seek, SeekFrom, Write};
22use std::marker;
23use std::path::{Path, PathBuf};
24use tempfile::tempfile;
25
26/// Represents a single entry in the size_file.
27/// Offset (in bytes) and size (in bytes) of a variable sized entry
28/// in the corresponding data_file.
29/// i.e. To read a single entry from the data_file at position p, read
30/// the entry in the size_file to obtain the offset (and size) and then
31/// read those bytes from the data_file.
32#[derive(Clone, Debug)]
33pub struct SizeEntry {
34    /// Offset (bytes) in the corresponding data_file.
35    pub offset: u64,
36    /// Size (bytes) in the corresponding data_file.
37    pub size: u16,
38}
39
40impl SizeEntry {
41    /// Length of a size entry (8 + 2 bytes) for convenience.
42    pub const LEN: u16 = 8 + 2;
43}
44
45impl Readable for SizeEntry {
46    fn read<R: Reader>(reader: &mut R) -> Result<SizeEntry, ser::Error> {
47        Ok(SizeEntry {
48            offset: reader.read_u64()?,
49            size: reader.read_u16()?,
50        })
51    }
52}
53
54impl Writeable for SizeEntry {
55    fn write<W: Writer>(&self, writer: &mut W) -> Result<(), ser::Error> {
56        writer.write_u64(self.offset)?;
57        writer.write_u16(self.size)?;
58        Ok(())
59    }
60}
61
62/// Are we dealing with "fixed size" data or "variable size" data in a data file?
63pub enum SizeInfo {
64    /// Fixed size data.
65    FixedSize(u16),
66    /// Variable size data.
67    VariableSize(Box<AppendOnlyFile<SizeEntry>>),
68}
69
70/// Data file (MMR) wrapper around an append-only file.
71pub struct DataFile<T> {
72    file: AppendOnlyFile<T>,
73}
74
75impl<T> DataFile<T>
76where
77    T: Readable + Writeable + Debug,
78{
79    /// Open (or create) a file at the provided path on disk.
80    pub fn open<P>(
81        path: P,
82        size_info: SizeInfo,
83        version: ProtocolVersion,
84    ) -> io::Result<DataFile<T>>
85    where
86        P: AsRef<Path> + Debug,
87    {
88        Ok(DataFile {
89            file: AppendOnlyFile::open(path, size_info, version)?,
90        })
91    }
92
93    /// Append an element to the file.
94    /// Will not be written to disk until flush() is subsequently called.
95    /// Alternatively discard() may be called to discard any pending changes.
96    pub fn append(&mut self, data: &T) -> io::Result<u64> {
97        self.file.append_elmt(data)?;
98        Ok(self.size_unsync())
99    }
100
101    /// Append a slice of multiple elements to the file.
102    /// Will not be written to disk until flush() is subsequently called.
103    /// Alternatively discard() may be called to discard any pending changes.
104    pub fn extend_from_slice(&mut self, data: &[T]) -> io::Result<u64> {
105        self.file.append_elmts(data)?;
106        Ok(self.size_unsync())
107    }
108
109    /// Read an element from the file by position.
110    /// Assumes we have already "shifted" the position to account for pruned data.
111    /// Note: PMMR API is 1-indexed, but backend storage is 0-indexed.
112    ///
113    /// Makes no assumptions about the size of the elements in bytes.
114    /// Elements can be of variable size (handled internally in the append-only file impl).
115    ///
116    pub fn read(&self, position: u64) -> Option<T> {
117        self.file.read_as_elmt(position - 1).ok()
118    }
119
120    /// Rewind the backend file to the specified position.
121    pub fn rewind(&mut self, position: u64) {
122        self.file.rewind(position)
123    }
124
125    /// Flush unsynced changes to the file to disk.
126    pub fn flush(&mut self) -> io::Result<()> {
127        self.file.flush()
128    }
129
130    /// Discard any unsynced changes to the file.
131    pub fn discard(&mut self) {
132        self.file.discard()
133    }
134
135    /// Size of the file in number of elements (not bytes).
136    pub fn size(&self) -> u64 {
137        self.file.size_in_elmts().unwrap_or(0)
138    }
139
140    /// Size of the unsync'd file, in elements (not bytes).
141    fn size_unsync(&self) -> u64 {
142        self.file.size_unsync_in_elmts().unwrap_or(0)
143    }
144
145    /// Path of the underlying file
146    pub fn path(&self) -> &Path {
147        self.file.path()
148    }
149
150    /// Drop underlying file handles
151    pub fn release(&mut self) {
152        self.file.release();
153    }
154
155    /// Write the file out to disk, pruning removed elements.
156    pub fn write_tmp_pruned(&self, prune_pos: &[u64]) -> io::Result<()> {
157        // Need to convert from 1-index to 0-index (don't ask).
158        let prune_idx: Vec<_> = prune_pos.iter().map(|x| x - 1).collect();
159        self.file.write_tmp_pruned(prune_idx.as_slice())
160    }
161
162    /// Replace with file at tmp path.
163    /// Rebuild and initialize from new file.
164    pub fn replace_with_tmp(&mut self) -> io::Result<()> {
165        self.file.replace_with_tmp()
166    }
167}
168
169/// Wrapper for a file that can be read at any position (random read) but for
170/// which writes are append only. Reads are backed by a memory map (mmap(2)),
171/// relying on the operating system for fast access and caching. The memory
172/// map is reallocated to expand it when new writes are flushed.
173///
174/// Despite being append-only, the file can still be pruned and truncated. The
175/// former simply happens by rewriting it, ignoring some of the data. The
176/// latter by truncating the underlying file and re-creating the mmap.
177pub struct AppendOnlyFile<T> {
178    path: PathBuf,
179    file: Option<File>,
180    size_info: SizeInfo,
181    version: ProtocolVersion,
182    mmap: Option<memmap::Mmap>,
183
184    // Buffer of unsync'd bytes. These bytes will be appended to the file when flushed.
185    buffer: Vec<u8>,
186    buffer_start_pos: u64,
187    buffer_start_pos_bak: u64,
188    _marker: marker::PhantomData<T>,
189}
190
191impl AppendOnlyFile<SizeEntry> {
192    fn sum_sizes(&self) -> io::Result<u64> {
193        let mut sum = 0;
194        for pos in 0..self.buffer_start_pos {
195            let entry = self.read_as_elmt(pos)?;
196            sum += entry.size as u64;
197        }
198        Ok(sum)
199    }
200}
201
202impl<T> AppendOnlyFile<T>
203where
204    T: Debug + Readable + Writeable,
205{
206    /// Open a file (existing or not) as append-only, backed by a mmap.
207    pub fn open<P>(
208        path: P,
209        size_info: SizeInfo,
210        version: ProtocolVersion,
211    ) -> io::Result<AppendOnlyFile<T>>
212    where
213        P: AsRef<Path> + Debug,
214    {
215        let mut aof = AppendOnlyFile {
216            file: None,
217            path: path.as_ref().to_path_buf(),
218            size_info,
219            version,
220            mmap: None,
221            buffer: vec![],
222            buffer_start_pos: 0,
223            buffer_start_pos_bak: 0,
224            _marker: marker::PhantomData,
225        };
226        aof.init()?;
227
228        // (Re)build the size file if inconsistent with the data file.
229        // This will occur during "fast sync" as we do not sync the size_file
230        // and must build it locally.
231        // And we can *only* do this after init() the data file (so we know sizes).
232        let expected_size = aof.size()?;
233        if let SizeInfo::VariableSize(ref mut size_file) = &mut aof.size_info {
234            if size_file.sum_sizes()? != expected_size {
235                aof.rebuild_size_file()?;
236
237                // (Re)init the entire file as we just rebuilt the size_file
238                // and things may have changed.
239                aof.init()?;
240            }
241        }
242
243        Ok(aof)
244    }
245
246    /// (Re)init an underlying file and its associated memmap.
247    /// Taking care to initialize the mmap_offset_cache for each element.
248    pub fn init(&mut self) -> io::Result<()> {
249        if let SizeInfo::VariableSize(ref mut size_file) = self.size_info {
250            size_file.init()?;
251        }
252
253        self.file = Some(
254            OpenOptions::new()
255                .read(true)
256                .append(true)
257                .create(true)
258                .open(self.path.clone())?,
259        );
260
261        // If we have a non-empty file then mmap it.
262        if self.size()? == 0 {
263            self.buffer_start_pos = 0;
264        } else {
265            self.mmap = Some(unsafe { memmap::Mmap::map(&self.file.as_ref().unwrap())? });
266            self.buffer_start_pos = self.size_in_elmts()?;
267        }
268
269        Ok(())
270    }
271
272    fn size_in_elmts(&self) -> io::Result<u64> {
273        match self.size_info {
274            SizeInfo::FixedSize(elmt_size) => Ok(self.size()? / elmt_size as u64),
275            SizeInfo::VariableSize(ref size_file) => size_file.size_in_elmts(),
276        }
277    }
278
279    fn size_unsync_in_elmts(&self) -> io::Result<u64> {
280        match self.size_info {
281            SizeInfo::FixedSize(elmt_size) => {
282                Ok(self.buffer_start_pos + (self.buffer.len() as u64 / elmt_size as u64))
283            }
284            SizeInfo::VariableSize(ref size_file) => size_file.size_unsync_in_elmts(),
285        }
286    }
287
288    /// Append element to append-only file by serializing it to bytes and appending the bytes.
289    fn append_elmt(&mut self, data: &T) -> io::Result<()> {
290        let mut bytes = ser::ser_vec(data, self.version)
291            .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
292        self.append(&mut bytes)?;
293        Ok(())
294    }
295
296    /// Iterate over the slice and append each element.
297    fn append_elmts(&mut self, data: &[T]) -> io::Result<()> {
298        for x in data {
299            self.append_elmt(x)?;
300        }
301        Ok(())
302    }
303
304    /// Append data to the file. Until the append-only file is synced, data is
305    /// only written to memory.
306    pub fn append(&mut self, bytes: &mut [u8]) -> io::Result<()> {
307        if let SizeInfo::VariableSize(ref mut size_file) = &mut self.size_info {
308            let next_pos = size_file.size_unsync_in_elmts()?;
309            let offset = if next_pos == 0 {
310                0
311            } else {
312                let prev_entry = size_file.read_as_elmt(next_pos - 1)?;
313                prev_entry.offset + prev_entry.size as u64
314            };
315            size_file.append_elmt(&SizeEntry {
316                offset,
317                size: bytes.len() as u16,
318            })?;
319        }
320
321        self.buffer.extend_from_slice(bytes);
322        Ok(())
323    }
324
325    // Returns the offset and size of bytes to read.
326    // If pos is in the buffer then caller needs to remember to account for this
327    // when reading from the buffer.
328    fn offset_and_size(&self, pos: u64) -> io::Result<(u64, u16)> {
329        match self.size_info {
330            SizeInfo::FixedSize(elmt_size) => Ok((pos * elmt_size as u64, elmt_size)),
331            SizeInfo::VariableSize(ref size_file) => {
332                // Otherwise we need to calculate offset and size from entries in the size_file.
333                let entry = size_file.read_as_elmt(pos)?;
334                Ok((entry.offset, entry.size))
335            }
336        }
337    }
338
339    /// Rewinds the data file back to a previous position.
340    /// We simply "rewind" the buffer_start_pos to the specified position.
341    /// Note: We do not currently support rewinding within the buffer itself.
342    pub fn rewind(&mut self, pos: u64) {
343        if let SizeInfo::VariableSize(ref mut size_file) = &mut self.size_info {
344            size_file.rewind(pos);
345        }
346
347        if self.buffer_start_pos_bak == 0 {
348            self.buffer_start_pos_bak = self.buffer_start_pos;
349        }
350        self.buffer_start_pos = pos;
351    }
352
353    /// Syncs all writes (fsync), reallocating the memory map to make the newly
354    /// written data accessible.
355    pub fn flush(&mut self) -> io::Result<()> {
356        if let SizeInfo::VariableSize(ref mut size_file) = &mut self.size_info {
357            // Flush the associated size_file if we have one.
358            size_file.flush()?
359        }
360
361        if self.buffer_start_pos_bak > 0 {
362            // Flushing a rewound state, we need to truncate via set_len() before applying.
363            // Drop and recreate, or windows throws an access error
364            self.mmap = None;
365            self.file = None;
366            {
367                let file = OpenOptions::new()
368                    .read(true)
369                    .create(true)
370                    .write(true)
371                    .open(&self.path)?;
372
373                // Set length of the file to truncate it as necessary.
374                if self.buffer_start_pos == 0 {
375                    file.set_len(0)?;
376                } else {
377                    let (offset, size) = self.offset_and_size(self.buffer_start_pos - 1)?;
378                    file.set_len(offset + size as u64)?;
379                };
380            }
381        }
382
383        {
384            let file = OpenOptions::new()
385                .read(true)
386                .create(true)
387                .append(true)
388                .open(&self.path)?;
389            self.file = Some(file);
390            self.buffer_start_pos_bak = 0;
391        }
392
393        self.file.as_mut().unwrap().write_all(&self.buffer[..])?;
394        self.file.as_mut().unwrap().sync_all()?;
395
396        self.buffer.clear();
397        self.buffer_start_pos = self.size_in_elmts()?;
398
399        // Note: file must be non-empty to memory map it
400        if self.file.as_ref().unwrap().metadata()?.len() == 0 {
401            self.mmap = None;
402        } else {
403            self.mmap = Some(unsafe { memmap::Mmap::map(&self.file.as_ref().unwrap())? });
404        }
405
406        Ok(())
407    }
408
409    /// Discard the current non-flushed data.
410    pub fn discard(&mut self) {
411        if self.buffer_start_pos_bak > 0 {
412            // discarding a rewound state, restore the buffer start
413            self.buffer_start_pos = self.buffer_start_pos_bak;
414            self.buffer_start_pos_bak = 0;
415        }
416
417        // Discarding the data file will discard the associated size file if we have one.
418        if let SizeInfo::VariableSize(ref mut size_file) = &mut self.size_info {
419            size_file.discard();
420        }
421
422        self.buffer = vec![];
423    }
424
425    /// Read the bytes representing the element at the given position (0-indexed).
426    /// Uses the offset cache to determine the offset to read from and the size
427    /// in bytes to actually read.
428    /// Leverages the memory map.
429    pub fn read(&self, pos: u64) -> io::Result<&[u8]> {
430        if pos >= self.size_unsync_in_elmts()? {
431            return Ok(<&[u8]>::default());
432        }
433        let (offset, length) = self.offset_and_size(pos)?;
434        let res = if pos < self.buffer_start_pos {
435            self.read_from_mmap(offset, length)
436        } else {
437            let (buffer_offset, _) = self.offset_and_size(self.buffer_start_pos)?;
438            self.read_from_buffer(offset.saturating_sub(buffer_offset), length)
439        };
440        Ok(res)
441    }
442
443    fn read_as_elmt(&self, pos: u64) -> io::Result<T> {
444        let data = self.read(pos)?;
445        ser::deserialize(&mut &data[..], self.version, DeserializationMode::default())
446            .map_err(|e| io::Error::new(io::ErrorKind::Other, e))
447    }
448
449    // Read length bytes starting at offset from the buffer.
450    // Return empty vec if we do not have enough bytes in the buffer to read
451    // the full length bytes.
452    fn read_from_buffer(&self, offset: u64, length: u16) -> &[u8] {
453        if self.buffer.len() < (offset as usize + length as usize) {
454            <&[u8]>::default()
455        } else {
456            &self.buffer[(offset as usize)..(offset as usize + length as usize)]
457        }
458    }
459
460    // Read length bytes starting at offset from the mmap.
461    // Return empty vec if we do not have enough bytes in the buffer to read
462    // the full length bytes.
463    // Return empty vec if we have no mmap currently.
464    fn read_from_mmap(&self, offset: u64, length: u16) -> &[u8] {
465        if let Some(mmap) = &self.mmap {
466            if mmap.len() < (offset as usize + length as usize) {
467                <&[u8]>::default()
468            } else {
469                &mmap[(offset as usize)..(offset as usize + length as usize)]
470            }
471        } else {
472            <&[u8]>::default()
473        }
474    }
475
476    /// Create a new tempfile containing the contents of this append only file.
477    /// This allows callers to see a consistent view of the data without
478    /// locking the append only file.
479    pub fn as_temp_file(&self) -> io::Result<File> {
480        let mut reader = BufReader::new(File::open(&self.path)?);
481        let mut writer = BufWriter::new(tempfile()?);
482        io::copy(&mut reader, &mut writer)?;
483
484        // Remember to seek back to start of the file as the caller is likely
485        // to read this file directly without reopening it.
486        writer.seek(SeekFrom::Start(0))?;
487
488        let file = writer.into_inner()?;
489        Ok(file)
490    }
491
492    fn tmp_path(&self) -> PathBuf {
493        self.path.with_extension("tmp")
494    }
495
496    /// Saves a copy of the current file content, skipping data at the provided
497    /// prune positions. prune_pos must be ordered.
498    pub fn write_tmp_pruned(&self, prune_pos: &[u64]) -> io::Result<()> {
499        let reader = File::open(&self.path)?;
500        let mut buf_reader = BufReader::new(reader);
501        let mut streaming_reader = StreamingReader::new(&mut buf_reader, self.version);
502
503        let mut buf_writer = BufWriter::new(File::create(&self.tmp_path())?);
504        let mut bin_writer = BinWriter::new(&mut buf_writer, self.version);
505
506        let mut current_pos = 0;
507        let mut prune_pos = prune_pos;
508        while let Ok(elmt) = T::read(&mut streaming_reader) {
509            if prune_pos.contains(&current_pos) {
510                // Pruned pos, moving on.
511                prune_pos = &prune_pos[1..];
512            } else {
513                // Not pruned, write to file.
514                elmt.write(&mut bin_writer)
515                    .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
516            }
517            current_pos += 1;
518        }
519        buf_writer.flush()?;
520        Ok(())
521    }
522
523    /// Replace the underlying file with the file at tmp path.
524    /// Rebuild and initialize from the new file.
525    pub fn replace_with_tmp(&mut self) -> io::Result<()> {
526        // Replace the underlying file -
527        // pmmr_data.tmp -> pmmr_data.bin
528        self.replace(&self.tmp_path())?;
529
530        // Now rebuild our size file to reflect the pruned data file.
531        // This will replace the underlying file internally.
532        if let SizeInfo::VariableSize(_) = &self.size_info {
533            self.rebuild_size_file()?;
534        }
535
536        // Now (re)init the file and associated size_file so everything is consistent.
537        self.init()?;
538
539        Ok(())
540    }
541
542    fn rebuild_size_file(&mut self) -> io::Result<()> {
543        if let SizeInfo::VariableSize(ref mut size_file) = &mut self.size_info {
544            // Note: Reading from data file and writing sizes to the associated (tmp) size_file.
545            let tmp_path = size_file.path.with_extension("tmp");
546            debug!("rebuild_size_file: {:?}", tmp_path);
547
548            // Scope the reader and writer to within the block so we can safely replace files later on.
549            {
550                let reader = File::open(&self.path)?;
551                let mut buf_reader = BufReader::new(reader);
552                let mut streaming_reader = StreamingReader::new(&mut buf_reader, self.version);
553
554                let mut buf_writer = BufWriter::new(File::create(&tmp_path)?);
555                let mut bin_writer = BinWriter::new(&mut buf_writer, self.version);
556
557                let mut current_offset = 0;
558                while let Ok(_) = T::read(&mut streaming_reader) {
559                    let size = streaming_reader
560                        .total_bytes_read()
561                        .saturating_sub(current_offset) as u16;
562                    let entry = SizeEntry {
563                        offset: current_offset,
564                        size,
565                    };
566
567                    // Not pruned, write to file.
568                    entry
569                        .write(&mut bin_writer)
570                        .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
571
572                    current_offset += size as u64;
573                }
574                buf_writer.flush()?;
575            }
576
577            // Replace the underlying file for our size_file -
578            // pmmr_size.tmp -> pmmr_size.bin
579            size_file.replace(&tmp_path)?;
580        }
581
582        Ok(())
583    }
584
585    /// Replace the underlying file with another file, deleting the original.
586    /// Takes an optional size_file path in addition to path.
587    fn replace<P>(&mut self, with: P) -> io::Result<()>
588    where
589        P: AsRef<Path> + Debug,
590    {
591        self.release();
592        fs::remove_file(&self.path)?;
593        fs::rename(with, &self.path)?;
594        Ok(())
595    }
596
597    /// Release underlying file handles.
598    pub fn release(&mut self) {
599        self.mmap = None;
600        self.file = None;
601
602        // Remember to release the size_file as well if we have one.
603        if let SizeInfo::VariableSize(ref mut size_file) = &mut self.size_info {
604            size_file.release();
605        }
606    }
607
608    /// Current size of the file in bytes.
609    pub fn size(&self) -> io::Result<u64> {
610        fs::metadata(&self.path).map(|md| md.len())
611    }
612
613    /// Path of the underlying file
614    pub fn path(&self) -> &Path {
615        &self.path
616    }
617}