spacetimedb_commitlog/
lib.rs

1use std::{
2    io,
3    num::{NonZeroU16, NonZeroU64},
4    sync::RwLock,
5};
6
7use log::trace;
8use repo::Repo;
9use spacetimedb_paths::server::CommitLogDir;
10
11pub mod commit;
12pub mod commitlog;
13mod index;
14pub mod repo;
15pub mod segment;
16mod varchar;
17mod varint;
18
19pub use crate::{
20    commit::{Commit, StoredCommit},
21    payload::{Decoder, Encode},
22    segment::{Transaction, DEFAULT_LOG_FORMAT_VERSION},
23    varchar::Varchar,
24};
25pub mod error;
26pub mod payload;
27
28#[cfg(feature = "streaming")]
29pub mod stream;
30
31#[cfg(any(test, feature = "test"))]
32pub mod tests;
33
34/// [`Commitlog`] options.
35#[derive(Clone, Copy, Debug, PartialEq)]
36#[cfg_attr(
37    feature = "serde",
38    derive(serde::Serialize, serde::Deserialize),
39    serde(rename_all = "kebab-case")
40)]
41pub struct Options {
42    /// Set the log format version to write, and the maximum supported version.
43    ///
44    /// Choosing a payload format `T` of [`Commitlog`] should usually result in
45    /// updating the [`DEFAULT_LOG_FORMAT_VERSION`] of this crate. Sometimes it
46    /// may however be useful to set the version at runtime, e.g. to experiment
47    /// with new or very old versions.
48    ///
49    /// Default: [`DEFAULT_LOG_FORMAT_VERSION`]
50    #[cfg_attr(feature = "serde", serde(default = "Options::default_log_format_version"))]
51    pub log_format_version: u8,
52    /// The maximum size in bytes to which log segments should be allowed to
53    /// grow.
54    ///
55    /// Default: 1GiB
56    #[cfg_attr(feature = "serde", serde(default = "Options::default_max_segment_size"))]
57    pub max_segment_size: u64,
58    /// The maximum number of records in a commit.
59    ///
60    /// If this number is exceeded, the commit is flushed to disk even without
61    /// explicitly calling [`Commitlog::flush`].
62    ///
63    /// Default: 65,535
64    #[cfg_attr(feature = "serde", serde(default = "Options::default_max_records_in_commit"))]
65    pub max_records_in_commit: NonZeroU16,
66    /// Whenever at least this many bytes have been written to the currently
67    /// active segment, an entry is added to its offset index.
68    ///
69    /// Default: 4096
70    #[cfg_attr(feature = "serde", serde(default = "Options::default_offset_index_interval_bytes"))]
71    pub offset_index_interval_bytes: NonZeroU64,
72    /// If `true`, require that the segment must be synced to disk before an
73    /// index entry is added.
74    ///
75    /// Setting this to `false` (the default) will update the index every
76    /// `offset_index_interval_bytes`, even if the commitlog wasn't synced.
77    /// This means that the index could contain non-existent entries in the
78    /// event of a crash.
79    ///
80    /// Setting it to `true` will update the index when the commitlog is synced,
81    /// and `offset_index_interval_bytes` have been written.
82    /// This means that the index could contain fewer index entries than
83    /// strictly every `offset_index_interval_bytes`.
84    ///
85    /// Default: false
86    #[cfg_attr(
87        feature = "serde",
88        serde(default = "Options::default_offset_index_require_segment_fsync")
89    )]
90    pub offset_index_require_segment_fsync: bool,
91}
92
93impl Default for Options {
94    fn default() -> Self {
95        Self::DEFAULT
96    }
97}
98
99impl Options {
100    pub const DEFAULT_MAX_SEGMENT_SIZE: u64 = 1024 * 1024 * 1024;
101    pub const DEFAULT_MAX_RECORDS_IN_COMMIT: NonZeroU16 = NonZeroU16::MAX;
102    pub const DEFAULT_OFFSET_INDEX_INTERVAL_BYTES: NonZeroU64 = NonZeroU64::new(4096).expect("4096 > 0, qed");
103    pub const DEFAULT_OFFSET_INDEX_REQUIRE_SEGMENT_FSYNC: bool = false;
104
105    pub const DEFAULT: Self = Self {
106        log_format_version: DEFAULT_LOG_FORMAT_VERSION,
107        max_segment_size: Self::default_max_segment_size(),
108        max_records_in_commit: Self::default_max_records_in_commit(),
109        offset_index_interval_bytes: Self::default_offset_index_interval_bytes(),
110        offset_index_require_segment_fsync: Self::default_offset_index_require_segment_fsync(),
111    };
112
113    pub const fn default_log_format_version() -> u8 {
114        DEFAULT_LOG_FORMAT_VERSION
115    }
116
117    pub const fn default_max_segment_size() -> u64 {
118        Self::DEFAULT_MAX_SEGMENT_SIZE
119    }
120
121    pub const fn default_max_records_in_commit() -> NonZeroU16 {
122        Self::DEFAULT_MAX_RECORDS_IN_COMMIT
123    }
124
125    pub const fn default_offset_index_interval_bytes() -> NonZeroU64 {
126        Self::DEFAULT_OFFSET_INDEX_INTERVAL_BYTES
127    }
128
129    pub const fn default_offset_index_require_segment_fsync() -> bool {
130        Self::DEFAULT_OFFSET_INDEX_REQUIRE_SEGMENT_FSYNC
131    }
132
133    /// Compute the length in bytes of an offset index based on the settings in
134    /// `self`.
135    pub fn offset_index_len(&self) -> u64 {
136        self.max_segment_size / self.offset_index_interval_bytes
137    }
138}
139
140/// The canonical commitlog, backed by on-disk log files.
141///
142/// Records in the log are of type `T`, which canonically is instantiated to
143/// [`payload::Txdata`].
144pub struct Commitlog<T> {
145    inner: RwLock<commitlog::Generic<repo::Fs, T>>,
146}
147
148impl<T> Commitlog<T> {
149    /// Open the log at root directory `root` with [`Options`].
150    ///
151    /// The root directory must already exist.
152    ///
153    /// Note that opening a commitlog involves I/O: some consistency checks are
154    /// performed, and the next writing position is determined.
155    ///
156    /// This is only necessary when opening the commitlog for writing. See the
157    /// free-standing functions in this module for how to traverse a read-only
158    /// commitlog.
159    pub fn open(root: CommitLogDir, opts: Options) -> io::Result<Self> {
160        let inner = commitlog::Generic::open(repo::Fs::new(root)?, opts)?;
161
162        Ok(Self {
163            inner: RwLock::new(inner),
164        })
165    }
166
167    /// Determine the maximum transaction offset considered durable.
168    ///
169    /// The offset is `None` if the log hasn't been flushed to disk yet.
170    pub fn max_committed_offset(&self) -> Option<u64> {
171        self.inner.read().unwrap().max_committed_offset()
172    }
173
174    /// Determine the minimum transaction offset in the log.
175    ///
176    /// The offset is `None` if the log hasn't been flushed to disk yet.
177    pub fn min_committed_offset(&self) -> Option<u64> {
178        self.inner.read().unwrap().min_committed_offset()
179    }
180
181    /// Get the current epoch.
182    ///
183    /// See also: [`Commit::epoch`].
184    pub fn epoch(&self) -> u64 {
185        self.inner.read().unwrap().epoch()
186    }
187
188    /// Update the current epoch.
189    ///
190    /// Does nothing if the given `epoch` is equal to the current epoch.
191    /// Otherwise flushes outstanding transactions to disk (equivalent to
192    /// [`Self::flush`]) before updating the epoch.
193    ///
194    /// Returns the maximum transaction offset written to disk. The offset is
195    /// `None` if the log is empty and no data was pending to be flushed.
196    ///
197    /// # Errors
198    ///
199    /// If `epoch` is smaller than the current epoch, an error of kind
200    /// [`io::ErrorKind::InvalidInput`] is returned.
201    ///
202    /// Errors from the implicit flush are propagated.
203    pub fn set_epoch(&self, epoch: u64) -> io::Result<Option<u64>> {
204        let mut inner = self.inner.write().unwrap();
205        inner.set_epoch(epoch)?;
206
207        Ok(inner.max_committed_offset())
208    }
209
210    /// Sync all OS-buffered writes to disk.
211    ///
212    /// Note that this does **not** write outstanding records to disk.
213    /// Use [`Self::flush_and_sync`] or call [`Self::flush`] prior to this
214    /// method to ensure all data is on disk.
215    ///
216    /// Returns the maximum transaction offset which is considered durable after
217    /// this method returns successfully. The offset is `None` if the log hasn't
218    /// been flushed to disk yet.
219    ///
220    /// # Panics
221    ///
222    /// This method panics if syncing fails irrecoverably.
223    pub fn sync(&self) -> Option<u64> {
224        let mut inner = self.inner.write().unwrap();
225        trace!("sync commitlog");
226        inner.sync();
227
228        inner.max_committed_offset()
229    }
230
231    /// Write all outstanding transaction records to disk.
232    ///
233    /// Note that this does **not** force the OS to sync the data to disk.
234    /// Use [`Self::flush_and_sync`] or call [`Self::sync`] after this method
235    /// to ensure all data is on disk.
236    ///
237    /// Returns the maximum transaction offset written to disk. The offset is
238    /// `None` if the log is empty and no data was pending to be flushed.
239    ///
240    /// Repeatedly calling this method may return the same value.
241    pub fn flush(&self) -> io::Result<Option<u64>> {
242        let mut inner = self.inner.write().unwrap();
243        trace!("flush commitlog");
244        inner.commit()?;
245
246        Ok(inner.max_committed_offset())
247    }
248
249    /// Write all outstanding transaction records to disk and flush OS buffers.
250    ///
251    /// Equivalent to calling [`Self::flush`] followed by [`Self::sync`], but
252    /// without releasing the write lock in between.
253    ///
254    /// # Errors
255    ///
256    /// An error is returned if writing to disk fails due to an I/O error.
257    ///
258    /// # Panics
259    ///
260    /// This method panics if syncing fails irrecoverably.
261    pub fn flush_and_sync(&self) -> io::Result<Option<u64>> {
262        let mut inner = self.inner.write().unwrap();
263        trace!("flush and sync commitlog");
264        inner.commit()?;
265        inner.sync();
266
267        Ok(inner.max_committed_offset())
268    }
269
270    /// Obtain an iterator which traverses the log from the start, yielding
271    /// [`StoredCommit`]s.
272    ///
273    /// The returned iterator is not aware of segment rotation. That is, if a
274    /// new segment is created after this method returns, the iterator will not
275    /// traverse it.
276    ///
277    /// Commits appended to the log while it is being traversed are generally
278    /// visible to the iterator. Upon encountering [`io::ErrorKind::UnexpectedEof`],
279    /// however, a new iterator should be created using [`Self::commits_from`]
280    /// with the last transaction offset yielded.
281    ///
282    /// Note that the very last [`StoredCommit`] in a commitlog may be corrupt
283    /// (e.g. due to a partial write to disk), but a subsequent `append` will
284    /// bring the log into a consistent state.
285    ///
286    /// This means that, when this iterator yields an `Err` value, the consumer
287    /// may want to check if the iterator is exhausted (by calling `next()`)
288    /// before treating the `Err` value as an application error.
289    pub fn commits(&self) -> impl Iterator<Item = Result<StoredCommit, error::Traversal>> {
290        self.commits_from(0)
291    }
292
293    /// Obtain an iterator starting from transaction offset `offset`, yielding
294    /// [`StoredCommit`]s.
295    ///
296    /// Similar to [`Self::commits`] but will skip until the offset is contained
297    /// in the next [`StoredCommit`] to yield.
298    ///
299    /// Note that the first [`StoredCommit`] yielded is the first commit
300    /// containing the given transaction offset, i.e. its `min_tx_offset` may be
301    /// smaller than `offset`.
302    pub fn commits_from(&self, offset: u64) -> impl Iterator<Item = Result<StoredCommit, error::Traversal>> {
303        self.inner.read().unwrap().commits_from(offset)
304    }
305
306    /// Get a list of segment offsets, sorted in ascending order.
307    pub fn existing_segment_offsets(&self) -> io::Result<Vec<u64>> {
308        self.inner.read().unwrap().repo.existing_offsets()
309    }
310
311    /// Compress the segments at the offsets provided, marking them as immutable.
312    pub fn compress_segments(&self, offsets: &[u64]) -> io::Result<()> {
313        // even though `compress_segment` takes &self, we take an
314        // exclusive lock to avoid any weirdness happening.
315        #[allow(clippy::readonly_write_lock)]
316        let inner = self.inner.write().unwrap();
317        assert!(!offsets.contains(&inner.head.min_tx_offset()));
318        // TODO: parallelize, maybe
319        offsets
320            .iter()
321            .try_for_each(|&offset| inner.repo.compress_segment(offset))
322    }
323
324    /// Remove all data from the log and reopen it.
325    ///
326    /// Log segments are deleted starting from the newest. As multiple segments
327    /// cannot be deleted atomically, the log may not be completely empty if
328    /// the method returns an error.
329    ///
330    /// Note that the method consumes `self` to ensure the log is not modified
331    /// while resetting.
332    pub fn reset(self) -> io::Result<Self> {
333        let inner = self.inner.into_inner().unwrap().reset()?;
334        Ok(Self {
335            inner: RwLock::new(inner),
336        })
337    }
338
339    /// Remove all data past the given transaction `offset` from the log and
340    /// reopen it.
341    ///
342    /// Like with [`Self::reset`], it may happen that not all segments newer
343    /// than `offset` can be deleted.
344    ///
345    /// If the method returns successfully, the most recent [`Commit`] in the
346    /// log will contain the transaction at `offset`.
347    ///
348    /// Note that the method consumes `self` to ensure the log is not modified
349    /// while resetting.
350    pub fn reset_to(self, offset: u64) -> io::Result<Self> {
351        let inner = self.inner.into_inner().unwrap().reset_to(offset)?;
352        Ok(Self {
353            inner: RwLock::new(inner),
354        })
355    }
356
357    /// Determine the size on disk of this commitlog.
358    pub fn size_on_disk(&self) -> io::Result<u64> {
359        let inner = self.inner.read().unwrap();
360        inner.repo.size_on_disk()
361    }
362}
363
364impl<T: Encode> Commitlog<T> {
365    /// Append the record `txdata` to the log.
366    ///
367    /// If the internal buffer exceeds [`Options::max_records_in_commit`], the
368    /// argument is returned in an `Err`. The caller should [`Self::flush`] the
369    /// log and try again.
370    ///
371    /// In case the log is appended to from multiple threads, this may result in
372    /// a busy loop trying to acquire a slot in the buffer. In such scenarios,
373    /// [`Self::append_maybe_flush`] is preferable.
374    pub fn append(&self, txdata: T) -> Result<(), T> {
375        let mut inner = self.inner.write().unwrap();
376        inner.append(txdata)
377    }
378
379    /// Append the record `txdata` to the log.
380    ///
381    /// The `txdata` payload is buffered in memory until either:
382    ///
383    /// - [`Self::flush`] is called explicitly, or
384    /// - [`Options::max_records_in_commit`] is exceeded
385    ///
386    /// In the latter case, [`Self::append`] flushes implicitly, _before_
387    /// appending the `txdata` argument.
388    ///
389    /// I.e. the argument is not guaranteed to be flushed after the method
390    /// returns. If that is desired, [`Self::flush`] must be called explicitly.
391    ///
392    /// # Errors
393    ///
394    /// If the log needs to be flushed, but an I/O error occurs, ownership of
395    /// `txdata` is returned back to the caller alongside the [`io::Error`].
396    ///
397    /// The value can then be used to retry appending.
398    pub fn append_maybe_flush(&self, txdata: T) -> Result<(), error::Append<T>> {
399        let mut inner = self.inner.write().unwrap();
400
401        if let Err(txdata) = inner.append(txdata) {
402            if let Err(source) = inner.commit() {
403                return Err(error::Append { txdata, source });
404            }
405            // `inner.commit.n` must be zero at this point
406            let res = inner.append(txdata);
407            debug_assert!(res.is_ok(), "failed to append while holding write lock");
408        }
409
410        Ok(())
411    }
412
413    /// Obtain an iterator which traverses the log from the start, yielding
414    /// [`Transaction`]s.
415    ///
416    /// The provided `decoder`'s [`Decoder::decode_record`] method will be
417    /// called [`Commit::n`] times per [`Commit`] to obtain the individual
418    /// transaction payloads.
419    ///
420    /// Like [`Self::commits`], the iterator is not aware of segment rotation.
421    /// That is, if a new segment is created after this method returns, the
422    /// iterator will not traverse it.
423    ///
424    /// Transactions appended to the log while it is being traversed are
425    /// generally visible to the iterator. Upon encountering [`io::ErrorKind::UnexpectedEof`],
426    /// however, a new iterator should be created using [`Self::transactions_from`]
427    /// with the last transaction offset yielded.
428    ///
429    /// Note that the very last [`Commit`] in a commitlog may be corrupt (e.g.
430    /// due to a partial write to disk), but a subsequent `append` will bring
431    /// the log into a consistent state.
432    ///
433    /// This means that, when this iterator yields an `Err` value, the consumer
434    /// may want to check if the iterator is exhausted (by calling `next()`)
435    /// before treating the `Err` value as an application error.
436    pub fn transactions<'a, D>(&self, de: &'a D) -> impl Iterator<Item = Result<Transaction<T>, D::Error>> + 'a
437    where
438        D: Decoder<Record = T>,
439        D::Error: From<error::Traversal>,
440        T: 'a,
441    {
442        self.transactions_from(0, de)
443    }
444
445    /// Obtain an iterator starting from transaction offset `offset`, yielding
446    /// [`Transaction`]s.
447    ///
448    /// Similar to [`Self::transactions`] but will skip until the provided
449    /// `offset`, i.e. the first [`Transaction`] yielded will be the transaction
450    /// with offset `offset`.
451    pub fn transactions_from<'a, D>(
452        &self,
453        offset: u64,
454        de: &'a D,
455    ) -> impl Iterator<Item = Result<Transaction<T>, D::Error>> + 'a
456    where
457        D: Decoder<Record = T>,
458        D::Error: From<error::Traversal>,
459        T: 'a,
460    {
461        self.inner.read().unwrap().transactions_from(offset, de)
462    }
463
464    /// Traverse the log from the start and "fold" its transactions into the
465    /// provided [`Decoder`].
466    ///
467    /// A [`Decoder`] is a stateful object due to the requirement to store
468    /// schema information in the log itself. That is, a [`Decoder`] may need to
469    /// be able to resolve transaction schema information dynamically while
470    /// traversing the log.
471    ///
472    /// This is equivalent to "replaying" a log into a database state. In this
473    /// scenario, it is not interesting to consume the [`Transaction`] payload
474    /// as an iterator.
475    ///
476    /// This method allows the use of a [`Decoder`] which returns zero-sized
477    /// data (e.g. `Decoder<Record = ()>`), as it will not allocate the commit
478    /// payload into a struct.
479    ///
480    /// Note that, unlike [`Self::transactions`], this method will ignore a
481    /// corrupt commit at the very end of the traversed log.
482    pub fn fold_transactions<D>(&self, de: D) -> Result<(), D::Error>
483    where
484        D: Decoder,
485        D::Error: From<error::Traversal>,
486    {
487        self.fold_transactions_from(0, de)
488    }
489
490    /// Traverse the log from the given transaction offset and "fold" its
491    /// transactions into the provided [`Decoder`].
492    ///
493    /// Similar to [`Self::fold_transactions`] but will skip until the provided
494    /// `offset`, i.e. the first `tx_offset` passed to [`Decoder::decode_record`]
495    /// will be equal to `offset`.
496    pub fn fold_transactions_from<D>(&self, offset: u64, de: D) -> Result<(), D::Error>
497    where
498        D: Decoder,
499        D::Error: From<error::Traversal>,
500    {
501        self.inner.read().unwrap().fold_transactions_from(offset, de)
502    }
503}
504
505/// Extract the most recently written [`segment::Metadata`] from the commitlog
506/// in `repo`.
507///
508/// Returns `None` if the commitlog is empty.
509///
510/// Note that this function validates the most recent segment, which entails
511/// traversing it from the start.
512///
513/// The function can be used instead of the pattern:
514///
515/// ```ignore
516/// let log = Commitlog::open(..)?;
517/// let max_offset = log.max_committed_offset();
518/// ```
519///
520/// like so:
521///
522/// ```ignore
523/// let max_offset = committed_meta(..)?.map(|meta| meta.tx_range.end);
524/// ```
525///
526/// Unlike `open`, no segment will be created in an empty `repo`.
527pub fn committed_meta(root: CommitLogDir) -> Result<Option<segment::Metadata>, error::SegmentMetadata> {
528    commitlog::committed_meta(repo::Fs::new(root)?)
529}
530
531/// Obtain an iterator which traverses the commitlog located at the `root`
532/// directory from the start, yielding [`StoredCommit`]s.
533///
534/// Starts the traversal without the upfront I/O imposed by [`Commitlog::open`].
535/// See [`Commitlog::commits`] for more information.
536pub fn commits(root: CommitLogDir) -> io::Result<impl Iterator<Item = Result<StoredCommit, error::Traversal>>> {
537    commits_from(root, 0)
538}
539
540/// Obtain an iterator which traverses the commitlog located at the `root`
541/// directory starting from `offset` and yielding [`StoredCommit`]s.
542///
543/// Starts the traversal without the upfront I/O imposed by [`Commitlog::open`].
544/// See [`Commitlog::commits_from`] for more information.
545pub fn commits_from(
546    root: CommitLogDir,
547    offset: u64,
548) -> io::Result<impl Iterator<Item = Result<StoredCommit, error::Traversal>>> {
549    commitlog::commits_from(repo::Fs::new(root)?, DEFAULT_LOG_FORMAT_VERSION, offset)
550}
551
552/// Obtain an iterator which traverses the commitlog located at the `root`
553/// directory from the start, yielding [`Transaction`]s.
554///
555/// Starts the traversal without the upfront I/O imposed by [`Commitlog::open`].
556/// See [`Commitlog::transactions`] for more information.
557pub fn transactions<'a, D, T>(
558    root: CommitLogDir,
559    de: &'a D,
560) -> io::Result<impl Iterator<Item = Result<Transaction<T>, D::Error>> + 'a>
561where
562    D: Decoder<Record = T>,
563    D::Error: From<error::Traversal>,
564    T: 'a,
565{
566    transactions_from(root, 0, de)
567}
568
569/// Obtain an iterator which traverses the commitlog located at the `root`
570/// directory starting from `offset` and yielding [`Transaction`]s.
571///
572/// Starts the traversal without the upfront I/O imposed by [`Commitlog::open`].
573/// See [`Commitlog::transactions_from`] for more information.
574pub fn transactions_from<'a, D, T>(
575    root: CommitLogDir,
576    offset: u64,
577    de: &'a D,
578) -> io::Result<impl Iterator<Item = Result<Transaction<T>, D::Error>> + 'a>
579where
580    D: Decoder<Record = T>,
581    D::Error: From<error::Traversal>,
582    T: 'a,
583{
584    commitlog::transactions_from(repo::Fs::new(root)?, DEFAULT_LOG_FORMAT_VERSION, offset, de)
585}
586
587/// Traverse the commitlog located at the `root` directory from the start and
588/// "fold" its transactions into the provided [`Decoder`].
589///
590/// Starts the traversal without the upfront I/O imposed by [`Commitlog::open`].
591/// See [`Commitlog::fold_transactions`] for more information.
592pub fn fold_transactions<D>(root: CommitLogDir, de: D) -> Result<(), D::Error>
593where
594    D: Decoder,
595    D::Error: From<error::Traversal> + From<io::Error>,
596{
597    fold_transactions_from(root, 0, de)
598}
599
600/// Traverse the commitlog located at the `root` directory starting from `offset`
601/// and "fold" its transactions into the provided [`Decoder`].
602///
603/// Starts the traversal without the upfront I/O imposed by [`Commitlog::open`].
604/// See [`Commitlog::fold_transactions_from`] for more information.
605pub fn fold_transactions_from<D>(root: CommitLogDir, offset: u64, de: D) -> Result<(), D::Error>
606where
607    D: Decoder,
608    D::Error: From<error::Traversal> + From<io::Error>,
609{
610    commitlog::fold_transactions_from(repo::Fs::new(root)?, DEFAULT_LOG_FORMAT_VERSION, offset, de)
611}