Skip to main content

log/
reader.rs

1//! Read-only log access and the [`LogRead`] trait.
2//!
3//! This module provides:
4//! - [`LogRead`]: The trait defining read operations on the log.
5//! - [`LogDbReader`]: A read-only view of the log that implements `LogRead`.
6
7use std::ops::{Range, RangeBounds};
8use std::sync::Arc;
9use std::time::Duration;
10
11use async_trait::async_trait;
12use bytes::Bytes;
13use tokio::sync::RwLock;
14use tokio::sync::watch;
15use tokio::task::JoinHandle;
16use tokio::time::MissedTickBehavior;
17
18use common::storage::factory::create_storage_read;
19use common::{StorageRead, StorageSemantics};
20
21use crate::config::{CountOptions, ReaderConfig, ScanOptions, SegmentConfig};
22use crate::error::{Error, Result};
23use crate::listing::LogKeyIterator;
24use crate::model::{LogEntry, Segment, SegmentId, Sequence};
25use crate::range::{normalize_segment_id, normalize_sequence};
26use crate::segment::{LogSegment, SegmentCache};
27use crate::storage::{LogStorageRead, SegmentIterator};
28
29/// Trait for read operations on the log.
30///
31/// This trait defines the common read interface shared by [`LogDb`](crate::LogDb)
32/// and [`LogDbReader`]. It provides methods for scanning entries and counting
33/// records within a key's log.
34///
35/// # Implementors
36///
37/// - [`LogDb`](crate::LogDb): The main log interface with both read and write access.
38/// - [`LogDbReader`]: A read-only view of the log.
39///
40/// # Example
41///
42/// ```ignore
43/// use log::LogRead;
44/// use bytes::Bytes;
45///
46/// async fn process_log(reader: &impl LogRead) -> Result<()> {
47///     // Works with both LogDb and LogDbReader
48///     let mut iter = reader.scan(Bytes::from("orders"), ..);
49///     while let Some(entry) = iter.next().await? {
50///         println!("seq={}: {:?}", entry.sequence, entry.value);
51///     }
52///     Ok(())
53/// }
54/// ```
55#[async_trait]
56pub trait LogRead {
57    /// Scans entries for a key within a sequence number range.
58    ///
59    /// Returns an iterator that yields entries in sequence number order.
60    /// The range is specified using Rust's standard range syntax.
61    ///
62    /// This method uses default scan options. Use [`scan_with_options`] for
63    /// custom read behavior.
64    ///
65    /// # Read Visibility
66    ///
67    /// An active scan may or may not see records appended after the initial
68    /// call. However, all records returned will always respect the correct
69    /// ordering of records (no reordering).
70    ///
71    /// # Arguments
72    ///
73    /// * `key` - The key identifying the log stream to scan.
74    /// * `seq_range` - The sequence number range to scan. Supports all Rust
75    ///   range types (`..`, `start..`, `..end`, `start..end`, etc.).
76    ///
77    /// # Errors
78    ///
79    /// Returns an error if the scan fails due to storage issues.
80    ///
81    /// [`scan_with_options`]: LogRead::scan_with_options
82    async fn scan(
83        &self,
84        key: Bytes,
85        seq_range: impl RangeBounds<Sequence> + Send,
86    ) -> Result<LogIterator> {
87        self.scan_with_options(key, seq_range, ScanOptions::default())
88            .await
89    }
90
91    /// Scans entries for a key within a sequence number range with custom options.
92    ///
93    /// Returns an iterator that yields entries in sequence number order.
94    /// See [`scan`](LogRead::scan) for read visibility semantics.
95    ///
96    /// # Arguments
97    ///
98    /// * `key` - The key identifying the log stream to scan.
99    /// * `seq_range` - The sequence number range to scan.
100    /// * `options` - Scan options controlling read behavior.
101    ///
102    /// # Errors
103    ///
104    /// Returns an error if the scan fails due to storage issues.
105    async fn scan_with_options(
106        &self,
107        key: Bytes,
108        seq_range: impl RangeBounds<Sequence> + Send,
109        options: ScanOptions,
110    ) -> Result<LogIterator>;
111
112    /// Counts entries for a key within a sequence number range.
113    ///
114    /// Returns the number of entries in the specified range. This is useful
115    /// for computing lag (how far behind a consumer is) or progress metrics.
116    ///
117    /// This method uses default count options (exact count). Use
118    /// [`count_with_options`] for approximate counts.
119    ///
120    /// # Arguments
121    ///
122    /// * `key` - The key identifying the log stream to count.
123    /// * `seq_range` - The sequence number range to count.
124    ///
125    /// # Errors
126    ///
127    /// Returns an error if the count fails due to storage issues.
128    ///
129    /// [`count_with_options`]: LogRead::count_with_options
130    async fn count(&self, key: Bytes, seq_range: impl RangeBounds<Sequence> + Send) -> Result<u64> {
131        self.count_with_options(key, seq_range, CountOptions::default())
132            .await
133    }
134
135    /// Counts entries for a key within a sequence number range with custom options.
136    ///
137    /// # Arguments
138    ///
139    /// * `key` - The key identifying the log stream to count.
140    /// * `seq_range` - The sequence number range to count.
141    /// * `options` - Count options, including whether to return an approximate count.
142    ///
143    /// # Errors
144    ///
145    /// Returns an error if the count fails due to storage issues.
146    async fn count_with_options(
147        &self,
148        key: Bytes,
149        seq_range: impl RangeBounds<Sequence> + Send,
150        options: CountOptions,
151    ) -> Result<u64>;
152
153    /// Lists distinct keys within a segment range.
154    ///
155    /// Returns an iterator over keys that have entries in the specified segments.
156    /// Each key is returned exactly once, even if it appears in multiple segments.
157    ///
158    /// Pass `..` to list keys from all segments.
159    ///
160    /// # Arguments
161    ///
162    /// * `segment_range` - The segment ID range to list keys from.
163    ///
164    /// # Errors
165    ///
166    /// Returns an error if the list operation fails due to storage issues.
167    ///
168    /// # Example
169    ///
170    /// ```ignore
171    /// // List all keys
172    /// let mut iter = log.list_keys(..).await?;
173    ///
174    /// // List keys from specific segments
175    /// let segments = log.list_segments(100..200).await?;
176    /// let start = segments.first().map(|s| s.id).unwrap_or(0);
177    /// let end = segments.last().map(|s| s.id + 1).unwrap_or(0);
178    /// let mut iter = log.list_keys(start..end).await?;
179    /// ```
180    async fn list_keys(
181        &self,
182        segment_range: impl RangeBounds<SegmentId> + Send,
183    ) -> Result<LogKeyIterator>;
184
185    /// Lists segments overlapping a sequence number range.
186    ///
187    /// Returns all segments that overlap the specified sequence range. This is
188    /// a precise operation—segments have well-defined boundaries, so there is
189    /// no approximation.
190    ///
191    /// Pass `..` to list all segments.
192    ///
193    /// # Arguments
194    ///
195    /// * `seq_range` - The sequence number range to filter segments by.
196    ///
197    /// # Errors
198    ///
199    /// Returns an error if the operation fails due to storage issues.
200    ///
201    /// # Example
202    ///
203    /// ```ignore
204    /// // List all segments
205    /// let segments = log.list_segments(..).await?;
206    ///
207    /// // List segments overlapping a specific range
208    /// let segments = log.list_segments(100..200).await?;
209    /// ```
210    async fn list_segments(
211        &self,
212        seq_range: impl RangeBounds<Sequence> + Send,
213    ) -> Result<Vec<Segment>>;
214}
215
216/// Shared read component used by both `LogDb` and `LogDbReader`.
217///
218/// Contains the storage and segment cache needed for read operations.
219/// Wrapped in `Arc<RwLock<_>>` by both consumers.
220pub(crate) struct LogReadInner {
221    pub(crate) storage: LogStorageRead,
222    pub(crate) segments: SegmentCache,
223}
224
225impl LogReadInner {
226    /// Creates a new `LogReadInner`.
227    pub(crate) fn new(storage: LogStorageRead, segments: SegmentCache) -> Self {
228        Self { storage, segments }
229    }
230
231    /// Scans entries for a key within a sequence number range with custom options.
232    pub(crate) fn scan_with_options(
233        &self,
234        key: Bytes,
235        seq_range: Range<Sequence>,
236        _options: &ScanOptions,
237    ) -> LogIterator {
238        LogIterator::open(self.storage.clone(), &self.segments, key, seq_range)
239    }
240
241    /// Lists distinct keys within a segment range.
242    pub(crate) async fn list_keys(
243        &self,
244        segment_range: Range<SegmentId>,
245    ) -> Result<LogKeyIterator> {
246        self.storage.list_keys(segment_range).await
247    }
248
249    /// Lists segments overlapping a sequence number range.
250    pub(crate) fn list_segments(&self, seq_range: &Range<Sequence>) -> Vec<Segment> {
251        self.segments
252            .find_covering(seq_range)
253            .into_iter()
254            .map(|s| s.into())
255            .collect()
256    }
257}
258
259/// A read-only view of the log.
260///
261/// `LogDbReader` provides access to all read operations via the [`LogRead`]
262/// trait, but not write operations. This is useful for:
263///
264/// - Consumers that should not have write access
265/// - Sharing read access across multiple components
266/// - Separating read and write concerns in your application
267///
268/// # Obtaining a LogDbReader
269///
270/// A `LogDbReader` is created by calling [`LogDbReader::open`]:
271///
272/// ```ignore
273/// let reader = LogDbReader::open(config).await?;
274/// ```
275///
276/// # Thread Safety
277///
278/// `LogDbReader` is designed to be cloned and shared across threads.
279/// All methods take `&self` and are safe to call concurrently.
280///
281/// # Example
282///
283/// ```ignore
284/// use log::{LogDbReader, LogRead};
285/// use bytes::Bytes;
286///
287/// async fn consume_events(reader: LogDbReader, key: Bytes) -> Result<()> {
288///     let mut checkpoint: u64 = 0;
289///
290///     loop {
291///         let mut iter = reader.scan(key.clone(), checkpoint..);
292///         while let Some(entry) = iter.next().await? {
293///             process_entry(&entry);
294///             checkpoint = entry.sequence + 1;
295///         }
296///
297///         // Check how far behind we are
298///         let lag = reader.count(key.clone(), checkpoint..).await?;
299///         if lag == 0 {
300///             // Caught up, wait for new entries
301///             tokio::time::sleep(Duration::from_millis(100)).await;
302///         }
303///     }
304/// }
305/// ```
306pub struct LogDbReader {
307    inner: Arc<RwLock<LogReadInner>>,
308    shutdown_tx: watch::Sender<bool>,
309    refresh_task: Option<JoinHandle<()>>,
310}
311
312impl LogDbReader {
313    /// Opens a read-only view of the log with the given configuration.
314    ///
315    /// This creates a `LogDbReader` that can scan and count entries but cannot
316    /// append new records. Use this when you only need read access to the log.
317    ///
318    /// When `refresh_interval` is set, the reader periodically discovers new
319    /// data written by other processes.
320    ///
321    /// # Arguments
322    ///
323    /// * `config` - Reader configuration including storage and refresh settings.
324    ///
325    /// # Errors
326    ///
327    /// Returns an error if the storage backend cannot be initialized.
328    ///
329    /// # Example
330    ///
331    /// ```ignore
332    /// use log::{LogDbReader, LogRead, ReaderConfig};
333    /// use common::StorageConfig;
334    /// use bytes::Bytes;
335    ///
336    /// let config = ReaderConfig {
337    ///     storage: StorageConfig::default(),
338    ///     ..Default::default()
339    /// };
340    /// let reader = LogDbReader::open(config).await?;
341    ///
342    /// // Reader will automatically discover new data
343    /// let mut iter = reader.scan(Bytes::from("orders"), ..).await?;
344    /// while let Some(entry) = iter.next().await? {
345    ///     println!("seq={}: {:?}", entry.sequence, entry.value);
346    /// }
347    ///
348    /// // Gracefully shut down when done
349    /// reader.close().await;
350    /// ```
351    pub async fn open(config: ReaderConfig) -> Result<Self> {
352        let reader_options = slatedb::config::DbReaderOptions {
353            manifest_poll_interval: config.refresh_interval,
354            ..Default::default()
355        };
356        let storage: Arc<dyn StorageRead> =
357            create_storage_read(&config.storage, StorageSemantics::new(), reader_options)
358                .await
359                .map_err(|e| Error::Storage(e.to_string()))?;
360        let log_storage = LogStorageRead::new(storage);
361        let segments = SegmentCache::open(&log_storage, SegmentConfig::default()).await?;
362        let inner = Arc::new(RwLock::new(LogReadInner::new(log_storage, segments)));
363
364        let (shutdown_tx, refresh_task) =
365            Self::spawn_refresh_task(Arc::clone(&inner), config.refresh_interval);
366
367        Ok(Self {
368            inner,
369            shutdown_tx,
370            refresh_task: Some(refresh_task),
371        })
372    }
373
374    /// Spawns a background task that periodically refreshes the segment cache.
375    fn spawn_refresh_task(
376        inner: Arc<RwLock<LogReadInner>>,
377        interval: Duration,
378    ) -> (watch::Sender<bool>, JoinHandle<()>) {
379        let (shutdown_tx, mut shutdown_rx) = watch::channel(false);
380
381        let task = tokio::spawn(async move {
382            let mut ticker = tokio::time::interval(interval);
383            ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
384
385            loop {
386                tokio::select! {
387                    _ = ticker.tick() => {
388                        // Get the latest segment ID for incremental refresh
389                        let after_segment_id = {
390                            let read_inner = inner.read().await;
391                            read_inner.segments.latest().map(|s| s.id())
392                        };
393
394                        // Refresh the cache
395                        let mut read_inner = inner.write().await;
396                        let storage = read_inner.storage.clone();
397                        if let Err(e) = read_inner.segments.refresh(&storage, after_segment_id).await {
398                            tracing::warn!("Failed to refresh segment cache: {}", e);
399                        }
400                    }
401                    _ = shutdown_rx.changed() => {
402                        if *shutdown_rx.borrow() {
403                            break;
404                        }
405                    }
406                }
407            }
408        });
409
410        (shutdown_tx, task)
411    }
412
413    /// Creates a LogDbReader from an existing storage implementation.
414    #[cfg(test)]
415    pub(crate) async fn new(storage: Arc<dyn StorageRead>) -> Result<Self> {
416        let log_storage = LogStorageRead::new(storage);
417        let segments = SegmentCache::open(&log_storage, SegmentConfig::default()).await?;
418        let inner = Arc::new(RwLock::new(LogReadInner::new(log_storage, segments)));
419        let (shutdown_tx, _) = watch::channel(false);
420        Ok(Self {
421            inner,
422            shutdown_tx,
423            refresh_task: None,
424        })
425    }
426
427    /// Closes the reader, stopping the background refresh task.
428    ///
429    /// This method consumes `self` and gracefully shuts down the background
430    /// refresh task. It waits up to 5 seconds for the task to complete.
431    pub async fn close(self) {
432        // Signal shutdown
433        let _ = self.shutdown_tx.send(true);
434
435        // Wait for the task to complete with timeout
436        if let Some(task) = self.refresh_task {
437            let timeout = tokio::time::timeout(Duration::from_secs(5), task).await;
438            if timeout.is_err() {
439                tracing::warn!("Refresh task did not stop within timeout");
440            }
441        }
442    }
443}
444
445#[async_trait]
446impl LogRead for LogDbReader {
447    async fn scan_with_options(
448        &self,
449        key: Bytes,
450        seq_range: impl RangeBounds<Sequence> + Send,
451        options: ScanOptions,
452    ) -> Result<LogIterator> {
453        let seq_range = normalize_sequence(&seq_range);
454        let inner = self.inner.read().await;
455        Ok(inner.scan_with_options(key, seq_range, &options))
456    }
457
458    async fn count_with_options(
459        &self,
460        _key: Bytes,
461        _seq_range: impl RangeBounds<Sequence> + Send,
462        _options: CountOptions,
463    ) -> Result<u64> {
464        todo!()
465    }
466
467    async fn list_keys(
468        &self,
469        segment_range: impl RangeBounds<SegmentId> + Send,
470    ) -> Result<LogKeyIterator> {
471        let segment_range = normalize_segment_id(&segment_range);
472        let inner = self.inner.read().await;
473        inner.list_keys(segment_range).await
474    }
475
476    async fn list_segments(
477        &self,
478        seq_range: impl RangeBounds<Sequence> + Send,
479    ) -> Result<Vec<Segment>> {
480        let seq_range = normalize_sequence(&seq_range);
481        let inner = self.inner.read().await;
482        Ok(inner.list_segments(&seq_range))
483    }
484}
485
486/// Iterator over log entries across multiple segments.
487///
488/// Iterates through segments in order, fetching entries for the given key
489/// within the sequence range. Instantiates a `SegmentIterator` for each
490/// segment as needed.
491pub struct LogIterator {
492    storage: LogStorageRead,
493    segments: Vec<LogSegment>,
494    key: Bytes,
495    seq_range: Range<Sequence>,
496    current_segment_idx: usize,
497    current_iter: Option<SegmentIterator>,
498}
499
500impl LogIterator {
501    /// Opens a new iterator by looking up segments covering the sequence range.
502    pub(crate) fn open(
503        storage: LogStorageRead,
504        segment_cache: &SegmentCache,
505        key: Bytes,
506        seq_range: Range<Sequence>,
507    ) -> Self {
508        let segments = segment_cache.find_covering(&seq_range);
509        Self {
510            storage,
511            segments,
512            key,
513            seq_range,
514            current_segment_idx: 0,
515            current_iter: None,
516        }
517    }
518
519    /// Creates a new iterator over the given segments.
520    #[cfg(test)]
521    pub(crate) fn new(
522        storage: LogStorageRead,
523        segments: Vec<LogSegment>,
524        key: Bytes,
525        seq_range: Range<Sequence>,
526    ) -> Self {
527        Self {
528            storage,
529            segments,
530            key,
531            seq_range,
532            current_segment_idx: 0,
533            current_iter: None,
534        }
535    }
536
537    /// Returns the next log entry, or None if iteration is complete.
538    pub async fn next(&mut self) -> Result<Option<LogEntry>> {
539        loop {
540            // If we have a current iterator, try to get the next entry
541            if let Some(iter) = &mut self.current_iter {
542                if let Some(entry) = iter.next().await? {
543                    return Ok(Some(entry));
544                }
545                // Current segment exhausted, move to next
546                self.current_iter = None;
547                self.current_segment_idx += 1;
548            }
549
550            // No current iterator, try to advance to next segment
551            if !self.advance_segment().await? {
552                return Ok(None);
553            }
554        }
555    }
556
557    /// Advances to the next segment and creates its iterator.
558    ///
559    /// Returns `true` if a new iterator was created, `false` if no more segments.
560    async fn advance_segment(&mut self) -> Result<bool> {
561        if self.current_segment_idx >= self.segments.len() {
562            return Ok(false);
563        }
564
565        let segment = &self.segments[self.current_segment_idx];
566        let iter = self
567            .storage
568            .scan_entries(segment, &self.key, self.seq_range.clone())
569            .await?;
570        self.current_iter = Some(iter);
571        Ok(true)
572    }
573}
574
575#[cfg(test)]
576mod tests {
577    use super::*;
578    use crate::serde::SegmentMeta;
579    use crate::storage::LogStorage;
580
581    fn entry(key: &[u8], seq: u64, value: &[u8]) -> LogEntry {
582        LogEntry {
583            key: Bytes::copy_from_slice(key),
584            sequence: seq,
585            value: Bytes::copy_from_slice(value),
586        }
587    }
588
589    #[tokio::test]
590    async fn should_return_none_when_no_segments() {
591        let storage = LogStorage::in_memory();
592        let segments = vec![];
593
594        let mut iter =
595            LogIterator::new(storage.as_read(), segments, Bytes::from("key"), 0..u64::MAX);
596
597        assert!(iter.next().await.unwrap().is_none());
598    }
599
600    #[tokio::test]
601    async fn should_iterate_entries_in_single_segment() {
602        let storage = LogStorage::in_memory();
603        let segment = LogSegment::new(0, SegmentMeta::new(0, 1000));
604        storage
605            .write_entry(&segment, &entry(b"key", 0, b"value0"))
606            .await
607            .unwrap();
608        storage
609            .write_entry(&segment, &entry(b"key", 1, b"value1"))
610            .await
611            .unwrap();
612        storage
613            .write_entry(&segment, &entry(b"key", 2, b"value2"))
614            .await
615            .unwrap();
616
617        let mut iter = LogIterator::new(
618            storage.as_read(),
619            vec![segment],
620            Bytes::from("key"),
621            0..u64::MAX,
622        );
623
624        let entry = iter.next().await.unwrap().unwrap();
625        assert_eq!(entry.sequence, 0);
626        assert_eq!(entry.value.as_ref(), b"value0");
627
628        let entry = iter.next().await.unwrap().unwrap();
629        assert_eq!(entry.sequence, 1);
630        assert_eq!(entry.value.as_ref(), b"value1");
631
632        let entry = iter.next().await.unwrap().unwrap();
633        assert_eq!(entry.sequence, 2);
634        assert_eq!(entry.value.as_ref(), b"value2");
635
636        assert!(iter.next().await.unwrap().is_none());
637    }
638
639    #[tokio::test]
640    async fn should_iterate_entries_across_multiple_segments() {
641        let storage = LogStorage::in_memory();
642        let segment0 = LogSegment::new(0, SegmentMeta::new(0, 1000));
643        let segment1 = LogSegment::new(1, SegmentMeta::new(100, 2000));
644        // Entries in segment 0 (start_seq = 0)
645        storage
646            .write_entry(&segment0, &entry(b"key", 0, b"value0"))
647            .await
648            .unwrap();
649        storage
650            .write_entry(&segment0, &entry(b"key", 1, b"value1"))
651            .await
652            .unwrap();
653        // Entries in segment 1 (start_seq = 100)
654        storage
655            .write_entry(&segment1, &entry(b"key", 100, b"value100"))
656            .await
657            .unwrap();
658        storage
659            .write_entry(&segment1, &entry(b"key", 101, b"value101"))
660            .await
661            .unwrap();
662
663        let mut iter = LogIterator::new(
664            storage.as_read(),
665            vec![segment0, segment1],
666            Bytes::from("key"),
667            0..u64::MAX,
668        );
669
670        // Entries from segment 0
671        let entry = iter.next().await.unwrap().unwrap();
672        assert_eq!(entry.sequence, 0);
673        assert_eq!(entry.value.as_ref(), b"value0");
674
675        let entry = iter.next().await.unwrap().unwrap();
676        assert_eq!(entry.sequence, 1);
677        assert_eq!(entry.value.as_ref(), b"value1");
678
679        // Entries from segment 1
680        let entry = iter.next().await.unwrap().unwrap();
681        assert_eq!(entry.sequence, 100);
682        assert_eq!(entry.value.as_ref(), b"value100");
683
684        let entry = iter.next().await.unwrap().unwrap();
685        assert_eq!(entry.sequence, 101);
686        assert_eq!(entry.value.as_ref(), b"value101");
687
688        assert!(iter.next().await.unwrap().is_none());
689    }
690
691    #[tokio::test]
692    async fn should_filter_by_sequence_range() {
693        let storage = LogStorage::in_memory();
694        let segment = LogSegment::new(0, SegmentMeta::new(0, 1000));
695        storage
696            .write_entry(&segment, &entry(b"key", 0, b"value0"))
697            .await
698            .unwrap();
699        storage
700            .write_entry(&segment, &entry(b"key", 1, b"value1"))
701            .await
702            .unwrap();
703        storage
704            .write_entry(&segment, &entry(b"key", 2, b"value2"))
705            .await
706            .unwrap();
707        storage
708            .write_entry(&segment, &entry(b"key", 3, b"value3"))
709            .await
710            .unwrap();
711
712        let mut iter = LogIterator::new(storage.as_read(), vec![segment], Bytes::from("key"), 1..3);
713
714        let entry = iter.next().await.unwrap().unwrap();
715        assert_eq!(entry.sequence, 1);
716
717        let entry = iter.next().await.unwrap().unwrap();
718        assert_eq!(entry.sequence, 2);
719
720        assert!(iter.next().await.unwrap().is_none());
721    }
722
723    #[tokio::test]
724    async fn should_filter_entries_for_specified_key() {
725        let storage = LogStorage::in_memory();
726        let segment = LogSegment::new(0, SegmentMeta::new(0, 1000));
727        storage
728            .write_entry(&segment, &entry(b"key1", 0, b"k1v0"))
729            .await
730            .unwrap();
731        storage
732            .write_entry(&segment, &entry(b"key2", 0, b"k2v0"))
733            .await
734            .unwrap();
735        storage
736            .write_entry(&segment, &entry(b"key1", 1, b"k1v1"))
737            .await
738            .unwrap();
739        storage
740            .write_entry(&segment, &entry(b"key2", 1, b"k2v1"))
741            .await
742            .unwrap();
743
744        let mut iter = LogIterator::new(
745            storage.as_read(),
746            vec![segment],
747            Bytes::from("key1"),
748            0..u64::MAX,
749        );
750
751        let entry = iter.next().await.unwrap().unwrap();
752        assert_eq!(entry.key.as_ref(), b"key1");
753        assert_eq!(entry.sequence, 0);
754
755        let entry = iter.next().await.unwrap().unwrap();
756        assert_eq!(entry.key.as_ref(), b"key1");
757        assert_eq!(entry.sequence, 1);
758
759        assert!(iter.next().await.unwrap().is_none());
760    }
761
762    #[tokio::test]
763    async fn should_return_none_when_no_entries_in_range() {
764        let storage = LogStorage::in_memory();
765        let segment = LogSegment::new(0, SegmentMeta::new(0, 1000));
766        storage
767            .write_entry(&segment, &entry(b"key", 0, b"value0"))
768            .await
769            .unwrap();
770        storage
771            .write_entry(&segment, &entry(b"key", 1, b"value1"))
772            .await
773            .unwrap();
774
775        let mut iter =
776            LogIterator::new(storage.as_read(), vec![segment], Bytes::from("key"), 10..20);
777
778        assert!(iter.next().await.unwrap().is_none());
779    }
780
781    #[tokio::test]
782    async fn open_spawns_refresh_task() {
783        use common::StorageConfig;
784
785        let config = ReaderConfig {
786            storage: StorageConfig::InMemory,
787            refresh_interval: Duration::from_millis(100),
788        };
789
790        let reader = LogDbReader::open(config).await.unwrap();
791
792        // Verify background task is running
793        assert!(reader.refresh_task.is_some());
794
795        // Clean up
796        reader.close().await;
797    }
798
799    #[tokio::test]
800    async fn close_stops_refresh_task_gracefully() {
801        use common::StorageConfig;
802
803        let config = ReaderConfig {
804            storage: StorageConfig::InMemory,
805            refresh_interval: Duration::from_millis(50),
806        };
807
808        let reader = LogDbReader::open(config).await.unwrap();
809        assert!(reader.refresh_task.is_some());
810
811        // Close should complete without timeout
812        let close_result =
813            tokio::time::timeout(Duration::from_secs(1), async { reader.close().await }).await;
814
815        assert!(
816            close_result.is_ok(),
817            "close() should complete within timeout"
818        );
819    }
820}