spacetimedb_commitlog/repo/
mod.rs

1use std::io;
2
3use log::{debug, warn};
4
5use crate::{
6    commit::Commit,
7    error,
8    index::{IndexFile, IndexFileMut},
9    segment::{FileLike, Header, Metadata, OffsetIndexWriter, Reader, Writer},
10    Options,
11};
12
13pub(crate) mod fs;
14#[cfg(any(test, feature = "test"))]
15pub mod mem;
16
17pub use fs::Fs;
18#[cfg(any(test, feature = "test"))]
19pub use mem::Memory;
20
21pub type TxOffset = u64;
22pub type TxOffsetIndexMut = IndexFileMut<TxOffset>;
23pub type TxOffsetIndex = IndexFile<TxOffset>;
24
25pub trait SegmentLen: io::Seek {
26    /// Determine the length in bytes of the segment.
27    ///
28    /// This method does not rely on metadata `fsync`, and may use up to three
29    /// `seek` operations.
30    ///
31    /// If the method returns successfully, the seek position before the call is
32    /// restored. However, if it returns an error, the seek position is
33    /// unspecified.
34    //
35    // TODO: Remove trait and replace with `Seek::stream_len` if / when stabilized:
36    // https://github.com/rust-lang/rust/issues/59359
37    fn segment_len(&mut self) -> io::Result<u64> {
38        let old_pos = self.stream_position()?;
39        let len = self.seek(io::SeekFrom::End(0))?;
40
41        // Avoid seeking a third time when we were already at the end of the
42        // stream. The branch is usually way cheaper than a seek operation.
43        if old_pos != len {
44            self.seek(io::SeekFrom::Start(old_pos))?;
45        }
46
47        Ok(len)
48    }
49}
50
51pub trait SegmentReader: io::BufRead + SegmentLen + Send + Sync {}
52impl<T: io::BufRead + SegmentLen + Send + Sync> SegmentReader for T {}
53
54pub trait SegmentWriter: FileLike + io::Read + io::Write + SegmentLen + Send + Sync {}
55impl<T: FileLike + io::Read + io::Write + SegmentLen + Send + Sync> SegmentWriter for T {}
56
57/// A repository of log segments.
58///
59/// This is mainly an internal trait to allow testing against an in-memory
60/// representation.
61pub trait Repo: Clone {
62    /// The type of log segments managed by this repo, which must behave like a file.
63    type SegmentWriter: SegmentWriter + 'static;
64    type SegmentReader: SegmentReader + 'static;
65
66    /// Create a new segment with the minimum transaction offset `offset`.
67    ///
68    /// This **must** create the segment atomically, and return
69    /// [`io::ErrorKind::AlreadyExists`] if the segment already exists.
70    ///
71    /// It is permissible, however, to successfully return the new segment if
72    /// it is completely empty (i.e. [`create_segment_writer`] did not previously
73    /// succeed in writing the segment header).
74    fn create_segment(&self, offset: u64) -> io::Result<Self::SegmentWriter>;
75
76    /// Open an existing segment at the minimum transaction offset `offset`.
77    ///
78    /// Must return [`io::ErrorKind::NotFound`] if a segment with the given
79    /// `offset` does not exist.
80    ///
81    /// The method does not guarantee that the segment is non-empty -- this case
82    /// will be caught by [`open_segment_reader`].
83    fn open_segment_reader(&self, offset: u64) -> io::Result<Self::SegmentReader>;
84
85    /// Open an existing segment at the minimum transaction offset `offset`.
86    ///
87    /// Must return [`io::ErrorKind::NotFound`] if a segment with the given
88    /// `offset` does not exist.
89    ///
90    /// The method does not guarantee that the segment is non-empty -- this case
91    /// will be caught by [`resume_segment_writer`].
92    fn open_segment_writer(&self, offset: u64) -> io::Result<Self::SegmentWriter>;
93
94    /// Remove the segment at the minimum transaction offset `offset`.
95    ///
96    /// Return [`io::ErrorKind::NotFound`] if no such segment exists.
97    fn remove_segment(&self, offset: u64) -> io::Result<()>;
98
99    /// Compress a segment in storage, marking it as immutable.
100    fn compress_segment(&self, offset: u64) -> io::Result<()>;
101
102    /// Traverse all segments in this repository and return list of their
103    /// offsets, sorted in ascending order.
104    fn existing_offsets(&self) -> io::Result<Vec<u64>>;
105
106    /// Create [`TxOffsetIndexMut`] for the given `offset` or open it if already exist.
107    /// The `cap` parameter is the maximum number of entries in the index.
108    fn create_offset_index(&self, _offset: TxOffset, _cap: u64) -> io::Result<TxOffsetIndexMut> {
109        Err(io::Error::other("not implemented"))
110    }
111
112    /// Remove [`TxOffsetIndexMut`] named with `offset`.
113    fn remove_offset_index(&self, _offset: TxOffset) -> io::Result<()> {
114        Err(io::Error::other("not implemented"))
115    }
116
117    /// Get [`TxOffsetIndex`] for the given `offset`.
118    fn get_offset_index(&self, _offset: TxOffset) -> io::Result<TxOffsetIndex> {
119        Err(io::Error::other("not implemented"))
120    }
121}
122
123impl<T: Repo> Repo for &T {
124    type SegmentWriter = T::SegmentWriter;
125    type SegmentReader = T::SegmentReader;
126
127    fn create_segment(&self, offset: u64) -> io::Result<Self::SegmentWriter> {
128        T::create_segment(self, offset)
129    }
130
131    fn open_segment_reader(&self, offset: u64) -> io::Result<Self::SegmentReader> {
132        T::open_segment_reader(self, offset)
133    }
134
135    fn open_segment_writer(&self, offset: u64) -> io::Result<Self::SegmentWriter> {
136        T::open_segment_writer(self, offset)
137    }
138
139    fn remove_segment(&self, offset: u64) -> io::Result<()> {
140        T::remove_segment(self, offset)
141    }
142
143    fn compress_segment(&self, offset: u64) -> io::Result<()> {
144        T::compress_segment(self, offset)
145    }
146
147    fn existing_offsets(&self) -> io::Result<Vec<u64>> {
148        T::existing_offsets(self)
149    }
150
151    fn create_offset_index(&self, offset: TxOffset, cap: u64) -> io::Result<TxOffsetIndexMut> {
152        T::create_offset_index(self, offset, cap)
153    }
154
155    /// Remove [`TxOffsetIndexMut`] named with `offset`.
156    fn remove_offset_index(&self, offset: TxOffset) -> io::Result<()> {
157        T::remove_offset_index(self, offset)
158    }
159
160    /// Get [`TxOffsetIndex`] for the given `offset`.
161    fn get_offset_index(&self, offset: TxOffset) -> io::Result<TxOffsetIndex> {
162        T::get_offset_index(self, offset)
163    }
164}
165
166impl<T: SegmentLen> SegmentLen for io::BufReader<T> {
167    fn segment_len(&mut self) -> io::Result<u64> {
168        SegmentLen::segment_len(self.get_mut())
169    }
170}
171
172pub(crate) fn create_offset_index_writer<R: Repo>(repo: &R, offset: u64, opts: Options) -> Option<OffsetIndexWriter> {
173    repo.create_offset_index(offset, opts.offset_index_len())
174        .map(|index| OffsetIndexWriter::new(index, opts))
175        .map_err(|e| {
176            warn!("failed to get offset index for segment {offset}: {e}");
177        })
178        .ok()
179}
180
181/// Create a new segment [`Writer`] with `offset`.
182///
183/// Immediately attempts to write the segment header with the supplied
184/// `log_format_version`.
185///
186/// If the segment already exists, [`io::ErrorKind::AlreadyExists`] is returned.
187pub fn create_segment_writer<R: Repo>(
188    repo: &R,
189    opts: Options,
190    epoch: u64,
191    offset: u64,
192) -> io::Result<Writer<R::SegmentWriter>> {
193    let mut storage = repo.create_segment(offset)?;
194    Header {
195        log_format_version: opts.log_format_version,
196        checksum_algorithm: Commit::CHECKSUM_ALGORITHM,
197    }
198    .write(&mut storage)?;
199    storage.fsync()?;
200
201    Ok(Writer {
202        commit: Commit {
203            min_tx_offset: offset,
204            n: 0,
205            records: Vec::new(),
206            epoch,
207        },
208        inner: io::BufWriter::new(storage),
209
210        min_tx_offset: offset,
211        bytes_written: Header::LEN as u64,
212
213        max_records_in_commit: opts.max_records_in_commit,
214
215        offset_index_head: create_offset_index_writer(repo, offset, opts),
216    })
217}
218
219/// Open the existing segment at `offset` for writing.
220///
221/// This will traverse the segment in order to find the offset of the next
222/// commit to write to it, which may fail for various reasons.
223///
224/// If the traversal is successful, the segment header is checked against the
225/// `max_log_format_version`, and [`io::ErrorKind::InvalidData`] is returned if
226/// the segment's log format version is greater than the given value. Likewise
227/// if the checksum algorithm stored in the segment header cannot be handled
228/// by this crate.
229///
230/// If only a (non-empty) prefix of the segment could be read due to a failure
231/// to decode a [`Commit`], the segment [`Metadata`] read up to the faulty
232/// commit is returned in an `Err`. In this case, a new segment should be
233/// created for writing.
234pub fn resume_segment_writer<R: Repo>(
235    repo: &R,
236    opts: Options,
237    offset: u64,
238) -> io::Result<Result<Writer<R::SegmentWriter>, Metadata>> {
239    let mut storage = repo.open_segment_writer(offset)?;
240    let offset_index = repo.get_offset_index(offset).ok();
241    let Metadata {
242        header,
243        tx_range,
244        size_in_bytes,
245        max_epoch,
246        max_commit_offset: _,
247    } = match Metadata::extract(offset, &mut storage, offset_index.as_ref()) {
248        Err(error::SegmentMetadata::InvalidCommit { sofar, source }) => {
249            warn!("invalid commit in segment {offset}: {source}");
250            debug!("sofar={sofar:?}");
251            return Ok(Err(sofar));
252        }
253        Err(error::SegmentMetadata::Io(e)) => return Err(e),
254        Ok(meta) => meta,
255    };
256    header
257        .ensure_compatible(opts.log_format_version, Commit::CHECKSUM_ALGORITHM)
258        .map_err(|msg| io::Error::new(io::ErrorKind::InvalidData, msg))?;
259    // When resuming, the log format version must be equal.
260    if header.log_format_version != opts.log_format_version {
261        return Err(io::Error::new(
262            io::ErrorKind::InvalidData,
263            format!(
264                "log format version mismatch: current={} segment={}",
265                opts.log_format_version, header.log_format_version
266            ),
267        ));
268    }
269
270    Ok(Ok(Writer {
271        commit: Commit {
272            min_tx_offset: tx_range.end,
273            n: 0,
274            records: Vec::new(),
275            epoch: max_epoch,
276        },
277        inner: io::BufWriter::new(storage),
278
279        min_tx_offset: tx_range.start,
280        bytes_written: size_in_bytes,
281
282        max_records_in_commit: opts.max_records_in_commit,
283
284        offset_index_head: create_offset_index_writer(repo, offset, opts),
285    }))
286}
287
288/// Open the existing segment at `offset` for reading.
289///
290/// Unlike [`resume_segment_writer`], this does not traverse the segment. It
291/// does, however, attempt to read the segment header and checks that the log
292/// format version and checksum algorithm are compatible.
293pub fn open_segment_reader<R: Repo>(
294    repo: &R,
295    max_log_format_version: u8,
296    offset: u64,
297) -> io::Result<Reader<R::SegmentReader>> {
298    debug!("open segment reader at {offset}");
299    let storage = repo.open_segment_reader(offset)?;
300    Reader::new(max_log_format_version, offset, storage)
301}