Skip to main content

log/
log.rs

1//! Core LogDb implementation with read and write APIs.
2//!
3//! This module provides the [`LogDb`] struct, the primary entry point for
4//! interacting with OpenData Log. It exposes both write operations
5//! ([`try_append`](LogDb::try_append), [`append_timeout`](LogDb::append_timeout))
6//! and read operations ([`scan`], [`count`]) via the [`LogRead`] trait.
7
8use std::ops::RangeBounds;
9use std::sync::Arc;
10use std::time::Duration;
11
12use async_trait::async_trait;
13use bytes::Bytes;
14use common::clock::{Clock, SystemClock};
15use common::coordinator::{Durability, EpochWatcher, EpochWatermarks};
16use common::storage::factory::create_storage;
17use common::{StorageRuntime, StorageSemantics};
18use tokio::sync::RwLock;
19use tokio::sync::watch;
20use tokio::task::JoinHandle;
21
22use crate::config::{CountOptions, ScanOptions, SegmentConfig};
23use crate::error::{AppendResult, Error, Result};
24use crate::listing::ListingCache;
25use crate::listing::LogKeyIterator;
26use crate::model::{AppendOutput, Record, Segment, SegmentId, Sequence};
27use crate::range::{normalize_segment_id, normalize_sequence};
28use crate::reader::{LogIterator, LogRead, LogReadView};
29use crate::segment::{LogSegment, SegmentCache};
30use crate::serde::SEQ_BLOCK_KEY;
31use crate::writer::{LogWrite, LogWriteHandle, LogWriter, LogWriterConfig, WrittenView};
32
33/// The main log interface providing read and write operations.
34///
35/// `LogDb` is the primary entry point for interacting with OpenData Log.
36/// It provides methods to append records, scan entries, and count records
37/// within a key's log.
38///
39/// # Read Operations
40///
41/// Read operations are provided via the [`LogRead`] trait, which `LogDb`
42/// implements. This allows generic code to work with either `LogDb` or
43/// [`LogDbReader`](crate::LogDbReader).
44///
45/// # Thread Safety
46///
47/// `LogDb` is designed to be shared across threads. All methods take `&self`
48/// and internal synchronization is handled automatically.
49///
50/// # Visibility and Durability
51///
52/// Appended records are not immediately visible to reads. The
53/// [`flush()`](LogDb::flush) method ensures all pending data is durably
54/// persisted to storage, but does not block on reader synchronization.
55/// Instead, read operations ([`scan`](LogRead::scan),
56/// [`list_keys`](LogRead::list_keys),
57/// [`list_segments`](LogRead::list_segments)) synchronize themselves by
58/// waiting for the reader view to catch up to the coordinator's current
59/// flushed watermark before returning results. This means:
60///
61/// - After `flush()`, subsequent reads are guaranteed to see all flushed
62///   data.
63/// - Data that becomes visible through background flush activity (without
64///   an explicit `flush()` call) may not yet be durable.
65///
66/// In the future, stronger guarantees such as a "read-durable" mode may
67/// be introduced, where the read view advances only after data is
68/// confirmed durable.
69///
70/// # Writer Semantics
71///
72/// Currently, each log supports a single writer. Multi-writer support may
73/// be added in the future, but would require each key to have a single
74/// writer to maintain monotonic ordering within that key's log.
75///
76/// # Example
77///
78/// ```
79/// # use log::{LogDb, LogRead, Config, Record};
80/// # use bytes::Bytes;
81/// # use common::StorageConfig;
82/// # #[tokio::main]
83/// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
84/// let config = Config { storage: StorageConfig::InMemory, ..Default::default() };
85/// let log = LogDb::open(config).await?;
86///
87/// // Append records
88/// let records = vec![
89///     Record { key: Bytes::from("user:123"), value: Bytes::from("event-a") },
90///     Record { key: Bytes::from("user:456"), value: Bytes::from("event-b") },
91/// ];
92/// log.try_append(records).await?;
93/// log.flush().await?;
94///
95/// // Scan entries for a specific key
96/// let mut iter = log.scan(Bytes::from("user:123"), ..).await?;
97/// while let Some(entry) = iter.next().await? {
98///     println!("seq={}: {:?}", entry.sequence, entry.value);
99/// }
100/// # Ok(())
101/// # }
102/// ```
103pub struct LogDb {
104    handle: LogWriteHandle,
105    writer_task: JoinHandle<()>,
106    storage: Arc<dyn common::Storage>,
107    clock: Arc<dyn Clock>,
108    read_view: Arc<RwLock<LogReadView>>,
109    epoch_watcher: EpochWatcher,
110    flushed_subscriber_task: JoinHandle<()>,
111}
112
113impl LogDb {
114    /// Opens or creates a log with the given configuration.
115    ///
116    /// This is the primary entry point for creating a `LogDb` instance. The
117    /// configuration specifies the storage backend and other settings.
118    ///
119    /// # Arguments
120    ///
121    /// * `config` - Configuration specifying storage backend and settings.
122    ///
123    /// # Errors
124    ///
125    /// Returns an error if the storage backend cannot be initialized.
126    ///
127    /// # Example
128    ///
129    /// ```ignore
130    /// use log::{LogDb, Config};
131    ///
132    /// let log = LogDb::open(test_config()).await?;
133    /// ```
134    pub async fn open(config: crate::config::Config) -> Result<Self> {
135        LogDbBuilder::new(config).build().await
136    }
137
138    /// Registers storage engine metrics into the given Prometheus registry.
139    #[cfg(feature = "http-server")]
140    pub fn register_metrics(&self, registry: &mut prometheus_client::registry::Registry) {
141        self.storage.register_metrics(registry);
142    }
143
144    /// Appends records to the log without blocking.
145    ///
146    /// Records are assigned sequence numbers in the order they appear in the
147    /// input vector. All records in a single append call are written atomically.
148    ///
149    /// Fails immediately with [`AppendError::QueueFull`](crate::AppendError::QueueFull) if the write queue
150    /// is full. The returned error contains the original batch so callers can
151    /// retry without cloning.
152    ///
153    /// Durability is **not** awaited. Call [`flush()`](LogDb::flush) after
154    /// appending to ensure records are persisted.
155    ///
156    /// # Arguments
157    ///
158    /// * `records` - The records to append. Each record specifies its target
159    ///   key and value.
160    ///
161    /// # Returns
162    ///
163    /// On success, returns an [`AppendOutput`] containing the starting sequence
164    /// number assigned to the batch.
165    ///
166    /// # Example
167    ///
168    /// ```ignore
169    /// let records = vec![
170    ///     Record { key: Bytes::from("events"), value: Bytes::from("event-1") },
171    ///     Record { key: Bytes::from("events"), value: Bytes::from("event-2") },
172    /// ];
173    /// let result = log.try_append(records).await?;
174    /// println!("Appended at seq {}", result.start_sequence);
175    /// ```
176    pub async fn try_append(&self, records: Vec<Record>) -> AppendResult<AppendOutput> {
177        self.append_inner(records, None).await
178    }
179
180    /// Appends records to the log, blocking up to `timeout` for queue space.
181    ///
182    /// Records are assigned sequence numbers in the order they appear in the
183    /// input vector. All records in a single append call are written atomically.
184    ///
185    /// Returns [`AppendError::Timeout`](crate::AppendError::Timeout) if the queue does not drain within the
186    /// deadline. The returned error contains the original batch for retry.
187    ///
188    /// Durability is **not** awaited. Call [`flush()`](LogDb::flush) after
189    /// appending to ensure records are persisted.
190    ///
191    /// # Arguments
192    ///
193    /// * `records` - The records to append.
194    /// * `timeout` - Maximum duration to wait for queue space.
195    ///
196    /// # Returns
197    ///
198    /// On success, returns an [`AppendOutput`] containing the starting sequence
199    /// number assigned to the batch.
200    ///
201    /// # Example
202    ///
203    /// ```ignore
204    /// use std::time::Duration;
205    ///
206    /// let records = vec![
207    ///     Record { key: Bytes::from("events"), value: Bytes::from("critical-event") },
208    /// ];
209    /// let result = log.append_timeout(records, Duration::from_secs(5)).await?;
210    /// println!("Started at sequence {}", result.start_sequence);
211    /// ```
212    pub async fn append_timeout(
213        &self,
214        records: Vec<Record>,
215        timeout: Duration,
216    ) -> AppendResult<AppendOutput> {
217        self.append_inner(records, Some(timeout)).await
218    }
219
220    /// Shared implementation for `try_append` and `append_timeout`.
221    async fn append_inner(
222        &self,
223        records: Vec<Record>,
224        timeout: Option<Duration>,
225    ) -> AppendResult<AppendOutput> {
226        if records.is_empty() {
227            return Ok(AppendOutput { start_sequence: 0 });
228        }
229
230        let write = LogWrite {
231            records,
232            timestamp_ms: self.current_time_ms(),
233        };
234
235        let result = if let Some(t) = timeout {
236            self.handle.append_timeout(write, t).await
237        } else {
238            self.handle.try_append(write).await
239        }?;
240
241        // Safe to unwrap: append_inner is only called with non-empty records,
242        // and the writer returns Some for non-empty writes.
243        Ok(result.expect("non-empty append must produce output"))
244    }
245
246    /// Returns the current time in milliseconds since Unix epoch.
247    fn current_time_ms(&self) -> i64 {
248        self.clock
249            .now()
250            .duration_since(std::time::UNIX_EPOCH)
251            .unwrap()
252            .as_millis() as i64
253    }
254
255    /// Checks if the underlying storage is accessible.
256    ///
257    /// This performs a lightweight read operation to verify that the storage
258    /// backend is responding. Use this for health/readiness checks.
259    ///
260    /// # Returns
261    ///
262    /// Returns `Ok(())` if storage is accessible, or an error if the check fails.
263    #[cfg(feature = "http-server")]
264    pub(crate) async fn check_storage(&self) -> Result<()> {
265        // Read the sequence block - this is a single key lookup that verifies
266        // storage is accessible without scanning or listing data.
267        let seq_key = Bytes::from_static(&crate::serde::SEQ_BLOCK_KEY);
268        let _ = self.storage.get(seq_key).await?;
269        Ok(())
270    }
271
272    /// Forces creation of a new segment, sealing the current one.
273    ///
274    /// This is an internal API for testing multi-segment scenarios. It forces
275    /// subsequent appends to write to a new segment, regardless of any
276    /// configured seal interval.
277    #[cfg(test)]
278    pub(crate) async fn seal_segment(&self) -> Result<()> {
279        self.handle.force_seal(self.current_time_ms()).await?;
280        self.flush().await?;
281        Ok(())
282    }
283
284    /// Flushes all pending writes to durable storage.
285    ///
286    /// This method ensures that all acknowledged writes are durably persisted
287    /// to storage.
288    pub async fn flush(&self) -> Result<()> {
289        self.handle.flush().await
290    }
291
292    /// Waits for the flushed subscriber to apply all writes up to the current epoch.
293    async fn sync_to_flushed(&self) -> Result<()> {
294        let target = self.handle.flushed_epoch();
295        self.epoch_watcher
296            .clone()
297            .wait(target, Durability::Written)
298            .await
299            .map_err(|_| Error::Internal("writer shut down".into()))?;
300        Ok(())
301    }
302
303    /// Closes the log, flushing any pending data and releasing resources.
304    ///
305    /// All appended data is flushed to durable storage before the log is
306    /// closed. For SlateDB-backed storage, this also releases the database
307    /// fence.
308    pub async fn close(self) -> Result<()> {
309        self.flush().await?;
310        // Drop the handle to signal the writer to stop
311        drop(self.handle);
312        let _ = self.writer_task.await;
313        self.flushed_subscriber_task.abort();
314        self.storage
315            .close()
316            .await
317            .map_err(|e| Error::Storage(e.to_string()))?;
318        Ok(())
319    }
320
321    /// Creates a LogDb from an existing storage implementation.
322    #[cfg(test)]
323    pub(crate) async fn new(storage: Arc<dyn common::Storage>) -> Result<Self> {
324        Self::from_storage(storage, SegmentConfig::default()).await
325    }
326
327    /// Shared construction logic used by both `LogDb::new` and `LogDbBuilder::build`.
328    async fn from_storage(
329        storage: Arc<dyn common::Storage>,
330        segment_config: SegmentConfig,
331    ) -> Result<Self> {
332        let clock: Arc<dyn Clock> = Arc::new(SystemClock);
333
334        let seq_key = Bytes::from_static(&SEQ_BLOCK_KEY);
335        let sequence_allocator = common::SequenceAllocator::load(storage.as_ref(), seq_key)
336            .await
337            .map_err(|e| Error::Internal(e.to_string()))?;
338        let snapshot = storage
339            .snapshot()
340            .await
341            .map_err(|e| Error::Storage(e.to_string()))?;
342        let segment_cache = SegmentCache::open(snapshot.as_ref(), segment_config).await?;
343        let listing_cache = ListingCache::new();
344
345        let (writer, mut handle) = LogWriter::new(
346            storage.clone(),
347            sequence_allocator,
348            segment_cache.clone(),
349            listing_cache,
350            LogWriterConfig::default(),
351        )
352        .await
353        .map_err(Error::Storage)?;
354
355        let written_rx = handle.written_rx();
356        let writer_task = handle.spawn(writer);
357
358        let read_view = Arc::new(RwLock::new(LogReadView::new(
359            snapshot as Arc<dyn common::StorageRead>,
360            segment_cache,
361        )));
362
363        let (epoch_watcher, flushed_subscriber_task) =
364            spawn_flushed_subscriber(written_rx, Arc::clone(&read_view));
365
366        Ok(Self {
367            handle,
368            writer_task,
369            storage,
370            clock,
371            read_view,
372            epoch_watcher,
373            flushed_subscriber_task,
374        })
375    }
376}
377
378#[async_trait]
379impl LogRead for LogDb {
380    async fn scan_with_options(
381        &self,
382        key: Bytes,
383        seq_range: impl RangeBounds<Sequence> + Send,
384        options: ScanOptions,
385    ) -> Result<LogIterator> {
386        self.sync_to_flushed().await?;
387        let seq_range = normalize_sequence(&seq_range);
388        let view = self.read_view.read().await;
389        Ok(view.scan_with_options(key, seq_range, &options))
390    }
391
392    async fn count_with_options(
393        &self,
394        _key: Bytes,
395        _seq_range: impl RangeBounds<Sequence> + Send,
396        _options: CountOptions,
397    ) -> Result<u64> {
398        todo!()
399    }
400
401    async fn list_keys(
402        &self,
403        segment_range: impl RangeBounds<SegmentId> + Send,
404    ) -> Result<LogKeyIterator> {
405        self.sync_to_flushed().await?;
406        let segment_range = normalize_segment_id(&segment_range);
407        let view = self.read_view.read().await;
408        view.list_keys(segment_range).await
409    }
410
411    async fn list_segments(
412        &self,
413        seq_range: impl RangeBounds<Sequence> + Send,
414    ) -> Result<Vec<Segment>> {
415        self.sync_to_flushed().await?;
416        let seq_range = normalize_sequence(&seq_range);
417        let view = self.read_view.read().await;
418        Ok(view.list_segments(&seq_range))
419    }
420}
421
422/// Builder for creating LogDb instances with custom options.
423///
424/// This builder provides a fluent API for configuring a LogDb, including
425/// runtime options that cannot be serialized in configuration files.
426///
427/// # Example
428///
429/// ```ignore
430/// use log::LogDbBuilder;
431/// use log::Config;
432/// use common::StorageRuntime;
433///
434/// // Create a separate runtime for compaction (important for sync/JNI usage)
435/// let compaction_runtime = tokio::runtime::Builder::new_multi_thread()
436///     .worker_threads(2)
437///     .enable_all()
438///     .build()
439///     .unwrap();
440///
441/// let runtime = StorageRuntime::new()
442///     .with_compaction_runtime(compaction_runtime.handle().clone());
443///
444/// let log = LogDbBuilder::new(config)
445///     .with_storage_runtime(runtime)
446///     .build()
447///     .await?;
448/// ```
449pub struct LogDbBuilder {
450    config: crate::config::Config,
451    storage_runtime: StorageRuntime,
452}
453
454impl LogDbBuilder {
455    /// Creates a new log builder with the given configuration.
456    pub fn new(config: crate::config::Config) -> Self {
457        Self {
458            config,
459            storage_runtime: StorageRuntime::new(),
460        }
461    }
462
463    /// Sets the storage runtime options.
464    ///
465    /// Use this to configure runtime options like the compaction runtime handle.
466    pub fn with_storage_runtime(mut self, runtime: StorageRuntime) -> Self {
467        self.storage_runtime = runtime;
468        self
469    }
470
471    /// Builds the LogDb instance.
472    pub async fn build(self) -> Result<LogDb> {
473        let storage = create_storage(
474            &self.config.storage,
475            self.storage_runtime,
476            StorageSemantics::new(),
477        )
478        .await
479        .map_err(|e| Error::Storage(e.to_string()))?;
480
481        LogDb::from_storage(storage, self.config.segmentation).await
482    }
483}
484
485fn spawn_flushed_subscriber(
486    mut written_rx: watch::Receiver<WrittenView>,
487    read_view: Arc<RwLock<LogReadView>>,
488) -> (EpochWatcher, JoinHandle<()>) {
489    let (watermarks, watcher) = EpochWatermarks::new();
490    let task = tokio::spawn(async move {
491        let mut last_segments: Option<Arc<[LogSegment]>> = None;
492        while written_rx.changed().await.is_ok() {
493            let view = written_rx.borrow_and_update().clone();
494
495            let mut rv = read_view.write().await;
496            rv.update_snapshot(view.snapshot as Arc<dyn common::StorageRead>);
497
498            // Only rebuild segment cache when segments actually changed
499            if !last_segments
500                .as_ref()
501                .is_some_and(|s| Arc::ptr_eq(s, &view.segments))
502            {
503                rv.replace_segments(&view.segments);
504                last_segments = Some(Arc::clone(&view.segments));
505            }
506
507            watermarks.update_written(view.epoch);
508        }
509    });
510    (watcher, task)
511}
512
513#[cfg(test)]
514mod tests {
515    use common::StorageConfig;
516    use common::storage::factory::create_storage;
517
518    use super::*;
519    use crate::config::Config;
520    use crate::reader::LogDbReader;
521
522    fn test_config() -> Config {
523        Config {
524            storage: StorageConfig::InMemory,
525            ..Default::default()
526        }
527    }
528
529    #[tokio::test]
530    async fn should_open_log_with_in_memory_config() {
531        // given
532        let config = test_config();
533
534        // when
535        let result = LogDb::open(config).await;
536
537        // then
538        assert!(result.is_ok());
539    }
540
541    #[tokio::test]
542    async fn should_append_single_record() {
543        // given
544        let log = LogDb::open(test_config()).await.unwrap();
545        let records = vec![Record {
546            key: Bytes::from("orders"),
547            value: Bytes::from("order-1"),
548        }];
549
550        // when
551        log.try_append(records).await.unwrap();
552
553        // then - verify entry can be read back
554        let mut iter = log.scan(Bytes::from("orders"), ..).await.unwrap();
555        let entry = iter.next().await.unwrap().unwrap();
556        assert_eq!(entry.sequence, 0);
557        assert_eq!(entry.value, Bytes::from("order-1"));
558        assert!(iter.next().await.unwrap().is_none());
559    }
560
561    #[tokio::test]
562    async fn should_append_multiple_records_in_batch() {
563        // given
564        let log = LogDb::open(test_config()).await.unwrap();
565        let records = vec![
566            Record {
567                key: Bytes::from("orders"),
568                value: Bytes::from("order-1"),
569            },
570            Record {
571                key: Bytes::from("orders"),
572                value: Bytes::from("order-2"),
573            },
574            Record {
575                key: Bytes::from("orders"),
576                value: Bytes::from("order-3"),
577            },
578        ];
579
580        // when
581        log.try_append(records).await.unwrap();
582
583        // then - verify entries with sequential sequence numbers
584        let mut iter = log.scan(Bytes::from("orders"), ..).await.unwrap();
585
586        let entry0 = iter.next().await.unwrap().unwrap();
587        assert_eq!(entry0.sequence, 0);
588        assert_eq!(entry0.value, Bytes::from("order-1"));
589
590        let entry1 = iter.next().await.unwrap().unwrap();
591        assert_eq!(entry1.sequence, 1);
592        assert_eq!(entry1.value, Bytes::from("order-2"));
593
594        let entry2 = iter.next().await.unwrap().unwrap();
595        assert_eq!(entry2.sequence, 2);
596        assert_eq!(entry2.value, Bytes::from("order-3"));
597
598        assert!(iter.next().await.unwrap().is_none());
599    }
600
601    #[tokio::test]
602    async fn should_append_empty_records_without_error() {
603        // given
604        let log = LogDb::open(test_config()).await.unwrap();
605        let records: Vec<Record> = vec![];
606
607        // when
608        let result = log.try_append(records).await;
609
610        // then
611        assert!(result.is_ok());
612
613        // verify no entries exist
614        let mut iter = log.scan(Bytes::from("any-key"), ..).await.unwrap();
615        assert!(iter.next().await.unwrap().is_none());
616    }
617
618    #[tokio::test]
619    async fn should_assign_sequential_sequences_across_appends() {
620        // given
621        let log = LogDb::open(test_config()).await.unwrap();
622
623        // when - first append
624        log.try_append(vec![
625            Record {
626                key: Bytes::from("events"),
627                value: Bytes::from("event-1"),
628            },
629            Record {
630                key: Bytes::from("events"),
631                value: Bytes::from("event-2"),
632            },
633        ])
634        .await
635        .unwrap();
636
637        // when - second append
638        log.try_append(vec![Record {
639            key: Bytes::from("events"),
640            value: Bytes::from("event-3"),
641        }])
642        .await
643        .unwrap();
644
645        // then - verify sequences are 0, 1, 2 across appends
646        let mut iter = log.scan(Bytes::from("events"), ..).await.unwrap();
647
648        let entry0 = iter.next().await.unwrap().unwrap();
649        assert_eq!(entry0.sequence, 0);
650
651        let entry1 = iter.next().await.unwrap().unwrap();
652        assert_eq!(entry1.sequence, 1);
653
654        let entry2 = iter.next().await.unwrap().unwrap();
655        assert_eq!(entry2.sequence, 2);
656
657        assert!(iter.next().await.unwrap().is_none());
658    }
659
660    #[tokio::test]
661    async fn should_store_records_with_correct_keys_and_values() {
662        // given
663        let log = LogDb::open(test_config()).await.unwrap();
664        let records = vec![
665            Record {
666                key: Bytes::from("topic-a"),
667                value: Bytes::from("message-a"),
668            },
669            Record {
670                key: Bytes::from("topic-b"),
671                value: Bytes::from("message-b"),
672            },
673        ];
674
675        // when
676        log.try_append(records).await.unwrap();
677
678        // then - verify entries for topic-a
679        let mut iter_a = log.scan(Bytes::from("topic-a"), ..).await.unwrap();
680        let entry_a = iter_a.next().await.unwrap().unwrap();
681        assert_eq!(entry_a.key, Bytes::from("topic-a"));
682        assert_eq!(entry_a.value, Bytes::from("message-a"));
683        assert!(iter_a.next().await.unwrap().is_none());
684
685        // then - verify entries for topic-b
686        let mut iter_b = log.scan(Bytes::from("topic-b"), ..).await.unwrap();
687        let entry_b = iter_b.next().await.unwrap().unwrap();
688        assert_eq!(entry_b.key, Bytes::from("topic-b"));
689        assert_eq!(entry_b.value, Bytes::from("message-b"));
690        assert!(iter_b.next().await.unwrap().is_none());
691    }
692
693    #[tokio::test]
694    async fn should_scan_all_entries_for_key() {
695        // given
696        let log = LogDb::open(test_config()).await.unwrap();
697        log.try_append(vec![
698            Record {
699                key: Bytes::from("orders"),
700                value: Bytes::from("order-1"),
701            },
702            Record {
703                key: Bytes::from("orders"),
704                value: Bytes::from("order-2"),
705            },
706            Record {
707                key: Bytes::from("orders"),
708                value: Bytes::from("order-3"),
709            },
710        ])
711        .await
712        .unwrap();
713
714        // when
715        let mut iter = log.scan(Bytes::from("orders"), ..).await.unwrap();
716        let mut entries = vec![];
717        while let Some(entry) = iter.next().await.unwrap() {
718            entries.push(entry);
719        }
720
721        // then
722        assert_eq!(entries.len(), 3);
723        assert_eq!(entries[0].sequence, 0);
724        assert_eq!(entries[0].value, Bytes::from("order-1"));
725        assert_eq!(entries[1].sequence, 1);
726        assert_eq!(entries[1].value, Bytes::from("order-2"));
727        assert_eq!(entries[2].sequence, 2);
728        assert_eq!(entries[2].value, Bytes::from("order-3"));
729    }
730
731    #[tokio::test]
732    async fn should_scan_with_sequence_range() {
733        // given
734        let log = LogDb::open(test_config()).await.unwrap();
735        log.try_append(vec![
736            Record {
737                key: Bytes::from("events"),
738                value: Bytes::from("event-0"),
739            },
740            Record {
741                key: Bytes::from("events"),
742                value: Bytes::from("event-1"),
743            },
744            Record {
745                key: Bytes::from("events"),
746                value: Bytes::from("event-2"),
747            },
748            Record {
749                key: Bytes::from("events"),
750                value: Bytes::from("event-3"),
751            },
752            Record {
753                key: Bytes::from("events"),
754                value: Bytes::from("event-4"),
755            },
756        ])
757        .await
758        .unwrap();
759
760        // when - scan sequences 1..4 (exclusive end)
761        let mut iter = log.scan(Bytes::from("events"), 1..4).await.unwrap();
762        let mut entries = vec![];
763        while let Some(entry) = iter.next().await.unwrap() {
764            entries.push(entry);
765        }
766
767        // then
768        assert_eq!(entries.len(), 3);
769        assert_eq!(entries[0].sequence, 1);
770        assert_eq!(entries[1].sequence, 2);
771        assert_eq!(entries[2].sequence, 3);
772    }
773
774    #[tokio::test]
775    async fn should_scan_from_starting_sequence() {
776        // given
777        let log = LogDb::open(test_config()).await.unwrap();
778        log.try_append(vec![
779            Record {
780                key: Bytes::from("logs"),
781                value: Bytes::from("log-0"),
782            },
783            Record {
784                key: Bytes::from("logs"),
785                value: Bytes::from("log-1"),
786            },
787            Record {
788                key: Bytes::from("logs"),
789                value: Bytes::from("log-2"),
790            },
791        ])
792        .await
793        .unwrap();
794
795        // when - scan from sequence 1 onwards
796        let mut iter = log.scan(Bytes::from("logs"), 1..).await.unwrap();
797        let mut entries = vec![];
798        while let Some(entry) = iter.next().await.unwrap() {
799            entries.push(entry);
800        }
801
802        // then
803        assert_eq!(entries.len(), 2);
804        assert_eq!(entries[0].sequence, 1);
805        assert_eq!(entries[1].sequence, 2);
806    }
807
808    #[tokio::test]
809    async fn should_scan_up_to_ending_sequence() {
810        // given
811        let log = LogDb::open(test_config()).await.unwrap();
812        log.try_append(vec![
813            Record {
814                key: Bytes::from("logs"),
815                value: Bytes::from("log-0"),
816            },
817            Record {
818                key: Bytes::from("logs"),
819                value: Bytes::from("log-1"),
820            },
821            Record {
822                key: Bytes::from("logs"),
823                value: Bytes::from("log-2"),
824            },
825        ])
826        .await
827        .unwrap();
828
829        // when - scan up to sequence 2 (exclusive)
830        let mut iter = log.scan(Bytes::from("logs"), ..2).await.unwrap();
831        let mut entries = vec![];
832        while let Some(entry) = iter.next().await.unwrap() {
833            entries.push(entry);
834        }
835
836        // then
837        assert_eq!(entries.len(), 2);
838        assert_eq!(entries[0].sequence, 0);
839        assert_eq!(entries[1].sequence, 1);
840    }
841
842    #[tokio::test]
843    async fn should_scan_only_entries_for_specified_key() {
844        // given
845        let log = LogDb::open(test_config()).await.unwrap();
846        log.try_append(vec![
847            Record {
848                key: Bytes::from("key-a"),
849                value: Bytes::from("value-a-0"),
850            },
851            Record {
852                key: Bytes::from("key-b"),
853                value: Bytes::from("value-b-0"),
854            },
855            Record {
856                key: Bytes::from("key-a"),
857                value: Bytes::from("value-a-1"),
858            },
859            Record {
860                key: Bytes::from("key-b"),
861                value: Bytes::from("value-b-1"),
862            },
863        ])
864        .await
865        .unwrap();
866
867        // when - scan only key-a
868        let mut iter = log.scan(Bytes::from("key-a"), ..).await.unwrap();
869        let mut entries = vec![];
870        while let Some(entry) = iter.next().await.unwrap() {
871            entries.push(entry);
872        }
873
874        // then - should only have entries for key-a
875        assert_eq!(entries.len(), 2);
876        assert_eq!(entries[0].key, Bytes::from("key-a"));
877        assert_eq!(entries[0].value, Bytes::from("value-a-0"));
878        assert_eq!(entries[1].key, Bytes::from("key-a"));
879        assert_eq!(entries[1].value, Bytes::from("value-a-1"));
880    }
881
882    #[tokio::test]
883    async fn should_return_empty_iterator_for_unknown_key() {
884        // given
885        let log = LogDb::open(test_config()).await.unwrap();
886        log.try_append(vec![Record {
887            key: Bytes::from("existing"),
888            value: Bytes::from("value"),
889        }])
890        .await
891        .unwrap();
892
893        // when - scan for non-existent key
894        let mut iter = log.scan(Bytes::from("unknown"), ..).await.unwrap();
895        let entry = iter.next().await.unwrap();
896
897        // then
898        assert!(entry.is_none());
899    }
900
901    #[tokio::test]
902    async fn should_return_empty_iterator_for_empty_range() {
903        // given
904        let log = LogDb::open(test_config()).await.unwrap();
905        log.try_append(vec![
906            Record {
907                key: Bytes::from("key"),
908                value: Bytes::from("value-0"),
909            },
910            Record {
911                key: Bytes::from("key"),
912                value: Bytes::from("value-1"),
913            },
914        ])
915        .await
916        .unwrap();
917
918        // when - scan range that doesn't include any existing sequences
919        let mut iter = log.scan(Bytes::from("key"), 10..20).await.unwrap();
920        let entry = iter.next().await.unwrap();
921
922        // then
923        assert!(entry.is_none());
924    }
925
926    #[tokio::test]
927    async fn should_scan_entries_via_log_reader() {
928        // given - create shared storage
929        let storage = create_storage(
930            &StorageConfig::InMemory,
931            StorageRuntime::new(),
932            StorageSemantics::new(),
933        )
934        .await
935        .unwrap();
936        let log = LogDb::new(storage.clone()).await.unwrap();
937        log.try_append(vec![
938            Record {
939                key: Bytes::from("orders"),
940                value: Bytes::from("order-1"),
941            },
942            Record {
943                key: Bytes::from("orders"),
944                value: Bytes::from("order-2"),
945            },
946            Record {
947                key: Bytes::from("orders"),
948                value: Bytes::from("order-3"),
949            },
950        ])
951        .await
952        .unwrap();
953        log.flush().await.unwrap();
954
955        // when - create LogDbReader sharing the same storage
956        let reader = LogDbReader::new(storage).await.unwrap();
957        let mut iter = reader.scan(Bytes::from("orders"), ..).await.unwrap();
958        let mut entries = vec![];
959        while let Some(entry) = iter.next().await.unwrap() {
960            entries.push(entry);
961        }
962
963        // then
964        assert_eq!(entries.len(), 3);
965        assert_eq!(entries[0].sequence, 0);
966        assert_eq!(entries[0].value, Bytes::from("order-1"));
967        assert_eq!(entries[1].sequence, 1);
968        assert_eq!(entries[1].value, Bytes::from("order-2"));
969        assert_eq!(entries[2].sequence, 2);
970        assert_eq!(entries[2].value, Bytes::from("order-3"));
971    }
972
973    #[tokio::test]
974    async fn should_scan_across_multiple_segments() {
975        // given - log with entries across multiple segments
976        let log = LogDb::open(test_config()).await.unwrap();
977
978        // write to segment 0
979        log.try_append(vec![
980            Record {
981                key: Bytes::from("events"),
982                value: Bytes::from("event-0"),
983            },
984            Record {
985                key: Bytes::from("events"),
986                value: Bytes::from("event-1"),
987            },
988        ])
989        .await
990        .unwrap();
991
992        // seal and create segment 1
993        log.seal_segment().await.unwrap();
994
995        // write to segment 1
996        log.try_append(vec![
997            Record {
998                key: Bytes::from("events"),
999                value: Bytes::from("event-2"),
1000            },
1001            Record {
1002                key: Bytes::from("events"),
1003                value: Bytes::from("event-3"),
1004            },
1005        ])
1006        .await
1007        .unwrap();
1008
1009        // when - scan all entries
1010        let mut iter = log.scan(Bytes::from("events"), ..).await.unwrap();
1011        let mut entries = vec![];
1012        while let Some(entry) = iter.next().await.unwrap() {
1013            entries.push(entry);
1014        }
1015
1016        // then - should see all 4 entries in order
1017        assert_eq!(entries.len(), 4);
1018        assert_eq!(entries[0].sequence, 0);
1019        assert_eq!(entries[0].value, Bytes::from("event-0"));
1020        assert_eq!(entries[1].sequence, 1);
1021        assert_eq!(entries[1].value, Bytes::from("event-1"));
1022        assert_eq!(entries[2].sequence, 2);
1023        assert_eq!(entries[2].value, Bytes::from("event-2"));
1024        assert_eq!(entries[3].sequence, 3);
1025        assert_eq!(entries[3].value, Bytes::from("event-3"));
1026    }
1027
1028    #[tokio::test]
1029    async fn should_scan_range_spanning_segments() {
1030        // given - log with entries across multiple segments
1031        let log = LogDb::open(test_config()).await.unwrap();
1032
1033        // segment 0: seq 0, 1
1034        log.try_append(vec![
1035            Record {
1036                key: Bytes::from("data"),
1037                value: Bytes::from("seg0-0"),
1038            },
1039            Record {
1040                key: Bytes::from("data"),
1041                value: Bytes::from("seg0-1"),
1042            },
1043        ])
1044        .await
1045        .unwrap();
1046
1047        log.seal_segment().await.unwrap();
1048
1049        // segment 1: seq 2, 3
1050        log.try_append(vec![
1051            Record {
1052                key: Bytes::from("data"),
1053                value: Bytes::from("seg1-2"),
1054            },
1055            Record {
1056                key: Bytes::from("data"),
1057                value: Bytes::from("seg1-3"),
1058            },
1059        ])
1060        .await
1061        .unwrap();
1062
1063        log.seal_segment().await.unwrap();
1064
1065        // segment 2: seq 4, 5
1066        log.try_append(vec![
1067            Record {
1068                key: Bytes::from("data"),
1069                value: Bytes::from("seg2-4"),
1070            },
1071            Record {
1072                key: Bytes::from("data"),
1073                value: Bytes::from("seg2-5"),
1074            },
1075        ])
1076        .await
1077        .unwrap();
1078
1079        // when - scan range 1..5 (spans segments 0, 1, 2)
1080        let mut iter = log.scan(Bytes::from("data"), 1..5).await.unwrap();
1081        let mut entries = vec![];
1082        while let Some(entry) = iter.next().await.unwrap() {
1083            entries.push(entry);
1084        }
1085
1086        // then - should see entries 1, 2, 3, 4
1087        assert_eq!(entries.len(), 4);
1088        assert_eq!(entries[0].sequence, 1);
1089        assert_eq!(entries[1].sequence, 2);
1090        assert_eq!(entries[2].sequence, 3);
1091        assert_eq!(entries[3].sequence, 4);
1092    }
1093
1094    #[tokio::test]
1095    async fn should_scan_single_segment_in_multi_segment_log() {
1096        // given - log with entries across multiple segments
1097        let log = LogDb::open(test_config()).await.unwrap();
1098
1099        // segment 0: seq 0, 1
1100        log.try_append(vec![
1101            Record {
1102                key: Bytes::from("key"),
1103                value: Bytes::from("v0"),
1104            },
1105            Record {
1106                key: Bytes::from("key"),
1107                value: Bytes::from("v1"),
1108            },
1109        ])
1110        .await
1111        .unwrap();
1112
1113        log.seal_segment().await.unwrap();
1114
1115        // segment 1: seq 2, 3
1116        log.try_append(vec![
1117            Record {
1118                key: Bytes::from("key"),
1119                value: Bytes::from("v2"),
1120            },
1121            Record {
1122                key: Bytes::from("key"),
1123                value: Bytes::from("v3"),
1124            },
1125        ])
1126        .await
1127        .unwrap();
1128
1129        // when - scan only segment 1's range
1130        let mut iter = log.scan(Bytes::from("key"), 2..4).await.unwrap();
1131        let mut entries = vec![];
1132        while let Some(entry) = iter.next().await.unwrap() {
1133            entries.push(entry);
1134        }
1135
1136        // then - should see only segment 1's entries
1137        assert_eq!(entries.len(), 2);
1138        assert_eq!(entries[0].sequence, 2);
1139        assert_eq!(entries[1].sequence, 3);
1140    }
1141
1142    #[tokio::test]
1143    async fn should_list_keys_returns_iterator() {
1144        // given
1145        let log = LogDb::open(test_config()).await.unwrap();
1146        log.try_append(vec![
1147            Record {
1148                key: Bytes::from("key-a"),
1149                value: Bytes::from("value-a"),
1150            },
1151            Record {
1152                key: Bytes::from("key-b"),
1153                value: Bytes::from("value-b"),
1154            },
1155        ])
1156        .await
1157        .unwrap();
1158
1159        // when
1160        let _iter = log.list_keys(..).await.unwrap();
1161
1162        // then - iterator is returned (full iteration tested when LogKeyIterator is implemented)
1163    }
1164
1165    #[tokio::test]
1166    async fn should_list_keys_via_log_reader() {
1167        // given - create shared storage
1168        let storage = create_storage(
1169            &StorageConfig::InMemory,
1170            StorageRuntime::new(),
1171            StorageSemantics::new(),
1172        )
1173        .await
1174        .unwrap();
1175        let log = LogDb::new(storage.clone()).await.unwrap();
1176        log.try_append(vec![
1177            Record {
1178                key: Bytes::from("key-a"),
1179                value: Bytes::from("value-a"),
1180            },
1181            Record {
1182                key: Bytes::from("key-b"),
1183                value: Bytes::from("value-b"),
1184            },
1185        ])
1186        .await
1187        .unwrap();
1188        log.flush().await.unwrap();
1189
1190        // when - create LogDbReader sharing the same storage
1191        let reader = LogDbReader::new(storage).await.unwrap();
1192        let _iter = reader.list_keys(..).await.unwrap();
1193
1194        // then - iterator is returned
1195    }
1196
1197    #[tokio::test]
1198    async fn should_list_keys_in_single_segment() {
1199        // given
1200        let log = LogDb::open(test_config()).await.unwrap();
1201        log.try_append(vec![
1202            Record {
1203                key: Bytes::from("key-a"),
1204                value: Bytes::from("value-a"),
1205            },
1206            Record {
1207                key: Bytes::from("key-b"),
1208                value: Bytes::from("value-b"),
1209            },
1210            Record {
1211                key: Bytes::from("key-c"),
1212                value: Bytes::from("value-c"),
1213            },
1214        ])
1215        .await
1216        .unwrap();
1217
1218        // when
1219        let mut iter = log.list_keys(..).await.unwrap();
1220        let mut keys = vec![];
1221        while let Some(key) = iter.next().await.unwrap() {
1222            keys.push(key.key);
1223        }
1224
1225        // then - keys returned in lexicographic order
1226        assert_eq!(keys.len(), 3);
1227        assert_eq!(keys[0], Bytes::from("key-a"));
1228        assert_eq!(keys[1], Bytes::from("key-b"));
1229        assert_eq!(keys[2], Bytes::from("key-c"));
1230    }
1231
1232    #[tokio::test]
1233    async fn should_list_keys_across_segments_after_roll() {
1234        // given - log with entries across multiple segments
1235        let log = LogDb::open(test_config()).await.unwrap();
1236
1237        // write to segment 0
1238        log.try_append(vec![
1239            Record {
1240                key: Bytes::from("key-a"),
1241                value: Bytes::from("value-a-0"),
1242            },
1243            Record {
1244                key: Bytes::from("key-b"),
1245                value: Bytes::from("value-b-0"),
1246            },
1247        ])
1248        .await
1249        .unwrap();
1250
1251        // seal and create segment 1
1252        log.seal_segment().await.unwrap();
1253
1254        // write to segment 1 with different keys
1255        log.try_append(vec![
1256            Record {
1257                key: Bytes::from("key-c"),
1258                value: Bytes::from("value-c-1"),
1259            },
1260            Record {
1261                key: Bytes::from("key-d"),
1262                value: Bytes::from("value-d-1"),
1263            },
1264        ])
1265        .await
1266        .unwrap();
1267
1268        // when
1269        let mut iter = log.list_keys(..).await.unwrap();
1270        let mut keys = vec![];
1271        while let Some(key) = iter.next().await.unwrap() {
1272            keys.push(key.key);
1273        }
1274
1275        // then - all keys from both segments
1276        assert_eq!(keys.len(), 4);
1277        assert_eq!(keys[0], Bytes::from("key-a"));
1278        assert_eq!(keys[1], Bytes::from("key-b"));
1279        assert_eq!(keys[2], Bytes::from("key-c"));
1280        assert_eq!(keys[3], Bytes::from("key-d"));
1281    }
1282
1283    #[tokio::test]
1284    async fn should_deduplicate_keys_across_segments() {
1285        // given - same key written to multiple segments
1286        let log = LogDb::open(test_config()).await.unwrap();
1287
1288        // write to segment 0
1289        log.try_append(vec![Record {
1290            key: Bytes::from("shared-key"),
1291            value: Bytes::from("value-0"),
1292        }])
1293        .await
1294        .unwrap();
1295
1296        // seal and create segment 1
1297        log.seal_segment().await.unwrap();
1298
1299        // write same key to segment 1
1300        log.try_append(vec![Record {
1301            key: Bytes::from("shared-key"),
1302            value: Bytes::from("value-1"),
1303        }])
1304        .await
1305        .unwrap();
1306
1307        // seal and create segment 2
1308        log.seal_segment().await.unwrap();
1309
1310        // write same key to segment 2
1311        log.try_append(vec![Record {
1312            key: Bytes::from("shared-key"),
1313            value: Bytes::from("value-2"),
1314        }])
1315        .await
1316        .unwrap();
1317
1318        // when
1319        let mut iter = log.list_keys(..).await.unwrap();
1320        let mut keys = vec![];
1321        while let Some(key) = iter.next().await.unwrap() {
1322            keys.push(key.key);
1323        }
1324
1325        // then - key appears only once despite being in 3 segments
1326        assert_eq!(keys.len(), 1);
1327        assert_eq!(keys[0], Bytes::from("shared-key"));
1328    }
1329
1330    #[tokio::test]
1331    async fn should_list_keys_in_lexicographic_order() {
1332        // given - keys inserted out of order
1333        let log = LogDb::open(test_config()).await.unwrap();
1334        log.try_append(vec![
1335            Record {
1336                key: Bytes::from("zebra"),
1337                value: Bytes::from("value"),
1338            },
1339            Record {
1340                key: Bytes::from("apple"),
1341                value: Bytes::from("value"),
1342            },
1343            Record {
1344                key: Bytes::from("mango"),
1345                value: Bytes::from("value"),
1346            },
1347        ])
1348        .await
1349        .unwrap();
1350
1351        // when
1352        let mut iter = log.list_keys(..).await.unwrap();
1353        let mut keys = vec![];
1354        while let Some(key) = iter.next().await.unwrap() {
1355            keys.push(key.key);
1356        }
1357
1358        // then - sorted lexicographically
1359        assert_eq!(keys[0], Bytes::from("apple"));
1360        assert_eq!(keys[1], Bytes::from("mango"));
1361        assert_eq!(keys[2], Bytes::from("zebra"));
1362    }
1363
1364    #[tokio::test]
1365    async fn should_list_empty_when_no_entries() {
1366        // given
1367        let log = LogDb::open(test_config()).await.unwrap();
1368
1369        // when
1370        let mut iter = log.list_keys(..).await.unwrap();
1371
1372        // then
1373        assert!(iter.next().await.unwrap().is_none());
1374    }
1375
1376    #[tokio::test]
1377    async fn should_list_keys_respects_segment_range() {
1378        // given - entries in different segments
1379        let log = LogDb::open(test_config()).await.unwrap();
1380
1381        // segment 0
1382        log.try_append(vec![
1383            Record {
1384                key: Bytes::from("key-seg0"),
1385                value: Bytes::from("value"),
1386            },
1387            Record {
1388                key: Bytes::from("key-seg0-b"),
1389                value: Bytes::from("value"),
1390            },
1391        ])
1392        .await
1393        .unwrap();
1394
1395        log.seal_segment().await.unwrap();
1396
1397        // segment 1
1398        log.try_append(vec![
1399            Record {
1400                key: Bytes::from("key-seg1"),
1401                value: Bytes::from("value"),
1402            },
1403            Record {
1404                key: Bytes::from("key-seg1-b"),
1405                value: Bytes::from("value"),
1406            },
1407        ])
1408        .await
1409        .unwrap();
1410
1411        log.seal_segment().await.unwrap();
1412
1413        // segment 2
1414        log.try_append(vec![
1415            Record {
1416                key: Bytes::from("key-seg2"),
1417                value: Bytes::from("value"),
1418            },
1419            Record {
1420                key: Bytes::from("key-seg2-b"),
1421                value: Bytes::from("value"),
1422            },
1423        ])
1424        .await
1425        .unwrap();
1426
1427        // when - list only keys from segment 1
1428        let mut iter = log.list_keys(1..2).await.unwrap();
1429        let mut keys = vec![];
1430        while let Some(key) = iter.next().await.unwrap() {
1431            keys.push(key.key);
1432        }
1433
1434        // then - only keys from segment 1
1435        assert_eq!(keys.len(), 2);
1436        assert_eq!(keys[0], Bytes::from("key-seg1"));
1437        assert_eq!(keys[1], Bytes::from("key-seg1-b"));
1438    }
1439
1440    #[tokio::test]
1441    async fn should_list_segments_returns_empty_when_no_segments() {
1442        // given
1443        let log = LogDb::open(test_config()).await.unwrap();
1444
1445        // when
1446        let segments = log.list_segments(..).await.unwrap();
1447
1448        // then
1449        assert!(segments.is_empty());
1450    }
1451
1452    #[tokio::test]
1453    async fn should_list_segments_returns_single_segment() {
1454        // given
1455        let log = LogDb::open(test_config()).await.unwrap();
1456        log.try_append(vec![Record {
1457            key: Bytes::from("key"),
1458            value: Bytes::from("value"),
1459        }])
1460        .await
1461        .unwrap();
1462
1463        // when
1464        let segments = log.list_segments(..).await.unwrap();
1465
1466        // then
1467        assert_eq!(segments.len(), 1);
1468        assert_eq!(segments[0].id, 0);
1469        assert_eq!(segments[0].start_seq, 0);
1470    }
1471
1472    #[tokio::test]
1473    async fn should_list_segments_returns_multiple_segments() {
1474        // given
1475        let log = LogDb::open(test_config()).await.unwrap();
1476
1477        // segment 0
1478        log.try_append(vec![Record {
1479            key: Bytes::from("key"),
1480            value: Bytes::from("value-0"),
1481        }])
1482        .await
1483        .unwrap();
1484
1485        log.seal_segment().await.unwrap();
1486
1487        // segment 1
1488        log.try_append(vec![Record {
1489            key: Bytes::from("key"),
1490            value: Bytes::from("value-1"),
1491        }])
1492        .await
1493        .unwrap();
1494
1495        log.seal_segment().await.unwrap();
1496
1497        // segment 2
1498        log.try_append(vec![Record {
1499            key: Bytes::from("key"),
1500            value: Bytes::from("value-2"),
1501        }])
1502        .await
1503        .unwrap();
1504
1505        // when
1506        let segments = log.list_segments(..).await.unwrap();
1507
1508        // then
1509        assert_eq!(segments.len(), 3);
1510        assert_eq!(segments[0].id, 0);
1511        assert_eq!(segments[0].start_seq, 0);
1512        assert_eq!(segments[1].id, 1);
1513        assert_eq!(segments[1].start_seq, 1);
1514        assert_eq!(segments[2].id, 2);
1515        assert_eq!(segments[2].start_seq, 2);
1516    }
1517
1518    #[tokio::test]
1519    async fn should_list_segments_filters_by_sequence_range() {
1520        // given
1521        let log = LogDb::open(test_config()).await.unwrap();
1522
1523        // segment 0: seq 0, 1
1524        log.try_append(vec![
1525            Record {
1526                key: Bytes::from("key"),
1527                value: Bytes::from("v0"),
1528            },
1529            Record {
1530                key: Bytes::from("key"),
1531                value: Bytes::from("v1"),
1532            },
1533        ])
1534        .await
1535        .unwrap();
1536
1537        log.seal_segment().await.unwrap();
1538
1539        // segment 1: seq 2, 3
1540        log.try_append(vec![
1541            Record {
1542                key: Bytes::from("key"),
1543                value: Bytes::from("v2"),
1544            },
1545            Record {
1546                key: Bytes::from("key"),
1547                value: Bytes::from("v3"),
1548            },
1549        ])
1550        .await
1551        .unwrap();
1552
1553        log.seal_segment().await.unwrap();
1554
1555        // segment 2: seq 4, 5
1556        log.try_append(vec![
1557            Record {
1558                key: Bytes::from("key"),
1559                value: Bytes::from("v4"),
1560            },
1561            Record {
1562                key: Bytes::from("key"),
1563                value: Bytes::from("v5"),
1564            },
1565        ])
1566        .await
1567        .unwrap();
1568
1569        // when - query range that spans segment 1
1570        let segments = log.list_segments(2..4).await.unwrap();
1571
1572        // then - only segment 1 matches
1573        assert_eq!(segments.len(), 1);
1574        assert_eq!(segments[0].id, 1);
1575        assert_eq!(segments[0].start_seq, 2);
1576    }
1577
1578    #[tokio::test]
1579    async fn should_list_segments_via_log_reader() {
1580        // given
1581        let storage = create_storage(
1582            &StorageConfig::InMemory,
1583            StorageRuntime::new(),
1584            StorageSemantics::new(),
1585        )
1586        .await
1587        .unwrap();
1588        let log = LogDb::new(storage.clone()).await.unwrap();
1589
1590        log.try_append(vec![Record {
1591            key: Bytes::from("key"),
1592            value: Bytes::from("value-0"),
1593        }])
1594        .await
1595        .unwrap();
1596
1597        log.seal_segment().await.unwrap();
1598
1599        log.try_append(vec![Record {
1600            key: Bytes::from("key"),
1601            value: Bytes::from("value-1"),
1602        }])
1603        .await
1604        .unwrap();
1605        log.flush().await.unwrap();
1606
1607        // when
1608        let reader = LogDbReader::new(storage).await.unwrap();
1609        let segments = reader.list_segments(..).await.unwrap();
1610
1611        // then
1612        assert_eq!(segments.len(), 2);
1613        assert_eq!(segments[0].id, 0);
1614        assert_eq!(segments[1].id, 1);
1615    }
1616
1617    #[tokio::test]
1618    async fn should_list_segments_includes_start_time() {
1619        // given
1620        let log = LogDb::open(test_config()).await.unwrap();
1621        log.try_append(vec![Record {
1622            key: Bytes::from("key"),
1623            value: Bytes::from("value"),
1624        }])
1625        .await
1626        .unwrap();
1627
1628        // when
1629        let segments = log.list_segments(..).await.unwrap();
1630
1631        // then - start_time_ms should be a reasonable timestamp (after year 2020)
1632        assert_eq!(segments.len(), 1);
1633        assert!(segments[0].start_time_ms > 1577836800000); // 2020-01-01
1634    }
1635
1636    #[tokio::test]
1637    async fn should_try_append_single_record() {
1638        // given
1639        let log = LogDb::open(test_config()).await.unwrap();
1640        let records = vec![Record {
1641            key: Bytes::from("orders"),
1642            value: Bytes::from("order-1"),
1643        }];
1644
1645        // when
1646        let result = log.try_append(records).await.unwrap();
1647
1648        // then
1649        assert_eq!(result.start_sequence, 0);
1650        let mut iter = log.scan(Bytes::from("orders"), ..).await.unwrap();
1651        let entry = iter.next().await.unwrap().unwrap();
1652        assert_eq!(entry.value, Bytes::from("order-1"));
1653    }
1654
1655    #[tokio::test]
1656    async fn should_append_timeout_single_record() {
1657        // given
1658        let log = LogDb::open(test_config()).await.unwrap();
1659        let records = vec![Record {
1660            key: Bytes::from("orders"),
1661            value: Bytes::from("order-1"),
1662        }];
1663
1664        // when
1665        let result = log
1666            .append_timeout(records, Duration::from_secs(5))
1667            .await
1668            .unwrap();
1669
1670        // then
1671        assert_eq!(result.start_sequence, 0);
1672        let mut iter = log.scan(Bytes::from("orders"), ..).await.unwrap();
1673        let entry = iter.next().await.unwrap().unwrap();
1674        assert_eq!(entry.value, Bytes::from("order-1"));
1675    }
1676
1677    #[tokio::test]
1678    async fn should_return_empty_records_on_try_append_empty() {
1679        // given
1680        let log = LogDb::open(test_config()).await.unwrap();
1681
1682        // when
1683        let result = log.try_append(vec![]).await.unwrap();
1684
1685        // then
1686        assert_eq!(result.start_sequence, 0);
1687    }
1688
1689    #[tokio::test]
1690    async fn should_return_empty_records_on_append_timeout_empty() {
1691        // given
1692        let log = LogDb::open(test_config()).await.unwrap();
1693
1694        // when
1695        let result = log
1696            .append_timeout(vec![], Duration::from_secs(1))
1697            .await
1698            .unwrap();
1699
1700        // then
1701        assert_eq!(result.start_sequence, 0);
1702    }
1703
1704    #[tokio::test]
1705    async fn should_scan_without_flush() {
1706        // given
1707        let log = LogDb::open(test_config()).await.unwrap();
1708        log.try_append(vec![
1709            Record {
1710                key: Bytes::from("key"),
1711                value: Bytes::from("v0"),
1712            },
1713            Record {
1714                key: Bytes::from("key"),
1715                value: Bytes::from("v1"),
1716            },
1717        ])
1718        .await
1719        .unwrap();
1720
1721        // when/then - reads see unflushed data via sync_to_flushed
1722        let mut iter = log.scan(Bytes::from("key"), ..).await.unwrap();
1723        let e0 = iter.next().await.unwrap().unwrap();
1724        assert_eq!(e0.sequence, 0);
1725        assert_eq!(e0.value, Bytes::from("v0"));
1726        let e1 = iter.next().await.unwrap().unwrap();
1727        assert_eq!(e1.sequence, 1);
1728        assert_eq!(e1.value, Bytes::from("v1"));
1729        assert!(iter.next().await.unwrap().is_none());
1730    }
1731
1732    #[tokio::test]
1733    async fn should_list_keys_without_flush() {
1734        // given
1735        let log = LogDb::open(test_config()).await.unwrap();
1736        log.try_append(vec![
1737            Record {
1738                key: Bytes::from("alpha"),
1739                value: Bytes::from("v"),
1740            },
1741            Record {
1742                key: Bytes::from("beta"),
1743                value: Bytes::from("v"),
1744            },
1745        ])
1746        .await
1747        .unwrap();
1748
1749        // when/then - reads see unflushed data via sync_to_flushed
1750        let mut iter = log.list_keys(..).await.unwrap();
1751        let mut keys = vec![];
1752        while let Some(key) = iter.next().await.unwrap() {
1753            keys.push(key.key);
1754        }
1755        assert_eq!(keys, vec![Bytes::from("alpha"), Bytes::from("beta")]);
1756    }
1757
1758    #[tokio::test]
1759    async fn should_list_segments_without_flush() {
1760        // given
1761        let log = LogDb::open(test_config()).await.unwrap();
1762        log.try_append(vec![Record {
1763            key: Bytes::from("key"),
1764            value: Bytes::from("value"),
1765        }])
1766        .await
1767        .unwrap();
1768
1769        // when/then - reads see unflushed data via sync_to_flushed
1770        let segments = log.list_segments(..).await.unwrap();
1771        assert_eq!(segments.len(), 1);
1772        assert_eq!(segments[0].id, 0);
1773        assert_eq!(segments[0].start_seq, 0);
1774    }
1775}