spacetimedb_commitlog/
lib.rs

1use std::{
2    io,
3    num::{NonZeroU16, NonZeroU64},
4    sync::RwLock,
5};
6
7use log::trace;
8use repo::Repo;
9use spacetimedb_paths::server::CommitLogDir;
10
11pub mod commit;
12pub mod commitlog;
13mod index;
14pub mod repo;
15pub mod segment;
16mod varchar;
17mod varint;
18
19pub use crate::{
20    commit::{Commit, StoredCommit},
21    payload::{Decoder, Encode},
22    segment::{Transaction, DEFAULT_LOG_FORMAT_VERSION},
23    varchar::Varchar,
24};
25pub mod error;
26pub mod payload;
27
28#[cfg(feature = "streaming")]
29pub mod stream;
30
31#[cfg(any(test, feature = "test"))]
32pub mod tests;
33
34/// [`Commitlog`] options.
35#[derive(Clone, Copy, Debug)]
36pub struct Options {
37    /// Set the log format version to write, and the maximum supported version.
38    ///
39    /// Choosing a payload format `T` of [`Commitlog`] should usually result in
40    /// updating the [`DEFAULT_LOG_FORMAT_VERSION`] of this crate. Sometimes it
41    /// may however be useful to set the version at runtime, e.g. to experiment
42    /// with new or very old versions.
43    ///
44    /// Default: [`DEFAULT_LOG_FORMAT_VERSION`]
45    pub log_format_version: u8,
46    /// The maximum size in bytes to which log segments should be allowed to
47    /// grow.
48    ///
49    /// Default: 1GiB
50    pub max_segment_size: u64,
51    /// The maximum number of records in a commit.
52    ///
53    /// If this number is exceeded, the commit is flushed to disk even without
54    /// explicitly calling [`Commitlog::flush`].
55    ///
56    /// Default: 65,535
57    pub max_records_in_commit: NonZeroU16,
58    /// Whenever at least this many bytes have been written to the currently
59    /// active segment, an entry is added to its offset index.
60    ///
61    /// Default: 4096
62    pub offset_index_interval_bytes: NonZeroU64,
63    /// If `true`, require that the segment must be synced to disk before an
64    /// index entry is added.
65    ///
66    /// Setting this to `false` (the default) will update the index every
67    /// `offset_index_interval_bytes`, even if the commitlog wasn't synced.
68    /// This means that the index could contain non-existent entries in the
69    /// event of a crash.
70    ///
71    /// Setting it to `true` will update the index when the commitlog is synced,
72    /// and `offset_index_interval_bytes` have been written.
73    /// This means that the index could contain fewer index entries than
74    /// strictly every `offset_index_interval_bytes`.
75    ///
76    /// Default: false
77    pub offset_index_require_segment_fsync: bool,
78}
79
80impl Default for Options {
81    fn default() -> Self {
82        Self {
83            log_format_version: DEFAULT_LOG_FORMAT_VERSION,
84            max_segment_size: 1024 * 1024 * 1024,
85            max_records_in_commit: NonZeroU16::MAX,
86            offset_index_interval_bytes: NonZeroU64::new(4096).unwrap(),
87            offset_index_require_segment_fsync: false,
88        }
89    }
90}
91
92impl Options {
93    /// Compute the length in bytes of an offset index based on the settings in
94    /// `self`.
95    pub fn offset_index_len(&self) -> u64 {
96        self.max_segment_size / self.offset_index_interval_bytes
97    }
98}
99
100/// The canonical commitlog, backed by on-disk log files.
101///
102/// Records in the log are of type `T`, which canonically is instantiated to
103/// [`payload::Txdata`].
104pub struct Commitlog<T> {
105    inner: RwLock<commitlog::Generic<repo::Fs, T>>,
106}
107
108impl<T> Commitlog<T> {
109    /// Open the log at root directory `root` with [`Options`].
110    ///
111    /// The root directory must already exist.
112    ///
113    /// Note that opening a commitlog involves I/O: some consistency checks are
114    /// performed, and the next writing position is determined.
115    ///
116    /// This is only necessary when opening the commitlog for writing. See the
117    /// free-standing functions in this module for how to traverse a read-only
118    /// commitlog.
119    pub fn open(root: CommitLogDir, opts: Options) -> io::Result<Self> {
120        let inner = commitlog::Generic::open(repo::Fs::new(root)?, opts)?;
121
122        Ok(Self {
123            inner: RwLock::new(inner),
124        })
125    }
126
127    /// Determine the maximum transaction offset considered durable.
128    ///
129    /// The offset is `None` if the log hasn't been flushed to disk yet.
130    pub fn max_committed_offset(&self) -> Option<u64> {
131        self.inner.read().unwrap().max_committed_offset()
132    }
133
134    /// Determine the minimum transaction offset in the log.
135    ///
136    /// The offset is `None` if the log hasn't been flushed to disk yet.
137    pub fn min_committed_offset(&self) -> Option<u64> {
138        self.inner.read().unwrap().min_committed_offset()
139    }
140
141    /// Get the current epoch.
142    ///
143    /// See also: [`Commit::epoch`].
144    pub fn epoch(&self) -> u64 {
145        self.inner.read().unwrap().epoch()
146    }
147
148    /// Update the current epoch.
149    ///
150    /// Does nothing if the given `epoch` is equal to the current epoch.
151    /// Otherwise flushes outstanding transactions to disk (equivalent to
152    /// [`Self::flush`]) before updating the epoch.
153    ///
154    /// Returns the maximum transaction offset written to disk. The offset is
155    /// `None` if the log is empty and no data was pending to be flushed.
156    ///
157    /// # Errors
158    ///
159    /// If `epoch` is smaller than the current epoch, an error of kind
160    /// [`io::ErrorKind::InvalidInput`] is returned.
161    ///
162    /// Errors from the implicit flush are propagated.
163    pub fn set_epoch(&self, epoch: u64) -> io::Result<Option<u64>> {
164        let mut inner = self.inner.write().unwrap();
165        inner.set_epoch(epoch)?;
166
167        Ok(inner.max_committed_offset())
168    }
169
170    /// Sync all OS-buffered writes to disk.
171    ///
172    /// Note that this does **not** write outstanding records to disk.
173    /// Use [`Self::flush_and_sync`] or call [`Self::flush`] prior to this
174    /// method to ensure all data is on disk.
175    ///
176    /// Returns the maximum transaction offset which is considered durable after
177    /// this method returns successfully. The offset is `None` if the log hasn't
178    /// been flushed to disk yet.
179    ///
180    /// # Panics
181    ///
182    /// This method panics if syncing fails irrecoverably.
183    pub fn sync(&self) -> Option<u64> {
184        let mut inner = self.inner.write().unwrap();
185        trace!("sync commitlog");
186        inner.sync();
187
188        inner.max_committed_offset()
189    }
190
191    /// Write all outstanding transaction records to disk.
192    ///
193    /// Note that this does **not** force the OS to sync the data to disk.
194    /// Use [`Self::flush_and_sync`] or call [`Self::sync`] after this method
195    /// to ensure all data is on disk.
196    ///
197    /// Returns the maximum transaction offset written to disk. The offset is
198    /// `None` if the log is empty and no data was pending to be flushed.
199    ///
200    /// Repeatedly calling this method may return the same value.
201    pub fn flush(&self) -> io::Result<Option<u64>> {
202        let mut inner = self.inner.write().unwrap();
203        trace!("flush commitlog");
204        inner.commit()?;
205
206        Ok(inner.max_committed_offset())
207    }
208
209    /// Write all outstanding transaction records to disk and flush OS buffers.
210    ///
211    /// Equivalent to calling [`Self::flush`] followed by [`Self::sync`], but
212    /// without releasing the write lock in between.
213    ///
214    /// # Errors
215    ///
216    /// An error is returned if writing to disk fails due to an I/O error.
217    ///
218    /// # Panics
219    ///
220    /// This method panics if syncing fails irrecoverably.
221    pub fn flush_and_sync(&self) -> io::Result<Option<u64>> {
222        let mut inner = self.inner.write().unwrap();
223        trace!("flush and sync commitlog");
224        inner.commit()?;
225        inner.sync();
226
227        Ok(inner.max_committed_offset())
228    }
229
230    /// Obtain an iterator which traverses the log from the start, yielding
231    /// [`StoredCommit`]s.
232    ///
233    /// The returned iterator is not aware of segment rotation. That is, if a
234    /// new segment is created after this method returns, the iterator will not
235    /// traverse it.
236    ///
237    /// Commits appended to the log while it is being traversed are generally
238    /// visible to the iterator. Upon encountering [`io::ErrorKind::UnexpectedEof`],
239    /// however, a new iterator should be created using [`Self::commits_from`]
240    /// with the last transaction offset yielded.
241    ///
242    /// Note that the very last [`StoredCommit`] in a commitlog may be corrupt
243    /// (e.g. due to a partial write to disk), but a subsequent `append` will
244    /// bring the log into a consistent state.
245    ///
246    /// This means that, when this iterator yields an `Err` value, the consumer
247    /// may want to check if the iterator is exhausted (by calling `next()`)
248    /// before treating the `Err` value as an application error.
249    pub fn commits(&self) -> impl Iterator<Item = Result<StoredCommit, error::Traversal>> {
250        self.commits_from(0)
251    }
252
253    /// Obtain an iterator starting from transaction offset `offset`, yielding
254    /// [`StoredCommit`]s.
255    ///
256    /// Similar to [`Self::commits`] but will skip until the offset is contained
257    /// in the next [`StoredCommit`] to yield.
258    ///
259    /// Note that the first [`StoredCommit`] yielded is the first commit
260    /// containing the given transaction offset, i.e. its `min_tx_offset` may be
261    /// smaller than `offset`.
262    pub fn commits_from(&self, offset: u64) -> impl Iterator<Item = Result<StoredCommit, error::Traversal>> {
263        self.inner.read().unwrap().commits_from(offset)
264    }
265
266    /// Get a list of segment offsets, sorted in ascending order.
267    pub fn existing_segment_offsets(&self) -> io::Result<Vec<u64>> {
268        self.inner.read().unwrap().repo.existing_offsets()
269    }
270
271    /// Compress the segments at the offsets provided, marking them as immutable.
272    pub fn compress_segments(&self, offsets: &[u64]) -> io::Result<()> {
273        // even though `compress_segment` takes &self, we take an
274        // exclusive lock to avoid any weirdness happening.
275        #[allow(clippy::readonly_write_lock)]
276        let inner = self.inner.write().unwrap();
277        assert!(!offsets.contains(&inner.head.min_tx_offset()));
278        // TODO: parallelize, maybe
279        offsets
280            .iter()
281            .try_for_each(|&offset| inner.repo.compress_segment(offset))
282    }
283
284    /// Remove all data from the log and reopen it.
285    ///
286    /// Log segments are deleted starting from the newest. As multiple segments
287    /// cannot be deleted atomically, the log may not be completely empty if
288    /// the method returns an error.
289    ///
290    /// Note that the method consumes `self` to ensure the log is not modified
291    /// while resetting.
292    pub fn reset(self) -> io::Result<Self> {
293        let inner = self.inner.into_inner().unwrap().reset()?;
294        Ok(Self {
295            inner: RwLock::new(inner),
296        })
297    }
298
299    /// Remove all data past the given transaction `offset` from the log and
300    /// reopen it.
301    ///
302    /// Like with [`Self::reset`], it may happen that not all segments newer
303    /// than `offset` can be deleted.
304    ///
305    /// If the method returns successfully, the most recent [`Commit`] in the
306    /// log will contain the transaction at `offset`.
307    ///
308    /// Note that the method consumes `self` to ensure the log is not modified
309    /// while resetting.
310    pub fn reset_to(self, offset: u64) -> io::Result<Self> {
311        let inner = self.inner.into_inner().unwrap().reset_to(offset)?;
312        Ok(Self {
313            inner: RwLock::new(inner),
314        })
315    }
316
317    /// Determine the size on disk of this commitlog.
318    pub fn size_on_disk(&self) -> io::Result<u64> {
319        let inner = self.inner.read().unwrap();
320        inner.repo.size_on_disk()
321    }
322}
323
324impl<T: Encode> Commitlog<T> {
325    /// Append the record `txdata` to the log.
326    ///
327    /// If the internal buffer exceeds [`Options::max_records_in_commit`], the
328    /// argument is returned in an `Err`. The caller should [`Self::flush`] the
329    /// log and try again.
330    ///
331    /// In case the log is appended to from multiple threads, this may result in
332    /// a busy loop trying to acquire a slot in the buffer. In such scenarios,
333    /// [`Self::append_maybe_flush`] is preferable.
334    pub fn append(&self, txdata: T) -> Result<(), T> {
335        let mut inner = self.inner.write().unwrap();
336        inner.append(txdata)
337    }
338
339    /// Append the record `txdata` to the log.
340    ///
341    /// The `txdata` payload is buffered in memory until either:
342    ///
343    /// - [`Self::flush`] is called explicitly, or
344    /// - [`Options::max_records_in_commit`] is exceeded
345    ///
346    /// In the latter case, [`Self::append`] flushes implicitly, _before_
347    /// appending the `txdata` argument.
348    ///
349    /// I.e. the argument is not guaranteed to be flushed after the method
350    /// returns. If that is desired, [`Self::flush`] must be called explicitly.
351    ///
352    /// # Errors
353    ///
354    /// If the log needs to be flushed, but an I/O error occurs, ownership of
355    /// `txdata` is returned back to the caller alongside the [`io::Error`].
356    ///
357    /// The value can then be used to retry appending.
358    pub fn append_maybe_flush(&self, txdata: T) -> Result<(), error::Append<T>> {
359        let mut inner = self.inner.write().unwrap();
360
361        if let Err(txdata) = inner.append(txdata) {
362            if let Err(source) = inner.commit() {
363                return Err(error::Append { txdata, source });
364            }
365            // `inner.commit.n` must be zero at this point
366            let res = inner.append(txdata);
367            debug_assert!(res.is_ok(), "failed to append while holding write lock");
368        }
369
370        Ok(())
371    }
372
373    /// Obtain an iterator which traverses the log from the start, yielding
374    /// [`Transaction`]s.
375    ///
376    /// The provided `decoder`'s [`Decoder::decode_record`] method will be
377    /// called [`Commit::n`] times per [`Commit`] to obtain the individual
378    /// transaction payloads.
379    ///
380    /// Like [`Self::commits`], the iterator is not aware of segment rotation.
381    /// That is, if a new segment is created after this method returns, the
382    /// iterator will not traverse it.
383    ///
384    /// Transactions appended to the log while it is being traversed are
385    /// generally visible to the iterator. Upon encountering [`io::ErrorKind::UnexpectedEof`],
386    /// however, a new iterator should be created using [`Self::transactions_from`]
387    /// with the last transaction offset yielded.
388    ///
389    /// Note that the very last [`Commit`] in a commitlog may be corrupt (e.g.
390    /// due to a partial write to disk), but a subsequent `append` will bring
391    /// the log into a consistent state.
392    ///
393    /// This means that, when this iterator yields an `Err` value, the consumer
394    /// may want to check if the iterator is exhausted (by calling `next()`)
395    /// before treating the `Err` value as an application error.
396    pub fn transactions<'a, D>(&self, de: &'a D) -> impl Iterator<Item = Result<Transaction<T>, D::Error>> + 'a
397    where
398        D: Decoder<Record = T>,
399        D::Error: From<error::Traversal>,
400        T: 'a,
401    {
402        self.transactions_from(0, de)
403    }
404
405    /// Obtain an iterator starting from transaction offset `offset`, yielding
406    /// [`Transaction`]s.
407    ///
408    /// Similar to [`Self::transactions`] but will skip until the provided
409    /// `offset`, i.e. the first [`Transaction`] yielded will be the transaction
410    /// with offset `offset`.
411    pub fn transactions_from<'a, D>(
412        &self,
413        offset: u64,
414        de: &'a D,
415    ) -> impl Iterator<Item = Result<Transaction<T>, D::Error>> + 'a
416    where
417        D: Decoder<Record = T>,
418        D::Error: From<error::Traversal>,
419        T: 'a,
420    {
421        self.inner.read().unwrap().transactions_from(offset, de)
422    }
423
424    /// Traverse the log from the start and "fold" its transactions into the
425    /// provided [`Decoder`].
426    ///
427    /// A [`Decoder`] is a stateful object due to the requirement to store
428    /// schema information in the log itself. That is, a [`Decoder`] may need to
429    /// be able to resolve transaction schema information dynamically while
430    /// traversing the log.
431    ///
432    /// This is equivalent to "replaying" a log into a database state. In this
433    /// scenario, it is not interesting to consume the [`Transaction`] payload
434    /// as an iterator.
435    ///
436    /// This method allows the use of a [`Decoder`] which returns zero-sized
437    /// data (e.g. `Decoder<Record = ()>`), as it will not allocate the commit
438    /// payload into a struct.
439    ///
440    /// Note that, unlike [`Self::transactions`], this method will ignore a
441    /// corrupt commit at the very end of the traversed log.
442    pub fn fold_transactions<D>(&self, de: D) -> Result<(), D::Error>
443    where
444        D: Decoder,
445        D::Error: From<error::Traversal>,
446    {
447        self.fold_transactions_from(0, de)
448    }
449
450    /// Traverse the log from the given transaction offset and "fold" its
451    /// transactions into the provided [`Decoder`].
452    ///
453    /// Similar to [`Self::fold_transactions`] but will skip until the provided
454    /// `offset`, i.e. the first `tx_offset` passed to [`Decoder::decode_record`]
455    /// will be equal to `offset`.
456    pub fn fold_transactions_from<D>(&self, offset: u64, de: D) -> Result<(), D::Error>
457    where
458        D: Decoder,
459        D::Error: From<error::Traversal>,
460    {
461        self.inner.read().unwrap().fold_transactions_from(offset, de)
462    }
463}
464
465/// Extract the most recently written [`segment::Metadata`] from the commitlog
466/// in `repo`.
467///
468/// Returns `None` if the commitlog is empty.
469///
470/// Note that this function validates the most recent segment, which entails
471/// traversing it from the start.
472///
473/// The function can be used instead of the pattern:
474///
475/// ```ignore
476/// let log = Commitlog::open(..)?;
477/// let max_offset = log.max_committed_offset();
478/// ```
479///
480/// like so:
481///
482/// ```ignore
483/// let max_offset = committed_meta(..)?.map(|meta| meta.tx_range.end);
484/// ```
485///
486/// Unlike `open`, no segment will be created in an empty `repo`.
487pub fn committed_meta(root: CommitLogDir) -> Result<Option<segment::Metadata>, error::SegmentMetadata> {
488    commitlog::committed_meta(repo::Fs::new(root)?)
489}
490
491/// Obtain an iterator which traverses the commitlog located at the `root`
492/// directory from the start, yielding [`StoredCommit`]s.
493///
494/// Starts the traversal without the upfront I/O imposed by [`Commitlog::open`].
495/// See [`Commitlog::commits`] for more information.
496pub fn commits(root: CommitLogDir) -> io::Result<impl Iterator<Item = Result<StoredCommit, error::Traversal>>> {
497    commits_from(root, 0)
498}
499
500/// Obtain an iterator which traverses the commitlog located at the `root`
501/// directory starting from `offset` and yielding [`StoredCommit`]s.
502///
503/// Starts the traversal without the upfront I/O imposed by [`Commitlog::open`].
504/// See [`Commitlog::commits_from`] for more information.
505pub fn commits_from(
506    root: CommitLogDir,
507    offset: u64,
508) -> io::Result<impl Iterator<Item = Result<StoredCommit, error::Traversal>>> {
509    commitlog::commits_from(repo::Fs::new(root)?, DEFAULT_LOG_FORMAT_VERSION, offset)
510}
511
512/// Obtain an iterator which traverses the commitlog located at the `root`
513/// directory from the start, yielding [`Transaction`]s.
514///
515/// Starts the traversal without the upfront I/O imposed by [`Commitlog::open`].
516/// See [`Commitlog::transactions`] for more information.
517pub fn transactions<'a, D, T>(
518    root: CommitLogDir,
519    de: &'a D,
520) -> io::Result<impl Iterator<Item = Result<Transaction<T>, D::Error>> + 'a>
521where
522    D: Decoder<Record = T>,
523    D::Error: From<error::Traversal>,
524    T: 'a,
525{
526    transactions_from(root, 0, de)
527}
528
529/// Obtain an iterator which traverses the commitlog located at the `root`
530/// directory starting from `offset` and yielding [`Transaction`]s.
531///
532/// Starts the traversal without the upfront I/O imposed by [`Commitlog::open`].
533/// See [`Commitlog::transactions_from`] for more information.
534pub fn transactions_from<'a, D, T>(
535    root: CommitLogDir,
536    offset: u64,
537    de: &'a D,
538) -> io::Result<impl Iterator<Item = Result<Transaction<T>, D::Error>> + 'a>
539where
540    D: Decoder<Record = T>,
541    D::Error: From<error::Traversal>,
542    T: 'a,
543{
544    commitlog::transactions_from(repo::Fs::new(root)?, DEFAULT_LOG_FORMAT_VERSION, offset, de)
545}
546
547/// Traverse the commitlog located at the `root` directory from the start and
548/// "fold" its transactions into the provided [`Decoder`].
549///
550/// Starts the traversal without the upfront I/O imposed by [`Commitlog::open`].
551/// See [`Commitlog::fold_transactions`] for more information.
552pub fn fold_transactions<D>(root: CommitLogDir, de: D) -> Result<(), D::Error>
553where
554    D: Decoder,
555    D::Error: From<error::Traversal> + From<io::Error>,
556{
557    fold_transactions_from(root, 0, de)
558}
559
560/// Traverse the commitlog located at the `root` directory starting from `offset`
561/// and "fold" its transactions into the provided [`Decoder`].
562///
563/// Starts the traversal without the upfront I/O imposed by [`Commitlog::open`].
564/// See [`Commitlog::fold_transactions_from`] for more information.
565pub fn fold_transactions_from<D>(root: CommitLogDir, offset: u64, de: D) -> Result<(), D::Error>
566where
567    D: Decoder,
568    D::Error: From<error::Traversal> + From<io::Error>,
569{
570    commitlog::fold_transactions_from(repo::Fs::new(root)?, DEFAULT_LOG_FORMAT_VERSION, offset, de)
571}