spacetimedb_commitlog/
lib.rs

1use std::{
2    io,
3    num::{NonZeroU16, NonZeroU64},
4    sync::RwLock,
5};
6
7use log::trace;
8use repo::Repo;
9use spacetimedb_paths::server::CommitLogDir;
10
11pub mod commit;
12pub mod commitlog;
13mod index;
14pub mod repo;
15pub mod segment;
16mod varchar;
17mod varint;
18
19pub use crate::{
20    commit::{Commit, StoredCommit},
21    payload::{Decoder, Encode},
22    segment::{Transaction, DEFAULT_LOG_FORMAT_VERSION},
23    varchar::Varchar,
24};
25pub mod error;
26pub mod payload;
27
28#[cfg(feature = "streaming")]
29pub mod stream;
30
31#[cfg(any(test, feature = "test"))]
32pub mod tests;
33
34/// [`Commitlog`] options.
35#[derive(Clone, Copy, Debug)]
36pub struct Options {
37    /// Set the log format version to write, and the maximum supported version.
38    ///
39    /// Choosing a payload format `T` of [`Commitlog`] should usually result in
40    /// updating the [`DEFAULT_LOG_FORMAT_VERSION`] of this crate. Sometimes it
41    /// may however be useful to set the version at runtime, e.g. to experiment
42    /// with new or very old versions.
43    ///
44    /// Default: [`DEFAULT_LOG_FORMAT_VERSION`]
45    pub log_format_version: u8,
46    /// The maximum size in bytes to which log segments should be allowed to
47    /// grow.
48    ///
49    /// Default: 1GiB
50    pub max_segment_size: u64,
51    /// The maximum number of records in a commit.
52    ///
53    /// If this number is exceeded, the commit is flushed to disk even without
54    /// explicitly calling [`Commitlog::flush`].
55    ///
56    /// Default: 65,535
57    pub max_records_in_commit: NonZeroU16,
58    /// Whenever at least this many bytes have been written to the currently
59    /// active segment, an entry is added to its offset index.
60    ///
61    /// Default: 4096
62    pub offset_index_interval_bytes: NonZeroU64,
63    /// If `true`, require that the segment must be synced to disk before an
64    /// index entry is added.
65    ///
66    /// Setting this to `false` (the default) will update the index every
67    /// `offset_index_interval_bytes`, even if the commitlog wasn't synced.
68    /// This means that the index could contain non-existent entries in the
69    /// event of a crash.
70    ///
71    /// Setting it to `true` will update the index when the commitlog is synced,
72    /// and `offset_index_interval_bytes` have been written.
73    /// This means that the index could contain fewer index entries than
74    /// strictly every `offset_index_interval_bytes`.
75    ///
76    /// Default: false
77    pub offset_index_require_segment_fsync: bool,
78}
79
80impl Default for Options {
81    fn default() -> Self {
82        Self {
83            log_format_version: DEFAULT_LOG_FORMAT_VERSION,
84            max_segment_size: 1024 * 1024 * 1024,
85            max_records_in_commit: NonZeroU16::MAX,
86            offset_index_interval_bytes: NonZeroU64::new(4096).unwrap(),
87            offset_index_require_segment_fsync: false,
88        }
89    }
90}
91
92impl Options {
93    /// Compute the length in bytes of an offset index based on the settings in
94    /// `self`.
95    pub fn offset_index_len(&self) -> u64 {
96        self.max_segment_size / self.offset_index_interval_bytes
97    }
98}
99
100/// The canonical commitlog, backed by on-disk log files.
101///
102/// Records in the log are of type `T`, which canonically is instantiated to
103/// [`payload::Txdata`].
104pub struct Commitlog<T> {
105    inner: RwLock<commitlog::Generic<repo::Fs, T>>,
106}
107
108impl<T> Commitlog<T> {
109    /// Open the log at root directory `root` with [`Options`].
110    ///
111    /// The root directory must already exist.
112    ///
113    /// Note that opening a commitlog involves I/O: some consistency checks are
114    /// performed, and the next writing position is determined.
115    ///
116    /// This is only necessary when opening the commitlog for writing. See the
117    /// free-standing functions in this module for how to traverse a read-only
118    /// commitlog.
119    pub fn open(root: CommitLogDir, opts: Options) -> io::Result<Self> {
120        let inner = commitlog::Generic::open(repo::Fs::new(root)?, opts)?;
121
122        Ok(Self {
123            inner: RwLock::new(inner),
124        })
125    }
126
127    /// Determine the maximum transaction offset considered durable.
128    ///
129    /// The offset is `None` if the log hasn't been flushed to disk yet.
130    pub fn max_committed_offset(&self) -> Option<u64> {
131        self.inner.read().unwrap().max_committed_offset()
132    }
133
134    /// Get the current epoch.
135    ///
136    /// See also: [`Commit::epoch`].
137    pub fn epoch(&self) -> u64 {
138        self.inner.read().unwrap().epoch()
139    }
140
141    /// Update the current epoch.
142    ///
143    /// Does nothing if the given `epoch` is equal to the current epoch.
144    /// Otherwise flushes outstanding transactions to disk (equivalent to
145    /// [`Self::flush`]) before updating the epoch.
146    ///
147    /// Returns the maximum transaction offset written to disk. The offset is
148    /// `None` if the log is empty and no data was pending to be flushed.
149    ///
150    /// # Errors
151    ///
152    /// If `epoch` is smaller than the current epoch, an error of kind
153    /// [`io::ErrorKind::InvalidInput`] is returned.
154    ///
155    /// Errors from the implicit flush are propagated.
156    pub fn set_epoch(&self, epoch: u64) -> io::Result<Option<u64>> {
157        let mut inner = self.inner.write().unwrap();
158        inner.set_epoch(epoch)?;
159
160        Ok(inner.max_committed_offset())
161    }
162
163    /// Sync all OS-buffered writes to disk.
164    ///
165    /// Note that this does **not** write outstanding records to disk.
166    /// Use [`Self::flush_and_sync`] or call [`Self::flush`] prior to this
167    /// method to ensure all data is on disk.
168    ///
169    /// Returns the maximum transaction offset which is considered durable after
170    /// this method returns successfully. The offset is `None` if the log hasn't
171    /// been flushed to disk yet.
172    ///
173    /// # Panics
174    ///
175    /// This method panics if syncing fails irrecoverably.
176    pub fn sync(&self) -> Option<u64> {
177        let mut inner = self.inner.write().unwrap();
178        trace!("sync commitlog");
179        inner.sync();
180
181        inner.max_committed_offset()
182    }
183
184    /// Write all outstanding transaction records to disk.
185    ///
186    /// Note that this does **not** force the OS to sync the data to disk.
187    /// Use [`Self::flush_and_sync`] or call [`Self::sync`] after this method
188    /// to ensure all data is on disk.
189    ///
190    /// Returns the maximum transaction offset written to disk. The offset is
191    /// `None` if the log is empty and no data was pending to be flushed.
192    ///
193    /// Repeatedly calling this method may return the same value.
194    pub fn flush(&self) -> io::Result<Option<u64>> {
195        let mut inner = self.inner.write().unwrap();
196        trace!("flush commitlog");
197        inner.commit()?;
198
199        Ok(inner.max_committed_offset())
200    }
201
202    /// Write all outstanding transaction records to disk and flush OS buffers.
203    ///
204    /// Equivalent to calling [`Self::flush`] followed by [`Self::sync`], but
205    /// without releasing the write lock in between.
206    ///
207    /// # Errors
208    ///
209    /// An error is returned if writing to disk fails due to an I/O error.
210    ///
211    /// # Panics
212    ///
213    /// This method panics if syncing fails irrecoverably.
214    pub fn flush_and_sync(&self) -> io::Result<Option<u64>> {
215        let mut inner = self.inner.write().unwrap();
216        trace!("flush and sync commitlog");
217        inner.commit()?;
218        inner.sync();
219
220        Ok(inner.max_committed_offset())
221    }
222
223    /// Obtain an iterator which traverses the log from the start, yielding
224    /// [`StoredCommit`]s.
225    ///
226    /// The returned iterator is not aware of segment rotation. That is, if a
227    /// new segment is created after this method returns, the iterator will not
228    /// traverse it.
229    ///
230    /// Commits appended to the log while it is being traversed are generally
231    /// visible to the iterator. Upon encountering [`io::ErrorKind::UnexpectedEof`],
232    /// however, a new iterator should be created using [`Self::commits_from`]
233    /// with the last transaction offset yielded.
234    ///
235    /// Note that the very last [`StoredCommit`] in a commitlog may be corrupt
236    /// (e.g. due to a partial write to disk), but a subsequent `append` will
237    /// bring the log into a consistent state.
238    ///
239    /// This means that, when this iterator yields an `Err` value, the consumer
240    /// may want to check if the iterator is exhausted (by calling `next()`)
241    /// before treating the `Err` value as an application error.
242    pub fn commits(&self) -> impl Iterator<Item = Result<StoredCommit, error::Traversal>> {
243        self.commits_from(0)
244    }
245
246    /// Obtain an iterator starting from transaction offset `offset`, yielding
247    /// [`StoredCommit`]s.
248    ///
249    /// Similar to [`Self::commits`] but will skip until the offset is contained
250    /// in the next [`StoredCommit`] to yield.
251    ///
252    /// Note that the first [`StoredCommit`] yielded is the first commit
253    /// containing the given transaction offset, i.e. its `min_tx_offset` may be
254    /// smaller than `offset`.
255    pub fn commits_from(&self, offset: u64) -> impl Iterator<Item = Result<StoredCommit, error::Traversal>> {
256        self.inner.read().unwrap().commits_from(offset)
257    }
258
259    /// Get a list of segment offsets, sorted in ascending order.
260    pub fn existing_segment_offsets(&self) -> io::Result<Vec<u64>> {
261        self.inner.read().unwrap().repo.existing_offsets()
262    }
263
264    /// Compress the segments at the offsets provided, marking them as immutable.
265    pub fn compress_segments(&self, offsets: &[u64]) -> io::Result<()> {
266        // even though `compress_segment` takes &self, we take an
267        // exclusive lock to avoid any weirdness happening.
268        #[allow(clippy::readonly_write_lock)]
269        let inner = self.inner.write().unwrap();
270        assert!(!offsets.contains(&inner.head.min_tx_offset()));
271        // TODO: parallelize, maybe
272        offsets
273            .iter()
274            .try_for_each(|&offset| inner.repo.compress_segment(offset))
275    }
276
277    /// Remove all data from the log and reopen it.
278    ///
279    /// Log segments are deleted starting from the newest. As multiple segments
280    /// cannot be deleted atomically, the log may not be completely empty if
281    /// the method returns an error.
282    ///
283    /// Note that the method consumes `self` to ensure the log is not modified
284    /// while resetting.
285    pub fn reset(self) -> io::Result<Self> {
286        let inner = self.inner.into_inner().unwrap().reset()?;
287        Ok(Self {
288            inner: RwLock::new(inner),
289        })
290    }
291
292    /// Remove all data past the given transaction `offset` from the log and
293    /// reopen it.
294    ///
295    /// Like with [`Self::reset`], it may happen that not all segments newer
296    /// than `offset` can be deleted.
297    ///
298    /// If the method returns successfully, the most recent [`Commit`] in the
299    /// log will contain the transaction at `offset`.
300    ///
301    /// Note that the method consumes `self` to ensure the log is not modified
302    /// while resetting.
303    pub fn reset_to(self, offset: u64) -> io::Result<Self> {
304        let inner = self.inner.into_inner().unwrap().reset_to(offset)?;
305        Ok(Self {
306            inner: RwLock::new(inner),
307        })
308    }
309
310    /// Determine the size on disk of this commitlog.
311    pub fn size_on_disk(&self) -> io::Result<u64> {
312        let inner = self.inner.read().unwrap();
313        inner.repo.size_on_disk()
314    }
315}
316
317impl<T: Encode> Commitlog<T> {
318    /// Append the record `txdata` to the log.
319    ///
320    /// If the internal buffer exceeds [`Options::max_records_in_commit`], the
321    /// argument is returned in an `Err`. The caller should [`Self::flush`] the
322    /// log and try again.
323    ///
324    /// In case the log is appended to from multiple threads, this may result in
325    /// a busy loop trying to acquire a slot in the buffer. In such scenarios,
326    /// [`Self::append_maybe_flush`] is preferable.
327    pub fn append(&self, txdata: T) -> Result<(), T> {
328        let mut inner = self.inner.write().unwrap();
329        inner.append(txdata)
330    }
331
332    /// Append the record `txdata` to the log.
333    ///
334    /// The `txdata` payload is buffered in memory until either:
335    ///
336    /// - [`Self::flush`] is called explicitly, or
337    /// - [`Options::max_records_in_commit`] is exceeded
338    ///
339    /// In the latter case, [`Self::append`] flushes implicitly, _before_
340    /// appending the `txdata` argument.
341    ///
342    /// I.e. the argument is not guaranteed to be flushed after the method
343    /// returns. If that is desired, [`Self::flush`] must be called explicitly.
344    ///
345    /// # Errors
346    ///
347    /// If the log needs to be flushed, but an I/O error occurs, ownership of
348    /// `txdata` is returned back to the caller alongside the [`io::Error`].
349    ///
350    /// The value can then be used to retry appending.
351    pub fn append_maybe_flush(&self, txdata: T) -> Result<(), error::Append<T>> {
352        let mut inner = self.inner.write().unwrap();
353
354        if let Err(txdata) = inner.append(txdata) {
355            if let Err(source) = inner.commit() {
356                return Err(error::Append { txdata, source });
357            }
358            // `inner.commit.n` must be zero at this point
359            let res = inner.append(txdata);
360            debug_assert!(res.is_ok(), "failed to append while holding write lock");
361        }
362
363        Ok(())
364    }
365
366    /// Obtain an iterator which traverses the log from the start, yielding
367    /// [`Transaction`]s.
368    ///
369    /// The provided `decoder`'s [`Decoder::decode_record`] method will be
370    /// called [`Commit::n`] times per [`Commit`] to obtain the individual
371    /// transaction payloads.
372    ///
373    /// Like [`Self::commits`], the iterator is not aware of segment rotation.
374    /// That is, if a new segment is created after this method returns, the
375    /// iterator will not traverse it.
376    ///
377    /// Transactions appended to the log while it is being traversed are
378    /// generally visible to the iterator. Upon encountering [`io::ErrorKind::UnexpectedEof`],
379    /// however, a new iterator should be created using [`Self::transactions_from`]
380    /// with the last transaction offset yielded.
381    ///
382    /// Note that the very last [`Commit`] in a commitlog may be corrupt (e.g.
383    /// due to a partial write to disk), but a subsequent `append` will bring
384    /// the log into a consistent state.
385    ///
386    /// This means that, when this iterator yields an `Err` value, the consumer
387    /// may want to check if the iterator is exhausted (by calling `next()`)
388    /// before treating the `Err` value as an application error.
389    pub fn transactions<'a, D>(&self, de: &'a D) -> impl Iterator<Item = Result<Transaction<T>, D::Error>> + 'a
390    where
391        D: Decoder<Record = T>,
392        D::Error: From<error::Traversal>,
393        T: 'a,
394    {
395        self.transactions_from(0, de)
396    }
397
398    /// Obtain an iterator starting from transaction offset `offset`, yielding
399    /// [`Transaction`]s.
400    ///
401    /// Similar to [`Self::transactions`] but will skip until the provided
402    /// `offset`, i.e. the first [`Transaction`] yielded will be the transaction
403    /// with offset `offset`.
404    pub fn transactions_from<'a, D>(
405        &self,
406        offset: u64,
407        de: &'a D,
408    ) -> impl Iterator<Item = Result<Transaction<T>, D::Error>> + 'a
409    where
410        D: Decoder<Record = T>,
411        D::Error: From<error::Traversal>,
412        T: 'a,
413    {
414        self.inner.read().unwrap().transactions_from(offset, de)
415    }
416
417    /// Traverse the log from the start and "fold" its transactions into the
418    /// provided [`Decoder`].
419    ///
420    /// A [`Decoder`] is a stateful object due to the requirement to store
421    /// schema information in the log itself. That is, a [`Decoder`] may need to
422    /// be able to resolve transaction schema information dynamically while
423    /// traversing the log.
424    ///
425    /// This is equivalent to "replaying" a log into a database state. In this
426    /// scenario, it is not interesting to consume the [`Transaction`] payload
427    /// as an iterator.
428    ///
429    /// This method allows the use of a [`Decoder`] which returns zero-sized
430    /// data (e.g. `Decoder<Record = ()>`), as it will not allocate the commit
431    /// payload into a struct.
432    ///
433    /// Note that, unlike [`Self::transactions`], this method will ignore a
434    /// corrupt commit at the very end of the traversed log.
435    pub fn fold_transactions<D>(&self, de: D) -> Result<(), D::Error>
436    where
437        D: Decoder,
438        D::Error: From<error::Traversal>,
439    {
440        self.fold_transactions_from(0, de)
441    }
442
443    /// Traverse the log from the given transaction offset and "fold" its
444    /// transactions into the provided [`Decoder`].
445    ///
446    /// Similar to [`Self::fold_transactions`] but will skip until the provided
447    /// `offset`, i.e. the first `tx_offset` passed to [`Decoder::decode_record`]
448    /// will be equal to `offset`.
449    pub fn fold_transactions_from<D>(&self, offset: u64, de: D) -> Result<(), D::Error>
450    where
451        D: Decoder,
452        D::Error: From<error::Traversal>,
453    {
454        self.inner.read().unwrap().fold_transactions_from(offset, de)
455    }
456}
457
458/// Extract the most recently written [`segment::Metadata`] from the commitlog
459/// in `repo`.
460///
461/// Returns `None` if the commitlog is empty.
462///
463/// Note that this function validates the most recent segment, which entails
464/// traversing it from the start.
465///
466/// The function can be used instead of the pattern:
467///
468/// ```ignore
469/// let log = Commitlog::open(..)?;
470/// let max_offset = log.max_committed_offset();
471/// ```
472///
473/// like so:
474///
475/// ```ignore
476/// let max_offset = committed_meta(..)?.map(|meta| meta.tx_range.end);
477/// ```
478///
479/// Unlike `open`, no segment will be created in an empty `repo`.
480pub fn committed_meta(root: CommitLogDir) -> Result<Option<segment::Metadata>, error::SegmentMetadata> {
481    commitlog::committed_meta(repo::Fs::new(root)?)
482}
483
484/// Obtain an iterator which traverses the commitlog located at the `root`
485/// directory from the start, yielding [`StoredCommit`]s.
486///
487/// Starts the traversal without the upfront I/O imposed by [`Commitlog::open`].
488/// See [`Commitlog::commits`] for more information.
489pub fn commits(root: CommitLogDir) -> io::Result<impl Iterator<Item = Result<StoredCommit, error::Traversal>>> {
490    commits_from(root, 0)
491}
492
493/// Obtain an iterator which traverses the commitlog located at the `root`
494/// directory starting from `offset` and yielding [`StoredCommit`]s.
495///
496/// Starts the traversal without the upfront I/O imposed by [`Commitlog::open`].
497/// See [`Commitlog::commits_from`] for more information.
498pub fn commits_from(
499    root: CommitLogDir,
500    offset: u64,
501) -> io::Result<impl Iterator<Item = Result<StoredCommit, error::Traversal>>> {
502    commitlog::commits_from(repo::Fs::new(root)?, DEFAULT_LOG_FORMAT_VERSION, offset)
503}
504
505/// Obtain an iterator which traverses the commitlog located at the `root`
506/// directory from the start, yielding [`Transaction`]s.
507///
508/// Starts the traversal without the upfront I/O imposed by [`Commitlog::open`].
509/// See [`Commitlog::transactions`] for more information.
510pub fn transactions<'a, D, T>(
511    root: CommitLogDir,
512    de: &'a D,
513) -> io::Result<impl Iterator<Item = Result<Transaction<T>, D::Error>> + 'a>
514where
515    D: Decoder<Record = T>,
516    D::Error: From<error::Traversal>,
517    T: 'a,
518{
519    transactions_from(root, 0, de)
520}
521
522/// Obtain an iterator which traverses the commitlog located at the `root`
523/// directory starting from `offset` and yielding [`Transaction`]s.
524///
525/// Starts the traversal without the upfront I/O imposed by [`Commitlog::open`].
526/// See [`Commitlog::transactions_from`] for more information.
527pub fn transactions_from<'a, D, T>(
528    root: CommitLogDir,
529    offset: u64,
530    de: &'a D,
531) -> io::Result<impl Iterator<Item = Result<Transaction<T>, D::Error>> + 'a>
532where
533    D: Decoder<Record = T>,
534    D::Error: From<error::Traversal>,
535    T: 'a,
536{
537    commitlog::transactions_from(repo::Fs::new(root)?, DEFAULT_LOG_FORMAT_VERSION, offset, de)
538}
539
540/// Traverse the commitlog located at the `root` directory from the start and
541/// "fold" its transactions into the provided [`Decoder`].
542///
543/// Starts the traversal without the upfront I/O imposed by [`Commitlog::open`].
544/// See [`Commitlog::fold_transactions`] for more information.
545pub fn fold_transactions<D>(root: CommitLogDir, de: D) -> Result<(), D::Error>
546where
547    D: Decoder,
548    D::Error: From<error::Traversal> + From<io::Error>,
549{
550    fold_transactions_from(root, 0, de)
551}
552
553/// Traverse the commitlog located at the `root` directory starting from `offset`
554/// and "fold" its transactions into the provided [`Decoder`].
555///
556/// Starts the traversal without the upfront I/O imposed by [`Commitlog::open`].
557/// See [`Commitlog::fold_transactions_from`] for more information.
558pub fn fold_transactions_from<D>(root: CommitLogDir, offset: u64, de: D) -> Result<(), D::Error>
559where
560    D: Decoder,
561    D::Error: From<error::Traversal> + From<io::Error>,
562{
563    commitlog::fold_transactions_from(repo::Fs::new(root)?, DEFAULT_LOG_FORMAT_VERSION, offset, de)
564}