slatedb 0.12.1

A cloud native embedded storage engine built on object storage.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
//! Read-only, metadata-only API for inspecting compacted SST files.
//!
//! The main types are:
//!
//! - [`SstReader`]: opens compacted SSTs by ULID or from an existing handle.
//! - [`SstFile`]: one compacted SST with accessors for metadata, stats, and index.
//!
//! # Example
//!
//! ```
//! use slatedb::config::{FlushOptions, FlushType, PutOptions, WriteOptions};
//! use slatedb::object_store::memory::InMemory;
//! use slatedb::{Db, SstReader};
//! use std::sync::Arc;
//!
//! #[tokio::main]
//! async fn main() -> Result<(), slatedb::Error> {
//!     let object_store = Arc::new(InMemory::new());
//!     let path = "/sst_reader_example";
//!     let db = Db::open(path, object_store.clone()).await?;
//!
//!     db.put_with_options(
//!         b"k1",
//!         b"v1",
//!         &PutOptions::default(),
//!         &WriteOptions::default(),
//!     )
//!     .await?;
//!     db.flush_with_options(FlushOptions {
//!         flush_type: FlushType::MemTable,
//!     })
//!     .await?;
//!
//!     let manifest = db.manifest();
//!     let reader = SstReader::new(path, object_store, None, None);
//!
//!     // Inspect L0 SSTs
//!     for view in &manifest.l0 {
//!         let sst_file = reader.open_with_handle(view.sst.clone())?;
//!         if let Some(stats) = sst_file.stats().await? {
//!             let _ = stats.num_puts;
//!         }
//!     }
//!     Ok(())
//! }
//! ```

use std::sync::Arc;

use bytes::Bytes;
use object_store::path::Path;
use object_store::ObjectStore;
use ulid::Ulid;

use crate::db_cache::DbCache;
use crate::db_state::{SsTableHandle, SsTableId, SsTableInfo};
use crate::format::sst::{BlockTransformer, SsTableFormat};
use crate::object_stores::ObjectStores;
use crate::sst_stats::SstStats;
use crate::tablestore::{SstFileMetadata, TableStore};

/// Opens compacted SST files for read-only inspection.
///
/// `SstReader` wraps the read-only functionality needed to open individual
/// SSTs and inspect their metadata, stats, and index data. SST paths are
/// resolved as `{root}/compacted/{ulid}.sst`.
pub struct SstReader {
    table_store: Arc<TableStore>,
}

impl SstReader {
    /// Creates a new SST reader for the database at the given path.
    ///
    /// The `object_store` should point at the same store used by the `Db`.
    /// An optional `DbCache` can be provided for in-memory caching of index
    /// and stats blocks. If the database was opened with a `BlockTransformer`
    /// (e.g. for encryption), the same transformer must be provided here.
    pub fn new<P: Into<Path>>(
        root_path: P,
        object_store: Arc<dyn ObjectStore>,
        cache: Option<Arc<dyn DbCache>>,
        block_transformer: Option<Arc<dyn BlockTransformer>>,
    ) -> Self {
        let sst_format = SsTableFormat {
            block_transformer,
            ..SsTableFormat::default()
        };
        let table_store = Arc::new(TableStore::new(
            ObjectStores::new(object_store, None),
            sst_format,
            root_path.into(),
            cache,
        ));
        Self { table_store }
    }

    /// Opens a compacted SST by its ULID.
    ///
    /// This reads the SST footer and metadata from object storage.
    ///
    /// ## Errors
    ///
    /// Returns an error if the SST file does not exist (e.g. it was GC'd)
    /// or if there is an issue reading from object storage.
    pub async fn open(&self, id: Ulid) -> Result<SstFile, crate::Error> {
        let handle = self.table_store.open_sst(&SsTableId::Compacted(id)).await?;
        Ok(SstFile {
            id,
            handle,
            table_store: Arc::clone(&self.table_store),
        })
    }

    /// Creates an `SstFile` from an existing `SsTableHandle` (no I/O needed).
    ///
    /// ## Errors
    ///
    /// Returns an error if the handle is not a compacted SST (e.g. a WAL SST).
    pub fn open_with_handle(&self, handle: SsTableHandle) -> Result<SstFile, crate::Error> {
        let id = match handle.id {
            SsTableId::Compacted(ulid) => ulid,
            SsTableId::Wal(_) => {
                return Err(crate::Error::invalid(
                    "SstReader only supports compacted SSTs, not WAL SSTs".to_string(),
                ));
            }
        };
        Ok(SstFile {
            id,
            handle,
            table_store: Arc::clone(&self.table_store),
        })
    }
}

/// A single compacted SST file with accessors for metadata, stats, and index.
pub struct SstFile {
    id: Ulid,
    handle: SsTableHandle,
    table_store: Arc<TableStore>,
}

impl SstFile {
    /// Returns the SST's ULID identifier.
    pub fn id(&self) -> Ulid {
        self.id
    }

    /// Returns the SST file metadata from `SsTableInfo`.
    ///
    /// This is available without any I/O since the info is cached in the handle.
    pub fn info(&self) -> &SsTableInfo {
        &self.handle.info
    }

    /// Returns object store metadata for this SST file.
    ///
    /// This performs one HEAD request to the object store.
    ///
    /// ## Errors
    ///
    /// Returns an error if the SST file does not exist or if there is an
    /// issue reading from object storage.
    pub async fn metadata(&self) -> Result<SstFileMetadata, crate::Error> {
        self.table_store
            .metadata(&self.handle.id)
            .await
            .map_err(Into::into)
    }

    /// Reads the stats block from object storage.
    ///
    /// Returns `None` for old SSTs that were written before the stats block
    /// was added (i.e. `stats_offset == 0 && stats_len == 0`).
    ///
    /// ## Errors
    ///
    /// Returns an error if there is an issue reading from object storage.
    pub async fn stats(&self) -> Result<Option<SstStats>, crate::Error> {
        self.table_store
            .read_stats(&self.handle, true)
            .await
            .map_err(Into::into)
    }

    /// Returns `(block_offset, first_key)` pairs from the SST index block.
    ///
    /// The returned vector is parallel to the data blocks in the SST. Each
    /// entry contains the on-disk byte offset of the block and the first key
    /// stored in that block.
    ///
    /// ## Errors
    ///
    /// Returns an error if there is an issue reading from object storage.
    pub async fn index(&self) -> Result<Vec<(u64, Bytes)>, crate::Error> {
        let index = self.table_store.read_index(&self.handle, true).await?;
        let borrowed = index.borrow();
        let block_meta = borrowed.block_meta();
        let result: Vec<(u64, Bytes)> = (0..block_meta.len())
            .map(|i| {
                let meta = block_meta.get(i);
                (
                    meta.offset(),
                    Bytes::copy_from_slice(meta.first_key().bytes()),
                )
            })
            .collect();
        Ok(result)
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::config::{FlushOptions, FlushType, PutOptions, SstBlockSize, WriteOptions};
    use crate::test_utils::StringConcatMergeOperator;
    use crate::Db;
    use object_store::memory::InMemory;

    /// Helper: create a DB with 10 puts, 3 deletes, and 2 merges, flush to
    /// L0, and return the object store + path + manifest for inspection.
    async fn setup_db_with_l0() -> (
        Arc<dyn ObjectStore>,
        &'static str,
        crate::manifest::ManifestCore,
    ) {
        let object_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
        let path = "/test_sst_reader";
        let db = Db::builder(path, object_store.clone())
            .with_merge_operator(Arc::new(StringConcatMergeOperator))
            .with_sst_block_size(SstBlockSize::Other(64))
            .build()
            .await
            .unwrap();

        // 10 puts
        for i in 0..10u8 {
            db.put_with_options(
                &[b'k', i],
                &[b'v', i],
                &PutOptions::default(),
                &WriteOptions::default(),
            )
            .await
            .unwrap();
        }

        // 3 deletes
        for i in 10..13u8 {
            db.delete(&[b'k', i]).await.unwrap();
        }

        // 2 merges
        for i in 13..15u8 {
            db.merge(&[b'k', i], &[b'm', i]).await.unwrap();
        }

        db.flush_with_options(FlushOptions {
            flush_type: FlushType::MemTable,
        })
        .await
        .unwrap();

        let manifest = db.manifest();
        db.close().await.unwrap();
        (object_store, path, manifest)
    }

    #[tokio::test]
    async fn test_open_and_info() {
        let (store, path, manifest) = setup_db_with_l0().await;
        let reader = SstReader::new(path, store, None, None);

        assert!(!manifest.l0.is_empty(), "expected at least one L0 SST");
        let view = &manifest.l0[0];
        let sst_file = reader
            .open(view.sst.id.unwrap_compacted_id())
            .await
            .unwrap();

        let info = sst_file.info();
        assert!(info.first_entry.is_some());
        assert!(info.index_offset > 0);
    }

    #[tokio::test]
    async fn test_open_with_handle() {
        let (store, path, manifest) = setup_db_with_l0().await;
        let reader = SstReader::new(path, store, None, None);

        let view = &manifest.l0[0];
        let sst_file = reader.open_with_handle(view.sst.clone()).unwrap();

        assert_eq!(sst_file.id(), view.sst.id.unwrap_compacted_id());
        assert_eq!(sst_file.info(), &view.sst.info);
    }

    #[tokio::test]
    async fn test_stats() {
        let (store, path, manifest) = setup_db_with_l0().await;
        let reader = SstReader::new(path, store, None, None);

        let view = &manifest.l0[0];
        let sst_file = reader.open_with_handle(view.sst.clone()).unwrap();
        let stats = sst_file.stats().await.unwrap();

        let stats = stats.expect("expected stats block to be present");
        assert_eq!(stats.num_puts, 10);
        assert_eq!(stats.num_deletes, 3);
        assert_eq!(stats.num_merges, 2);
        assert_eq!(stats.num_rows(), 15);
        assert!(stats.raw_key_size > 0);
        assert!(stats.raw_val_size > 0);
    }

    #[tokio::test]
    async fn test_index() {
        let (store, path, manifest) = setup_db_with_l0().await;
        let reader = SstReader::new(path, store, None, None);

        let view = &manifest.l0[0];
        let sst_file = reader.open_with_handle(view.sst.clone()).unwrap();
        let index = sst_file.index().await.unwrap();

        assert!(!index.is_empty());
        // First index key should be <= the SST's first entry (it may be a
        // shortened separator key rather than the exact first key).
        if let Some(first_entry) = sst_file.info().first_entry.as_ref() {
            assert!(index[0].1.as_ref() <= first_entry.as_ref());
        }
        // Offsets should be monotonically increasing
        for window in index.windows(2) {
            assert!(window[0].0 < window[1].0);
        }
    }

    #[tokio::test]
    async fn test_block_stats_parallel_to_index() {
        let (store, path, manifest) = setup_db_with_l0().await;
        let reader = SstReader::new(path, store, None, None);

        let view = &manifest.l0[0];
        let sst_file = reader.open_with_handle(view.sst.clone()).unwrap();

        let stats = sst_file
            .stats()
            .await
            .unwrap()
            .expect("expected stats block");
        let index = sst_file.index().await.unwrap();

        assert_eq!(
            stats.block_stats.len(),
            index.len(),
            "block_stats should be parallel to the index"
        );

        // Sum of per-block puts should equal aggregate puts
        let sum_puts: u64 = stats.block_stats.iter().map(|b| b.num_puts as u64).sum();
        assert_eq!(sum_puts, stats.num_puts);
    }

    #[tokio::test]
    async fn test_metadata() {
        let (store, path, manifest) = setup_db_with_l0().await;
        let reader = SstReader::new(path, store, None, None);

        let view = &manifest.l0[0];
        let sst_file = reader.open_with_handle(view.sst.clone()).unwrap();
        let metadata = sst_file.metadata().await.unwrap();

        assert!(metadata.size > 0);
        assert!(matches!(metadata.id, SsTableId::Compacted(_)));
    }

    #[tokio::test]
    async fn test_open_with_wal_handle_returns_error() {
        let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
        let reader = SstReader::new("/test", store, None, None);

        let wal_handle = SsTableHandle::new(
            SsTableId::Wal(42),
            0,
            crate::db_state::SsTableInfo::default(),
        );
        let result = reader.open_with_handle(wal_handle);
        assert!(result.is_err());
    }

    #[tokio::test]
    async fn test_open_missing_sst() {
        let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
        let reader = SstReader::new("/nonexistent", store, None, None);

        let result = reader.open(Ulid::new()).await;
        assert!(result.is_err());
    }

    struct XorTransformer {
        key: u8,
    }

    #[async_trait::async_trait]
    impl BlockTransformer for XorTransformer {
        async fn encode(&self, data: Bytes) -> Result<Bytes, crate::Error> {
            let transformed: Vec<u8> = data.iter().map(|b| b ^ self.key).collect();
            Ok(Bytes::from(transformed))
        }

        async fn decode(&self, data: Bytes) -> Result<Bytes, crate::Error> {
            self.encode(data).await
        }
    }

    #[tokio::test]
    async fn test_stats_with_block_transformer() {
        let object_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
        let path = "/test_sst_reader_transformer";
        let transformer: Arc<dyn BlockTransformer> = Arc::new(XorTransformer { key: 0x42 });

        // Write data with a block transformer
        let db = Db::builder(path, object_store.clone())
            .with_block_transformer(transformer.clone())
            .build()
            .await
            .unwrap();

        for i in 0..5u8 {
            db.put_with_options(
                &[b'k', i],
                &[b'v', i],
                &PutOptions::default(),
                &WriteOptions::default(),
            )
            .await
            .unwrap();
        }
        db.flush_with_options(FlushOptions {
            flush_type: FlushType::MemTable,
        })
        .await
        .unwrap();

        let manifest = db.manifest();
        db.close().await.unwrap();

        // Reading with the correct transformer should succeed
        let reader = SstReader::new(path, object_store.clone(), None, Some(transformer));
        let view = &manifest.l0[0];
        let sst_file = reader.open_with_handle(view.sst.clone()).unwrap();
        let stats = sst_file.stats().await.unwrap().expect("expected stats");
        assert_eq!(stats.num_puts, 5);

        // Reading without the transformer should fail
        let reader_no_transformer = SstReader::new(path, object_store, None, None);
        let sst_file = reader_no_transformer
            .open_with_handle(view.sst.clone())
            .unwrap();
        assert!(sst_file.stats().await.is_err());
    }
}