spacetimedb_commitlog/lib.rs
1use std::{
2 io,
3 num::{NonZeroU16, NonZeroU64},
4 sync::RwLock,
5};
6
7use log::trace;
8use repo::Repo;
9use spacetimedb_paths::server::CommitLogDir;
10
11pub mod commit;
12pub mod commitlog;
13mod index;
14pub mod repo;
15pub mod segment;
16mod varchar;
17mod varint;
18
19pub use crate::{
20 commit::{Commit, StoredCommit},
21 payload::{Decoder, Encode},
22 segment::{Transaction, DEFAULT_LOG_FORMAT_VERSION},
23 varchar::Varchar,
24};
25pub mod error;
26pub mod payload;
27
28#[cfg(feature = "streaming")]
29pub mod stream;
30
31#[cfg(any(test, feature = "test"))]
32pub mod tests;
33
34/// [`Commitlog`] options.
35#[derive(Clone, Copy, Debug)]
36pub struct Options {
37 /// Set the log format version to write, and the maximum supported version.
38 ///
39 /// Choosing a payload format `T` of [`Commitlog`] should usually result in
40 /// updating the [`DEFAULT_LOG_FORMAT_VERSION`] of this crate. Sometimes it
41 /// may however be useful to set the version at runtime, e.g. to experiment
42 /// with new or very old versions.
43 ///
44 /// Default: [`DEFAULT_LOG_FORMAT_VERSION`]
45 pub log_format_version: u8,
46 /// The maximum size in bytes to which log segments should be allowed to
47 /// grow.
48 ///
49 /// Default: 1GiB
50 pub max_segment_size: u64,
51 /// The maximum number of records in a commit.
52 ///
53 /// If this number is exceeded, the commit is flushed to disk even without
54 /// explicitly calling [`Commitlog::flush`].
55 ///
56 /// Default: 65,535
57 pub max_records_in_commit: NonZeroU16,
58 /// Whenever at least this many bytes have been written to the currently
59 /// active segment, an entry is added to its offset index.
60 ///
61 /// Default: 4096
62 pub offset_index_interval_bytes: NonZeroU64,
63 /// If `true`, require that the segment must be synced to disk before an
64 /// index entry is added.
65 ///
66 /// Setting this to `false` (the default) will update the index every
67 /// `offset_index_interval_bytes`, even if the commitlog wasn't synced.
68 /// This means that the index could contain non-existent entries in the
69 /// event of a crash.
70 ///
71 /// Setting it to `true` will update the index when the commitlog is synced,
72 /// and `offset_index_interval_bytes` have been written.
73 /// This means that the index could contain fewer index entries than
74 /// strictly every `offset_index_interval_bytes`.
75 ///
76 /// Default: false
77 pub offset_index_require_segment_fsync: bool,
78}
79
80impl Default for Options {
81 fn default() -> Self {
82 Self {
83 log_format_version: DEFAULT_LOG_FORMAT_VERSION,
84 max_segment_size: 1024 * 1024 * 1024,
85 max_records_in_commit: NonZeroU16::MAX,
86 offset_index_interval_bytes: NonZeroU64::new(4096).unwrap(),
87 offset_index_require_segment_fsync: false,
88 }
89 }
90}
91
92impl Options {
93 /// Compute the length in bytes of an offset index based on the settings in
94 /// `self`.
95 pub fn offset_index_len(&self) -> u64 {
96 self.max_segment_size / self.offset_index_interval_bytes
97 }
98}
99
100/// The canonical commitlog, backed by on-disk log files.
101///
102/// Records in the log are of type `T`, which canonically is instantiated to
103/// [`payload::Txdata`].
104pub struct Commitlog<T> {
105 inner: RwLock<commitlog::Generic<repo::Fs, T>>,
106}
107
108impl<T> Commitlog<T> {
109 /// Open the log at root directory `root` with [`Options`].
110 ///
111 /// The root directory must already exist.
112 ///
113 /// Note that opening a commitlog involves I/O: some consistency checks are
114 /// performed, and the next writing position is determined.
115 ///
116 /// This is only necessary when opening the commitlog for writing. See the
117 /// free-standing functions in this module for how to traverse a read-only
118 /// commitlog.
119 pub fn open(root: CommitLogDir, opts: Options) -> io::Result<Self> {
120 let inner = commitlog::Generic::open(repo::Fs::new(root)?, opts)?;
121
122 Ok(Self {
123 inner: RwLock::new(inner),
124 })
125 }
126
127 /// Determine the maximum transaction offset considered durable.
128 ///
129 /// The offset is `None` if the log hasn't been flushed to disk yet.
130 pub fn max_committed_offset(&self) -> Option<u64> {
131 self.inner.read().unwrap().max_committed_offset()
132 }
133
134 /// Get the current epoch.
135 ///
136 /// See also: [`Commit::epoch`].
137 pub fn epoch(&self) -> u64 {
138 self.inner.read().unwrap().epoch()
139 }
140
141 /// Update the current epoch.
142 ///
143 /// Does nothing if the given `epoch` is equal to the current epoch.
144 /// Otherwise flushes outstanding transactions to disk (equivalent to
145 /// [`Self::flush`]) before updating the epoch.
146 ///
147 /// Returns the maximum transaction offset written to disk. The offset is
148 /// `None` if the log is empty and no data was pending to be flushed.
149 ///
150 /// # Errors
151 ///
152 /// If `epoch` is smaller than the current epoch, an error of kind
153 /// [`io::ErrorKind::InvalidInput`] is returned.
154 ///
155 /// Errors from the implicit flush are propagated.
156 pub fn set_epoch(&self, epoch: u64) -> io::Result<Option<u64>> {
157 let mut inner = self.inner.write().unwrap();
158 inner.set_epoch(epoch)?;
159
160 Ok(inner.max_committed_offset())
161 }
162
163 /// Sync all OS-buffered writes to disk.
164 ///
165 /// Note that this does **not** write outstanding records to disk.
166 /// Use [`Self::flush_and_sync`] or call [`Self::flush`] prior to this
167 /// method to ensure all data is on disk.
168 ///
169 /// Returns the maximum transaction offset which is considered durable after
170 /// this method returns successfully. The offset is `None` if the log hasn't
171 /// been flushed to disk yet.
172 ///
173 /// # Panics
174 ///
175 /// This method panics if syncing fails irrecoverably.
176 pub fn sync(&self) -> Option<u64> {
177 let mut inner = self.inner.write().unwrap();
178 trace!("sync commitlog");
179 inner.sync();
180
181 inner.max_committed_offset()
182 }
183
184 /// Write all outstanding transaction records to disk.
185 ///
186 /// Note that this does **not** force the OS to sync the data to disk.
187 /// Use [`Self::flush_and_sync`] or call [`Self::sync`] after this method
188 /// to ensure all data is on disk.
189 ///
190 /// Returns the maximum transaction offset written to disk. The offset is
191 /// `None` if the log is empty and no data was pending to be flushed.
192 ///
193 /// Repeatedly calling this method may return the same value.
194 pub fn flush(&self) -> io::Result<Option<u64>> {
195 let mut inner = self.inner.write().unwrap();
196 trace!("flush commitlog");
197 inner.commit()?;
198
199 Ok(inner.max_committed_offset())
200 }
201
202 /// Write all outstanding transaction records to disk and flush OS buffers.
203 ///
204 /// Equivalent to calling [`Self::flush`] followed by [`Self::sync`], but
205 /// without releasing the write lock in between.
206 ///
207 /// # Errors
208 ///
209 /// An error is returned if writing to disk fails due to an I/O error.
210 ///
211 /// # Panics
212 ///
213 /// This method panics if syncing fails irrecoverably.
214 pub fn flush_and_sync(&self) -> io::Result<Option<u64>> {
215 let mut inner = self.inner.write().unwrap();
216 trace!("flush and sync commitlog");
217 inner.commit()?;
218 inner.sync();
219
220 Ok(inner.max_committed_offset())
221 }
222
223 /// Obtain an iterator which traverses the log from the start, yielding
224 /// [`StoredCommit`]s.
225 ///
226 /// The returned iterator is not aware of segment rotation. That is, if a
227 /// new segment is created after this method returns, the iterator will not
228 /// traverse it.
229 ///
230 /// Commits appended to the log while it is being traversed are generally
231 /// visible to the iterator. Upon encountering [`io::ErrorKind::UnexpectedEof`],
232 /// however, a new iterator should be created using [`Self::commits_from`]
233 /// with the last transaction offset yielded.
234 ///
235 /// Note that the very last [`StoredCommit`] in a commitlog may be corrupt
236 /// (e.g. due to a partial write to disk), but a subsequent `append` will
237 /// bring the log into a consistent state.
238 ///
239 /// This means that, when this iterator yields an `Err` value, the consumer
240 /// may want to check if the iterator is exhausted (by calling `next()`)
241 /// before treating the `Err` value as an application error.
242 pub fn commits(&self) -> impl Iterator<Item = Result<StoredCommit, error::Traversal>> {
243 self.commits_from(0)
244 }
245
246 /// Obtain an iterator starting from transaction offset `offset`, yielding
247 /// [`StoredCommit`]s.
248 ///
249 /// Similar to [`Self::commits`] but will skip until the offset is contained
250 /// in the next [`StoredCommit`] to yield.
251 ///
252 /// Note that the first [`StoredCommit`] yielded is the first commit
253 /// containing the given transaction offset, i.e. its `min_tx_offset` may be
254 /// smaller than `offset`.
255 pub fn commits_from(&self, offset: u64) -> impl Iterator<Item = Result<StoredCommit, error::Traversal>> {
256 self.inner.read().unwrap().commits_from(offset)
257 }
258
259 /// Get a list of segment offsets, sorted in ascending order.
260 pub fn existing_segment_offsets(&self) -> io::Result<Vec<u64>> {
261 self.inner.read().unwrap().repo.existing_offsets()
262 }
263
264 /// Compress the segments at the offsets provided, marking them as immutable.
265 pub fn compress_segments(&self, offsets: &[u64]) -> io::Result<()> {
266 // even though `compress_segment` takes &self, we take an
267 // exclusive lock to avoid any weirdness happening.
268 #[allow(clippy::readonly_write_lock)]
269 let inner = self.inner.write().unwrap();
270 assert!(!offsets.contains(&inner.head.min_tx_offset()));
271 // TODO: parallelize, maybe
272 offsets
273 .iter()
274 .try_for_each(|&offset| inner.repo.compress_segment(offset))
275 }
276
277 /// Remove all data from the log and reopen it.
278 ///
279 /// Log segments are deleted starting from the newest. As multiple segments
280 /// cannot be deleted atomically, the log may not be completely empty if
281 /// the method returns an error.
282 ///
283 /// Note that the method consumes `self` to ensure the log is not modified
284 /// while resetting.
285 pub fn reset(self) -> io::Result<Self> {
286 let inner = self.inner.into_inner().unwrap().reset()?;
287 Ok(Self {
288 inner: RwLock::new(inner),
289 })
290 }
291
292 /// Remove all data past the given transaction `offset` from the log and
293 /// reopen it.
294 ///
295 /// Like with [`Self::reset`], it may happen that not all segments newer
296 /// than `offset` can be deleted.
297 ///
298 /// If the method returns successfully, the most recent [`Commit`] in the
299 /// log will contain the transaction at `offset`.
300 ///
301 /// Note that the method consumes `self` to ensure the log is not modified
302 /// while resetting.
303 pub fn reset_to(self, offset: u64) -> io::Result<Self> {
304 let inner = self.inner.into_inner().unwrap().reset_to(offset)?;
305 Ok(Self {
306 inner: RwLock::new(inner),
307 })
308 }
309
310 /// Determine the size on disk of this commitlog.
311 pub fn size_on_disk(&self) -> io::Result<u64> {
312 let inner = self.inner.read().unwrap();
313 inner.repo.size_on_disk()
314 }
315}
316
317impl<T: Encode> Commitlog<T> {
318 /// Append the record `txdata` to the log.
319 ///
320 /// If the internal buffer exceeds [`Options::max_records_in_commit`], the
321 /// argument is returned in an `Err`. The caller should [`Self::flush`] the
322 /// log and try again.
323 ///
324 /// In case the log is appended to from multiple threads, this may result in
325 /// a busy loop trying to acquire a slot in the buffer. In such scenarios,
326 /// [`Self::append_maybe_flush`] is preferable.
327 pub fn append(&self, txdata: T) -> Result<(), T> {
328 let mut inner = self.inner.write().unwrap();
329 inner.append(txdata)
330 }
331
332 /// Append the record `txdata` to the log.
333 ///
334 /// The `txdata` payload is buffered in memory until either:
335 ///
336 /// - [`Self::flush`] is called explicitly, or
337 /// - [`Options::max_records_in_commit`] is exceeded
338 ///
339 /// In the latter case, [`Self::append`] flushes implicitly, _before_
340 /// appending the `txdata` argument.
341 ///
342 /// I.e. the argument is not guaranteed to be flushed after the method
343 /// returns. If that is desired, [`Self::flush`] must be called explicitly.
344 ///
345 /// # Errors
346 ///
347 /// If the log needs to be flushed, but an I/O error occurs, ownership of
348 /// `txdata` is returned back to the caller alongside the [`io::Error`].
349 ///
350 /// The value can then be used to retry appending.
351 pub fn append_maybe_flush(&self, txdata: T) -> Result<(), error::Append<T>> {
352 let mut inner = self.inner.write().unwrap();
353
354 if let Err(txdata) = inner.append(txdata) {
355 if let Err(source) = inner.commit() {
356 return Err(error::Append { txdata, source });
357 }
358 // `inner.commit.n` must be zero at this point
359 let res = inner.append(txdata);
360 debug_assert!(res.is_ok(), "failed to append while holding write lock");
361 }
362
363 Ok(())
364 }
365
366 /// Obtain an iterator which traverses the log from the start, yielding
367 /// [`Transaction`]s.
368 ///
369 /// The provided `decoder`'s [`Decoder::decode_record`] method will be
370 /// called [`Commit::n`] times per [`Commit`] to obtain the individual
371 /// transaction payloads.
372 ///
373 /// Like [`Self::commits`], the iterator is not aware of segment rotation.
374 /// That is, if a new segment is created after this method returns, the
375 /// iterator will not traverse it.
376 ///
377 /// Transactions appended to the log while it is being traversed are
378 /// generally visible to the iterator. Upon encountering [`io::ErrorKind::UnexpectedEof`],
379 /// however, a new iterator should be created using [`Self::transactions_from`]
380 /// with the last transaction offset yielded.
381 ///
382 /// Note that the very last [`Commit`] in a commitlog may be corrupt (e.g.
383 /// due to a partial write to disk), but a subsequent `append` will bring
384 /// the log into a consistent state.
385 ///
386 /// This means that, when this iterator yields an `Err` value, the consumer
387 /// may want to check if the iterator is exhausted (by calling `next()`)
388 /// before treating the `Err` value as an application error.
389 pub fn transactions<'a, D>(&self, de: &'a D) -> impl Iterator<Item = Result<Transaction<T>, D::Error>> + 'a
390 where
391 D: Decoder<Record = T>,
392 D::Error: From<error::Traversal>,
393 T: 'a,
394 {
395 self.transactions_from(0, de)
396 }
397
398 /// Obtain an iterator starting from transaction offset `offset`, yielding
399 /// [`Transaction`]s.
400 ///
401 /// Similar to [`Self::transactions`] but will skip until the provided
402 /// `offset`, i.e. the first [`Transaction`] yielded will be the transaction
403 /// with offset `offset`.
404 pub fn transactions_from<'a, D>(
405 &self,
406 offset: u64,
407 de: &'a D,
408 ) -> impl Iterator<Item = Result<Transaction<T>, D::Error>> + 'a
409 where
410 D: Decoder<Record = T>,
411 D::Error: From<error::Traversal>,
412 T: 'a,
413 {
414 self.inner.read().unwrap().transactions_from(offset, de)
415 }
416
417 /// Traverse the log from the start and "fold" its transactions into the
418 /// provided [`Decoder`].
419 ///
420 /// A [`Decoder`] is a stateful object due to the requirement to store
421 /// schema information in the log itself. That is, a [`Decoder`] may need to
422 /// be able to resolve transaction schema information dynamically while
423 /// traversing the log.
424 ///
425 /// This is equivalent to "replaying" a log into a database state. In this
426 /// scenario, it is not interesting to consume the [`Transaction`] payload
427 /// as an iterator.
428 ///
429 /// This method allows the use of a [`Decoder`] which returns zero-sized
430 /// data (e.g. `Decoder<Record = ()>`), as it will not allocate the commit
431 /// payload into a struct.
432 ///
433 /// Note that, unlike [`Self::transactions`], this method will ignore a
434 /// corrupt commit at the very end of the traversed log.
435 pub fn fold_transactions<D>(&self, de: D) -> Result<(), D::Error>
436 where
437 D: Decoder,
438 D::Error: From<error::Traversal>,
439 {
440 self.fold_transactions_from(0, de)
441 }
442
443 /// Traverse the log from the given transaction offset and "fold" its
444 /// transactions into the provided [`Decoder`].
445 ///
446 /// Similar to [`Self::fold_transactions`] but will skip until the provided
447 /// `offset`, i.e. the first `tx_offset` passed to [`Decoder::decode_record`]
448 /// will be equal to `offset`.
449 pub fn fold_transactions_from<D>(&self, offset: u64, de: D) -> Result<(), D::Error>
450 where
451 D: Decoder,
452 D::Error: From<error::Traversal>,
453 {
454 self.inner.read().unwrap().fold_transactions_from(offset, de)
455 }
456}
457
458/// Extract the most recently written [`segment::Metadata`] from the commitlog
459/// in `repo`.
460///
461/// Returns `None` if the commitlog is empty.
462///
463/// Note that this function validates the most recent segment, which entails
464/// traversing it from the start.
465///
466/// The function can be used instead of the pattern:
467///
468/// ```ignore
469/// let log = Commitlog::open(..)?;
470/// let max_offset = log.max_committed_offset();
471/// ```
472///
473/// like so:
474///
475/// ```ignore
476/// let max_offset = committed_meta(..)?.map(|meta| meta.tx_range.end);
477/// ```
478///
479/// Unlike `open`, no segment will be created in an empty `repo`.
480pub fn committed_meta(root: CommitLogDir) -> Result<Option<segment::Metadata>, error::SegmentMetadata> {
481 commitlog::committed_meta(repo::Fs::new(root)?)
482}
483
484/// Obtain an iterator which traverses the commitlog located at the `root`
485/// directory from the start, yielding [`StoredCommit`]s.
486///
487/// Starts the traversal without the upfront I/O imposed by [`Commitlog::open`].
488/// See [`Commitlog::commits`] for more information.
489pub fn commits(root: CommitLogDir) -> io::Result<impl Iterator<Item = Result<StoredCommit, error::Traversal>>> {
490 commits_from(root, 0)
491}
492
493/// Obtain an iterator which traverses the commitlog located at the `root`
494/// directory starting from `offset` and yielding [`StoredCommit`]s.
495///
496/// Starts the traversal without the upfront I/O imposed by [`Commitlog::open`].
497/// See [`Commitlog::commits_from`] for more information.
498pub fn commits_from(
499 root: CommitLogDir,
500 offset: u64,
501) -> io::Result<impl Iterator<Item = Result<StoredCommit, error::Traversal>>> {
502 commitlog::commits_from(repo::Fs::new(root)?, DEFAULT_LOG_FORMAT_VERSION, offset)
503}
504
505/// Obtain an iterator which traverses the commitlog located at the `root`
506/// directory from the start, yielding [`Transaction`]s.
507///
508/// Starts the traversal without the upfront I/O imposed by [`Commitlog::open`].
509/// See [`Commitlog::transactions`] for more information.
510pub fn transactions<'a, D, T>(
511 root: CommitLogDir,
512 de: &'a D,
513) -> io::Result<impl Iterator<Item = Result<Transaction<T>, D::Error>> + 'a>
514where
515 D: Decoder<Record = T>,
516 D::Error: From<error::Traversal>,
517 T: 'a,
518{
519 transactions_from(root, 0, de)
520}
521
522/// Obtain an iterator which traverses the commitlog located at the `root`
523/// directory starting from `offset` and yielding [`Transaction`]s.
524///
525/// Starts the traversal without the upfront I/O imposed by [`Commitlog::open`].
526/// See [`Commitlog::transactions_from`] for more information.
527pub fn transactions_from<'a, D, T>(
528 root: CommitLogDir,
529 offset: u64,
530 de: &'a D,
531) -> io::Result<impl Iterator<Item = Result<Transaction<T>, D::Error>> + 'a>
532where
533 D: Decoder<Record = T>,
534 D::Error: From<error::Traversal>,
535 T: 'a,
536{
537 commitlog::transactions_from(repo::Fs::new(root)?, DEFAULT_LOG_FORMAT_VERSION, offset, de)
538}
539
540/// Traverse the commitlog located at the `root` directory from the start and
541/// "fold" its transactions into the provided [`Decoder`].
542///
543/// Starts the traversal without the upfront I/O imposed by [`Commitlog::open`].
544/// See [`Commitlog::fold_transactions`] for more information.
545pub fn fold_transactions<D>(root: CommitLogDir, de: D) -> Result<(), D::Error>
546where
547 D: Decoder,
548 D::Error: From<error::Traversal> + From<io::Error>,
549{
550 fold_transactions_from(root, 0, de)
551}
552
553/// Traverse the commitlog located at the `root` directory starting from `offset`
554/// and "fold" its transactions into the provided [`Decoder`].
555///
556/// Starts the traversal without the upfront I/O imposed by [`Commitlog::open`].
557/// See [`Commitlog::fold_transactions_from`] for more information.
558pub fn fold_transactions_from<D>(root: CommitLogDir, offset: u64, de: D) -> Result<(), D::Error>
559where
560 D: Decoder,
561 D::Error: From<error::Traversal> + From<io::Error>,
562{
563 commitlog::fold_transactions_from(repo::Fs::new(root)?, DEFAULT_LOG_FORMAT_VERSION, offset, de)
564}