slatedb 0.12.1

A cloud native embedded storage engine built on object storage.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
//! This module provides a read-only API for inspecting WAL SST files that were
//! flushed by SlateDB writers.
//!
//! The API has the following main types:
//!
//! - [`WalReader`]: opens a WAL namespace and lists WAL files.
//! - [`WalFile`]: one WAL file (`id`) plus accessors for metadata and contents.
//! - [`WalFileMetadata`]: metadata for one WAL file (`last_modified_dt`, `size_bytes`).
//! - [`WalFileIterator`]: entry-level iterator over a WAL file.
//!
//! WAL files returned by [`WalReader::list`] are ordered by WAL ID in ascending
//! order. Iterating each file with [`WalFile::iterator`] yields [`RowEntry`]
//! values in the order they are stored in that WAL SST.
//!
//! `WalFileIterator` intentionally exposes `next`, not `next`. This keeps
//! the API at [`RowEntry`] level and preserves tombstones and merge rows exactly
//! as written to the WAL.
//!
//! # Listing costs and polling strategy
//!
//! The `list()` API can become expensive when WAL retention is high or GC is not
//! keeping up. If the GC is not running, listings can grow without bound. Even
//! with GC, CDC often needs higher retention. Retaining WAL files for just
//! 1 hour can yield tens of thousands of files, which is expensive to list in
//! both cost and time.
//!
//! If you plan to poll frequently, use `list()` once to establish a baseline,
//! then poll with `WalReader::get(latest_id + 1)` to avoid repeated large
//! listings.
//!
//! # Example
//!
//! ```
//! use slatedb::config::{FlushOptions, FlushType, PutOptions, WriteOptions};
//! use slatedb::object_store::memory::InMemory;
//! use slatedb::{Db, Error, WalReader};
//! use std::sync::Arc;
//!
//! #[tokio::main]
//! async fn main() -> Result<(), Error> {
//!     let object_store = Arc::new(InMemory::new());
//!     let path = "/wal_reader_example";
//!     let db = Db::open(path, object_store.clone()).await?;
//!
//!     db.put_with_options(
//!         b"k1",
//!         b"v1",
//!         &PutOptions::default(),
//!         &WriteOptions::default(),
//!     )
//!     .await?;
//!     db.flush_with_options(FlushOptions {
//!         flush_type: FlushType::Wal,
//!     })
//!     .await?;
//!
//!     let reader = WalReader::new(path, object_store);
//!     for wal_file in reader.list(..).await? {
//!         let mut iter = wal_file.iterator().await?;
//!         while let Some(entry) = iter.next().await? {
//!             let _ = entry;
//!         }
//!     }
//!     Ok(())
//! }
//! ```

use std::ops::RangeBounds;
use std::sync::Arc;

use chrono::{DateTime, Utc};
use object_store::path::Path;
use object_store::ObjectStore;

use crate::db_state::SsTableId;
use crate::format::sst::SsTableFormat;
use crate::iter::{EmptyIterator, RowEntryIterator};
use crate::manifest::SsTableView;
use crate::object_stores::ObjectStores;
use crate::sst_iter::{SstIterator, SstIteratorOptions};
use crate::tablestore::TableStore;
use crate::types::RowEntry;

/// Iterator over entries in a WAL file.
pub struct WalFileIterator {
    iter: Box<dyn RowEntryIterator + 'static>,
}

impl WalFileIterator {
    /// Creates a new WAL file iterator from a boxed `RowEntryIterator`. The iterator
    /// must be initialized before being passed in.
    fn new(iter: Box<dyn RowEntryIterator + 'static>) -> Self {
        Self { iter }
    }

    /// Returns the next entry in the WAL file.
    pub async fn next(&mut self) -> Result<Option<RowEntry>, crate::Error> {
        self.iter.next().await.map_err(Into::into)
    }
}

#[derive(Debug, Clone, PartialEq, Eq)]
pub struct WalFileMetadata {
    /// The time this WAL file was last written to object storage.
    pub last_modified_dt: DateTime<Utc>,

    /// The size of this WAL file in bytes.
    pub size_bytes: u64,

    /// The path of this WAL file in object storage.
    pub location: Path,
}

/// Represents a single WAL file stored in object storage and provides methods
/// to inspect and read its contents.
pub struct WalFile {
    /// The unique identifier for this WAL file. Corresponds to the SST filename without
    /// the extension. For example, file `000123.sst` would have id `123`.
    pub id: u64,

    table_store: Arc<TableStore>,
}

impl WalFile {
    /// Returns metadata for this WAL file.
    ///
    /// ## Errors
    ///
    /// Raises an error if the metadata could not be read from object storage. This can
    /// happen if the file was deleted after listing or if there was an issue with the
    /// object store. If the file is missing, a [`crate::Error`] with
    /// [`crate::ErrorKind::Data`] is returned, and its source contains an
    /// `object_store::Error::NotFound`.
    pub async fn metadata(&self) -> Result<WalFileMetadata, crate::Error> {
        let metadata = self.table_store.metadata(&SsTableId::Wal(self.id)).await?;
        Ok(WalFileMetadata {
            last_modified_dt: metadata.last_modified,
            size_bytes: metadata.size,
            location: metadata.location,
        })
    }

    /// Returns an iterator over `RowEntry`s in this WAL file. Raises an error if the
    /// WAL file could not be read.
    ///
    /// ## Errors
    ///
    /// Raises an error if the data could not be read from object storage. This can
    /// happen if the file was deleted after listing or if there was an issue with the
    /// object store. If the file is missing, a [`crate::Error`] with
    /// [`crate::ErrorKind::Data`] is returned, and its source contains an
    /// `object_store::Error::NotFound`.
    pub async fn iterator(&self) -> Result<WalFileIterator, crate::Error> {
        let sst = self.table_store.open_sst(&SsTableId::Wal(self.id)).await?;
        let iter = match SstIterator::new_owned_initialized(
            ..,
            SsTableView::identity(sst),
            Arc::clone(&self.table_store),
            SstIteratorOptions {
                // Optimize for throughput. Go for 256MiB per-fetch (4096 bytes/block default).
                blocks_to_fetch: 65_536,
                ..Default::default()
            },
        )
        .await
        {
            Ok(Some(iter)) => Box::new(iter) as Box<dyn RowEntryIterator + 'static>,
            Ok(None) => Box::new(EmptyIterator::new()) as Box<dyn RowEntryIterator + 'static>,
            Err(err) => return Err(err.into()),
        };
        Ok(WalFileIterator::new(iter))
    }

    /// Returns the WAL ID immediately following this file's ID.
    pub fn next_id(&self) -> u64 {
        self.id + 1
    }

    /// Returns a [`WalFile`] handle for the next WAL file after this one.
    ///
    /// This does not check whether the next WAL file actually exists in object storage.
    pub fn next_file(&self) -> Self {
        Self {
            id: self.next_id(),
            table_store: Arc::clone(&self.table_store),
        }
    }
}

/// Reads WAL files in object storage for a specific database.
pub struct WalReader {
    table_store: Arc<TableStore>,
}

impl WalReader {
    /// Creates a new WAL reader for the database at the given path.
    ///
    /// If the database was configured with a separate WAL object store, pass that
    /// object store here.
    pub fn new<P: Into<Path>>(path: P, object_store: Arc<dyn ObjectStore>) -> Self {
        let sst_format = SsTableFormat::default();
        let table_store = Arc::new(TableStore::new(
            ObjectStores::new(object_store, None),
            sst_format,
            path.into(),
            None,
        ));
        Self { table_store }
    }

    /// Lists WAL files in ascending order by their ID within the specified range.
    /// If `range` is unbounded, all WAL files are returned.
    pub async fn list<R: RangeBounds<u64>>(&self, range: R) -> Result<Vec<WalFile>, crate::Error> {
        let result = self.table_store.list_wal_ssts(range).await;
        Ok(result?
            .into_iter()
            .map(|wal_file| WalFile {
                id: wal_file.id.unwrap_wal_id(),
                table_store: Arc::clone(&self.table_store),
            })
            .collect())
    }

    /// Creates a [`WalFile`] handle for a WAL ID.
    pub fn get(&self, id: u64) -> WalFile {
        WalFile {
            id,
            table_store: Arc::clone(&self.table_store),
        }
    }
}

#[cfg(test)]
mod tests {
    use std::error::Error;

    use super::*;
    use crate::config::{FlushOptions, FlushType, PutOptions, WriteOptions};
    use crate::test_utils::StringConcatMergeOperator;
    use crate::types::ValueDeletable;
    use crate::Db;
    use object_store::memory::InMemory;

    fn has_not_found_object_store_source(err: &crate::Error) -> bool {
        err.source()
            .and_then(|source| source.downcast_ref::<object_store::Error>())
            .is_some_and(|source| matches!(source, object_store::Error::NotFound { .. }))
            || err
                .source()
                .and_then(|source| source.downcast_ref::<Arc<object_store::Error>>())
                .is_some_and(|source| {
                    matches!(source.as_ref(), object_store::Error::NotFound { .. })
                })
    }

    #[tokio::test]
    async fn test_list_and_iterator() {
        let main_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
        let path = "/test_wal_reader";
        let db = Db::open(path, main_store.clone()).await.unwrap();
        db.put_with_options(
            b"k2",
            b"v2",
            &PutOptions::default(),
            &WriteOptions::default(),
        )
        .await
        .unwrap();
        db.put_with_options(
            b"k1",
            b"v1",
            &PutOptions::default(),
            &WriteOptions::default(),
        )
        .await
        .unwrap();
        db.flush_with_options(FlushOptions {
            flush_type: FlushType::Wal,
        })
        .await
        .unwrap();

        let wal_reader = WalReader::new(path, main_store.clone());
        let wal_files = wal_reader.list(..).await.unwrap();
        assert!(!wal_files.is_empty());
        let mut rows = Vec::new();
        for wal_file in wal_files {
            let mut iter = wal_file.iterator().await.unwrap();
            while let Some(entry) = iter.next().await.unwrap() {
                rows.push(entry);
            }
        }
        assert_eq!(rows.len(), 2);
        assert_eq!(rows[0].key.as_ref(), b"k2");
        assert!(matches!(
            &rows[0].value,
            ValueDeletable::Value(value) if value.as_ref() == b"v2"
        ));
        assert_eq!(rows[1].key.as_ref(), b"k1");
        assert!(matches!(
            &rows[1].value,
            ValueDeletable::Value(value) if value.as_ref() == b"v1"
        ));
    }

    #[tokio::test]
    async fn test_reads_from_wal_object_store() {
        let main_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
        let wal_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
        let path = "/test_wal_store";
        let db = Db::builder(path, main_store.clone())
            .with_wal_object_store(wal_store.clone())
            .build()
            .await
            .unwrap();
        db.put_with_options(
            b"k1",
            b"v1",
            &PutOptions::default(),
            &WriteOptions::default(),
        )
        .await
        .unwrap();
        db.flush_with_options(FlushOptions {
            flush_type: FlushType::Wal,
        })
        .await
        .unwrap();

        let wal_reader = WalReader::new(path, wal_store.clone());
        let wal_files = wal_reader.list(..).await.unwrap();
        assert!(!wal_files.is_empty());
        let mut rows = Vec::new();
        for wal_file in wal_files {
            let mut iter = wal_file.iterator().await.unwrap();
            while let Some(entry) = iter.next().await.unwrap() {
                rows.push(entry);
            }
        }
        assert_eq!(rows.len(), 1);
        assert_eq!(rows[0].key.as_ref(), b"k1");
        assert!(matches!(
            &rows[0].value,
            ValueDeletable::Value(value) if value.as_ref() == b"v1"
        ));
    }

    #[tokio::test]
    async fn test_wal_file_metadata_matches_object_store_metadata() {
        let main_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
        let path = "/test_wal_reader_metadata";
        let db = Db::open(path, main_store.clone()).await.unwrap();
        db.put_with_options(
            b"k1",
            b"v1",
            &PutOptions::default(),
            &WriteOptions::default(),
        )
        .await
        .unwrap();
        db.flush_with_options(FlushOptions {
            flush_type: FlushType::Wal,
        })
        .await
        .unwrap();

        let wal_reader = WalReader::new(path, main_store.clone());
        let wal_files = wal_reader.list(..).await.unwrap();
        assert!(!wal_files.is_empty());

        for wal_file in wal_files {
            let wal_metadata = wal_file.metadata().await.unwrap();
            let object_metadata = main_store.head(&wal_metadata.location).await.unwrap();
            assert_eq!(wal_metadata.last_modified_dt, object_metadata.last_modified);
            assert_eq!(wal_metadata.size_bytes, object_metadata.size);
            assert_eq!(wal_metadata.location, object_metadata.location);
        }
    }

    #[tokio::test]
    async fn test_get_returns_wal_file_for_id() {
        let main_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
        let path = "/test_wal_reader_get_missing";
        let wal_reader = WalReader::new(path, main_store);
        let wal_file = wal_reader.get(42);
        assert_eq!(wal_file.id, 42);
    }

    #[test]
    fn test_wal_file_next_id_and_next_file() {
        let main_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
        let path = "/test_wal_reader_next_id";
        let wal_reader = WalReader::new(path, main_store);
        let wal_file = wal_reader.get(41);

        assert_eq!(wal_file.next_id(), 42);

        let next = wal_file.next_file();
        assert_eq!(next.id, 42);
        assert!(Arc::ptr_eq(&wal_file.table_store, &next.table_store));
    }

    #[tokio::test]
    async fn test_metadata_returns_error_when_file_deleted() {
        let main_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
        let path = "/test_wal_reader_missing_metadata";
        let db = Db::open(path, main_store.clone()).await.unwrap();
        db.put_with_options(
            b"k1",
            b"v1",
            &PutOptions::default(),
            &WriteOptions::default(),
        )
        .await
        .unwrap();
        db.flush_with_options(FlushOptions {
            flush_type: FlushType::Wal,
        })
        .await
        .unwrap();

        let wal_reader = WalReader::new(path, main_store.clone());
        let wal_file = wal_reader
            .list(..)
            .await
            .unwrap()
            .into_iter()
            .next()
            .expect("expected at least one WAL file");
        let wal_metadata = wal_file.metadata().await.unwrap();

        main_store.delete(&wal_metadata.location).await.unwrap();
        let err = wal_file
            .metadata()
            .await
            .expect_err("expected metadata() to fail after deleting WAL file");
        assert_eq!(err.kind(), crate::ErrorKind::Data);
        assert!(has_not_found_object_store_source(&err));
    }

    #[tokio::test]
    async fn test_iterator_returns_error_when_file_deleted() {
        let main_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
        let path = "/test_wal_reader_missing_iterator";
        let db = Db::open(path, main_store.clone()).await.unwrap();
        db.put_with_options(
            b"k1",
            b"v1",
            &PutOptions::default(),
            &WriteOptions::default(),
        )
        .await
        .unwrap();
        db.flush_with_options(FlushOptions {
            flush_type: FlushType::Wal,
        })
        .await
        .unwrap();

        let wal_reader = WalReader::new(path, main_store.clone());
        let wal_file = wal_reader
            .list(..)
            .await
            .unwrap()
            .into_iter()
            .next()
            .expect("expected at least one WAL file");
        let wal_metadata = wal_file.metadata().await.unwrap();

        main_store.delete(&wal_metadata.location).await.unwrap();
        let err = match wal_file.iterator().await {
            Ok(_) => panic!("expected iterator() to fail after deleting WAL file"),
            Err(err) => err,
        };
        assert_eq!(err.kind(), crate::ErrorKind::Data);
        assert!(has_not_found_object_store_source(&err));
    }

    #[tokio::test]
    async fn test_iterator_returns_tombstones() {
        let main_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
        let path = "/test_wal_reader_tombstones";
        let db = Db::open(path, main_store.clone()).await.unwrap();

        db.delete(b"k_tombstone").await.unwrap();
        db.flush_with_options(FlushOptions {
            flush_type: FlushType::Wal,
        })
        .await
        .unwrap();

        let wal_reader = WalReader::new(path, main_store);
        let wal_files = wal_reader.list(..).await.unwrap();
        assert!(!wal_files.is_empty());

        let mut rows = Vec::new();
        for wal_file in wal_files {
            let mut iter = wal_file.iterator().await.unwrap();
            while let Some(entry) = iter.next().await.unwrap() {
                rows.push(entry);
            }
        }

        let tombstone_entry = rows
            .iter()
            .find(|entry| entry.key.as_ref() == b"k_tombstone")
            .expect("expected deleted key in WAL iterator output");
        assert!(matches!(tombstone_entry.value, ValueDeletable::Tombstone));
    }

    #[tokio::test]
    async fn test_iterator_returns_merge_operands() {
        let main_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
        let path = "/test_wal_reader_merges";
        let db = Db::builder(path, main_store.clone())
            .with_merge_operator(Arc::new(StringConcatMergeOperator))
            .build()
            .await
            .unwrap();

        db.merge(b"k_merge", b"merge_operand").await.unwrap();
        db.flush_with_options(FlushOptions {
            flush_type: FlushType::Wal,
        })
        .await
        .unwrap();

        let wal_reader = WalReader::new(path, main_store);
        let wal_files = wal_reader.list(..).await.unwrap();
        assert!(!wal_files.is_empty());

        let mut rows = Vec::new();
        for wal_file in wal_files {
            let mut iter = wal_file.iterator().await.unwrap();
            while let Some(entry) = iter.next().await.unwrap() {
                rows.push(entry);
            }
        }

        let merge_entry = rows
            .iter()
            .find(|entry| entry.key.as_ref() == b"k_merge")
            .expect("expected merge key in WAL iterator output");
        assert!(matches!(
            &merge_entry.value,
            ValueDeletable::Merge(value) if value.as_ref() == b"merge_operand"
        ));
    }
}