spacetimedb_commitlog/lib.rs
1use std::{
2 io,
3 num::{NonZeroU16, NonZeroU64},
4 sync::RwLock,
5};
6
7use log::trace;
8use repo::Repo;
9use spacetimedb_paths::server::CommitLogDir;
10
11pub mod commit;
12pub mod commitlog;
13mod index;
14pub mod repo;
15pub mod segment;
16mod varchar;
17mod varint;
18
19pub use crate::{
20 commit::{Commit, StoredCommit},
21 payload::{Decoder, Encode},
22 segment::{Transaction, DEFAULT_LOG_FORMAT_VERSION},
23 varchar::Varchar,
24};
25pub mod error;
26pub mod payload;
27
28#[cfg(feature = "streaming")]
29pub mod stream;
30
31#[cfg(any(test, feature = "test"))]
32pub mod tests;
33
34/// [`Commitlog`] options.
35#[derive(Clone, Copy, Debug)]
36pub struct Options {
37 /// Set the log format version to write, and the maximum supported version.
38 ///
39 /// Choosing a payload format `T` of [`Commitlog`] should usually result in
40 /// updating the [`DEFAULT_LOG_FORMAT_VERSION`] of this crate. Sometimes it
41 /// may however be useful to set the version at runtime, e.g. to experiment
42 /// with new or very old versions.
43 ///
44 /// Default: [`DEFAULT_LOG_FORMAT_VERSION`]
45 pub log_format_version: u8,
46 /// The maximum size in bytes to which log segments should be allowed to
47 /// grow.
48 ///
49 /// Default: 1GiB
50 pub max_segment_size: u64,
51 /// The maximum number of records in a commit.
52 ///
53 /// If this number is exceeded, the commit is flushed to disk even without
54 /// explicitly calling [`Commitlog::flush`].
55 ///
56 /// Default: 65,535
57 pub max_records_in_commit: NonZeroU16,
58 /// Whenever at least this many bytes have been written to the currently
59 /// active segment, an entry is added to its offset index.
60 ///
61 /// Default: 4096
62 pub offset_index_interval_bytes: NonZeroU64,
63 /// If `true`, require that the segment must be synced to disk before an
64 /// index entry is added.
65 ///
66 /// Setting this to `false` (the default) will update the index every
67 /// `offset_index_interval_bytes`, even if the commitlog wasn't synced.
68 /// This means that the index could contain non-existent entries in the
69 /// event of a crash.
70 ///
71 /// Setting it to `true` will update the index when the commitlog is synced,
72 /// and `offset_index_interval_bytes` have been written.
73 /// This means that the index could contain fewer index entries than
74 /// strictly every `offset_index_interval_bytes`.
75 ///
76 /// Default: false
77 pub offset_index_require_segment_fsync: bool,
78}
79
80impl Default for Options {
81 fn default() -> Self {
82 Self {
83 log_format_version: DEFAULT_LOG_FORMAT_VERSION,
84 max_segment_size: 1024 * 1024 * 1024,
85 max_records_in_commit: NonZeroU16::MAX,
86 offset_index_interval_bytes: NonZeroU64::new(4096).unwrap(),
87 offset_index_require_segment_fsync: false,
88 }
89 }
90}
91
92impl Options {
93 /// Compute the length in bytes of an offset index based on the settings in
94 /// `self`.
95 pub fn offset_index_len(&self) -> u64 {
96 self.max_segment_size / self.offset_index_interval_bytes
97 }
98}
99
100/// The canonical commitlog, backed by on-disk log files.
101///
102/// Records in the log are of type `T`, which canonically is instantiated to
103/// [`payload::Txdata`].
104pub struct Commitlog<T> {
105 inner: RwLock<commitlog::Generic<repo::Fs, T>>,
106}
107
108impl<T> Commitlog<T> {
109 /// Open the log at root directory `root` with [`Options`].
110 ///
111 /// The root directory must already exist.
112 ///
113 /// Note that opening a commitlog involves I/O: some consistency checks are
114 /// performed, and the next writing position is determined.
115 ///
116 /// This is only necessary when opening the commitlog for writing. See the
117 /// free-standing functions in this module for how to traverse a read-only
118 /// commitlog.
119 pub fn open(root: CommitLogDir, opts: Options) -> io::Result<Self> {
120 let inner = commitlog::Generic::open(repo::Fs::new(root)?, opts)?;
121
122 Ok(Self {
123 inner: RwLock::new(inner),
124 })
125 }
126
127 /// Determine the maximum transaction offset considered durable.
128 ///
129 /// The offset is `None` if the log hasn't been flushed to disk yet.
130 pub fn max_committed_offset(&self) -> Option<u64> {
131 self.inner.read().unwrap().max_committed_offset()
132 }
133
134 /// Determine the minimum transaction offset in the log.
135 ///
136 /// The offset is `None` if the log hasn't been flushed to disk yet.
137 pub fn min_committed_offset(&self) -> Option<u64> {
138 self.inner.read().unwrap().min_committed_offset()
139 }
140
141 /// Get the current epoch.
142 ///
143 /// See also: [`Commit::epoch`].
144 pub fn epoch(&self) -> u64 {
145 self.inner.read().unwrap().epoch()
146 }
147
148 /// Update the current epoch.
149 ///
150 /// Does nothing if the given `epoch` is equal to the current epoch.
151 /// Otherwise flushes outstanding transactions to disk (equivalent to
152 /// [`Self::flush`]) before updating the epoch.
153 ///
154 /// Returns the maximum transaction offset written to disk. The offset is
155 /// `None` if the log is empty and no data was pending to be flushed.
156 ///
157 /// # Errors
158 ///
159 /// If `epoch` is smaller than the current epoch, an error of kind
160 /// [`io::ErrorKind::InvalidInput`] is returned.
161 ///
162 /// Errors from the implicit flush are propagated.
163 pub fn set_epoch(&self, epoch: u64) -> io::Result<Option<u64>> {
164 let mut inner = self.inner.write().unwrap();
165 inner.set_epoch(epoch)?;
166
167 Ok(inner.max_committed_offset())
168 }
169
170 /// Sync all OS-buffered writes to disk.
171 ///
172 /// Note that this does **not** write outstanding records to disk.
173 /// Use [`Self::flush_and_sync`] or call [`Self::flush`] prior to this
174 /// method to ensure all data is on disk.
175 ///
176 /// Returns the maximum transaction offset which is considered durable after
177 /// this method returns successfully. The offset is `None` if the log hasn't
178 /// been flushed to disk yet.
179 ///
180 /// # Panics
181 ///
182 /// This method panics if syncing fails irrecoverably.
183 pub fn sync(&self) -> Option<u64> {
184 let mut inner = self.inner.write().unwrap();
185 trace!("sync commitlog");
186 inner.sync();
187
188 inner.max_committed_offset()
189 }
190
191 /// Write all outstanding transaction records to disk.
192 ///
193 /// Note that this does **not** force the OS to sync the data to disk.
194 /// Use [`Self::flush_and_sync`] or call [`Self::sync`] after this method
195 /// to ensure all data is on disk.
196 ///
197 /// Returns the maximum transaction offset written to disk. The offset is
198 /// `None` if the log is empty and no data was pending to be flushed.
199 ///
200 /// Repeatedly calling this method may return the same value.
201 pub fn flush(&self) -> io::Result<Option<u64>> {
202 let mut inner = self.inner.write().unwrap();
203 trace!("flush commitlog");
204 inner.commit()?;
205
206 Ok(inner.max_committed_offset())
207 }
208
209 /// Write all outstanding transaction records to disk and flush OS buffers.
210 ///
211 /// Equivalent to calling [`Self::flush`] followed by [`Self::sync`], but
212 /// without releasing the write lock in between.
213 ///
214 /// # Errors
215 ///
216 /// An error is returned if writing to disk fails due to an I/O error.
217 ///
218 /// # Panics
219 ///
220 /// This method panics if syncing fails irrecoverably.
221 pub fn flush_and_sync(&self) -> io::Result<Option<u64>> {
222 let mut inner = self.inner.write().unwrap();
223 trace!("flush and sync commitlog");
224 inner.commit()?;
225 inner.sync();
226
227 Ok(inner.max_committed_offset())
228 }
229
230 /// Obtain an iterator which traverses the log from the start, yielding
231 /// [`StoredCommit`]s.
232 ///
233 /// The returned iterator is not aware of segment rotation. That is, if a
234 /// new segment is created after this method returns, the iterator will not
235 /// traverse it.
236 ///
237 /// Commits appended to the log while it is being traversed are generally
238 /// visible to the iterator. Upon encountering [`io::ErrorKind::UnexpectedEof`],
239 /// however, a new iterator should be created using [`Self::commits_from`]
240 /// with the last transaction offset yielded.
241 ///
242 /// Note that the very last [`StoredCommit`] in a commitlog may be corrupt
243 /// (e.g. due to a partial write to disk), but a subsequent `append` will
244 /// bring the log into a consistent state.
245 ///
246 /// This means that, when this iterator yields an `Err` value, the consumer
247 /// may want to check if the iterator is exhausted (by calling `next()`)
248 /// before treating the `Err` value as an application error.
249 pub fn commits(&self) -> impl Iterator<Item = Result<StoredCommit, error::Traversal>> {
250 self.commits_from(0)
251 }
252
253 /// Obtain an iterator starting from transaction offset `offset`, yielding
254 /// [`StoredCommit`]s.
255 ///
256 /// Similar to [`Self::commits`] but will skip until the offset is contained
257 /// in the next [`StoredCommit`] to yield.
258 ///
259 /// Note that the first [`StoredCommit`] yielded is the first commit
260 /// containing the given transaction offset, i.e. its `min_tx_offset` may be
261 /// smaller than `offset`.
262 pub fn commits_from(&self, offset: u64) -> impl Iterator<Item = Result<StoredCommit, error::Traversal>> {
263 self.inner.read().unwrap().commits_from(offset)
264 }
265
266 /// Get a list of segment offsets, sorted in ascending order.
267 pub fn existing_segment_offsets(&self) -> io::Result<Vec<u64>> {
268 self.inner.read().unwrap().repo.existing_offsets()
269 }
270
271 /// Compress the segments at the offsets provided, marking them as immutable.
272 pub fn compress_segments(&self, offsets: &[u64]) -> io::Result<()> {
273 // even though `compress_segment` takes &self, we take an
274 // exclusive lock to avoid any weirdness happening.
275 #[allow(clippy::readonly_write_lock)]
276 let inner = self.inner.write().unwrap();
277 assert!(!offsets.contains(&inner.head.min_tx_offset()));
278 // TODO: parallelize, maybe
279 offsets
280 .iter()
281 .try_for_each(|&offset| inner.repo.compress_segment(offset))
282 }
283
284 /// Remove all data from the log and reopen it.
285 ///
286 /// Log segments are deleted starting from the newest. As multiple segments
287 /// cannot be deleted atomically, the log may not be completely empty if
288 /// the method returns an error.
289 ///
290 /// Note that the method consumes `self` to ensure the log is not modified
291 /// while resetting.
292 pub fn reset(self) -> io::Result<Self> {
293 let inner = self.inner.into_inner().unwrap().reset()?;
294 Ok(Self {
295 inner: RwLock::new(inner),
296 })
297 }
298
299 /// Remove all data past the given transaction `offset` from the log and
300 /// reopen it.
301 ///
302 /// Like with [`Self::reset`], it may happen that not all segments newer
303 /// than `offset` can be deleted.
304 ///
305 /// If the method returns successfully, the most recent [`Commit`] in the
306 /// log will contain the transaction at `offset`.
307 ///
308 /// Note that the method consumes `self` to ensure the log is not modified
309 /// while resetting.
310 pub fn reset_to(self, offset: u64) -> io::Result<Self> {
311 let inner = self.inner.into_inner().unwrap().reset_to(offset)?;
312 Ok(Self {
313 inner: RwLock::new(inner),
314 })
315 }
316
317 /// Determine the size on disk of this commitlog.
318 pub fn size_on_disk(&self) -> io::Result<u64> {
319 let inner = self.inner.read().unwrap();
320 inner.repo.size_on_disk()
321 }
322}
323
324impl<T: Encode> Commitlog<T> {
325 /// Append the record `txdata` to the log.
326 ///
327 /// If the internal buffer exceeds [`Options::max_records_in_commit`], the
328 /// argument is returned in an `Err`. The caller should [`Self::flush`] the
329 /// log and try again.
330 ///
331 /// In case the log is appended to from multiple threads, this may result in
332 /// a busy loop trying to acquire a slot in the buffer. In such scenarios,
333 /// [`Self::append_maybe_flush`] is preferable.
334 pub fn append(&self, txdata: T) -> Result<(), T> {
335 let mut inner = self.inner.write().unwrap();
336 inner.append(txdata)
337 }
338
339 /// Append the record `txdata` to the log.
340 ///
341 /// The `txdata` payload is buffered in memory until either:
342 ///
343 /// - [`Self::flush`] is called explicitly, or
344 /// - [`Options::max_records_in_commit`] is exceeded
345 ///
346 /// In the latter case, [`Self::append`] flushes implicitly, _before_
347 /// appending the `txdata` argument.
348 ///
349 /// I.e. the argument is not guaranteed to be flushed after the method
350 /// returns. If that is desired, [`Self::flush`] must be called explicitly.
351 ///
352 /// # Errors
353 ///
354 /// If the log needs to be flushed, but an I/O error occurs, ownership of
355 /// `txdata` is returned back to the caller alongside the [`io::Error`].
356 ///
357 /// The value can then be used to retry appending.
358 pub fn append_maybe_flush(&self, txdata: T) -> Result<(), error::Append<T>> {
359 let mut inner = self.inner.write().unwrap();
360
361 if let Err(txdata) = inner.append(txdata) {
362 if let Err(source) = inner.commit() {
363 return Err(error::Append { txdata, source });
364 }
365 // `inner.commit.n` must be zero at this point
366 let res = inner.append(txdata);
367 debug_assert!(res.is_ok(), "failed to append while holding write lock");
368 }
369
370 Ok(())
371 }
372
373 /// Obtain an iterator which traverses the log from the start, yielding
374 /// [`Transaction`]s.
375 ///
376 /// The provided `decoder`'s [`Decoder::decode_record`] method will be
377 /// called [`Commit::n`] times per [`Commit`] to obtain the individual
378 /// transaction payloads.
379 ///
380 /// Like [`Self::commits`], the iterator is not aware of segment rotation.
381 /// That is, if a new segment is created after this method returns, the
382 /// iterator will not traverse it.
383 ///
384 /// Transactions appended to the log while it is being traversed are
385 /// generally visible to the iterator. Upon encountering [`io::ErrorKind::UnexpectedEof`],
386 /// however, a new iterator should be created using [`Self::transactions_from`]
387 /// with the last transaction offset yielded.
388 ///
389 /// Note that the very last [`Commit`] in a commitlog may be corrupt (e.g.
390 /// due to a partial write to disk), but a subsequent `append` will bring
391 /// the log into a consistent state.
392 ///
393 /// This means that, when this iterator yields an `Err` value, the consumer
394 /// may want to check if the iterator is exhausted (by calling `next()`)
395 /// before treating the `Err` value as an application error.
396 pub fn transactions<'a, D>(&self, de: &'a D) -> impl Iterator<Item = Result<Transaction<T>, D::Error>> + 'a
397 where
398 D: Decoder<Record = T>,
399 D::Error: From<error::Traversal>,
400 T: 'a,
401 {
402 self.transactions_from(0, de)
403 }
404
405 /// Obtain an iterator starting from transaction offset `offset`, yielding
406 /// [`Transaction`]s.
407 ///
408 /// Similar to [`Self::transactions`] but will skip until the provided
409 /// `offset`, i.e. the first [`Transaction`] yielded will be the transaction
410 /// with offset `offset`.
411 pub fn transactions_from<'a, D>(
412 &self,
413 offset: u64,
414 de: &'a D,
415 ) -> impl Iterator<Item = Result<Transaction<T>, D::Error>> + 'a
416 where
417 D: Decoder<Record = T>,
418 D::Error: From<error::Traversal>,
419 T: 'a,
420 {
421 self.inner.read().unwrap().transactions_from(offset, de)
422 }
423
424 /// Traverse the log from the start and "fold" its transactions into the
425 /// provided [`Decoder`].
426 ///
427 /// A [`Decoder`] is a stateful object due to the requirement to store
428 /// schema information in the log itself. That is, a [`Decoder`] may need to
429 /// be able to resolve transaction schema information dynamically while
430 /// traversing the log.
431 ///
432 /// This is equivalent to "replaying" a log into a database state. In this
433 /// scenario, it is not interesting to consume the [`Transaction`] payload
434 /// as an iterator.
435 ///
436 /// This method allows the use of a [`Decoder`] which returns zero-sized
437 /// data (e.g. `Decoder<Record = ()>`), as it will not allocate the commit
438 /// payload into a struct.
439 ///
440 /// Note that, unlike [`Self::transactions`], this method will ignore a
441 /// corrupt commit at the very end of the traversed log.
442 pub fn fold_transactions<D>(&self, de: D) -> Result<(), D::Error>
443 where
444 D: Decoder,
445 D::Error: From<error::Traversal>,
446 {
447 self.fold_transactions_from(0, de)
448 }
449
450 /// Traverse the log from the given transaction offset and "fold" its
451 /// transactions into the provided [`Decoder`].
452 ///
453 /// Similar to [`Self::fold_transactions`] but will skip until the provided
454 /// `offset`, i.e. the first `tx_offset` passed to [`Decoder::decode_record`]
455 /// will be equal to `offset`.
456 pub fn fold_transactions_from<D>(&self, offset: u64, de: D) -> Result<(), D::Error>
457 where
458 D: Decoder,
459 D::Error: From<error::Traversal>,
460 {
461 self.inner.read().unwrap().fold_transactions_from(offset, de)
462 }
463}
464
465/// Extract the most recently written [`segment::Metadata`] from the commitlog
466/// in `repo`.
467///
468/// Returns `None` if the commitlog is empty.
469///
470/// Note that this function validates the most recent segment, which entails
471/// traversing it from the start.
472///
473/// The function can be used instead of the pattern:
474///
475/// ```ignore
476/// let log = Commitlog::open(..)?;
477/// let max_offset = log.max_committed_offset();
478/// ```
479///
480/// like so:
481///
482/// ```ignore
483/// let max_offset = committed_meta(..)?.map(|meta| meta.tx_range.end);
484/// ```
485///
486/// Unlike `open`, no segment will be created in an empty `repo`.
487pub fn committed_meta(root: CommitLogDir) -> Result<Option<segment::Metadata>, error::SegmentMetadata> {
488 commitlog::committed_meta(repo::Fs::new(root)?)
489}
490
491/// Obtain an iterator which traverses the commitlog located at the `root`
492/// directory from the start, yielding [`StoredCommit`]s.
493///
494/// Starts the traversal without the upfront I/O imposed by [`Commitlog::open`].
495/// See [`Commitlog::commits`] for more information.
496pub fn commits(root: CommitLogDir) -> io::Result<impl Iterator<Item = Result<StoredCommit, error::Traversal>>> {
497 commits_from(root, 0)
498}
499
500/// Obtain an iterator which traverses the commitlog located at the `root`
501/// directory starting from `offset` and yielding [`StoredCommit`]s.
502///
503/// Starts the traversal without the upfront I/O imposed by [`Commitlog::open`].
504/// See [`Commitlog::commits_from`] for more information.
505pub fn commits_from(
506 root: CommitLogDir,
507 offset: u64,
508) -> io::Result<impl Iterator<Item = Result<StoredCommit, error::Traversal>>> {
509 commitlog::commits_from(repo::Fs::new(root)?, DEFAULT_LOG_FORMAT_VERSION, offset)
510}
511
512/// Obtain an iterator which traverses the commitlog located at the `root`
513/// directory from the start, yielding [`Transaction`]s.
514///
515/// Starts the traversal without the upfront I/O imposed by [`Commitlog::open`].
516/// See [`Commitlog::transactions`] for more information.
517pub fn transactions<'a, D, T>(
518 root: CommitLogDir,
519 de: &'a D,
520) -> io::Result<impl Iterator<Item = Result<Transaction<T>, D::Error>> + 'a>
521where
522 D: Decoder<Record = T>,
523 D::Error: From<error::Traversal>,
524 T: 'a,
525{
526 transactions_from(root, 0, de)
527}
528
529/// Obtain an iterator which traverses the commitlog located at the `root`
530/// directory starting from `offset` and yielding [`Transaction`]s.
531///
532/// Starts the traversal without the upfront I/O imposed by [`Commitlog::open`].
533/// See [`Commitlog::transactions_from`] for more information.
534pub fn transactions_from<'a, D, T>(
535 root: CommitLogDir,
536 offset: u64,
537 de: &'a D,
538) -> io::Result<impl Iterator<Item = Result<Transaction<T>, D::Error>> + 'a>
539where
540 D: Decoder<Record = T>,
541 D::Error: From<error::Traversal>,
542 T: 'a,
543{
544 commitlog::transactions_from(repo::Fs::new(root)?, DEFAULT_LOG_FORMAT_VERSION, offset, de)
545}
546
547/// Traverse the commitlog located at the `root` directory from the start and
548/// "fold" its transactions into the provided [`Decoder`].
549///
550/// Starts the traversal without the upfront I/O imposed by [`Commitlog::open`].
551/// See [`Commitlog::fold_transactions`] for more information.
552pub fn fold_transactions<D>(root: CommitLogDir, de: D) -> Result<(), D::Error>
553where
554 D: Decoder,
555 D::Error: From<error::Traversal> + From<io::Error>,
556{
557 fold_transactions_from(root, 0, de)
558}
559
560/// Traverse the commitlog located at the `root` directory starting from `offset`
561/// and "fold" its transactions into the provided [`Decoder`].
562///
563/// Starts the traversal without the upfront I/O imposed by [`Commitlog::open`].
564/// See [`Commitlog::fold_transactions_from`] for more information.
565pub fn fold_transactions_from<D>(root: CommitLogDir, offset: u64, de: D) -> Result<(), D::Error>
566where
567 D: Decoder,
568 D::Error: From<error::Traversal> + From<io::Error>,
569{
570 commitlog::fold_transactions_from(repo::Fs::new(root)?, DEFAULT_LOG_FORMAT_VERSION, offset, de)
571}