Skip to main content

log/
log.rs

1//! Core LogDb implementation with read and write APIs.
2//!
3//! This module provides the [`LogDb`] struct, the primary entry point for
4//! interacting with OpenData Log. It exposes both write operations ([`append`])
5//! and read operations ([`scan`], [`count`]) via the [`LogRead`] trait.
6
7use std::ops::RangeBounds;
8use std::sync::Arc;
9
10use async_trait::async_trait;
11use bytes::Bytes;
12use common::clock::{Clock, SystemClock};
13use common::coordinator::{
14    Durability, WriteCoordinator, WriteCoordinatorConfig, WriteCoordinatorHandle,
15};
16use common::storage::factory::create_storage;
17use common::{StorageRuntime, StorageSemantics};
18use tokio::sync::RwLock;
19use tokio::task::JoinHandle;
20
21use crate::config::{CountOptions, ScanOptions, WriteOptions};
22use crate::delta::{LogContext, LogDelta, LogFlusher, LogWrite};
23use crate::error::{Error, Result};
24use crate::listing::ListingCache;
25use crate::listing::LogKeyIterator;
26use crate::model::{AppendResult, Record, Segment, SegmentId, Sequence};
27use crate::range::{normalize_segment_id, normalize_sequence};
28use crate::reader::{LogIterator, LogRead};
29use crate::segment::SegmentCache;
30use crate::storage::LogStorage;
31
32const WRITE_CHANNEL: &str = "write";
33
34/// The main log interface providing read and write operations.
35///
36/// `LogDb` is the primary entry point for interacting with OpenData Log.
37/// It provides methods to append records, scan entries, and count records
38/// within a key's log.
39///
40/// # Read Operations
41///
42/// Read operations are provided via the [`LogRead`] trait, which `LogDb`
43/// implements. This allows generic code to work with either `LogDb` or
44/// [`LogDbReader`](crate::LogDbReader).
45///
46/// # Thread Safety
47///
48/// `LogDb` is designed to be shared across threads. All methods take `&self`
49/// and internal synchronization is handled automatically.
50///
51/// # Writer Semantics
52///
53/// Currently, each log supports a single writer. Multi-writer support may
54/// be added in the future, but would require each key to have a single
55/// writer to maintain monotonic ordering within that key's log.
56///
57/// # Example
58///
59/// ```ignore
60/// use log::{LogDb, LogRead, Record, WriteOptions};
61/// use bytes::Bytes;
62///
63/// // Open a log (implementation details TBD)
64/// let log = LogDb::open(config).await?;
65///
66/// // Append records
67/// let records = vec![
68///     Record { key: Bytes::from("user:123"), value: Bytes::from("event-a") },
69///     Record { key: Bytes::from("user:456"), value: Bytes::from("event-b") },
70/// ];
71/// log.append(records).await?;
72///
73/// // Scan entries for a specific key
74/// let mut iter = log.scan(Bytes::from("user:123"), ..).await?;
75/// while let Some(entry) = iter.next().await? {
76///     println!("seq={}: {:?}", entry.sequence, entry.value);
77/// }
78/// ```
79pub struct LogDb {
80    handle: WriteCoordinatorHandle<LogDelta>,
81    coordinator: WriteCoordinator<LogDelta, LogFlusher>,
82    storage: LogStorage,
83    clock: Arc<dyn Clock>,
84    read_inner: Arc<RwLock<crate::reader::LogReadInner>>,
85    flush_subscriber_task: JoinHandle<()>,
86}
87
88impl LogDb {
89    /// Opens or creates a log with the given configuration.
90    ///
91    /// This is the primary entry point for creating a `LogDb` instance. The
92    /// configuration specifies the storage backend and other settings.
93    ///
94    /// # Arguments
95    ///
96    /// * `config` - Configuration specifying storage backend and settings.
97    ///
98    /// # Errors
99    ///
100    /// Returns an error if the storage backend cannot be initialized.
101    ///
102    /// # Example
103    ///
104    /// ```ignore
105    /// use log::{LogDb, Config};
106    ///
107    /// let log = LogDb::open(test_config()).await?;
108    /// ```
109    pub async fn open(config: crate::config::Config) -> Result<Self> {
110        LogDbBuilder::new(config).build().await
111    }
112
113    /// Appends records to the log.
114    ///
115    /// Records are assigned sequence numbers in the order they appear in the
116    /// input vector. All records in a single append call are written atomically.
117    ///
118    /// This method uses default write options. Use [`append_with_options`] for
119    /// custom durability settings.
120    ///
121    /// # Arguments
122    ///
123    /// * `records` - The records to append. Each record specifies its target
124    ///   key and value.
125    ///
126    /// # Returns
127    ///
128    /// On success, returns an [`AppendResult`] containing the starting sequence
129    /// number assigned to the batch.
130    ///
131    /// # Errors
132    ///
133    /// Returns an error if the write fails due to storage issues.
134    ///
135    /// # Example
136    ///
137    /// ```ignore
138    /// let records = vec![
139    ///     Record { key: Bytes::from("events"), value: Bytes::from("event-1") },
140    ///     Record { key: Bytes::from("events"), value: Bytes::from("event-2") },
141    /// ];
142    /// let result = log.append(records).await?;
143    /// println!("Appended at seq {}", result.start_sequence);
144    /// ```
145    ///
146    /// [`append_with_options`]: LogDb::append_with_options
147    pub async fn append(&self, records: Vec<Record>) -> Result<AppendResult> {
148        self.append_with_options(records, WriteOptions::default())
149            .await
150    }
151
152    /// Appends records to the log with custom options.
153    ///
154    /// Records are assigned sequence numbers in the order they appear in the
155    /// input vector. All records in a single append call are written atomically.
156    ///
157    /// # Arguments
158    ///
159    /// * `records` - The records to append. Each record specifies its target
160    ///   key and value.
161    /// * `options` - Write options controlling durability behavior.
162    ///
163    /// # Returns
164    ///
165    /// On success, returns an [`AppendResult`] containing the starting sequence
166    /// number assigned to the batch.
167    ///
168    /// # Errors
169    ///
170    /// Returns an error if the write fails due to storage issues.
171    ///
172    /// # Example
173    ///
174    /// ```ignore
175    /// let records = vec![
176    ///     Record { key: Bytes::from("events"), value: Bytes::from("critical-event") },
177    /// ];
178    /// let options = WriteOptions { await_durable: true };
179    /// let result = log.append_with_options(records, options).await?;
180    /// println!("Started at sequence {}", result.start_sequence);
181    /// ```
182    pub async fn append_with_options(
183        &self,
184        records: Vec<Record>,
185        options: WriteOptions,
186    ) -> Result<AppendResult> {
187        if records.is_empty() {
188            return Ok(AppendResult { start_sequence: 0 });
189        }
190
191        let write = LogWrite {
192            records,
193            timestamp_ms: self.current_time_ms(),
194            force_seal: false,
195        };
196        let mut write_handle = self.handle.write(write).await?;
197        let result = write_handle.wait(Durability::Applied).await?;
198
199        if options.await_durable {
200            self.flush().await?;
201        }
202
203        Ok(result)
204    }
205
206    /// Returns the current time in milliseconds since Unix epoch.
207    fn current_time_ms(&self) -> i64 {
208        self.clock
209            .now()
210            .duration_since(std::time::UNIX_EPOCH)
211            .unwrap()
212            .as_millis() as i64
213    }
214
215    /// Checks if the underlying storage is accessible.
216    ///
217    /// This performs a lightweight read operation to verify that the storage
218    /// backend is responding. Use this for health/readiness checks.
219    ///
220    /// # Returns
221    ///
222    /// Returns `Ok(())` if storage is accessible, or an error if the check fails.
223    pub async fn check_storage(&self) -> Result<()> {
224        // Read the sequence block - this is a single key lookup that verifies
225        // storage is accessible without scanning or listing data.
226        let seq_key = Bytes::from_static(&crate::serde::SEQ_BLOCK_KEY);
227        let _ = self.storage.as_read().get(seq_key).await?;
228        Ok(())
229    }
230
231    /// Forces creation of a new segment, sealing the current one.
232    ///
233    /// This is an internal API for testing multi-segment scenarios. It forces
234    /// subsequent appends to write to a new segment, regardless of any
235    /// configured seal interval.
236    #[cfg(test)]
237    pub(crate) async fn seal_segment(&self) -> Result<()> {
238        let write = LogWrite {
239            records: vec![],
240            timestamp_ms: self.current_time_ms(),
241            force_seal: true,
242        };
243        self.handle.write(write).await?;
244        self.flush().await?;
245        Ok(())
246    }
247
248    /// Flushes all pending writes to durable storage.
249    ///
250    /// This ensures that all writes that have been acknowledged are persisted
251    /// to durable storage. For SlateDB-backed storage, this flushes the memtable
252    /// to the WAL and object store.
253    pub async fn flush(&self) -> Result<()> {
254        let mut flush_handle = self.handle.flush(true).await?;
255        flush_handle.wait(Durability::Durable).await?;
256        // Synchronously refresh read_inner segments for immediate visibility
257        let mut inner = self.read_inner.write().await;
258        let after = inner.segments.latest().map(|s| s.id());
259        let storage = inner.storage.clone();
260        inner.segments.refresh(&storage, after).await?;
261        Ok(())
262    }
263
264    /// Closes the log, releasing any resources.
265    ///
266    /// This method should be called before dropping the log to ensure
267    /// proper cleanup. For SlateDB-backed storage, this releases the database fence.
268    pub async fn close(self) -> Result<()> {
269        self.coordinator.stop().await.map_err(Error::Internal)?;
270        self.flush_subscriber_task.abort();
271        self.storage.close().await?;
272        Ok(())
273    }
274
275    /// Creates a LogDb from an existing storage implementation.
276    #[cfg(test)]
277    pub(crate) async fn new(storage: Arc<dyn common::Storage>) -> Result<Self> {
278        use crate::config::SegmentConfig;
279        use crate::reader::LogReadInner;
280        use crate::serde::SEQ_BLOCK_KEY;
281
282        let log_storage = LogStorage::new(storage.clone());
283        let clock: Arc<dyn Clock> = Arc::new(SystemClock);
284
285        let log_storage_read = log_storage.as_read();
286        let seq_key = Bytes::from_static(&SEQ_BLOCK_KEY);
287        let sequence_allocator = common::SequenceAllocator::load(storage.as_ref(), seq_key)
288            .await
289            .map_err(|e| Error::Internal(e.to_string()))?;
290        let segment_cache = SegmentCache::open(&log_storage_read, SegmentConfig::default()).await?;
291        let listing_cache = ListingCache::new();
292
293        let context = LogContext {
294            sequence_allocator,
295            segment_cache: segment_cache.clone(),
296            listing_cache,
297        };
298
299        let flusher = LogFlusher::new(log_storage.clone());
300        let mut coordinator = WriteCoordinator::new(
301            WriteCoordinatorConfig::default(),
302            vec![WRITE_CHANNEL.to_string()],
303            context,
304            log_storage.snapshot().await?,
305            flusher,
306        );
307        let handle = coordinator.handle(WRITE_CHANNEL);
308
309        let read_inner = Arc::new(RwLock::new(LogReadInner::new(
310            log_storage_read,
311            segment_cache,
312        )));
313
314        let flush_subscriber_task = spawn_flush_subscriber(&coordinator, Arc::clone(&read_inner));
315        coordinator.start();
316
317        Ok(Self {
318            handle,
319            coordinator,
320            storage: log_storage,
321            clock,
322            read_inner,
323            flush_subscriber_task,
324        })
325    }
326}
327
328#[async_trait]
329impl LogRead for LogDb {
330    async fn scan_with_options(
331        &self,
332        key: Bytes,
333        seq_range: impl RangeBounds<Sequence> + Send,
334        options: ScanOptions,
335    ) -> Result<LogIterator> {
336        let seq_range = normalize_sequence(&seq_range);
337        let inner = self.read_inner.read().await;
338        Ok(inner.scan_with_options(key, seq_range, &options))
339    }
340
341    async fn count_with_options(
342        &self,
343        _key: Bytes,
344        _seq_range: impl RangeBounds<Sequence> + Send,
345        _options: CountOptions,
346    ) -> Result<u64> {
347        todo!()
348    }
349
350    async fn list_keys(
351        &self,
352        segment_range: impl RangeBounds<SegmentId> + Send,
353    ) -> Result<LogKeyIterator> {
354        let segment_range = normalize_segment_id(&segment_range);
355        let inner = self.read_inner.read().await;
356        inner.list_keys(segment_range).await
357    }
358
359    async fn list_segments(
360        &self,
361        seq_range: impl RangeBounds<Sequence> + Send,
362    ) -> Result<Vec<Segment>> {
363        let seq_range = normalize_sequence(&seq_range);
364        let inner = self.read_inner.read().await;
365        Ok(inner.list_segments(&seq_range))
366    }
367}
368
369/// Builder for creating LogDb instances with custom options.
370///
371/// This builder provides a fluent API for configuring a LogDb, including
372/// runtime options that cannot be serialized in configuration files.
373///
374/// # Example
375///
376/// ```ignore
377/// use log::LogDbBuilder;
378/// use log::Config;
379/// use common::StorageRuntime;
380///
381/// // Create a separate runtime for compaction (important for sync/JNI usage)
382/// let compaction_runtime = tokio::runtime::Builder::new_multi_thread()
383///     .worker_threads(2)
384///     .enable_all()
385///     .build()
386///     .unwrap();
387///
388/// let runtime = StorageRuntime::new()
389///     .with_compaction_runtime(compaction_runtime.handle().clone());
390///
391/// let log = LogDbBuilder::new(config)
392///     .with_storage_runtime(runtime)
393///     .build()
394///     .await?;
395/// ```
396pub struct LogDbBuilder {
397    config: crate::config::Config,
398    storage_runtime: StorageRuntime,
399}
400
401impl LogDbBuilder {
402    /// Creates a new log builder with the given configuration.
403    pub fn new(config: crate::config::Config) -> Self {
404        Self {
405            config,
406            storage_runtime: StorageRuntime::new(),
407        }
408    }
409
410    /// Sets the storage runtime options.
411    ///
412    /// Use this to configure runtime options like the compaction runtime handle.
413    pub fn with_storage_runtime(mut self, runtime: StorageRuntime) -> Self {
414        self.storage_runtime = runtime;
415        self
416    }
417
418    /// Builds the LogDb instance.
419    pub async fn build(self) -> Result<LogDb> {
420        use crate::reader::LogReadInner;
421        use crate::serde::SEQ_BLOCK_KEY;
422
423        let storage = create_storage(
424            &self.config.storage,
425            self.storage_runtime,
426            StorageSemantics::new(),
427        )
428        .await
429        .map_err(|e| Error::Storage(e.to_string()))?;
430
431        let log_storage = LogStorage::new(storage.clone());
432        let clock: Arc<dyn Clock> = Arc::new(SystemClock);
433
434        let log_storage_read = log_storage.as_read();
435        let seq_key = Bytes::from_static(&SEQ_BLOCK_KEY);
436        let sequence_allocator = common::SequenceAllocator::load(storage.as_ref(), seq_key)
437            .await
438            .map_err(|e| Error::Internal(e.to_string()))?;
439        let segment_cache = SegmentCache::open(&log_storage_read, self.config.segmentation).await?;
440        let listing_cache = ListingCache::new();
441
442        let context = LogContext {
443            sequence_allocator,
444            segment_cache: segment_cache.clone(),
445            listing_cache,
446        };
447
448        let flusher = LogFlusher::new(log_storage.clone());
449        let snapshot = log_storage.snapshot().await?;
450        let mut coordinator = WriteCoordinator::new(
451            WriteCoordinatorConfig::default(),
452            vec![WRITE_CHANNEL.to_string()],
453            context,
454            snapshot,
455            flusher,
456        );
457        let handle = coordinator.handle(WRITE_CHANNEL);
458
459        let read_inner = Arc::new(RwLock::new(LogReadInner::new(
460            log_storage_read,
461            segment_cache,
462        )));
463
464        let flush_subscriber_task = spawn_flush_subscriber(&coordinator, Arc::clone(&read_inner));
465        coordinator.start();
466
467        Ok(LogDb {
468            handle,
469            coordinator,
470            storage: log_storage,
471            clock,
472            read_inner,
473            flush_subscriber_task,
474        })
475    }
476}
477
478fn spawn_flush_subscriber(
479    handle: &WriteCoordinator<LogDelta, LogFlusher>,
480    read_inner: Arc<RwLock<crate::reader::LogReadInner>>,
481) -> JoinHandle<()> {
482    let (mut subscriber, _) = handle.subscribe();
483    tokio::spawn(async move {
484        while let Ok(result) = subscriber.recv().await {
485            let Some(broadcast) = &result.last_flushed_delta else {
486                continue;
487            };
488            if !broadcast.val.new_segments.is_empty() {
489                let mut inner = read_inner.write().await;
490                for segment in &broadcast.val.new_segments {
491                    inner.segments.insert(segment.clone());
492                }
493            }
494        }
495    })
496}
497
498#[cfg(test)]
499mod tests {
500    use common::StorageConfig;
501    use common::storage::factory::create_storage;
502
503    use super::*;
504    use crate::config::Config;
505    use crate::reader::LogDbReader;
506
507    fn test_config() -> Config {
508        Config {
509            storage: StorageConfig::InMemory,
510            ..Default::default()
511        }
512    }
513
514    #[tokio::test]
515    async fn should_open_log_with_in_memory_config() {
516        // given
517        let config = test_config();
518
519        // when
520        let result = LogDb::open(config).await;
521
522        // then
523        assert!(result.is_ok());
524    }
525
526    #[tokio::test]
527    async fn should_append_single_record() {
528        // given
529        let log = LogDb::open(test_config()).await.unwrap();
530        let records = vec![Record {
531            key: Bytes::from("orders"),
532            value: Bytes::from("order-1"),
533        }];
534
535        // when
536        log.append(records).await.unwrap();
537        log.flush().await.unwrap();
538
539        // then - verify entry can be read back
540        let mut iter = log.scan(Bytes::from("orders"), ..).await.unwrap();
541        let entry = iter.next().await.unwrap().unwrap();
542        assert_eq!(entry.sequence, 0);
543        assert_eq!(entry.value, Bytes::from("order-1"));
544        assert!(iter.next().await.unwrap().is_none());
545    }
546
547    #[tokio::test]
548    async fn should_append_multiple_records_in_batch() {
549        // given
550        let log = LogDb::open(test_config()).await.unwrap();
551        let records = vec![
552            Record {
553                key: Bytes::from("orders"),
554                value: Bytes::from("order-1"),
555            },
556            Record {
557                key: Bytes::from("orders"),
558                value: Bytes::from("order-2"),
559            },
560            Record {
561                key: Bytes::from("orders"),
562                value: Bytes::from("order-3"),
563            },
564        ];
565
566        // when
567        log.append(records).await.unwrap();
568        log.flush().await.unwrap();
569
570        // then - verify entries with sequential sequence numbers
571        let mut iter = log.scan(Bytes::from("orders"), ..).await.unwrap();
572
573        let entry0 = iter.next().await.unwrap().unwrap();
574        assert_eq!(entry0.sequence, 0);
575        assert_eq!(entry0.value, Bytes::from("order-1"));
576
577        let entry1 = iter.next().await.unwrap().unwrap();
578        assert_eq!(entry1.sequence, 1);
579        assert_eq!(entry1.value, Bytes::from("order-2"));
580
581        let entry2 = iter.next().await.unwrap().unwrap();
582        assert_eq!(entry2.sequence, 2);
583        assert_eq!(entry2.value, Bytes::from("order-3"));
584
585        assert!(iter.next().await.unwrap().is_none());
586    }
587
588    #[tokio::test]
589    async fn should_append_empty_records_without_error() {
590        // given
591        let log = LogDb::open(test_config()).await.unwrap();
592        let records: Vec<Record> = vec![];
593
594        // when
595        let result = log.append(records).await;
596
597        // then
598        assert!(result.is_ok());
599
600        // verify no entries exist
601        let mut iter = log.scan(Bytes::from("any-key"), ..).await.unwrap();
602        assert!(iter.next().await.unwrap().is_none());
603    }
604
605    #[tokio::test]
606    async fn should_assign_sequential_sequences_across_appends() {
607        // given
608        let log = LogDb::open(test_config()).await.unwrap();
609
610        // when - first append
611        log.append(vec![
612            Record {
613                key: Bytes::from("events"),
614                value: Bytes::from("event-1"),
615            },
616            Record {
617                key: Bytes::from("events"),
618                value: Bytes::from("event-2"),
619            },
620        ])
621        .await
622        .unwrap();
623
624        // when - second append
625        log.append(vec![Record {
626            key: Bytes::from("events"),
627            value: Bytes::from("event-3"),
628        }])
629        .await
630        .unwrap();
631        log.flush().await.unwrap();
632
633        // then - verify sequences are 0, 1, 2 across appends
634        let mut iter = log.scan(Bytes::from("events"), ..).await.unwrap();
635
636        let entry0 = iter.next().await.unwrap().unwrap();
637        assert_eq!(entry0.sequence, 0);
638
639        let entry1 = iter.next().await.unwrap().unwrap();
640        assert_eq!(entry1.sequence, 1);
641
642        let entry2 = iter.next().await.unwrap().unwrap();
643        assert_eq!(entry2.sequence, 2);
644
645        assert!(iter.next().await.unwrap().is_none());
646    }
647
648    #[tokio::test]
649    async fn should_store_records_with_correct_keys_and_values() {
650        // given
651        let log = LogDb::open(test_config()).await.unwrap();
652        let records = vec![
653            Record {
654                key: Bytes::from("topic-a"),
655                value: Bytes::from("message-a"),
656            },
657            Record {
658                key: Bytes::from("topic-b"),
659                value: Bytes::from("message-b"),
660            },
661        ];
662
663        // when
664        log.append(records).await.unwrap();
665        log.flush().await.unwrap();
666
667        // then - verify entries for topic-a
668        let mut iter_a = log.scan(Bytes::from("topic-a"), ..).await.unwrap();
669        let entry_a = iter_a.next().await.unwrap().unwrap();
670        assert_eq!(entry_a.key, Bytes::from("topic-a"));
671        assert_eq!(entry_a.value, Bytes::from("message-a"));
672        assert!(iter_a.next().await.unwrap().is_none());
673
674        // then - verify entries for topic-b
675        let mut iter_b = log.scan(Bytes::from("topic-b"), ..).await.unwrap();
676        let entry_b = iter_b.next().await.unwrap().unwrap();
677        assert_eq!(entry_b.key, Bytes::from("topic-b"));
678        assert_eq!(entry_b.value, Bytes::from("message-b"));
679        assert!(iter_b.next().await.unwrap().is_none());
680    }
681
682    #[tokio::test]
683    async fn should_scan_all_entries_for_key() {
684        // given
685        let log = LogDb::open(test_config()).await.unwrap();
686        log.append(vec![
687            Record {
688                key: Bytes::from("orders"),
689                value: Bytes::from("order-1"),
690            },
691            Record {
692                key: Bytes::from("orders"),
693                value: Bytes::from("order-2"),
694            },
695            Record {
696                key: Bytes::from("orders"),
697                value: Bytes::from("order-3"),
698            },
699        ])
700        .await
701        .unwrap();
702        log.flush().await.unwrap();
703
704        // when
705        let mut iter = log.scan(Bytes::from("orders"), ..).await.unwrap();
706        let mut entries = vec![];
707        while let Some(entry) = iter.next().await.unwrap() {
708            entries.push(entry);
709        }
710
711        // then
712        assert_eq!(entries.len(), 3);
713        assert_eq!(entries[0].sequence, 0);
714        assert_eq!(entries[0].value, Bytes::from("order-1"));
715        assert_eq!(entries[1].sequence, 1);
716        assert_eq!(entries[1].value, Bytes::from("order-2"));
717        assert_eq!(entries[2].sequence, 2);
718        assert_eq!(entries[2].value, Bytes::from("order-3"));
719    }
720
721    #[tokio::test]
722    async fn should_scan_with_sequence_range() {
723        // given
724        let log = LogDb::open(test_config()).await.unwrap();
725        log.append(vec![
726            Record {
727                key: Bytes::from("events"),
728                value: Bytes::from("event-0"),
729            },
730            Record {
731                key: Bytes::from("events"),
732                value: Bytes::from("event-1"),
733            },
734            Record {
735                key: Bytes::from("events"),
736                value: Bytes::from("event-2"),
737            },
738            Record {
739                key: Bytes::from("events"),
740                value: Bytes::from("event-3"),
741            },
742            Record {
743                key: Bytes::from("events"),
744                value: Bytes::from("event-4"),
745            },
746        ])
747        .await
748        .unwrap();
749        log.flush().await.unwrap();
750
751        // when - scan sequences 1..4 (exclusive end)
752        let mut iter = log.scan(Bytes::from("events"), 1..4).await.unwrap();
753        let mut entries = vec![];
754        while let Some(entry) = iter.next().await.unwrap() {
755            entries.push(entry);
756        }
757
758        // then
759        assert_eq!(entries.len(), 3);
760        assert_eq!(entries[0].sequence, 1);
761        assert_eq!(entries[1].sequence, 2);
762        assert_eq!(entries[2].sequence, 3);
763    }
764
765    #[tokio::test]
766    async fn should_scan_from_starting_sequence() {
767        // given
768        let log = LogDb::open(test_config()).await.unwrap();
769        log.append(vec![
770            Record {
771                key: Bytes::from("logs"),
772                value: Bytes::from("log-0"),
773            },
774            Record {
775                key: Bytes::from("logs"),
776                value: Bytes::from("log-1"),
777            },
778            Record {
779                key: Bytes::from("logs"),
780                value: Bytes::from("log-2"),
781            },
782        ])
783        .await
784        .unwrap();
785        log.flush().await.unwrap();
786
787        // when - scan from sequence 1 onwards
788        let mut iter = log.scan(Bytes::from("logs"), 1..).await.unwrap();
789        let mut entries = vec![];
790        while let Some(entry) = iter.next().await.unwrap() {
791            entries.push(entry);
792        }
793
794        // then
795        assert_eq!(entries.len(), 2);
796        assert_eq!(entries[0].sequence, 1);
797        assert_eq!(entries[1].sequence, 2);
798    }
799
800    #[tokio::test]
801    async fn should_scan_up_to_ending_sequence() {
802        // given
803        let log = LogDb::open(test_config()).await.unwrap();
804        log.append(vec![
805            Record {
806                key: Bytes::from("logs"),
807                value: Bytes::from("log-0"),
808            },
809            Record {
810                key: Bytes::from("logs"),
811                value: Bytes::from("log-1"),
812            },
813            Record {
814                key: Bytes::from("logs"),
815                value: Bytes::from("log-2"),
816            },
817        ])
818        .await
819        .unwrap();
820        log.flush().await.unwrap();
821
822        // when - scan up to sequence 2 (exclusive)
823        let mut iter = log.scan(Bytes::from("logs"), ..2).await.unwrap();
824        let mut entries = vec![];
825        while let Some(entry) = iter.next().await.unwrap() {
826            entries.push(entry);
827        }
828
829        // then
830        assert_eq!(entries.len(), 2);
831        assert_eq!(entries[0].sequence, 0);
832        assert_eq!(entries[1].sequence, 1);
833    }
834
835    #[tokio::test]
836    async fn should_scan_only_entries_for_specified_key() {
837        // given
838        let log = LogDb::open(test_config()).await.unwrap();
839        log.append(vec![
840            Record {
841                key: Bytes::from("key-a"),
842                value: Bytes::from("value-a-0"),
843            },
844            Record {
845                key: Bytes::from("key-b"),
846                value: Bytes::from("value-b-0"),
847            },
848            Record {
849                key: Bytes::from("key-a"),
850                value: Bytes::from("value-a-1"),
851            },
852            Record {
853                key: Bytes::from("key-b"),
854                value: Bytes::from("value-b-1"),
855            },
856        ])
857        .await
858        .unwrap();
859        log.flush().await.unwrap();
860
861        // when - scan only key-a
862        let mut iter = log.scan(Bytes::from("key-a"), ..).await.unwrap();
863        let mut entries = vec![];
864        while let Some(entry) = iter.next().await.unwrap() {
865            entries.push(entry);
866        }
867
868        // then - should only have entries for key-a
869        assert_eq!(entries.len(), 2);
870        assert_eq!(entries[0].key, Bytes::from("key-a"));
871        assert_eq!(entries[0].value, Bytes::from("value-a-0"));
872        assert_eq!(entries[1].key, Bytes::from("key-a"));
873        assert_eq!(entries[1].value, Bytes::from("value-a-1"));
874    }
875
876    #[tokio::test]
877    async fn should_return_empty_iterator_for_unknown_key() {
878        // given
879        let log = LogDb::open(test_config()).await.unwrap();
880        log.append(vec![Record {
881            key: Bytes::from("existing"),
882            value: Bytes::from("value"),
883        }])
884        .await
885        .unwrap();
886        log.flush().await.unwrap();
887
888        // when - scan for non-existent key
889        let mut iter = log.scan(Bytes::from("unknown"), ..).await.unwrap();
890        let entry = iter.next().await.unwrap();
891
892        // then
893        assert!(entry.is_none());
894    }
895
896    #[tokio::test]
897    async fn should_return_empty_iterator_for_empty_range() {
898        // given
899        let log = LogDb::open(test_config()).await.unwrap();
900        log.append(vec![
901            Record {
902                key: Bytes::from("key"),
903                value: Bytes::from("value-0"),
904            },
905            Record {
906                key: Bytes::from("key"),
907                value: Bytes::from("value-1"),
908            },
909        ])
910        .await
911        .unwrap();
912        log.flush().await.unwrap();
913
914        // when - scan range that doesn't include any existing sequences
915        let mut iter = log.scan(Bytes::from("key"), 10..20).await.unwrap();
916        let entry = iter.next().await.unwrap();
917
918        // then
919        assert!(entry.is_none());
920    }
921
922    #[tokio::test]
923    async fn should_scan_entries_via_log_reader() {
924        // given - create shared storage
925        let storage = create_storage(
926            &StorageConfig::InMemory,
927            StorageRuntime::new(),
928            StorageSemantics::new(),
929        )
930        .await
931        .unwrap();
932        let log = LogDb::new(storage.clone()).await.unwrap();
933        log.append(vec![
934            Record {
935                key: Bytes::from("orders"),
936                value: Bytes::from("order-1"),
937            },
938            Record {
939                key: Bytes::from("orders"),
940                value: Bytes::from("order-2"),
941            },
942            Record {
943                key: Bytes::from("orders"),
944                value: Bytes::from("order-3"),
945            },
946        ])
947        .await
948        .unwrap();
949        log.flush().await.unwrap();
950
951        // when - create LogDbReader sharing the same storage
952        let reader = LogDbReader::new(storage).await.unwrap();
953        let mut iter = reader.scan(Bytes::from("orders"), ..).await.unwrap();
954        let mut entries = vec![];
955        while let Some(entry) = iter.next().await.unwrap() {
956            entries.push(entry);
957        }
958
959        // then
960        assert_eq!(entries.len(), 3);
961        assert_eq!(entries[0].sequence, 0);
962        assert_eq!(entries[0].value, Bytes::from("order-1"));
963        assert_eq!(entries[1].sequence, 1);
964        assert_eq!(entries[1].value, Bytes::from("order-2"));
965        assert_eq!(entries[2].sequence, 2);
966        assert_eq!(entries[2].value, Bytes::from("order-3"));
967    }
968
969    #[tokio::test]
970    async fn should_scan_across_multiple_segments() {
971        // given - log with entries across multiple segments
972        let log = LogDb::open(test_config()).await.unwrap();
973
974        // write to segment 0
975        log.append(vec![
976            Record {
977                key: Bytes::from("events"),
978                value: Bytes::from("event-0"),
979            },
980            Record {
981                key: Bytes::from("events"),
982                value: Bytes::from("event-1"),
983            },
984        ])
985        .await
986        .unwrap();
987
988        // seal and create segment 1
989        log.seal_segment().await.unwrap();
990
991        // write to segment 1
992        log.append(vec![
993            Record {
994                key: Bytes::from("events"),
995                value: Bytes::from("event-2"),
996            },
997            Record {
998                key: Bytes::from("events"),
999                value: Bytes::from("event-3"),
1000            },
1001        ])
1002        .await
1003        .unwrap();
1004        log.flush().await.unwrap();
1005
1006        // when - scan all entries
1007        let mut iter = log.scan(Bytes::from("events"), ..).await.unwrap();
1008        let mut entries = vec![];
1009        while let Some(entry) = iter.next().await.unwrap() {
1010            entries.push(entry);
1011        }
1012
1013        // then - should see all 4 entries in order
1014        assert_eq!(entries.len(), 4);
1015        assert_eq!(entries[0].sequence, 0);
1016        assert_eq!(entries[0].value, Bytes::from("event-0"));
1017        assert_eq!(entries[1].sequence, 1);
1018        assert_eq!(entries[1].value, Bytes::from("event-1"));
1019        assert_eq!(entries[2].sequence, 2);
1020        assert_eq!(entries[2].value, Bytes::from("event-2"));
1021        assert_eq!(entries[3].sequence, 3);
1022        assert_eq!(entries[3].value, Bytes::from("event-3"));
1023    }
1024
1025    #[tokio::test]
1026    async fn should_scan_range_spanning_segments() {
1027        // given - log with entries across multiple segments
1028        let log = LogDb::open(test_config()).await.unwrap();
1029
1030        // segment 0: seq 0, 1
1031        log.append(vec![
1032            Record {
1033                key: Bytes::from("data"),
1034                value: Bytes::from("seg0-0"),
1035            },
1036            Record {
1037                key: Bytes::from("data"),
1038                value: Bytes::from("seg0-1"),
1039            },
1040        ])
1041        .await
1042        .unwrap();
1043
1044        log.seal_segment().await.unwrap();
1045
1046        // segment 1: seq 2, 3
1047        log.append(vec![
1048            Record {
1049                key: Bytes::from("data"),
1050                value: Bytes::from("seg1-2"),
1051            },
1052            Record {
1053                key: Bytes::from("data"),
1054                value: Bytes::from("seg1-3"),
1055            },
1056        ])
1057        .await
1058        .unwrap();
1059
1060        log.seal_segment().await.unwrap();
1061
1062        // segment 2: seq 4, 5
1063        log.append(vec![
1064            Record {
1065                key: Bytes::from("data"),
1066                value: Bytes::from("seg2-4"),
1067            },
1068            Record {
1069                key: Bytes::from("data"),
1070                value: Bytes::from("seg2-5"),
1071            },
1072        ])
1073        .await
1074        .unwrap();
1075        log.flush().await.unwrap();
1076
1077        // when - scan range 1..5 (spans segments 0, 1, 2)
1078        let mut iter = log.scan(Bytes::from("data"), 1..5).await.unwrap();
1079        let mut entries = vec![];
1080        while let Some(entry) = iter.next().await.unwrap() {
1081            entries.push(entry);
1082        }
1083
1084        // then - should see entries 1, 2, 3, 4
1085        assert_eq!(entries.len(), 4);
1086        assert_eq!(entries[0].sequence, 1);
1087        assert_eq!(entries[1].sequence, 2);
1088        assert_eq!(entries[2].sequence, 3);
1089        assert_eq!(entries[3].sequence, 4);
1090    }
1091
1092    #[tokio::test]
1093    async fn should_scan_single_segment_in_multi_segment_log() {
1094        // given - log with entries across multiple segments
1095        let log = LogDb::open(test_config()).await.unwrap();
1096
1097        // segment 0: seq 0, 1
1098        log.append(vec![
1099            Record {
1100                key: Bytes::from("key"),
1101                value: Bytes::from("v0"),
1102            },
1103            Record {
1104                key: Bytes::from("key"),
1105                value: Bytes::from("v1"),
1106            },
1107        ])
1108        .await
1109        .unwrap();
1110
1111        log.seal_segment().await.unwrap();
1112
1113        // segment 1: seq 2, 3
1114        log.append(vec![
1115            Record {
1116                key: Bytes::from("key"),
1117                value: Bytes::from("v2"),
1118            },
1119            Record {
1120                key: Bytes::from("key"),
1121                value: Bytes::from("v3"),
1122            },
1123        ])
1124        .await
1125        .unwrap();
1126        log.flush().await.unwrap();
1127
1128        // when - scan only segment 1's range
1129        let mut iter = log.scan(Bytes::from("key"), 2..4).await.unwrap();
1130        let mut entries = vec![];
1131        while let Some(entry) = iter.next().await.unwrap() {
1132            entries.push(entry);
1133        }
1134
1135        // then - should see only segment 1's entries
1136        assert_eq!(entries.len(), 2);
1137        assert_eq!(entries[0].sequence, 2);
1138        assert_eq!(entries[1].sequence, 3);
1139    }
1140
1141    #[tokio::test]
1142    async fn should_list_keys_returns_iterator() {
1143        // given
1144        let log = LogDb::open(test_config()).await.unwrap();
1145        log.append(vec![
1146            Record {
1147                key: Bytes::from("key-a"),
1148                value: Bytes::from("value-a"),
1149            },
1150            Record {
1151                key: Bytes::from("key-b"),
1152                value: Bytes::from("value-b"),
1153            },
1154        ])
1155        .await
1156        .unwrap();
1157        log.flush().await.unwrap();
1158
1159        // when
1160        let _iter = log.list_keys(..).await.unwrap();
1161
1162        // then - iterator is returned (full iteration tested when LogKeyIterator is implemented)
1163    }
1164
1165    #[tokio::test]
1166    async fn should_list_keys_via_log_reader() {
1167        // given - create shared storage
1168        let storage = create_storage(
1169            &StorageConfig::InMemory,
1170            StorageRuntime::new(),
1171            StorageSemantics::new(),
1172        )
1173        .await
1174        .unwrap();
1175        let log = LogDb::new(storage.clone()).await.unwrap();
1176        log.append(vec![
1177            Record {
1178                key: Bytes::from("key-a"),
1179                value: Bytes::from("value-a"),
1180            },
1181            Record {
1182                key: Bytes::from("key-b"),
1183                value: Bytes::from("value-b"),
1184            },
1185        ])
1186        .await
1187        .unwrap();
1188        log.flush().await.unwrap();
1189
1190        // when - create LogDbReader sharing the same storage
1191        let reader = LogDbReader::new(storage).await.unwrap();
1192        let _iter = reader.list_keys(..).await.unwrap();
1193
1194        // then - iterator is returned
1195    }
1196
1197    #[tokio::test]
1198    async fn should_list_keys_in_single_segment() {
1199        // given
1200        let log = LogDb::open(test_config()).await.unwrap();
1201        log.append(vec![
1202            Record {
1203                key: Bytes::from("key-a"),
1204                value: Bytes::from("value-a"),
1205            },
1206            Record {
1207                key: Bytes::from("key-b"),
1208                value: Bytes::from("value-b"),
1209            },
1210            Record {
1211                key: Bytes::from("key-c"),
1212                value: Bytes::from("value-c"),
1213            },
1214        ])
1215        .await
1216        .unwrap();
1217        log.flush().await.unwrap();
1218
1219        // when
1220        let mut iter = log.list_keys(..).await.unwrap();
1221        let mut keys = vec![];
1222        while let Some(key) = iter.next().await.unwrap() {
1223            keys.push(key.key);
1224        }
1225
1226        // then - keys returned in lexicographic order
1227        assert_eq!(keys.len(), 3);
1228        assert_eq!(keys[0], Bytes::from("key-a"));
1229        assert_eq!(keys[1], Bytes::from("key-b"));
1230        assert_eq!(keys[2], Bytes::from("key-c"));
1231    }
1232
1233    #[tokio::test]
1234    async fn should_list_keys_across_segments_after_roll() {
1235        // given - log with entries across multiple segments
1236        let log = LogDb::open(test_config()).await.unwrap();
1237
1238        // write to segment 0
1239        log.append(vec![
1240            Record {
1241                key: Bytes::from("key-a"),
1242                value: Bytes::from("value-a-0"),
1243            },
1244            Record {
1245                key: Bytes::from("key-b"),
1246                value: Bytes::from("value-b-0"),
1247            },
1248        ])
1249        .await
1250        .unwrap();
1251
1252        // seal and create segment 1
1253        log.seal_segment().await.unwrap();
1254
1255        // write to segment 1 with different keys
1256        log.append(vec![
1257            Record {
1258                key: Bytes::from("key-c"),
1259                value: Bytes::from("value-c-1"),
1260            },
1261            Record {
1262                key: Bytes::from("key-d"),
1263                value: Bytes::from("value-d-1"),
1264            },
1265        ])
1266        .await
1267        .unwrap();
1268        log.flush().await.unwrap();
1269
1270        // when
1271        let mut iter = log.list_keys(..).await.unwrap();
1272        let mut keys = vec![];
1273        while let Some(key) = iter.next().await.unwrap() {
1274            keys.push(key.key);
1275        }
1276
1277        // then - all keys from both segments
1278        assert_eq!(keys.len(), 4);
1279        assert_eq!(keys[0], Bytes::from("key-a"));
1280        assert_eq!(keys[1], Bytes::from("key-b"));
1281        assert_eq!(keys[2], Bytes::from("key-c"));
1282        assert_eq!(keys[3], Bytes::from("key-d"));
1283    }
1284
1285    #[tokio::test]
1286    async fn should_deduplicate_keys_across_segments() {
1287        // given - same key written to multiple segments
1288        let log = LogDb::open(test_config()).await.unwrap();
1289
1290        // write to segment 0
1291        log.append(vec![Record {
1292            key: Bytes::from("shared-key"),
1293            value: Bytes::from("value-0"),
1294        }])
1295        .await
1296        .unwrap();
1297
1298        // seal and create segment 1
1299        log.seal_segment().await.unwrap();
1300
1301        // write same key to segment 1
1302        log.append(vec![Record {
1303            key: Bytes::from("shared-key"),
1304            value: Bytes::from("value-1"),
1305        }])
1306        .await
1307        .unwrap();
1308
1309        // seal and create segment 2
1310        log.seal_segment().await.unwrap();
1311
1312        // write same key to segment 2
1313        log.append(vec![Record {
1314            key: Bytes::from("shared-key"),
1315            value: Bytes::from("value-2"),
1316        }])
1317        .await
1318        .unwrap();
1319        log.flush().await.unwrap();
1320
1321        // when
1322        let mut iter = log.list_keys(..).await.unwrap();
1323        let mut keys = vec![];
1324        while let Some(key) = iter.next().await.unwrap() {
1325            keys.push(key.key);
1326        }
1327
1328        // then - key appears only once despite being in 3 segments
1329        assert_eq!(keys.len(), 1);
1330        assert_eq!(keys[0], Bytes::from("shared-key"));
1331    }
1332
1333    #[tokio::test]
1334    async fn should_list_keys_in_lexicographic_order() {
1335        // given - keys inserted out of order
1336        let log = LogDb::open(test_config()).await.unwrap();
1337        log.append(vec![
1338            Record {
1339                key: Bytes::from("zebra"),
1340                value: Bytes::from("value"),
1341            },
1342            Record {
1343                key: Bytes::from("apple"),
1344                value: Bytes::from("value"),
1345            },
1346            Record {
1347                key: Bytes::from("mango"),
1348                value: Bytes::from("value"),
1349            },
1350        ])
1351        .await
1352        .unwrap();
1353        log.flush().await.unwrap();
1354
1355        // when
1356        let mut iter = log.list_keys(..).await.unwrap();
1357        let mut keys = vec![];
1358        while let Some(key) = iter.next().await.unwrap() {
1359            keys.push(key.key);
1360        }
1361
1362        // then - sorted lexicographically
1363        assert_eq!(keys[0], Bytes::from("apple"));
1364        assert_eq!(keys[1], Bytes::from("mango"));
1365        assert_eq!(keys[2], Bytes::from("zebra"));
1366    }
1367
1368    #[tokio::test]
1369    async fn should_list_empty_when_no_entries() {
1370        // given
1371        let log = LogDb::open(test_config()).await.unwrap();
1372
1373        // when
1374        let mut iter = log.list_keys(..).await.unwrap();
1375
1376        // then
1377        assert!(iter.next().await.unwrap().is_none());
1378    }
1379
1380    #[tokio::test]
1381    async fn should_list_keys_respects_segment_range() {
1382        // given - entries in different segments
1383        let log = LogDb::open(test_config()).await.unwrap();
1384
1385        // segment 0
1386        log.append(vec![
1387            Record {
1388                key: Bytes::from("key-seg0"),
1389                value: Bytes::from("value"),
1390            },
1391            Record {
1392                key: Bytes::from("key-seg0-b"),
1393                value: Bytes::from("value"),
1394            },
1395        ])
1396        .await
1397        .unwrap();
1398
1399        log.seal_segment().await.unwrap();
1400
1401        // segment 1
1402        log.append(vec![
1403            Record {
1404                key: Bytes::from("key-seg1"),
1405                value: Bytes::from("value"),
1406            },
1407            Record {
1408                key: Bytes::from("key-seg1-b"),
1409                value: Bytes::from("value"),
1410            },
1411        ])
1412        .await
1413        .unwrap();
1414
1415        log.seal_segment().await.unwrap();
1416
1417        // segment 2
1418        log.append(vec![
1419            Record {
1420                key: Bytes::from("key-seg2"),
1421                value: Bytes::from("value"),
1422            },
1423            Record {
1424                key: Bytes::from("key-seg2-b"),
1425                value: Bytes::from("value"),
1426            },
1427        ])
1428        .await
1429        .unwrap();
1430        log.flush().await.unwrap();
1431
1432        // when - list only keys from segment 1
1433        let mut iter = log.list_keys(1..2).await.unwrap();
1434        let mut keys = vec![];
1435        while let Some(key) = iter.next().await.unwrap() {
1436            keys.push(key.key);
1437        }
1438
1439        // then - only keys from segment 1
1440        assert_eq!(keys.len(), 2);
1441        assert_eq!(keys[0], Bytes::from("key-seg1"));
1442        assert_eq!(keys[1], Bytes::from("key-seg1-b"));
1443    }
1444
1445    #[tokio::test]
1446    async fn should_list_segments_returns_empty_when_no_segments() {
1447        // given
1448        let log = LogDb::open(test_config()).await.unwrap();
1449
1450        // when
1451        let segments = log.list_segments(..).await.unwrap();
1452
1453        // then
1454        assert!(segments.is_empty());
1455    }
1456
1457    #[tokio::test]
1458    async fn should_list_segments_returns_single_segment() {
1459        // given
1460        let log = LogDb::open(test_config()).await.unwrap();
1461        log.append(vec![Record {
1462            key: Bytes::from("key"),
1463            value: Bytes::from("value"),
1464        }])
1465        .await
1466        .unwrap();
1467        log.flush().await.unwrap();
1468
1469        // when
1470        let segments = log.list_segments(..).await.unwrap();
1471
1472        // then
1473        assert_eq!(segments.len(), 1);
1474        assert_eq!(segments[0].id, 0);
1475        assert_eq!(segments[0].start_seq, 0);
1476    }
1477
1478    #[tokio::test]
1479    async fn should_list_segments_returns_multiple_segments() {
1480        // given
1481        let log = LogDb::open(test_config()).await.unwrap();
1482
1483        // segment 0
1484        log.append(vec![Record {
1485            key: Bytes::from("key"),
1486            value: Bytes::from("value-0"),
1487        }])
1488        .await
1489        .unwrap();
1490
1491        log.seal_segment().await.unwrap();
1492
1493        // segment 1
1494        log.append(vec![Record {
1495            key: Bytes::from("key"),
1496            value: Bytes::from("value-1"),
1497        }])
1498        .await
1499        .unwrap();
1500
1501        log.seal_segment().await.unwrap();
1502
1503        // segment 2
1504        log.append(vec![Record {
1505            key: Bytes::from("key"),
1506            value: Bytes::from("value-2"),
1507        }])
1508        .await
1509        .unwrap();
1510        log.flush().await.unwrap();
1511
1512        // when
1513        let segments = log.list_segments(..).await.unwrap();
1514
1515        // then
1516        assert_eq!(segments.len(), 3);
1517        assert_eq!(segments[0].id, 0);
1518        assert_eq!(segments[0].start_seq, 0);
1519        assert_eq!(segments[1].id, 1);
1520        assert_eq!(segments[1].start_seq, 1);
1521        assert_eq!(segments[2].id, 2);
1522        assert_eq!(segments[2].start_seq, 2);
1523    }
1524
1525    #[tokio::test]
1526    async fn should_list_segments_filters_by_sequence_range() {
1527        // given
1528        let log = LogDb::open(test_config()).await.unwrap();
1529
1530        // segment 0: seq 0, 1
1531        log.append(vec![
1532            Record {
1533                key: Bytes::from("key"),
1534                value: Bytes::from("v0"),
1535            },
1536            Record {
1537                key: Bytes::from("key"),
1538                value: Bytes::from("v1"),
1539            },
1540        ])
1541        .await
1542        .unwrap();
1543
1544        log.seal_segment().await.unwrap();
1545
1546        // segment 1: seq 2, 3
1547        log.append(vec![
1548            Record {
1549                key: Bytes::from("key"),
1550                value: Bytes::from("v2"),
1551            },
1552            Record {
1553                key: Bytes::from("key"),
1554                value: Bytes::from("v3"),
1555            },
1556        ])
1557        .await
1558        .unwrap();
1559
1560        log.seal_segment().await.unwrap();
1561
1562        // segment 2: seq 4, 5
1563        log.append(vec![
1564            Record {
1565                key: Bytes::from("key"),
1566                value: Bytes::from("v4"),
1567            },
1568            Record {
1569                key: Bytes::from("key"),
1570                value: Bytes::from("v5"),
1571            },
1572        ])
1573        .await
1574        .unwrap();
1575        log.flush().await.unwrap();
1576
1577        // when - query range that spans segment 1
1578        let segments = log.list_segments(2..4).await.unwrap();
1579
1580        // then - only segment 1 matches
1581        assert_eq!(segments.len(), 1);
1582        assert_eq!(segments[0].id, 1);
1583        assert_eq!(segments[0].start_seq, 2);
1584    }
1585
1586    #[tokio::test]
1587    async fn should_list_segments_via_log_reader() {
1588        // given
1589        let storage = create_storage(
1590            &StorageConfig::InMemory,
1591            StorageRuntime::new(),
1592            StorageSemantics::new(),
1593        )
1594        .await
1595        .unwrap();
1596        let log = LogDb::new(storage.clone()).await.unwrap();
1597
1598        log.append(vec![Record {
1599            key: Bytes::from("key"),
1600            value: Bytes::from("value-0"),
1601        }])
1602        .await
1603        .unwrap();
1604
1605        log.seal_segment().await.unwrap();
1606
1607        log.append(vec![Record {
1608            key: Bytes::from("key"),
1609            value: Bytes::from("value-1"),
1610        }])
1611        .await
1612        .unwrap();
1613        log.flush().await.unwrap();
1614
1615        // when
1616        let reader = LogDbReader::new(storage).await.unwrap();
1617        let segments = reader.list_segments(..).await.unwrap();
1618
1619        // then
1620        assert_eq!(segments.len(), 2);
1621        assert_eq!(segments[0].id, 0);
1622        assert_eq!(segments[1].id, 1);
1623    }
1624
1625    #[tokio::test]
1626    async fn should_list_segments_includes_start_time() {
1627        // given
1628        let log = LogDb::open(test_config()).await.unwrap();
1629        log.append(vec![Record {
1630            key: Bytes::from("key"),
1631            value: Bytes::from("value"),
1632        }])
1633        .await
1634        .unwrap();
1635        log.flush().await.unwrap();
1636
1637        // when
1638        let segments = log.list_segments(..).await.unwrap();
1639
1640        // then - start_time_ms should be a reasonable timestamp (after year 2020)
1641        assert_eq!(segments.len(), 1);
1642        assert!(segments[0].start_time_ms > 1577836800000); // 2020-01-01
1643    }
1644}