spacetimedb_commitlog/lib.rs
1use std::{
2 io,
3 num::{NonZeroU16, NonZeroU64},
4 sync::RwLock,
5};
6
7use log::trace;
8use repo::Repo;
9use spacetimedb_paths::server::CommitLogDir;
10
11pub mod commit;
12pub mod commitlog;
13mod index;
14pub mod repo;
15pub mod segment;
16mod varchar;
17mod varint;
18
19pub use crate::{
20 commit::{Commit, StoredCommit},
21 payload::{Decoder, Encode},
22 segment::{Transaction, DEFAULT_LOG_FORMAT_VERSION},
23 varchar::Varchar,
24};
25pub mod error;
26pub mod payload;
27
28#[cfg(feature = "streaming")]
29pub mod stream;
30
31#[cfg(any(test, feature = "test"))]
32pub mod tests;
33
34/// [`Commitlog`] options.
35#[derive(Clone, Copy, Debug, PartialEq)]
36#[cfg_attr(
37 feature = "serde",
38 derive(serde::Serialize, serde::Deserialize),
39 serde(rename_all = "kebab-case")
40)]
41pub struct Options {
42 /// Set the log format version to write, and the maximum supported version.
43 ///
44 /// Choosing a payload format `T` of [`Commitlog`] should usually result in
45 /// updating the [`DEFAULT_LOG_FORMAT_VERSION`] of this crate. Sometimes it
46 /// may however be useful to set the version at runtime, e.g. to experiment
47 /// with new or very old versions.
48 ///
49 /// Default: [`DEFAULT_LOG_FORMAT_VERSION`]
50 #[cfg_attr(feature = "serde", serde(default = "Options::default_log_format_version"))]
51 pub log_format_version: u8,
52 /// The maximum size in bytes to which log segments should be allowed to
53 /// grow.
54 ///
55 /// Default: 1GiB
56 #[cfg_attr(feature = "serde", serde(default = "Options::default_max_segment_size"))]
57 pub max_segment_size: u64,
58 /// The maximum number of records in a commit.
59 ///
60 /// If this number is exceeded, the commit is flushed to disk even without
61 /// explicitly calling [`Commitlog::flush`].
62 ///
63 /// Default: 65,535
64 #[cfg_attr(feature = "serde", serde(default = "Options::default_max_records_in_commit"))]
65 pub max_records_in_commit: NonZeroU16,
66 /// Whenever at least this many bytes have been written to the currently
67 /// active segment, an entry is added to its offset index.
68 ///
69 /// Default: 4096
70 #[cfg_attr(feature = "serde", serde(default = "Options::default_offset_index_interval_bytes"))]
71 pub offset_index_interval_bytes: NonZeroU64,
72 /// If `true`, require that the segment must be synced to disk before an
73 /// index entry is added.
74 ///
75 /// Setting this to `false` (the default) will update the index every
76 /// `offset_index_interval_bytes`, even if the commitlog wasn't synced.
77 /// This means that the index could contain non-existent entries in the
78 /// event of a crash.
79 ///
80 /// Setting it to `true` will update the index when the commitlog is synced,
81 /// and `offset_index_interval_bytes` have been written.
82 /// This means that the index could contain fewer index entries than
83 /// strictly every `offset_index_interval_bytes`.
84 ///
85 /// Default: false
86 #[cfg_attr(
87 feature = "serde",
88 serde(default = "Options::default_offset_index_require_segment_fsync")
89 )]
90 pub offset_index_require_segment_fsync: bool,
91}
92
93impl Default for Options {
94 fn default() -> Self {
95 Self::DEFAULT
96 }
97}
98
99impl Options {
100 pub const DEFAULT_MAX_SEGMENT_SIZE: u64 = 1024 * 1024 * 1024;
101 pub const DEFAULT_MAX_RECORDS_IN_COMMIT: NonZeroU16 = NonZeroU16::MAX;
102 pub const DEFAULT_OFFSET_INDEX_INTERVAL_BYTES: NonZeroU64 = NonZeroU64::new(4096).expect("4096 > 0, qed");
103 pub const DEFAULT_OFFSET_INDEX_REQUIRE_SEGMENT_FSYNC: bool = false;
104
105 pub const DEFAULT: Self = Self {
106 log_format_version: DEFAULT_LOG_FORMAT_VERSION,
107 max_segment_size: Self::default_max_segment_size(),
108 max_records_in_commit: Self::default_max_records_in_commit(),
109 offset_index_interval_bytes: Self::default_offset_index_interval_bytes(),
110 offset_index_require_segment_fsync: Self::default_offset_index_require_segment_fsync(),
111 };
112
113 pub const fn default_log_format_version() -> u8 {
114 DEFAULT_LOG_FORMAT_VERSION
115 }
116
117 pub const fn default_max_segment_size() -> u64 {
118 Self::DEFAULT_MAX_SEGMENT_SIZE
119 }
120
121 pub const fn default_max_records_in_commit() -> NonZeroU16 {
122 Self::DEFAULT_MAX_RECORDS_IN_COMMIT
123 }
124
125 pub const fn default_offset_index_interval_bytes() -> NonZeroU64 {
126 Self::DEFAULT_OFFSET_INDEX_INTERVAL_BYTES
127 }
128
129 pub const fn default_offset_index_require_segment_fsync() -> bool {
130 Self::DEFAULT_OFFSET_INDEX_REQUIRE_SEGMENT_FSYNC
131 }
132
133 /// Compute the length in bytes of an offset index based on the settings in
134 /// `self`.
135 pub fn offset_index_len(&self) -> u64 {
136 self.max_segment_size / self.offset_index_interval_bytes
137 }
138}
139
140/// The canonical commitlog, backed by on-disk log files.
141///
142/// Records in the log are of type `T`, which canonically is instantiated to
143/// [`payload::Txdata`].
144pub struct Commitlog<T> {
145 inner: RwLock<commitlog::Generic<repo::Fs, T>>,
146}
147
148impl<T> Commitlog<T> {
149 /// Open the log at root directory `root` with [`Options`].
150 ///
151 /// The root directory must already exist.
152 ///
153 /// Note that opening a commitlog involves I/O: some consistency checks are
154 /// performed, and the next writing position is determined.
155 ///
156 /// This is only necessary when opening the commitlog for writing. See the
157 /// free-standing functions in this module for how to traverse a read-only
158 /// commitlog.
159 pub fn open(root: CommitLogDir, opts: Options) -> io::Result<Self> {
160 let inner = commitlog::Generic::open(repo::Fs::new(root)?, opts)?;
161
162 Ok(Self {
163 inner: RwLock::new(inner),
164 })
165 }
166
167 /// Determine the maximum transaction offset considered durable.
168 ///
169 /// The offset is `None` if the log hasn't been flushed to disk yet.
170 pub fn max_committed_offset(&self) -> Option<u64> {
171 self.inner.read().unwrap().max_committed_offset()
172 }
173
174 /// Determine the minimum transaction offset in the log.
175 ///
176 /// The offset is `None` if the log hasn't been flushed to disk yet.
177 pub fn min_committed_offset(&self) -> Option<u64> {
178 self.inner.read().unwrap().min_committed_offset()
179 }
180
181 /// Get the current epoch.
182 ///
183 /// See also: [`Commit::epoch`].
184 pub fn epoch(&self) -> u64 {
185 self.inner.read().unwrap().epoch()
186 }
187
188 /// Update the current epoch.
189 ///
190 /// Does nothing if the given `epoch` is equal to the current epoch.
191 /// Otherwise flushes outstanding transactions to disk (equivalent to
192 /// [`Self::flush`]) before updating the epoch.
193 ///
194 /// Returns the maximum transaction offset written to disk. The offset is
195 /// `None` if the log is empty and no data was pending to be flushed.
196 ///
197 /// # Errors
198 ///
199 /// If `epoch` is smaller than the current epoch, an error of kind
200 /// [`io::ErrorKind::InvalidInput`] is returned.
201 ///
202 /// Errors from the implicit flush are propagated.
203 pub fn set_epoch(&self, epoch: u64) -> io::Result<Option<u64>> {
204 let mut inner = self.inner.write().unwrap();
205 inner.set_epoch(epoch)?;
206
207 Ok(inner.max_committed_offset())
208 }
209
210 /// Sync all OS-buffered writes to disk.
211 ///
212 /// Note that this does **not** write outstanding records to disk.
213 /// Use [`Self::flush_and_sync`] or call [`Self::flush`] prior to this
214 /// method to ensure all data is on disk.
215 ///
216 /// Returns the maximum transaction offset which is considered durable after
217 /// this method returns successfully. The offset is `None` if the log hasn't
218 /// been flushed to disk yet.
219 ///
220 /// # Panics
221 ///
222 /// This method panics if syncing fails irrecoverably.
223 pub fn sync(&self) -> Option<u64> {
224 let mut inner = self.inner.write().unwrap();
225 trace!("sync commitlog");
226 inner.sync();
227
228 inner.max_committed_offset()
229 }
230
231 /// Write all outstanding transaction records to disk.
232 ///
233 /// Note that this does **not** force the OS to sync the data to disk.
234 /// Use [`Self::flush_and_sync`] or call [`Self::sync`] after this method
235 /// to ensure all data is on disk.
236 ///
237 /// Returns the maximum transaction offset written to disk. The offset is
238 /// `None` if the log is empty and no data was pending to be flushed.
239 ///
240 /// Repeatedly calling this method may return the same value.
241 pub fn flush(&self) -> io::Result<Option<u64>> {
242 let mut inner = self.inner.write().unwrap();
243 trace!("flush commitlog");
244 inner.commit()?;
245
246 Ok(inner.max_committed_offset())
247 }
248
249 /// Write all outstanding transaction records to disk and flush OS buffers.
250 ///
251 /// Equivalent to calling [`Self::flush`] followed by [`Self::sync`], but
252 /// without releasing the write lock in between.
253 ///
254 /// # Errors
255 ///
256 /// An error is returned if writing to disk fails due to an I/O error.
257 ///
258 /// # Panics
259 ///
260 /// This method panics if syncing fails irrecoverably.
261 pub fn flush_and_sync(&self) -> io::Result<Option<u64>> {
262 let mut inner = self.inner.write().unwrap();
263 trace!("flush and sync commitlog");
264 inner.commit()?;
265 inner.sync();
266
267 Ok(inner.max_committed_offset())
268 }
269
270 /// Obtain an iterator which traverses the log from the start, yielding
271 /// [`StoredCommit`]s.
272 ///
273 /// The returned iterator is not aware of segment rotation. That is, if a
274 /// new segment is created after this method returns, the iterator will not
275 /// traverse it.
276 ///
277 /// Commits appended to the log while it is being traversed are generally
278 /// visible to the iterator. Upon encountering [`io::ErrorKind::UnexpectedEof`],
279 /// however, a new iterator should be created using [`Self::commits_from`]
280 /// with the last transaction offset yielded.
281 ///
282 /// Note that the very last [`StoredCommit`] in a commitlog may be corrupt
283 /// (e.g. due to a partial write to disk), but a subsequent `append` will
284 /// bring the log into a consistent state.
285 ///
286 /// This means that, when this iterator yields an `Err` value, the consumer
287 /// may want to check if the iterator is exhausted (by calling `next()`)
288 /// before treating the `Err` value as an application error.
289 pub fn commits(&self) -> impl Iterator<Item = Result<StoredCommit, error::Traversal>> {
290 self.commits_from(0)
291 }
292
293 /// Obtain an iterator starting from transaction offset `offset`, yielding
294 /// [`StoredCommit`]s.
295 ///
296 /// Similar to [`Self::commits`] but will skip until the offset is contained
297 /// in the next [`StoredCommit`] to yield.
298 ///
299 /// Note that the first [`StoredCommit`] yielded is the first commit
300 /// containing the given transaction offset, i.e. its `min_tx_offset` may be
301 /// smaller than `offset`.
302 pub fn commits_from(&self, offset: u64) -> impl Iterator<Item = Result<StoredCommit, error::Traversal>> {
303 self.inner.read().unwrap().commits_from(offset)
304 }
305
306 /// Get a list of segment offsets, sorted in ascending order.
307 pub fn existing_segment_offsets(&self) -> io::Result<Vec<u64>> {
308 self.inner.read().unwrap().repo.existing_offsets()
309 }
310
311 /// Compress the segments at the offsets provided, marking them as immutable.
312 pub fn compress_segments(&self, offsets: &[u64]) -> io::Result<()> {
313 // even though `compress_segment` takes &self, we take an
314 // exclusive lock to avoid any weirdness happening.
315 #[allow(clippy::readonly_write_lock)]
316 let inner = self.inner.write().unwrap();
317 assert!(!offsets.contains(&inner.head.min_tx_offset()));
318 // TODO: parallelize, maybe
319 offsets
320 .iter()
321 .try_for_each(|&offset| inner.repo.compress_segment(offset))
322 }
323
324 /// Remove all data from the log and reopen it.
325 ///
326 /// Log segments are deleted starting from the newest. As multiple segments
327 /// cannot be deleted atomically, the log may not be completely empty if
328 /// the method returns an error.
329 ///
330 /// Note that the method consumes `self` to ensure the log is not modified
331 /// while resetting.
332 pub fn reset(self) -> io::Result<Self> {
333 let inner = self.inner.into_inner().unwrap().reset()?;
334 Ok(Self {
335 inner: RwLock::new(inner),
336 })
337 }
338
339 /// Remove all data past the given transaction `offset` from the log and
340 /// reopen it.
341 ///
342 /// Like with [`Self::reset`], it may happen that not all segments newer
343 /// than `offset` can be deleted.
344 ///
345 /// If the method returns successfully, the most recent [`Commit`] in the
346 /// log will contain the transaction at `offset`.
347 ///
348 /// Note that the method consumes `self` to ensure the log is not modified
349 /// while resetting.
350 pub fn reset_to(self, offset: u64) -> io::Result<Self> {
351 let inner = self.inner.into_inner().unwrap().reset_to(offset)?;
352 Ok(Self {
353 inner: RwLock::new(inner),
354 })
355 }
356
357 /// Determine the size on disk of this commitlog.
358 pub fn size_on_disk(&self) -> io::Result<u64> {
359 let inner = self.inner.read().unwrap();
360 inner.repo.size_on_disk()
361 }
362}
363
364impl<T: Encode> Commitlog<T> {
365 /// Append the record `txdata` to the log.
366 ///
367 /// If the internal buffer exceeds [`Options::max_records_in_commit`], the
368 /// argument is returned in an `Err`. The caller should [`Self::flush`] the
369 /// log and try again.
370 ///
371 /// In case the log is appended to from multiple threads, this may result in
372 /// a busy loop trying to acquire a slot in the buffer. In such scenarios,
373 /// [`Self::append_maybe_flush`] is preferable.
374 pub fn append(&self, txdata: T) -> Result<(), T> {
375 let mut inner = self.inner.write().unwrap();
376 inner.append(txdata)
377 }
378
379 /// Append the record `txdata` to the log.
380 ///
381 /// The `txdata` payload is buffered in memory until either:
382 ///
383 /// - [`Self::flush`] is called explicitly, or
384 /// - [`Options::max_records_in_commit`] is exceeded
385 ///
386 /// In the latter case, [`Self::append`] flushes implicitly, _before_
387 /// appending the `txdata` argument.
388 ///
389 /// I.e. the argument is not guaranteed to be flushed after the method
390 /// returns. If that is desired, [`Self::flush`] must be called explicitly.
391 ///
392 /// # Errors
393 ///
394 /// If the log needs to be flushed, but an I/O error occurs, ownership of
395 /// `txdata` is returned back to the caller alongside the [`io::Error`].
396 ///
397 /// The value can then be used to retry appending.
398 pub fn append_maybe_flush(&self, txdata: T) -> Result<(), error::Append<T>> {
399 let mut inner = self.inner.write().unwrap();
400
401 if let Err(txdata) = inner.append(txdata) {
402 if let Err(source) = inner.commit() {
403 return Err(error::Append { txdata, source });
404 }
405 // `inner.commit.n` must be zero at this point
406 let res = inner.append(txdata);
407 debug_assert!(res.is_ok(), "failed to append while holding write lock");
408 }
409
410 Ok(())
411 }
412
413 /// Obtain an iterator which traverses the log from the start, yielding
414 /// [`Transaction`]s.
415 ///
416 /// The provided `decoder`'s [`Decoder::decode_record`] method will be
417 /// called [`Commit::n`] times per [`Commit`] to obtain the individual
418 /// transaction payloads.
419 ///
420 /// Like [`Self::commits`], the iterator is not aware of segment rotation.
421 /// That is, if a new segment is created after this method returns, the
422 /// iterator will not traverse it.
423 ///
424 /// Transactions appended to the log while it is being traversed are
425 /// generally visible to the iterator. Upon encountering [`io::ErrorKind::UnexpectedEof`],
426 /// however, a new iterator should be created using [`Self::transactions_from`]
427 /// with the last transaction offset yielded.
428 ///
429 /// Note that the very last [`Commit`] in a commitlog may be corrupt (e.g.
430 /// due to a partial write to disk), but a subsequent `append` will bring
431 /// the log into a consistent state.
432 ///
433 /// This means that, when this iterator yields an `Err` value, the consumer
434 /// may want to check if the iterator is exhausted (by calling `next()`)
435 /// before treating the `Err` value as an application error.
436 pub fn transactions<'a, D>(&self, de: &'a D) -> impl Iterator<Item = Result<Transaction<T>, D::Error>> + 'a
437 where
438 D: Decoder<Record = T>,
439 D::Error: From<error::Traversal>,
440 T: 'a,
441 {
442 self.transactions_from(0, de)
443 }
444
445 /// Obtain an iterator starting from transaction offset `offset`, yielding
446 /// [`Transaction`]s.
447 ///
448 /// Similar to [`Self::transactions`] but will skip until the provided
449 /// `offset`, i.e. the first [`Transaction`] yielded will be the transaction
450 /// with offset `offset`.
451 pub fn transactions_from<'a, D>(
452 &self,
453 offset: u64,
454 de: &'a D,
455 ) -> impl Iterator<Item = Result<Transaction<T>, D::Error>> + 'a
456 where
457 D: Decoder<Record = T>,
458 D::Error: From<error::Traversal>,
459 T: 'a,
460 {
461 self.inner.read().unwrap().transactions_from(offset, de)
462 }
463
464 /// Traverse the log from the start and "fold" its transactions into the
465 /// provided [`Decoder`].
466 ///
467 /// A [`Decoder`] is a stateful object due to the requirement to store
468 /// schema information in the log itself. That is, a [`Decoder`] may need to
469 /// be able to resolve transaction schema information dynamically while
470 /// traversing the log.
471 ///
472 /// This is equivalent to "replaying" a log into a database state. In this
473 /// scenario, it is not interesting to consume the [`Transaction`] payload
474 /// as an iterator.
475 ///
476 /// This method allows the use of a [`Decoder`] which returns zero-sized
477 /// data (e.g. `Decoder<Record = ()>`), as it will not allocate the commit
478 /// payload into a struct.
479 ///
480 /// Note that, unlike [`Self::transactions`], this method will ignore a
481 /// corrupt commit at the very end of the traversed log.
482 pub fn fold_transactions<D>(&self, de: D) -> Result<(), D::Error>
483 where
484 D: Decoder,
485 D::Error: From<error::Traversal>,
486 {
487 self.fold_transactions_from(0, de)
488 }
489
490 /// Traverse the log from the given transaction offset and "fold" its
491 /// transactions into the provided [`Decoder`].
492 ///
493 /// Similar to [`Self::fold_transactions`] but will skip until the provided
494 /// `offset`, i.e. the first `tx_offset` passed to [`Decoder::decode_record`]
495 /// will be equal to `offset`.
496 pub fn fold_transactions_from<D>(&self, offset: u64, de: D) -> Result<(), D::Error>
497 where
498 D: Decoder,
499 D::Error: From<error::Traversal>,
500 {
501 self.inner.read().unwrap().fold_transactions_from(offset, de)
502 }
503}
504
505/// Extract the most recently written [`segment::Metadata`] from the commitlog
506/// in `repo`.
507///
508/// Returns `None` if the commitlog is empty.
509///
510/// Note that this function validates the most recent segment, which entails
511/// traversing it from the start.
512///
513/// The function can be used instead of the pattern:
514///
515/// ```ignore
516/// let log = Commitlog::open(..)?;
517/// let max_offset = log.max_committed_offset();
518/// ```
519///
520/// like so:
521///
522/// ```ignore
523/// let max_offset = committed_meta(..)?.map(|meta| meta.tx_range.end);
524/// ```
525///
526/// Unlike `open`, no segment will be created in an empty `repo`.
527pub fn committed_meta(root: CommitLogDir) -> Result<Option<segment::Metadata>, error::SegmentMetadata> {
528 commitlog::committed_meta(repo::Fs::new(root)?)
529}
530
531/// Obtain an iterator which traverses the commitlog located at the `root`
532/// directory from the start, yielding [`StoredCommit`]s.
533///
534/// Starts the traversal without the upfront I/O imposed by [`Commitlog::open`].
535/// See [`Commitlog::commits`] for more information.
536pub fn commits(root: CommitLogDir) -> io::Result<impl Iterator<Item = Result<StoredCommit, error::Traversal>>> {
537 commits_from(root, 0)
538}
539
540/// Obtain an iterator which traverses the commitlog located at the `root`
541/// directory starting from `offset` and yielding [`StoredCommit`]s.
542///
543/// Starts the traversal without the upfront I/O imposed by [`Commitlog::open`].
544/// See [`Commitlog::commits_from`] for more information.
545pub fn commits_from(
546 root: CommitLogDir,
547 offset: u64,
548) -> io::Result<impl Iterator<Item = Result<StoredCommit, error::Traversal>>> {
549 commitlog::commits_from(repo::Fs::new(root)?, DEFAULT_LOG_FORMAT_VERSION, offset)
550}
551
552/// Obtain an iterator which traverses the commitlog located at the `root`
553/// directory from the start, yielding [`Transaction`]s.
554///
555/// Starts the traversal without the upfront I/O imposed by [`Commitlog::open`].
556/// See [`Commitlog::transactions`] for more information.
557pub fn transactions<'a, D, T>(
558 root: CommitLogDir,
559 de: &'a D,
560) -> io::Result<impl Iterator<Item = Result<Transaction<T>, D::Error>> + 'a>
561where
562 D: Decoder<Record = T>,
563 D::Error: From<error::Traversal>,
564 T: 'a,
565{
566 transactions_from(root, 0, de)
567}
568
569/// Obtain an iterator which traverses the commitlog located at the `root`
570/// directory starting from `offset` and yielding [`Transaction`]s.
571///
572/// Starts the traversal without the upfront I/O imposed by [`Commitlog::open`].
573/// See [`Commitlog::transactions_from`] for more information.
574pub fn transactions_from<'a, D, T>(
575 root: CommitLogDir,
576 offset: u64,
577 de: &'a D,
578) -> io::Result<impl Iterator<Item = Result<Transaction<T>, D::Error>> + 'a>
579where
580 D: Decoder<Record = T>,
581 D::Error: From<error::Traversal>,
582 T: 'a,
583{
584 commitlog::transactions_from(repo::Fs::new(root)?, DEFAULT_LOG_FORMAT_VERSION, offset, de)
585}
586
587/// Traverse the commitlog located at the `root` directory from the start and
588/// "fold" its transactions into the provided [`Decoder`].
589///
590/// Starts the traversal without the upfront I/O imposed by [`Commitlog::open`].
591/// See [`Commitlog::fold_transactions`] for more information.
592pub fn fold_transactions<D>(root: CommitLogDir, de: D) -> Result<(), D::Error>
593where
594 D: Decoder,
595 D::Error: From<error::Traversal> + From<io::Error>,
596{
597 fold_transactions_from(root, 0, de)
598}
599
600/// Traverse the commitlog located at the `root` directory starting from `offset`
601/// and "fold" its transactions into the provided [`Decoder`].
602///
603/// Starts the traversal without the upfront I/O imposed by [`Commitlog::open`].
604/// See [`Commitlog::fold_transactions_from`] for more information.
605pub fn fold_transactions_from<D>(root: CommitLogDir, offset: u64, de: D) -> Result<(), D::Error>
606where
607 D: Decoder,
608 D::Error: From<error::Traversal> + From<io::Error>,
609{
610 commitlog::fold_transactions_from(repo::Fs::new(root)?, DEFAULT_LOG_FORMAT_VERSION, offset, de)
611}