spacetimedb_commitlog/
lib.rs

1use std::{
2    io,
3    num::{NonZeroU16, NonZeroU64},
4    sync::RwLock,
5};
6
7use log::trace;
8use spacetimedb_paths::server::CommitLogDir;
9
10pub mod commit;
11pub mod commitlog;
12mod index;
13pub mod repo;
14pub mod segment;
15mod varchar;
16mod varint;
17
18pub use crate::{
19    commit::{Commit, StoredCommit},
20    payload::{Decoder, Encode},
21    segment::{Transaction, DEFAULT_LOG_FORMAT_VERSION},
22    varchar::Varchar,
23};
24pub mod error;
25pub mod payload;
26
27#[cfg(any(test, feature = "test"))]
28pub mod tests;
29
30/// [`Commitlog`] options.
31#[derive(Clone, Copy, Debug)]
32pub struct Options {
33    /// Set the log format version to write, and the maximum supported version.
34    ///
35    /// Choosing a payload format `T` of [`Commitlog`] should usually result in
36    /// updating the [`DEFAULT_LOG_FORMAT_VERSION`] of this crate. Sometimes it
37    /// may however be useful to set the version at runtime, e.g. to experiment
38    /// with new or very old versions.
39    ///
40    /// Default: [`DEFAULT_LOG_FORMAT_VERSION`]
41    pub log_format_version: u8,
42    /// The maximum size in bytes to which log segments should be allowed to
43    /// grow.
44    ///
45    /// Default: 1GiB
46    pub max_segment_size: u64,
47    /// The maximum number of records in a commit.
48    ///
49    /// If this number is exceeded, the commit is flushed to disk even without
50    /// explicitly calling [`Commitlog::flush`].
51    ///
52    /// Default: 65,535
53    pub max_records_in_commit: NonZeroU16,
54    /// Whenever at least this many bytes have been written to the currently
55    /// active segment, an entry is added to its offset index.
56    ///
57    /// Default: 4096
58    pub offset_index_interval_bytes: NonZeroU64,
59    /// If `true`, require that the segment must be synced to disk before an
60    /// index entry is added.
61    ///
62    /// Setting this to `false` (the default) will update the index every
63    /// `offset_index_interval_bytes`, even if the commitlog wasn't synced.
64    /// This means that the index could contain non-existent entries in the
65    /// event of a crash.
66    ///
67    /// Setting it to `true` will update the index when the commitlog is synced,
68    /// and `offset_index_interval_bytes` have been written.
69    /// This means that the index could contain fewer index entries than
70    /// strictly every `offset_index_interval_bytes`.
71    ///
72    /// Default: false
73    pub offset_index_require_segment_fsync: bool,
74}
75
76impl Default for Options {
77    fn default() -> Self {
78        Self {
79            log_format_version: DEFAULT_LOG_FORMAT_VERSION,
80            max_segment_size: 1024 * 1024 * 1024,
81            max_records_in_commit: NonZeroU16::MAX,
82            offset_index_interval_bytes: NonZeroU64::new(4096).unwrap(),
83            offset_index_require_segment_fsync: false,
84        }
85    }
86}
87
88impl Options {
89    /// Compute the length in bytes of an offset index based on the settings in
90    /// `self`.
91    pub fn offset_index_len(&self) -> u64 {
92        self.max_segment_size / self.offset_index_interval_bytes
93    }
94}
95
96/// The canonical commitlog, backed by on-disk log files.
97///
98/// Records in the log are of type `T`, which canonically is instantiated to
99/// [`payload::Txdata`].
100pub struct Commitlog<T> {
101    inner: RwLock<commitlog::Generic<repo::Fs, T>>,
102}
103
104impl<T> Commitlog<T> {
105    /// Open the log at root directory `root` with [`Options`].
106    ///
107    /// The root directory must already exist.
108    ///
109    /// Note that opening a commitlog involves I/O: some consistency checks are
110    /// performed, and the next writing position is determined.
111    ///
112    /// This is only necessary when opening the commitlog for writing. See the
113    /// free-standing functions in this module for how to traverse a read-only
114    /// commitlog.
115    pub fn open(root: CommitLogDir, opts: Options) -> io::Result<Self> {
116        let inner = commitlog::Generic::open(repo::Fs::new(root)?, opts)?;
117
118        Ok(Self {
119            inner: RwLock::new(inner),
120        })
121    }
122
123    /// Determine the maximum transaction offset considered durable.
124    ///
125    /// The offset is `None` if the log hasn't been flushed to disk yet.
126    pub fn max_committed_offset(&self) -> Option<u64> {
127        self.inner.read().unwrap().max_committed_offset()
128    }
129
130    /// Get the current epoch.
131    ///
132    /// See also: [`Commit::epoch`].
133    pub fn epoch(&self) -> u64 {
134        self.inner.read().unwrap().epoch()
135    }
136
137    /// Update the current epoch.
138    ///
139    /// Does nothing if the given `epoch` is equal to the current epoch.
140    /// Otherwise flushes outstanding transactions to disk (equivalent to
141    /// [`Self::flush`]) before updating the epoch.
142    ///
143    /// Returns the maximum transaction offset written to disk. The offset is
144    /// `None` if the log is empty and no data was pending to be flushed.
145    ///
146    /// # Errors
147    ///
148    /// If `epoch` is smaller than the current epoch, an error of kind
149    /// [`io::ErrorKind::InvalidInput`] is returned.
150    ///
151    /// Errors from the implicit flush are propagated.
152    pub fn set_epoch(&self, epoch: u64) -> io::Result<Option<u64>> {
153        let mut inner = self.inner.write().unwrap();
154        inner.set_epoch(epoch)?;
155
156        Ok(inner.max_committed_offset())
157    }
158
159    /// Sync all OS-buffered writes to disk.
160    ///
161    /// Note that this does **not** write outstanding records to disk.
162    /// Use [`Self::flush_and_sync`] or call [`Self::flush`] prior to this
163    /// method to ensure all data is on disk.
164    ///
165    /// Returns the maximum transaction offset which is considered durable after
166    /// this method returns successfully. The offset is `None` if the log hasn't
167    /// been flushed to disk yet.
168    ///
169    /// # Panics
170    ///
171    /// This method panics if syncing fails irrecoverably.
172    pub fn sync(&self) -> Option<u64> {
173        let mut inner = self.inner.write().unwrap();
174        trace!("sync commitlog");
175        inner.sync();
176
177        inner.max_committed_offset()
178    }
179
180    /// Write all outstanding transaction records to disk.
181    ///
182    /// Note that this does **not** force the OS to sync the data to disk.
183    /// Use [`Self::flush_and_sync`] or call [`Self::sync`] after this method
184    /// to ensure all data is on disk.
185    ///
186    /// Returns the maximum transaction offset written to disk. The offset is
187    /// `None` if the log is empty and no data was pending to be flushed.
188    ///
189    /// Repeatedly calling this method may return the same value.
190    pub fn flush(&self) -> io::Result<Option<u64>> {
191        let mut inner = self.inner.write().unwrap();
192        trace!("flush commitlog");
193        inner.commit()?;
194
195        Ok(inner.max_committed_offset())
196    }
197
198    /// Write all outstanding transaction records to disk and flush OS buffers.
199    ///
200    /// Equivalent to calling [`Self::flush`] followed by [`Self::sync`], but
201    /// without releasing the write lock in between.
202    ///
203    /// # Errors
204    ///
205    /// An error is returned if writing to disk fails due to an I/O error.
206    ///
207    /// # Panics
208    ///
209    /// This method panics if syncing fails irrecoverably.
210    pub fn flush_and_sync(&self) -> io::Result<Option<u64>> {
211        let mut inner = self.inner.write().unwrap();
212        trace!("flush and sync commitlog");
213        inner.commit()?;
214        inner.sync();
215
216        Ok(inner.max_committed_offset())
217    }
218
219    /// Obtain an iterator which traverses the log from the start, yielding
220    /// [`StoredCommit`]s.
221    ///
222    /// The returned iterator is not aware of segment rotation. That is, if a
223    /// new segment is created after this method returns, the iterator will not
224    /// traverse it.
225    ///
226    /// Commits appended to the log while it is being traversed are generally
227    /// visible to the iterator. Upon encountering [`io::ErrorKind::UnexpectedEof`],
228    /// however, a new iterator should be created using [`Self::commits_from`]
229    /// with the last transaction offset yielded.
230    ///
231    /// Note that the very last [`StoredCommit`] in a commitlog may be corrupt
232    /// (e.g. due to a partial write to disk), but a subsequent `append` will
233    /// bring the log into a consistent state.
234    ///
235    /// This means that, when this iterator yields an `Err` value, the consumer
236    /// may want to check if the iterator is exhausted (by calling `next()`)
237    /// before treating the `Err` value as an application error.
238    pub fn commits(&self) -> impl Iterator<Item = Result<StoredCommit, error::Traversal>> {
239        self.commits_from(0)
240    }
241
242    /// Obtain an iterator starting from transaction offset `offset`, yielding
243    /// [`StoredCommit`]s.
244    ///
245    /// Similar to [`Self::commits`] but will skip until the offset is contained
246    /// in the next [`StoredCommit`] to yield.
247    ///
248    /// Note that the first [`StoredCommit`] yielded is the first commit
249    /// containing the given transaction offset, i.e. its `min_tx_offset` may be
250    /// smaller than `offset`.
251    pub fn commits_from(&self, offset: u64) -> impl Iterator<Item = Result<StoredCommit, error::Traversal>> {
252        self.inner.read().unwrap().commits_from(offset)
253    }
254
255    /// Remove all data from the log and reopen it.
256    ///
257    /// Log segments are deleted starting from the newest. As multiple segments
258    /// cannot be deleted atomically, the log may not be completely empty if
259    /// the method returns an error.
260    ///
261    /// Note that the method consumes `self` to ensure the log is not modified
262    /// while resetting.
263    pub fn reset(self) -> io::Result<Self> {
264        let inner = self.inner.into_inner().unwrap().reset()?;
265        Ok(Self {
266            inner: RwLock::new(inner),
267        })
268    }
269
270    /// Remove all data past the given transaction `offset` from the log and
271    /// reopen it.
272    ///
273    /// Like with [`Self::reset`], it may happen that not all segments newer
274    /// than `offset` can be deleted.
275    ///
276    /// If the method returns successfully, the most recent [`Commit`] in the
277    /// log will contain the transaction at `offset`.
278    ///
279    /// Note that the method consumes `self` to ensure the log is not modified
280    /// while resetting.
281    pub fn reset_to(self, offset: u64) -> io::Result<Self> {
282        let inner = self.inner.into_inner().unwrap().reset_to(offset)?;
283        Ok(Self {
284            inner: RwLock::new(inner),
285        })
286    }
287
288    /// Determine the size on disk of this commitlog.
289    pub fn size_on_disk(&self) -> io::Result<u64> {
290        let inner = self.inner.read().unwrap();
291        inner.repo.size_on_disk()
292    }
293}
294
295impl<T: Encode> Commitlog<T> {
296    /// Append the record `txdata` to the log.
297    ///
298    /// If the internal buffer exceeds [`Options::max_records_in_commit`], the
299    /// argument is returned in an `Err`. The caller should [`Self::flush`] the
300    /// log and try again.
301    ///
302    /// In case the log is appended to from multiple threads, this may result in
303    /// a busy loop trying to acquire a slot in the buffer. In such scenarios,
304    /// [`Self::append_maybe_flush`] is preferable.
305    pub fn append(&self, txdata: T) -> Result<(), T> {
306        let mut inner = self.inner.write().unwrap();
307        inner.append(txdata)
308    }
309
310    /// Append the record `txdata` to the log.
311    ///
312    /// The `txdata` payload is buffered in memory until either:
313    ///
314    /// - [`Self::flush`] is called explicitly, or
315    /// - [`Options::max_records_in_commit`] is exceeded
316    ///
317    /// In the latter case, [`Self::append`] flushes implicitly, _before_
318    /// appending the `txdata` argument.
319    ///
320    /// I.e. the argument is not guaranteed to be flushed after the method
321    /// returns. If that is desired, [`Self::flush`] must be called explicitly.
322    ///
323    /// # Errors
324    ///
325    /// If the log needs to be flushed, but an I/O error occurs, ownership of
326    /// `txdata` is returned back to the caller alongside the [`io::Error`].
327    ///
328    /// The value can then be used to retry appending.
329    pub fn append_maybe_flush(&self, txdata: T) -> Result<(), error::Append<T>> {
330        let mut inner = self.inner.write().unwrap();
331
332        if let Err(txdata) = inner.append(txdata) {
333            if let Err(source) = inner.commit() {
334                return Err(error::Append { txdata, source });
335            }
336            // `inner.commit.n` must be zero at this point
337            let res = inner.append(txdata);
338            debug_assert!(res.is_ok(), "failed to append while holding write lock");
339        }
340
341        Ok(())
342    }
343
344    /// Obtain an iterator which traverses the log from the start, yielding
345    /// [`Transaction`]s.
346    ///
347    /// The provided `decoder`'s [`Decoder::decode_record`] method will be
348    /// called [`Commit::n`] times per [`Commit`] to obtain the individual
349    /// transaction payloads.
350    ///
351    /// Like [`Self::commits`], the iterator is not aware of segment rotation.
352    /// That is, if a new segment is created after this method returns, the
353    /// iterator will not traverse it.
354    ///
355    /// Transactions appended to the log while it is being traversed are
356    /// generally visible to the iterator. Upon encountering [`io::ErrorKind::UnexpectedEof`],
357    /// however, a new iterator should be created using [`Self::transactions_from`]
358    /// with the last transaction offset yielded.
359    ///
360    /// Note that the very last [`Commit`] in a commitlog may be corrupt (e.g.
361    /// due to a partial write to disk), but a subsequent `append` will bring
362    /// the log into a consistent state.
363    ///
364    /// This means that, when this iterator yields an `Err` value, the consumer
365    /// may want to check if the iterator is exhausted (by calling `next()`)
366    /// before treating the `Err` value as an application error.
367    pub fn transactions<'a, D>(&self, de: &'a D) -> impl Iterator<Item = Result<Transaction<T>, D::Error>> + 'a
368    where
369        D: Decoder<Record = T>,
370        D::Error: From<error::Traversal>,
371        T: 'a,
372    {
373        self.transactions_from(0, de)
374    }
375
376    /// Obtain an iterator starting from transaction offset `offset`, yielding
377    /// [`Transaction`]s.
378    ///
379    /// Similar to [`Self::transactions`] but will skip until the provided
380    /// `offset`, i.e. the first [`Transaction`] yielded will be the transaction
381    /// with offset `offset`.
382    pub fn transactions_from<'a, D>(
383        &self,
384        offset: u64,
385        de: &'a D,
386    ) -> impl Iterator<Item = Result<Transaction<T>, D::Error>> + 'a
387    where
388        D: Decoder<Record = T>,
389        D::Error: From<error::Traversal>,
390        T: 'a,
391    {
392        self.inner.read().unwrap().transactions_from(offset, de)
393    }
394
395    /// Traverse the log from the start and "fold" its transactions into the
396    /// provided [`Decoder`].
397    ///
398    /// A [`Decoder`] is a stateful object due to the requirement to store
399    /// schema information in the log itself. That is, a [`Decoder`] may need to
400    /// be able to resolve transaction schema information dynamically while
401    /// traversing the log.
402    ///
403    /// This is equivalent to "replaying" a log into a database state. In this
404    /// scenario, it is not interesting to consume the [`Transaction`] payload
405    /// as an iterator.
406    ///
407    /// This method allows the use of a [`Decoder`] which returns zero-sized
408    /// data (e.g. `Decoder<Record = ()>`), as it will not allocate the commit
409    /// payload into a struct.
410    ///
411    /// Note that, unlike [`Self::transactions`], this method will ignore a
412    /// corrupt commit at the very end of the traversed log.
413    pub fn fold_transactions<D>(&self, de: D) -> Result<(), D::Error>
414    where
415        D: Decoder,
416        D::Error: From<error::Traversal>,
417    {
418        self.fold_transactions_from(0, de)
419    }
420
421    /// Traverse the log from the given transaction offset and "fold" its
422    /// transactions into the provided [`Decoder`].
423    ///
424    /// Similar to [`Self::fold_transactions`] but will skip until the provided
425    /// `offset`, i.e. the first `tx_offset` passed to [`Decoder::decode_record`]
426    /// will be equal to `offset`.
427    pub fn fold_transactions_from<D>(&self, offset: u64, de: D) -> Result<(), D::Error>
428    where
429        D: Decoder,
430        D::Error: From<error::Traversal>,
431    {
432        self.inner.read().unwrap().fold_transactions_from(offset, de)
433    }
434}
435
436/// Obtain an iterator which traverses the commitlog located at the `root`
437/// directory from the start, yielding [`StoredCommit`]s.
438///
439/// Starts the traversal without the upfront I/O imposed by [`Commitlog::open`].
440/// See [`Commitlog::commits`] for more information.
441pub fn commits(root: CommitLogDir) -> io::Result<impl Iterator<Item = Result<StoredCommit, error::Traversal>>> {
442    commits_from(root, 0)
443}
444
445/// Obtain an iterator which traverses the commitlog located at the `root`
446/// directory starting from `offset` and yielding [`StoredCommit`]s.
447///
448/// Starts the traversal without the upfront I/O imposed by [`Commitlog::open`].
449/// See [`Commitlog::commits_from`] for more information.
450pub fn commits_from(
451    root: CommitLogDir,
452    offset: u64,
453) -> io::Result<impl Iterator<Item = Result<StoredCommit, error::Traversal>>> {
454    commitlog::commits_from(repo::Fs::new(root)?, DEFAULT_LOG_FORMAT_VERSION, offset)
455}
456
457/// Obtain an iterator which traverses the commitlog located at the `root`
458/// directory from the start, yielding [`Transaction`]s.
459///
460/// Starts the traversal without the upfront I/O imposed by [`Commitlog::open`].
461/// See [`Commitlog::transactions`] for more information.
462pub fn transactions<'a, D, T>(
463    root: CommitLogDir,
464    de: &'a D,
465) -> io::Result<impl Iterator<Item = Result<Transaction<T>, D::Error>> + 'a>
466where
467    D: Decoder<Record = T>,
468    D::Error: From<error::Traversal>,
469    T: 'a,
470{
471    transactions_from(root, 0, de)
472}
473
474/// Obtain an iterator which traverses the commitlog located at the `root`
475/// directory starting from `offset` and yielding [`Transaction`]s.
476///
477/// Starts the traversal without the upfront I/O imposed by [`Commitlog::open`].
478/// See [`Commitlog::transactions_from`] for more information.
479pub fn transactions_from<'a, D, T>(
480    root: CommitLogDir,
481    offset: u64,
482    de: &'a D,
483) -> io::Result<impl Iterator<Item = Result<Transaction<T>, D::Error>> + 'a>
484where
485    D: Decoder<Record = T>,
486    D::Error: From<error::Traversal>,
487    T: 'a,
488{
489    commitlog::transactions_from(repo::Fs::new(root)?, DEFAULT_LOG_FORMAT_VERSION, offset, de)
490}
491
492/// Traverse the commitlog located at the `root` directory from the start and
493/// "fold" its transactions into the provided [`Decoder`].
494///
495/// Starts the traversal without the upfront I/O imposed by [`Commitlog::open`].
496/// See [`Commitlog::fold_transactions`] for more information.
497pub fn fold_transactions<D>(root: CommitLogDir, de: D) -> Result<(), D::Error>
498where
499    D: Decoder,
500    D::Error: From<error::Traversal> + From<io::Error>,
501{
502    fold_transactions_from(root, 0, de)
503}
504
505/// Traverse the commitlog located at the `root` directory starting from `offset`
506/// and "fold" its transactions into the provided [`Decoder`].
507///
508/// Starts the traversal without the upfront I/O imposed by [`Commitlog::open`].
509/// See [`Commitlog::fold_transactions_from`] for more information.
510pub fn fold_transactions_from<D>(root: CommitLogDir, offset: u64, de: D) -> Result<(), D::Error>
511where
512    D: Decoder,
513    D::Error: From<error::Traversal> + From<io::Error>,
514{
515    commitlog::fold_transactions_from(repo::Fs::new(root)?, DEFAULT_LOG_FORMAT_VERSION, offset, de)
516}