Skip to main content

log/
reader.rs

1//! Read-only log access and the [`LogRead`] trait.
2//!
3//! This module provides:
4//! - [`LogRead`]: The trait defining read operations on the log.
5//! - [`LogDbReader`]: A read-only view of the log that implements `LogRead`.
6
7use std::ops::{Range, RangeBounds};
8use std::sync::Arc;
9use std::time::Duration;
10
11use async_trait::async_trait;
12use bytes::Bytes;
13use tokio::sync::RwLock;
14use tokio::sync::watch;
15use tokio::task::JoinHandle;
16use tokio::time::MissedTickBehavior;
17
18use crate::config::{CountOptions, ReaderConfig, ScanOptions, SegmentConfig};
19use crate::error::{Error, Result};
20use crate::listing::LogKeyIterator;
21use crate::model::{LogEntry, Segment, SegmentId, Sequence};
22use crate::range::{normalize_segment_id, normalize_sequence};
23use crate::segment::{LogSegment, SegmentCache};
24use crate::storage::{LogStorageRead as _, SegmentIterator};
25use common::storage::factory::create_storage_read;
26use common::{StorageRead, StorageReaderRuntime, StorageSemantics};
27
28/// Trait for read operations on the log.
29///
30/// This trait defines the common read interface shared by [`LogDb`](crate::LogDb)
31/// and [`LogDbReader`]. It provides methods for scanning entries and counting
32/// records within a key's log.
33///
34/// # Implementors
35///
36/// - [`LogDb`](crate::LogDb): The main log interface with both read and write access.
37/// - [`LogDbReader`]: A read-only view of the log.
38///
39/// # Example
40///
41/// ```no_run
42/// use log::{LogRead, Result};
43/// use bytes::Bytes;
44///
45/// async fn process_log(reader: &(impl LogRead + Sync)) -> Result<()> {
46///     let mut iter = reader.scan(Bytes::from("orders"), ..).await?;
47///     while let Some(entry) = iter.next().await? {
48///         println!("seq={}: {:?}", entry.sequence, entry.value);
49///     }
50///     Ok(())
51/// }
52/// ```
53#[async_trait]
54pub trait LogRead {
55    /// Scans entries for a key within a sequence number range.
56    ///
57    /// Returns an iterator that yields entries in sequence number order.
58    /// The range is specified using Rust's standard range syntax.
59    ///
60    /// This method uses default scan options. Use [`scan_with_options`] for
61    /// custom read behavior.
62    ///
63    /// # Read Visibility
64    ///
65    /// An active scan may or may not see records appended after the initial
66    /// call. However, all records returned will always respect the correct
67    /// ordering of records (no reordering).
68    ///
69    /// # Arguments
70    ///
71    /// * `key` - The key identifying the log stream to scan.
72    /// * `seq_range` - The sequence number range to scan. Supports all Rust
73    ///   range types (`..`, `start..`, `..end`, `start..end`, etc.).
74    ///
75    /// # Errors
76    ///
77    /// Returns an error if the scan fails due to storage issues.
78    ///
79    /// [`scan_with_options`]: LogRead::scan_with_options
80    async fn scan(
81        &self,
82        key: Bytes,
83        seq_range: impl RangeBounds<Sequence> + Send,
84    ) -> Result<LogIterator> {
85        self.scan_with_options(key, seq_range, ScanOptions::default())
86            .await
87    }
88
89    /// Scans entries for a key within a sequence number range with custom options.
90    ///
91    /// Returns an iterator that yields entries in sequence number order.
92    /// See [`scan`](LogRead::scan) for read visibility semantics.
93    ///
94    /// # Arguments
95    ///
96    /// * `key` - The key identifying the log stream to scan.
97    /// * `seq_range` - The sequence number range to scan.
98    /// * `options` - Scan options controlling read behavior.
99    ///
100    /// # Errors
101    ///
102    /// Returns an error if the scan fails due to storage issues.
103    async fn scan_with_options(
104        &self,
105        key: Bytes,
106        seq_range: impl RangeBounds<Sequence> + Send,
107        options: ScanOptions,
108    ) -> Result<LogIterator>;
109
110    /// Counts entries for a key within a sequence number range.
111    ///
112    /// Returns the number of entries in the specified range. This is useful
113    /// for computing lag (how far behind a consumer is) or progress metrics.
114    ///
115    /// This method uses default count options (exact count). Use
116    /// [`count_with_options`] for approximate counts.
117    ///
118    /// # Arguments
119    ///
120    /// * `key` - The key identifying the log stream to count.
121    /// * `seq_range` - The sequence number range to count.
122    ///
123    /// # Errors
124    ///
125    /// Returns an error if the count fails due to storage issues.
126    ///
127    /// [`count_with_options`]: LogRead::count_with_options
128    async fn count(&self, key: Bytes, seq_range: impl RangeBounds<Sequence> + Send) -> Result<u64> {
129        self.count_with_options(key, seq_range, CountOptions::default())
130            .await
131    }
132
133    /// Counts entries for a key within a sequence number range with custom options.
134    ///
135    /// # Arguments
136    ///
137    /// * `key` - The key identifying the log stream to count.
138    /// * `seq_range` - The sequence number range to count.
139    /// * `options` - Count options, including whether to return an approximate count.
140    ///
141    /// # Errors
142    ///
143    /// Returns an error if the count fails due to storage issues.
144    async fn count_with_options(
145        &self,
146        key: Bytes,
147        seq_range: impl RangeBounds<Sequence> + Send,
148        options: CountOptions,
149    ) -> Result<u64>;
150
151    /// Lists distinct keys within a segment range.
152    ///
153    /// Returns an iterator over keys that have entries in the specified segments.
154    /// Each key is returned exactly once, even if it appears in multiple segments.
155    ///
156    /// Pass `..` to list keys from all segments.
157    ///
158    /// # Arguments
159    ///
160    /// * `segment_range` - The segment ID range to list keys from.
161    ///
162    /// # Errors
163    ///
164    /// Returns an error if the list operation fails due to storage issues.
165    ///
166    /// # Example
167    ///
168    /// ```no_run
169    /// # use log::{LogDb, LogRead, Config};
170    /// # use common::StorageConfig;
171    /// # #[tokio::main]
172    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
173    /// # let config = Config { storage: StorageConfig::InMemory, ..Default::default() };
174    /// # let log = LogDb::open(config).await?;
175    /// // List all keys
176    /// let mut iter = log.list_keys(..).await?;
177    ///
178    /// // List keys from specific segments
179    /// let segments = log.list_segments(100..200).await?;
180    /// let start = segments.first().map(|s| s.id).unwrap_or(0);
181    /// let end = segments.last().map(|s| s.id + 1).unwrap_or(0);
182    /// let mut iter = log.list_keys(start..end).await?;
183    /// # Ok(())
184    /// # }
185    /// ```
186    async fn list_keys(
187        &self,
188        segment_range: impl RangeBounds<SegmentId> + Send,
189    ) -> Result<LogKeyIterator>;
190
191    /// Lists segments overlapping a sequence number range.
192    ///
193    /// Returns all segments that overlap the specified sequence range. This is
194    /// a precise operation—segments have well-defined boundaries, so there is
195    /// no approximation.
196    ///
197    /// Pass `..` to list all segments.
198    ///
199    /// # Arguments
200    ///
201    /// * `seq_range` - The sequence number range to filter segments by.
202    ///
203    /// # Errors
204    ///
205    /// Returns an error if the operation fails due to storage issues.
206    ///
207    /// # Example
208    ///
209    /// ```no_run
210    /// # use log::{LogDb, LogRead, Config};
211    /// # use common::StorageConfig;
212    /// # #[tokio::main]
213    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
214    /// # let config = Config { storage: StorageConfig::InMemory, ..Default::default() };
215    /// # let log = LogDb::open(config).await?;
216    /// // List all segments
217    /// let segments = log.list_segments(..).await?;
218    ///
219    /// // List segments overlapping a specific range
220    /// let segments = log.list_segments(100..200).await?;
221    /// # Ok(())
222    /// # }
223    /// ```
224    async fn list_segments(
225        &self,
226        seq_range: impl RangeBounds<Sequence> + Send,
227    ) -> Result<Vec<Segment>>;
228}
229
230/// Shared read component used by both `LogDb` and `LogDbReader`.
231///
232/// Contains the storage and segment cache needed for read operations.
233/// Wrapped in `Arc<RwLock<_>>` by both consumers.
234pub(crate) struct LogReadView {
235    pub(crate) storage: Arc<dyn StorageRead>,
236    pub(crate) segments: SegmentCache,
237}
238
239impl LogReadView {
240    /// Creates a new `LogReadView`.
241    pub(crate) fn new(storage: Arc<dyn StorageRead>, segments: SegmentCache) -> Self {
242        Self { storage, segments }
243    }
244
245    /// Replaces the underlying storage snapshot with a new one.
246    pub(crate) fn update_snapshot(&mut self, snapshot: Arc<dyn StorageRead>) {
247        self.storage = snapshot;
248    }
249
250    /// Replaces the segment cache contents with the given segments.
251    pub(crate) fn replace_segments(&mut self, segments: &[LogSegment]) {
252        self.segments.replace_all(segments);
253    }
254
255    /// Scans entries for a key within a sequence number range with custom options.
256    pub(crate) fn scan_with_options(
257        &self,
258        key: Bytes,
259        seq_range: Range<Sequence>,
260        _options: &ScanOptions,
261    ) -> LogIterator {
262        LogIterator::open(Arc::clone(&self.storage), &self.segments, key, seq_range)
263    }
264
265    /// Lists distinct keys within a segment range.
266    pub(crate) async fn list_keys(
267        &self,
268        segment_range: Range<SegmentId>,
269    ) -> Result<LogKeyIterator> {
270        let keys = self.storage.list_keys(segment_range).await?;
271        Ok(LogKeyIterator::from_keys(keys))
272    }
273
274    /// Lists segments overlapping a sequence number range.
275    pub(crate) fn list_segments(&self, seq_range: &Range<Sequence>) -> Vec<Segment> {
276        self.segments
277            .find_covering(seq_range)
278            .into_iter()
279            .map(|s| s.into())
280            .collect()
281    }
282}
283
284/// A read-only view of the log.
285///
286/// `LogDbReader` provides access to all read operations via the [`LogRead`]
287/// trait, but not write operations. This is useful for:
288///
289/// - Consumers that should not have write access
290/// - Sharing read access across multiple components
291/// - Separating read and write concerns in your application
292///
293/// # Obtaining a LogDbReader
294///
295/// A `LogDbReader` is created by calling [`LogDbReader::open`]:
296///
297/// ```no_run
298/// # use log::{LogDbReader, ReaderConfig};
299/// # use common::StorageConfig;
300/// # #[tokio::main]
301/// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
302/// let config = ReaderConfig { storage: StorageConfig::default(), ..Default::default() };
303/// let reader = LogDbReader::open(config).await?;
304/// # Ok(())
305/// # }
306/// ```
307///
308/// # Thread Safety
309///
310/// `LogDbReader` is designed to be cloned and shared across threads.
311/// All methods take `&self` and are safe to call concurrently.
312///
313/// # Example
314///
315/// ```no_run
316/// use log::{LogDbReader, LogRead, LogEntry};
317/// use bytes::Bytes;
318/// use std::time::Duration;
319///
320/// async fn consume_events(reader: LogDbReader, key: Bytes) -> log::Result<()> {
321///     let mut checkpoint: u64 = 0;
322///
323///     loop {
324///         let mut iter = reader.scan(key.clone(), checkpoint..).await?;
325///         while let Some(entry) = iter.next().await? {
326///             println!("entry: {:?}", entry);
327///             checkpoint = entry.sequence + 1;
328///         }
329///
330///         // Check how far behind we are
331///         let lag = reader.count(key.clone(), checkpoint..).await?;
332///         if lag == 0 {
333///             // Caught up, wait for new entries
334///             tokio::time::sleep(Duration::from_millis(100)).await;
335///         }
336///     }
337/// }
338/// ```
339pub struct LogDbReader {
340    read_view: Arc<RwLock<LogReadView>>,
341    shutdown_tx: watch::Sender<bool>,
342    refresh_task: Option<JoinHandle<()>>,
343}
344
345impl LogDbReader {
346    /// Opens a read-only view of the log with the given configuration.
347    ///
348    /// This creates a `LogDbReader` that can scan and count entries but cannot
349    /// append new records. Use this when you only need read access to the log.
350    ///
351    /// When `refresh_interval` is set, the reader periodically discovers new
352    /// data written by other processes.
353    ///
354    /// # Arguments
355    ///
356    /// * `config` - Reader configuration including storage and refresh settings.
357    ///
358    /// # Errors
359    ///
360    /// Returns an error if the storage backend cannot be initialized.
361    ///
362    /// # Example
363    ///
364    /// ```no_run
365    /// use log::{LogDbReader, LogRead, ReaderConfig};
366    /// use common::StorageConfig;
367    /// use bytes::Bytes;
368    ///
369    /// # #[tokio::main]
370    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
371    /// let config = ReaderConfig {
372    ///     storage: StorageConfig::default(),
373    ///     ..Default::default()
374    /// };
375    /// let reader = LogDbReader::open(config).await?;
376    ///
377    /// // Reader will automatically discover new data
378    /// let mut iter = reader.scan(Bytes::from("orders"), ..).await?;
379    /// while let Some(entry) = iter.next().await? {
380    ///     println!("seq={}: {:?}", entry.sequence, entry.value);
381    /// }
382    ///
383    /// // Gracefully shut down when done
384    /// reader.close().await;
385    /// # Ok(())
386    /// # }
387    /// ```
388    pub async fn open(config: ReaderConfig) -> Result<Self> {
389        let reader_options = slatedb::config::DbReaderOptions {
390            manifest_poll_interval: config.refresh_interval,
391            ..Default::default()
392        };
393        let storage: Arc<dyn StorageRead> = create_storage_read(
394            &config.storage,
395            StorageReaderRuntime::new(),
396            StorageSemantics::new(),
397            reader_options,
398        )
399        .await
400        .map_err(|e| Error::Storage(e.to_string()))?;
401        let segments = SegmentCache::open(storage.as_ref(), SegmentConfig::default()).await?;
402        let read_view = Arc::new(RwLock::new(LogReadView::new(storage, segments)));
403
404        let (shutdown_tx, refresh_task) =
405            Self::spawn_refresh_task(Arc::clone(&read_view), config.refresh_interval);
406
407        Ok(Self {
408            read_view,
409            shutdown_tx,
410            refresh_task: Some(refresh_task),
411        })
412    }
413
414    /// Spawns a background task that periodically refreshes the segment cache.
415    fn spawn_refresh_task(
416        read_view: Arc<RwLock<LogReadView>>,
417        interval: Duration,
418    ) -> (watch::Sender<bool>, JoinHandle<()>) {
419        let (shutdown_tx, mut shutdown_rx) = watch::channel(false);
420
421        let task = tokio::spawn(async move {
422            let mut ticker = tokio::time::interval(interval);
423            ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
424
425            loop {
426                tokio::select! {
427                    _ = ticker.tick() => {
428                        // Get the latest segment ID for incremental refresh
429                        let after_segment_id = {
430                            let view = read_view.read().await;
431                            view.segments.latest().map(|s| s.id())
432                        };
433
434                        // Refresh the cache
435                        let mut view = read_view.write().await;
436                        let storage = Arc::clone(&view.storage);
437                        if let Err(e) = view.segments.refresh(storage.as_ref(), after_segment_id).await {
438                            tracing::warn!("Failed to refresh segment cache: {}", e);
439                        }
440                    }
441                    _ = shutdown_rx.changed() => {
442                        if *shutdown_rx.borrow() {
443                            break;
444                        }
445                    }
446                }
447            }
448        });
449
450        (shutdown_tx, task)
451    }
452
453    /// Creates a LogDbReader from an existing storage implementation.
454    #[cfg(test)]
455    pub(crate) async fn new(storage: Arc<dyn StorageRead>) -> Result<Self> {
456        let segments = SegmentCache::open(storage.as_ref(), SegmentConfig::default()).await?;
457        let read_view = Arc::new(RwLock::new(LogReadView::new(storage, segments)));
458        let (shutdown_tx, _) = watch::channel(false);
459        Ok(Self {
460            read_view,
461            shutdown_tx,
462            refresh_task: None,
463        })
464    }
465
466    /// Closes the reader, stopping the background refresh task.
467    ///
468    /// This method consumes `self` and gracefully shuts down the background
469    /// refresh task. It waits up to 5 seconds for the task to complete.
470    pub async fn close(self) {
471        // Signal shutdown
472        let _ = self.shutdown_tx.send(true);
473
474        // Wait for the task to complete with timeout
475        if let Some(task) = self.refresh_task {
476            let timeout = tokio::time::timeout(Duration::from_secs(5), task).await;
477            if timeout.is_err() {
478                tracing::warn!("Refresh task did not stop within timeout");
479            }
480        }
481    }
482}
483
484#[async_trait]
485impl LogRead for LogDbReader {
486    async fn scan_with_options(
487        &self,
488        key: Bytes,
489        seq_range: impl RangeBounds<Sequence> + Send,
490        options: ScanOptions,
491    ) -> Result<LogIterator> {
492        let seq_range = normalize_sequence(&seq_range);
493        let view = self.read_view.read().await;
494        Ok(view.scan_with_options(key, seq_range, &options))
495    }
496
497    async fn count_with_options(
498        &self,
499        _key: Bytes,
500        _seq_range: impl RangeBounds<Sequence> + Send,
501        _options: CountOptions,
502    ) -> Result<u64> {
503        todo!()
504    }
505
506    async fn list_keys(
507        &self,
508        segment_range: impl RangeBounds<SegmentId> + Send,
509    ) -> Result<LogKeyIterator> {
510        let segment_range = normalize_segment_id(&segment_range);
511        let view = self.read_view.read().await;
512        view.list_keys(segment_range).await
513    }
514
515    async fn list_segments(
516        &self,
517        seq_range: impl RangeBounds<Sequence> + Send,
518    ) -> Result<Vec<Segment>> {
519        let seq_range = normalize_sequence(&seq_range);
520        let view = self.read_view.read().await;
521        Ok(view.list_segments(&seq_range))
522    }
523}
524
525/// Iterator over log entries across multiple segments.
526///
527/// Iterates through segments in order, fetching entries for the given key
528/// within the sequence range. Instantiates a `SegmentIterator` for each
529/// segment as needed.
530pub struct LogIterator {
531    storage: Arc<dyn StorageRead>,
532    segments: Vec<LogSegment>,
533    key: Bytes,
534    seq_range: Range<Sequence>,
535    current_segment_idx: usize,
536    current_iter: Option<SegmentIterator>,
537}
538
539impl LogIterator {
540    /// Opens a new iterator by looking up segments covering the sequence range.
541    pub(crate) fn open(
542        storage: Arc<dyn StorageRead>,
543        segment_cache: &SegmentCache,
544        key: Bytes,
545        seq_range: Range<Sequence>,
546    ) -> Self {
547        let segments = segment_cache.find_covering(&seq_range);
548        Self {
549            storage,
550            segments,
551            key,
552            seq_range,
553            current_segment_idx: 0,
554            current_iter: None,
555        }
556    }
557
558    /// Creates a new iterator over the given segments.
559    #[cfg(test)]
560    pub(crate) fn new(
561        storage: Arc<dyn StorageRead>,
562        segments: Vec<LogSegment>,
563        key: Bytes,
564        seq_range: Range<Sequence>,
565    ) -> Self {
566        Self {
567            storage,
568            segments,
569            key,
570            seq_range,
571            current_segment_idx: 0,
572            current_iter: None,
573        }
574    }
575
576    /// Returns the next log entry, or None if iteration is complete.
577    pub async fn next(&mut self) -> Result<Option<LogEntry>> {
578        loop {
579            // If we have a current iterator, try to get the next entry
580            if let Some(iter) = &mut self.current_iter {
581                if let Some(entry) = iter.next().await? {
582                    return Ok(Some(entry));
583                }
584                // Current segment exhausted, move to next
585                self.current_iter = None;
586                self.current_segment_idx += 1;
587            }
588
589            // No current iterator, try to advance to next segment
590            if !self.advance_segment().await? {
591                return Ok(None);
592            }
593        }
594    }
595
596    /// Advances to the next segment and creates its iterator.
597    ///
598    /// Returns `true` if a new iterator was created, `false` if no more segments.
599    async fn advance_segment(&mut self) -> Result<bool> {
600        if self.current_segment_idx >= self.segments.len() {
601            return Ok(false);
602        }
603
604        let segment = &self.segments[self.current_segment_idx];
605        let iter = self
606            .storage
607            .scan_entries(segment, &self.key, self.seq_range.clone())
608            .await?;
609        self.current_iter = Some(iter);
610        Ok(true)
611    }
612}
613
614#[cfg(test)]
615mod tests {
616    use super::*;
617    use crate::serde::SegmentMeta;
618    use crate::storage::LogStorageWrite;
619    use common::Storage;
620    use opendata_macros::storage_test;
621
622    fn entry(key: &[u8], seq: u64, value: &[u8]) -> LogEntry {
623        LogEntry {
624            key: Bytes::copy_from_slice(key),
625            sequence: seq,
626            value: Bytes::copy_from_slice(value),
627        }
628    }
629
630    #[storage_test]
631    async fn should_return_none_when_no_segments(storage: Arc<dyn Storage>) {
632        let segments = vec![];
633
634        let mut iter = LogIterator::new(
635            storage.clone() as Arc<dyn StorageRead>,
636            segments,
637            Bytes::from("key"),
638            0..u64::MAX,
639        );
640
641        assert!(iter.next().await.unwrap().is_none());
642    }
643
644    #[storage_test]
645    async fn should_iterate_entries_in_single_segment(storage: Arc<dyn Storage>) {
646        let segment = LogSegment::new(0, SegmentMeta::new(0, 1000));
647        storage
648            .write_entry(&segment, &entry(b"key", 0, b"value0"))
649            .await
650            .unwrap();
651        storage
652            .write_entry(&segment, &entry(b"key", 1, b"value1"))
653            .await
654            .unwrap();
655        storage
656            .write_entry(&segment, &entry(b"key", 2, b"value2"))
657            .await
658            .unwrap();
659
660        let mut iter = LogIterator::new(
661            storage.clone() as Arc<dyn StorageRead>,
662            vec![segment],
663            Bytes::from("key"),
664            0..u64::MAX,
665        );
666
667        let entry = iter.next().await.unwrap().unwrap();
668        assert_eq!(entry.sequence, 0);
669        assert_eq!(entry.value.as_ref(), b"value0");
670
671        let entry = iter.next().await.unwrap().unwrap();
672        assert_eq!(entry.sequence, 1);
673        assert_eq!(entry.value.as_ref(), b"value1");
674
675        let entry = iter.next().await.unwrap().unwrap();
676        assert_eq!(entry.sequence, 2);
677        assert_eq!(entry.value.as_ref(), b"value2");
678
679        assert!(iter.next().await.unwrap().is_none());
680    }
681
682    #[storage_test]
683    async fn should_iterate_entries_across_multiple_segments(storage: Arc<dyn Storage>) {
684        let segment0 = LogSegment::new(0, SegmentMeta::new(0, 1000));
685        let segment1 = LogSegment::new(1, SegmentMeta::new(100, 2000));
686        // Entries in segment 0 (start_seq = 0)
687        storage
688            .write_entry(&segment0, &entry(b"key", 0, b"value0"))
689            .await
690            .unwrap();
691        storage
692            .write_entry(&segment0, &entry(b"key", 1, b"value1"))
693            .await
694            .unwrap();
695        // Entries in segment 1 (start_seq = 100)
696        storage
697            .write_entry(&segment1, &entry(b"key", 100, b"value100"))
698            .await
699            .unwrap();
700        storage
701            .write_entry(&segment1, &entry(b"key", 101, b"value101"))
702            .await
703            .unwrap();
704
705        let mut iter = LogIterator::new(
706            storage.clone() as Arc<dyn StorageRead>,
707            vec![segment0, segment1],
708            Bytes::from("key"),
709            0..u64::MAX,
710        );
711
712        // Entries from segment 0
713        let entry = iter.next().await.unwrap().unwrap();
714        assert_eq!(entry.sequence, 0);
715        assert_eq!(entry.value.as_ref(), b"value0");
716
717        let entry = iter.next().await.unwrap().unwrap();
718        assert_eq!(entry.sequence, 1);
719        assert_eq!(entry.value.as_ref(), b"value1");
720
721        // Entries from segment 1
722        let entry = iter.next().await.unwrap().unwrap();
723        assert_eq!(entry.sequence, 100);
724        assert_eq!(entry.value.as_ref(), b"value100");
725
726        let entry = iter.next().await.unwrap().unwrap();
727        assert_eq!(entry.sequence, 101);
728        assert_eq!(entry.value.as_ref(), b"value101");
729
730        assert!(iter.next().await.unwrap().is_none());
731    }
732
733    #[storage_test]
734    async fn should_filter_by_sequence_range(storage: Arc<dyn Storage>) {
735        let segment = LogSegment::new(0, SegmentMeta::new(0, 1000));
736        storage
737            .write_entry(&segment, &entry(b"key", 0, b"value0"))
738            .await
739            .unwrap();
740        storage
741            .write_entry(&segment, &entry(b"key", 1, b"value1"))
742            .await
743            .unwrap();
744        storage
745            .write_entry(&segment, &entry(b"key", 2, b"value2"))
746            .await
747            .unwrap();
748        storage
749            .write_entry(&segment, &entry(b"key", 3, b"value3"))
750            .await
751            .unwrap();
752
753        let mut iter = LogIterator::new(
754            storage.clone() as Arc<dyn StorageRead>,
755            vec![segment],
756            Bytes::from("key"),
757            1..3,
758        );
759
760        let entry = iter.next().await.unwrap().unwrap();
761        assert_eq!(entry.sequence, 1);
762
763        let entry = iter.next().await.unwrap().unwrap();
764        assert_eq!(entry.sequence, 2);
765
766        assert!(iter.next().await.unwrap().is_none());
767    }
768
769    #[storage_test]
770    async fn should_filter_entries_for_specified_key(storage: Arc<dyn Storage>) {
771        let segment = LogSegment::new(0, SegmentMeta::new(0, 1000));
772        storage
773            .write_entry(&segment, &entry(b"key1", 0, b"k1v0"))
774            .await
775            .unwrap();
776        storage
777            .write_entry(&segment, &entry(b"key2", 0, b"k2v0"))
778            .await
779            .unwrap();
780        storage
781            .write_entry(&segment, &entry(b"key1", 1, b"k1v1"))
782            .await
783            .unwrap();
784        storage
785            .write_entry(&segment, &entry(b"key2", 1, b"k2v1"))
786            .await
787            .unwrap();
788
789        let mut iter = LogIterator::new(
790            storage.clone() as Arc<dyn StorageRead>,
791            vec![segment],
792            Bytes::from("key1"),
793            0..u64::MAX,
794        );
795
796        let entry = iter.next().await.unwrap().unwrap();
797        assert_eq!(entry.key.as_ref(), b"key1");
798        assert_eq!(entry.sequence, 0);
799
800        let entry = iter.next().await.unwrap().unwrap();
801        assert_eq!(entry.key.as_ref(), b"key1");
802        assert_eq!(entry.sequence, 1);
803
804        assert!(iter.next().await.unwrap().is_none());
805    }
806
807    #[storage_test]
808    async fn should_return_none_when_no_entries_in_range(storage: Arc<dyn Storage>) {
809        let segment = LogSegment::new(0, SegmentMeta::new(0, 1000));
810        storage
811            .write_entry(&segment, &entry(b"key", 0, b"value0"))
812            .await
813            .unwrap();
814        storage
815            .write_entry(&segment, &entry(b"key", 1, b"value1"))
816            .await
817            .unwrap();
818
819        let mut iter = LogIterator::new(
820            storage.clone() as Arc<dyn StorageRead>,
821            vec![segment],
822            Bytes::from("key"),
823            10..20,
824        );
825
826        assert!(iter.next().await.unwrap().is_none());
827    }
828
829    #[tokio::test]
830    async fn open_spawns_refresh_task() {
831        use common::StorageConfig;
832
833        let config = ReaderConfig {
834            storage: StorageConfig::InMemory,
835            refresh_interval: Duration::from_millis(100),
836        };
837
838        let reader = LogDbReader::open(config).await.unwrap();
839
840        // Verify background task is running
841        assert!(reader.refresh_task.is_some());
842
843        // Clean up
844        reader.close().await;
845    }
846
847    #[tokio::test]
848    async fn close_stops_refresh_task_gracefully() {
849        use common::StorageConfig;
850
851        let config = ReaderConfig {
852            storage: StorageConfig::InMemory,
853            refresh_interval: Duration::from_millis(50),
854        };
855
856        let reader = LogDbReader::open(config).await.unwrap();
857        assert!(reader.refresh_task.is_some());
858
859        // Close should complete without timeout
860        let close_result =
861            tokio::time::timeout(Duration::from_secs(1), async { reader.close().await }).await;
862
863        assert!(
864            close_result.is_ok(),
865            "close() should complete within timeout"
866        );
867    }
868}