log/reader.rs
1//! Read-only log access and the [`LogRead`] trait.
2//!
3//! This module provides:
4//! - [`LogRead`]: The trait defining read operations on the log.
5//! - [`LogDbReader`]: A read-only view of the log that implements `LogRead`.
6
7use std::ops::{Range, RangeBounds};
8use std::sync::Arc;
9use std::time::Duration;
10
11use async_trait::async_trait;
12use bytes::Bytes;
13use tokio::sync::RwLock;
14use tokio::sync::watch;
15use tokio::task::JoinHandle;
16use tokio::time::MissedTickBehavior;
17
18use crate::config::{CountOptions, ReaderConfig, ScanOptions, SegmentConfig};
19use crate::error::{Error, Result};
20use crate::listing::LogKeyIterator;
21use crate::model::{LogEntry, Segment, SegmentId, Sequence};
22use crate::range::{normalize_segment_id, normalize_sequence};
23use crate::segment::{LogSegment, SegmentCache};
24use crate::storage::{LogStorageRead as _, SegmentIterator};
25use common::storage::factory::create_storage_read;
26use common::{StorageRead, StorageReaderRuntime, StorageSemantics};
27
28/// Trait for read operations on the log.
29///
30/// This trait defines the common read interface shared by [`LogDb`](crate::LogDb)
31/// and [`LogDbReader`]. It provides methods for scanning entries and counting
32/// records within a key's log.
33///
34/// # Implementors
35///
36/// - [`LogDb`](crate::LogDb): The main log interface with both read and write access.
37/// - [`LogDbReader`]: A read-only view of the log.
38///
39/// # Example
40///
41/// ```no_run
42/// use log::{LogRead, Result};
43/// use bytes::Bytes;
44///
45/// async fn process_log(reader: &(impl LogRead + Sync)) -> Result<()> {
46/// let mut iter = reader.scan(Bytes::from("orders"), ..).await?;
47/// while let Some(entry) = iter.next().await? {
48/// println!("seq={}: {:?}", entry.sequence, entry.value);
49/// }
50/// Ok(())
51/// }
52/// ```
53#[async_trait]
54pub trait LogRead {
55 /// Scans entries for a key within a sequence number range.
56 ///
57 /// Returns an iterator that yields entries in sequence number order.
58 /// The range is specified using Rust's standard range syntax.
59 ///
60 /// This method uses default scan options. Use [`scan_with_options`] for
61 /// custom read behavior.
62 ///
63 /// # Read Visibility
64 ///
65 /// An active scan may or may not see records appended after the initial
66 /// call. However, all records returned will always respect the correct
67 /// ordering of records (no reordering).
68 ///
69 /// # Arguments
70 ///
71 /// * `key` - The key identifying the log stream to scan.
72 /// * `seq_range` - The sequence number range to scan. Supports all Rust
73 /// range types (`..`, `start..`, `..end`, `start..end`, etc.).
74 ///
75 /// # Errors
76 ///
77 /// Returns an error if the scan fails due to storage issues.
78 ///
79 /// [`scan_with_options`]: LogRead::scan_with_options
80 async fn scan(
81 &self,
82 key: Bytes,
83 seq_range: impl RangeBounds<Sequence> + Send,
84 ) -> Result<LogIterator> {
85 self.scan_with_options(key, seq_range, ScanOptions::default())
86 .await
87 }
88
89 /// Scans entries for a key within a sequence number range with custom options.
90 ///
91 /// Returns an iterator that yields entries in sequence number order.
92 /// See [`scan`](LogRead::scan) for read visibility semantics.
93 ///
94 /// # Arguments
95 ///
96 /// * `key` - The key identifying the log stream to scan.
97 /// * `seq_range` - The sequence number range to scan.
98 /// * `options` - Scan options controlling read behavior.
99 ///
100 /// # Errors
101 ///
102 /// Returns an error if the scan fails due to storage issues.
103 async fn scan_with_options(
104 &self,
105 key: Bytes,
106 seq_range: impl RangeBounds<Sequence> + Send,
107 options: ScanOptions,
108 ) -> Result<LogIterator>;
109
110 /// Counts entries for a key within a sequence number range.
111 ///
112 /// Returns the number of entries in the specified range. This is useful
113 /// for computing lag (how far behind a consumer is) or progress metrics.
114 ///
115 /// This method uses default count options (exact count). Use
116 /// [`count_with_options`] for approximate counts.
117 ///
118 /// # Arguments
119 ///
120 /// * `key` - The key identifying the log stream to count.
121 /// * `seq_range` - The sequence number range to count.
122 ///
123 /// # Errors
124 ///
125 /// Returns an error if the count fails due to storage issues.
126 ///
127 /// [`count_with_options`]: LogRead::count_with_options
128 async fn count(&self, key: Bytes, seq_range: impl RangeBounds<Sequence> + Send) -> Result<u64> {
129 self.count_with_options(key, seq_range, CountOptions::default())
130 .await
131 }
132
133 /// Counts entries for a key within a sequence number range with custom options.
134 ///
135 /// # Arguments
136 ///
137 /// * `key` - The key identifying the log stream to count.
138 /// * `seq_range` - The sequence number range to count.
139 /// * `options` - Count options, including whether to return an approximate count.
140 ///
141 /// # Errors
142 ///
143 /// Returns an error if the count fails due to storage issues.
144 async fn count_with_options(
145 &self,
146 key: Bytes,
147 seq_range: impl RangeBounds<Sequence> + Send,
148 options: CountOptions,
149 ) -> Result<u64>;
150
151 /// Lists distinct keys within a segment range.
152 ///
153 /// Returns an iterator over keys that have entries in the specified segments.
154 /// Each key is returned exactly once, even if it appears in multiple segments.
155 ///
156 /// Pass `..` to list keys from all segments.
157 ///
158 /// # Arguments
159 ///
160 /// * `segment_range` - The segment ID range to list keys from.
161 ///
162 /// # Errors
163 ///
164 /// Returns an error if the list operation fails due to storage issues.
165 ///
166 /// # Example
167 ///
168 /// ```no_run
169 /// # use log::{LogDb, LogRead, Config};
170 /// # use common::StorageConfig;
171 /// # #[tokio::main]
172 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
173 /// # let config = Config { storage: StorageConfig::InMemory, ..Default::default() };
174 /// # let log = LogDb::open(config).await?;
175 /// // List all keys
176 /// let mut iter = log.list_keys(..).await?;
177 ///
178 /// // List keys from specific segments
179 /// let segments = log.list_segments(100..200).await?;
180 /// let start = segments.first().map(|s| s.id).unwrap_or(0);
181 /// let end = segments.last().map(|s| s.id + 1).unwrap_or(0);
182 /// let mut iter = log.list_keys(start..end).await?;
183 /// # Ok(())
184 /// # }
185 /// ```
186 async fn list_keys(
187 &self,
188 segment_range: impl RangeBounds<SegmentId> + Send,
189 ) -> Result<LogKeyIterator>;
190
191 /// Lists segments overlapping a sequence number range.
192 ///
193 /// Returns all segments that overlap the specified sequence range. This is
194 /// a precise operation—segments have well-defined boundaries, so there is
195 /// no approximation.
196 ///
197 /// Pass `..` to list all segments.
198 ///
199 /// # Arguments
200 ///
201 /// * `seq_range` - The sequence number range to filter segments by.
202 ///
203 /// # Errors
204 ///
205 /// Returns an error if the operation fails due to storage issues.
206 ///
207 /// # Example
208 ///
209 /// ```no_run
210 /// # use log::{LogDb, LogRead, Config};
211 /// # use common::StorageConfig;
212 /// # #[tokio::main]
213 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
214 /// # let config = Config { storage: StorageConfig::InMemory, ..Default::default() };
215 /// # let log = LogDb::open(config).await?;
216 /// // List all segments
217 /// let segments = log.list_segments(..).await?;
218 ///
219 /// // List segments overlapping a specific range
220 /// let segments = log.list_segments(100..200).await?;
221 /// # Ok(())
222 /// # }
223 /// ```
224 async fn list_segments(
225 &self,
226 seq_range: impl RangeBounds<Sequence> + Send,
227 ) -> Result<Vec<Segment>>;
228}
229
230/// Shared read component used by both `LogDb` and `LogDbReader`.
231///
232/// Contains the storage and segment cache needed for read operations.
233/// Wrapped in `Arc<RwLock<_>>` by both consumers.
234pub(crate) struct LogReadView {
235 pub(crate) storage: Arc<dyn StorageRead>,
236 pub(crate) segments: SegmentCache,
237}
238
239impl LogReadView {
240 /// Creates a new `LogReadView`.
241 pub(crate) fn new(storage: Arc<dyn StorageRead>, segments: SegmentCache) -> Self {
242 Self { storage, segments }
243 }
244
245 /// Replaces the underlying storage snapshot with a new one.
246 pub(crate) fn update_snapshot(&mut self, snapshot: Arc<dyn StorageRead>) {
247 self.storage = snapshot;
248 }
249
250 /// Replaces the segment cache contents with the given segments.
251 pub(crate) fn replace_segments(&mut self, segments: &[LogSegment]) {
252 self.segments.replace_all(segments);
253 }
254
255 /// Scans entries for a key within a sequence number range with custom options.
256 pub(crate) fn scan_with_options(
257 &self,
258 key: Bytes,
259 seq_range: Range<Sequence>,
260 _options: &ScanOptions,
261 ) -> LogIterator {
262 LogIterator::open(Arc::clone(&self.storage), &self.segments, key, seq_range)
263 }
264
265 /// Lists distinct keys within a segment range.
266 pub(crate) async fn list_keys(
267 &self,
268 segment_range: Range<SegmentId>,
269 ) -> Result<LogKeyIterator> {
270 let keys = self.storage.list_keys(segment_range).await?;
271 Ok(LogKeyIterator::from_keys(keys))
272 }
273
274 /// Lists segments overlapping a sequence number range.
275 pub(crate) fn list_segments(&self, seq_range: &Range<Sequence>) -> Vec<Segment> {
276 self.segments
277 .find_covering(seq_range)
278 .into_iter()
279 .map(|s| s.into())
280 .collect()
281 }
282}
283
284/// A read-only view of the log.
285///
286/// `LogDbReader` provides access to all read operations via the [`LogRead`]
287/// trait, but not write operations. This is useful for:
288///
289/// - Consumers that should not have write access
290/// - Sharing read access across multiple components
291/// - Separating read and write concerns in your application
292///
293/// # Obtaining a LogDbReader
294///
295/// A `LogDbReader` is created by calling [`LogDbReader::open`]:
296///
297/// ```no_run
298/// # use log::{LogDbReader, ReaderConfig};
299/// # use common::StorageConfig;
300/// # #[tokio::main]
301/// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
302/// let config = ReaderConfig { storage: StorageConfig::default(), ..Default::default() };
303/// let reader = LogDbReader::open(config).await?;
304/// # Ok(())
305/// # }
306/// ```
307///
308/// # Thread Safety
309///
310/// `LogDbReader` is designed to be cloned and shared across threads.
311/// All methods take `&self` and are safe to call concurrently.
312///
313/// # Example
314///
315/// ```no_run
316/// use log::{LogDbReader, LogRead, LogEntry};
317/// use bytes::Bytes;
318/// use std::time::Duration;
319///
320/// async fn consume_events(reader: LogDbReader, key: Bytes) -> log::Result<()> {
321/// let mut checkpoint: u64 = 0;
322///
323/// loop {
324/// let mut iter = reader.scan(key.clone(), checkpoint..).await?;
325/// while let Some(entry) = iter.next().await? {
326/// println!("entry: {:?}", entry);
327/// checkpoint = entry.sequence + 1;
328/// }
329///
330/// // Check how far behind we are
331/// let lag = reader.count(key.clone(), checkpoint..).await?;
332/// if lag == 0 {
333/// // Caught up, wait for new entries
334/// tokio::time::sleep(Duration::from_millis(100)).await;
335/// }
336/// }
337/// }
338/// ```
339pub struct LogDbReader {
340 read_view: Arc<RwLock<LogReadView>>,
341 shutdown_tx: watch::Sender<bool>,
342 refresh_task: Option<JoinHandle<()>>,
343}
344
345impl LogDbReader {
346 /// Opens a read-only view of the log with the given configuration.
347 ///
348 /// This creates a `LogDbReader` that can scan and count entries but cannot
349 /// append new records. Use this when you only need read access to the log.
350 ///
351 /// When `refresh_interval` is set, the reader periodically discovers new
352 /// data written by other processes.
353 ///
354 /// # Arguments
355 ///
356 /// * `config` - Reader configuration including storage and refresh settings.
357 ///
358 /// # Errors
359 ///
360 /// Returns an error if the storage backend cannot be initialized.
361 ///
362 /// # Example
363 ///
364 /// ```no_run
365 /// use log::{LogDbReader, LogRead, ReaderConfig};
366 /// use common::StorageConfig;
367 /// use bytes::Bytes;
368 ///
369 /// # #[tokio::main]
370 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
371 /// let config = ReaderConfig {
372 /// storage: StorageConfig::default(),
373 /// ..Default::default()
374 /// };
375 /// let reader = LogDbReader::open(config).await?;
376 ///
377 /// // Reader will automatically discover new data
378 /// let mut iter = reader.scan(Bytes::from("orders"), ..).await?;
379 /// while let Some(entry) = iter.next().await? {
380 /// println!("seq={}: {:?}", entry.sequence, entry.value);
381 /// }
382 ///
383 /// // Gracefully shut down when done
384 /// reader.close().await;
385 /// # Ok(())
386 /// # }
387 /// ```
388 pub async fn open(config: ReaderConfig) -> Result<Self> {
389 let reader_options = slatedb::config::DbReaderOptions {
390 manifest_poll_interval: config.refresh_interval,
391 ..Default::default()
392 };
393 let storage: Arc<dyn StorageRead> = create_storage_read(
394 &config.storage,
395 StorageReaderRuntime::new(),
396 StorageSemantics::new(),
397 reader_options,
398 )
399 .await
400 .map_err(|e| Error::Storage(e.to_string()))?;
401 let segments = SegmentCache::open(storage.as_ref(), SegmentConfig::default()).await?;
402 let read_view = Arc::new(RwLock::new(LogReadView::new(storage, segments)));
403
404 let (shutdown_tx, refresh_task) =
405 Self::spawn_refresh_task(Arc::clone(&read_view), config.refresh_interval);
406
407 Ok(Self {
408 read_view,
409 shutdown_tx,
410 refresh_task: Some(refresh_task),
411 })
412 }
413
414 /// Spawns a background task that periodically refreshes the segment cache.
415 fn spawn_refresh_task(
416 read_view: Arc<RwLock<LogReadView>>,
417 interval: Duration,
418 ) -> (watch::Sender<bool>, JoinHandle<()>) {
419 let (shutdown_tx, mut shutdown_rx) = watch::channel(false);
420
421 let task = tokio::spawn(async move {
422 let mut ticker = tokio::time::interval(interval);
423 ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
424
425 loop {
426 tokio::select! {
427 _ = ticker.tick() => {
428 // Get the latest segment ID for incremental refresh
429 let after_segment_id = {
430 let view = read_view.read().await;
431 view.segments.latest().map(|s| s.id())
432 };
433
434 // Refresh the cache
435 let mut view = read_view.write().await;
436 let storage = Arc::clone(&view.storage);
437 if let Err(e) = view.segments.refresh(storage.as_ref(), after_segment_id).await {
438 tracing::warn!("Failed to refresh segment cache: {}", e);
439 }
440 }
441 _ = shutdown_rx.changed() => {
442 if *shutdown_rx.borrow() {
443 break;
444 }
445 }
446 }
447 }
448 });
449
450 (shutdown_tx, task)
451 }
452
453 /// Creates a LogDbReader from an existing storage implementation.
454 #[cfg(test)]
455 pub(crate) async fn new(storage: Arc<dyn StorageRead>) -> Result<Self> {
456 let segments = SegmentCache::open(storage.as_ref(), SegmentConfig::default()).await?;
457 let read_view = Arc::new(RwLock::new(LogReadView::new(storage, segments)));
458 let (shutdown_tx, _) = watch::channel(false);
459 Ok(Self {
460 read_view,
461 shutdown_tx,
462 refresh_task: None,
463 })
464 }
465
466 /// Closes the reader, stopping the background refresh task.
467 ///
468 /// This method consumes `self` and gracefully shuts down the background
469 /// refresh task. It waits up to 5 seconds for the task to complete.
470 pub async fn close(self) {
471 // Signal shutdown
472 let _ = self.shutdown_tx.send(true);
473
474 // Wait for the task to complete with timeout
475 if let Some(task) = self.refresh_task {
476 let timeout = tokio::time::timeout(Duration::from_secs(5), task).await;
477 if timeout.is_err() {
478 tracing::warn!("Refresh task did not stop within timeout");
479 }
480 }
481 }
482}
483
484#[async_trait]
485impl LogRead for LogDbReader {
486 async fn scan_with_options(
487 &self,
488 key: Bytes,
489 seq_range: impl RangeBounds<Sequence> + Send,
490 options: ScanOptions,
491 ) -> Result<LogIterator> {
492 let seq_range = normalize_sequence(&seq_range);
493 let view = self.read_view.read().await;
494 Ok(view.scan_with_options(key, seq_range, &options))
495 }
496
497 async fn count_with_options(
498 &self,
499 _key: Bytes,
500 _seq_range: impl RangeBounds<Sequence> + Send,
501 _options: CountOptions,
502 ) -> Result<u64> {
503 todo!()
504 }
505
506 async fn list_keys(
507 &self,
508 segment_range: impl RangeBounds<SegmentId> + Send,
509 ) -> Result<LogKeyIterator> {
510 let segment_range = normalize_segment_id(&segment_range);
511 let view = self.read_view.read().await;
512 view.list_keys(segment_range).await
513 }
514
515 async fn list_segments(
516 &self,
517 seq_range: impl RangeBounds<Sequence> + Send,
518 ) -> Result<Vec<Segment>> {
519 let seq_range = normalize_sequence(&seq_range);
520 let view = self.read_view.read().await;
521 Ok(view.list_segments(&seq_range))
522 }
523}
524
525/// Iterator over log entries across multiple segments.
526///
527/// Iterates through segments in order, fetching entries for the given key
528/// within the sequence range. Instantiates a `SegmentIterator` for each
529/// segment as needed.
530pub struct LogIterator {
531 storage: Arc<dyn StorageRead>,
532 segments: Vec<LogSegment>,
533 key: Bytes,
534 seq_range: Range<Sequence>,
535 current_segment_idx: usize,
536 current_iter: Option<SegmentIterator>,
537}
538
539impl LogIterator {
540 /// Opens a new iterator by looking up segments covering the sequence range.
541 pub(crate) fn open(
542 storage: Arc<dyn StorageRead>,
543 segment_cache: &SegmentCache,
544 key: Bytes,
545 seq_range: Range<Sequence>,
546 ) -> Self {
547 let segments = segment_cache.find_covering(&seq_range);
548 Self {
549 storage,
550 segments,
551 key,
552 seq_range,
553 current_segment_idx: 0,
554 current_iter: None,
555 }
556 }
557
558 /// Creates a new iterator over the given segments.
559 #[cfg(test)]
560 pub(crate) fn new(
561 storage: Arc<dyn StorageRead>,
562 segments: Vec<LogSegment>,
563 key: Bytes,
564 seq_range: Range<Sequence>,
565 ) -> Self {
566 Self {
567 storage,
568 segments,
569 key,
570 seq_range,
571 current_segment_idx: 0,
572 current_iter: None,
573 }
574 }
575
576 /// Returns the next log entry, or None if iteration is complete.
577 pub async fn next(&mut self) -> Result<Option<LogEntry>> {
578 loop {
579 // If we have a current iterator, try to get the next entry
580 if let Some(iter) = &mut self.current_iter {
581 if let Some(entry) = iter.next().await? {
582 return Ok(Some(entry));
583 }
584 // Current segment exhausted, move to next
585 self.current_iter = None;
586 self.current_segment_idx += 1;
587 }
588
589 // No current iterator, try to advance to next segment
590 if !self.advance_segment().await? {
591 return Ok(None);
592 }
593 }
594 }
595
596 /// Advances to the next segment and creates its iterator.
597 ///
598 /// Returns `true` if a new iterator was created, `false` if no more segments.
599 async fn advance_segment(&mut self) -> Result<bool> {
600 if self.current_segment_idx >= self.segments.len() {
601 return Ok(false);
602 }
603
604 let segment = &self.segments[self.current_segment_idx];
605 let iter = self
606 .storage
607 .scan_entries(segment, &self.key, self.seq_range.clone())
608 .await?;
609 self.current_iter = Some(iter);
610 Ok(true)
611 }
612}
613
614#[cfg(test)]
615mod tests {
616 use super::*;
617 use crate::serde::SegmentMeta;
618 use crate::storage::LogStorageWrite;
619 use common::Storage;
620 use opendata_macros::storage_test;
621
622 fn entry(key: &[u8], seq: u64, value: &[u8]) -> LogEntry {
623 LogEntry {
624 key: Bytes::copy_from_slice(key),
625 sequence: seq,
626 value: Bytes::copy_from_slice(value),
627 }
628 }
629
630 #[storage_test]
631 async fn should_return_none_when_no_segments(storage: Arc<dyn Storage>) {
632 let segments = vec![];
633
634 let mut iter = LogIterator::new(
635 storage.clone() as Arc<dyn StorageRead>,
636 segments,
637 Bytes::from("key"),
638 0..u64::MAX,
639 );
640
641 assert!(iter.next().await.unwrap().is_none());
642 }
643
644 #[storage_test]
645 async fn should_iterate_entries_in_single_segment(storage: Arc<dyn Storage>) {
646 let segment = LogSegment::new(0, SegmentMeta::new(0, 1000));
647 storage
648 .write_entry(&segment, &entry(b"key", 0, b"value0"))
649 .await
650 .unwrap();
651 storage
652 .write_entry(&segment, &entry(b"key", 1, b"value1"))
653 .await
654 .unwrap();
655 storage
656 .write_entry(&segment, &entry(b"key", 2, b"value2"))
657 .await
658 .unwrap();
659
660 let mut iter = LogIterator::new(
661 storage.clone() as Arc<dyn StorageRead>,
662 vec![segment],
663 Bytes::from("key"),
664 0..u64::MAX,
665 );
666
667 let entry = iter.next().await.unwrap().unwrap();
668 assert_eq!(entry.sequence, 0);
669 assert_eq!(entry.value.as_ref(), b"value0");
670
671 let entry = iter.next().await.unwrap().unwrap();
672 assert_eq!(entry.sequence, 1);
673 assert_eq!(entry.value.as_ref(), b"value1");
674
675 let entry = iter.next().await.unwrap().unwrap();
676 assert_eq!(entry.sequence, 2);
677 assert_eq!(entry.value.as_ref(), b"value2");
678
679 assert!(iter.next().await.unwrap().is_none());
680 }
681
682 #[storage_test]
683 async fn should_iterate_entries_across_multiple_segments(storage: Arc<dyn Storage>) {
684 let segment0 = LogSegment::new(0, SegmentMeta::new(0, 1000));
685 let segment1 = LogSegment::new(1, SegmentMeta::new(100, 2000));
686 // Entries in segment 0 (start_seq = 0)
687 storage
688 .write_entry(&segment0, &entry(b"key", 0, b"value0"))
689 .await
690 .unwrap();
691 storage
692 .write_entry(&segment0, &entry(b"key", 1, b"value1"))
693 .await
694 .unwrap();
695 // Entries in segment 1 (start_seq = 100)
696 storage
697 .write_entry(&segment1, &entry(b"key", 100, b"value100"))
698 .await
699 .unwrap();
700 storage
701 .write_entry(&segment1, &entry(b"key", 101, b"value101"))
702 .await
703 .unwrap();
704
705 let mut iter = LogIterator::new(
706 storage.clone() as Arc<dyn StorageRead>,
707 vec![segment0, segment1],
708 Bytes::from("key"),
709 0..u64::MAX,
710 );
711
712 // Entries from segment 0
713 let entry = iter.next().await.unwrap().unwrap();
714 assert_eq!(entry.sequence, 0);
715 assert_eq!(entry.value.as_ref(), b"value0");
716
717 let entry = iter.next().await.unwrap().unwrap();
718 assert_eq!(entry.sequence, 1);
719 assert_eq!(entry.value.as_ref(), b"value1");
720
721 // Entries from segment 1
722 let entry = iter.next().await.unwrap().unwrap();
723 assert_eq!(entry.sequence, 100);
724 assert_eq!(entry.value.as_ref(), b"value100");
725
726 let entry = iter.next().await.unwrap().unwrap();
727 assert_eq!(entry.sequence, 101);
728 assert_eq!(entry.value.as_ref(), b"value101");
729
730 assert!(iter.next().await.unwrap().is_none());
731 }
732
733 #[storage_test]
734 async fn should_filter_by_sequence_range(storage: Arc<dyn Storage>) {
735 let segment = LogSegment::new(0, SegmentMeta::new(0, 1000));
736 storage
737 .write_entry(&segment, &entry(b"key", 0, b"value0"))
738 .await
739 .unwrap();
740 storage
741 .write_entry(&segment, &entry(b"key", 1, b"value1"))
742 .await
743 .unwrap();
744 storage
745 .write_entry(&segment, &entry(b"key", 2, b"value2"))
746 .await
747 .unwrap();
748 storage
749 .write_entry(&segment, &entry(b"key", 3, b"value3"))
750 .await
751 .unwrap();
752
753 let mut iter = LogIterator::new(
754 storage.clone() as Arc<dyn StorageRead>,
755 vec![segment],
756 Bytes::from("key"),
757 1..3,
758 );
759
760 let entry = iter.next().await.unwrap().unwrap();
761 assert_eq!(entry.sequence, 1);
762
763 let entry = iter.next().await.unwrap().unwrap();
764 assert_eq!(entry.sequence, 2);
765
766 assert!(iter.next().await.unwrap().is_none());
767 }
768
769 #[storage_test]
770 async fn should_filter_entries_for_specified_key(storage: Arc<dyn Storage>) {
771 let segment = LogSegment::new(0, SegmentMeta::new(0, 1000));
772 storage
773 .write_entry(&segment, &entry(b"key1", 0, b"k1v0"))
774 .await
775 .unwrap();
776 storage
777 .write_entry(&segment, &entry(b"key2", 0, b"k2v0"))
778 .await
779 .unwrap();
780 storage
781 .write_entry(&segment, &entry(b"key1", 1, b"k1v1"))
782 .await
783 .unwrap();
784 storage
785 .write_entry(&segment, &entry(b"key2", 1, b"k2v1"))
786 .await
787 .unwrap();
788
789 let mut iter = LogIterator::new(
790 storage.clone() as Arc<dyn StorageRead>,
791 vec![segment],
792 Bytes::from("key1"),
793 0..u64::MAX,
794 );
795
796 let entry = iter.next().await.unwrap().unwrap();
797 assert_eq!(entry.key.as_ref(), b"key1");
798 assert_eq!(entry.sequence, 0);
799
800 let entry = iter.next().await.unwrap().unwrap();
801 assert_eq!(entry.key.as_ref(), b"key1");
802 assert_eq!(entry.sequence, 1);
803
804 assert!(iter.next().await.unwrap().is_none());
805 }
806
807 #[storage_test]
808 async fn should_return_none_when_no_entries_in_range(storage: Arc<dyn Storage>) {
809 let segment = LogSegment::new(0, SegmentMeta::new(0, 1000));
810 storage
811 .write_entry(&segment, &entry(b"key", 0, b"value0"))
812 .await
813 .unwrap();
814 storage
815 .write_entry(&segment, &entry(b"key", 1, b"value1"))
816 .await
817 .unwrap();
818
819 let mut iter = LogIterator::new(
820 storage.clone() as Arc<dyn StorageRead>,
821 vec![segment],
822 Bytes::from("key"),
823 10..20,
824 );
825
826 assert!(iter.next().await.unwrap().is_none());
827 }
828
829 #[tokio::test]
830 async fn open_spawns_refresh_task() {
831 use common::StorageConfig;
832
833 let config = ReaderConfig {
834 storage: StorageConfig::InMemory,
835 refresh_interval: Duration::from_millis(100),
836 };
837
838 let reader = LogDbReader::open(config).await.unwrap();
839
840 // Verify background task is running
841 assert!(reader.refresh_task.is_some());
842
843 // Clean up
844 reader.close().await;
845 }
846
847 #[tokio::test]
848 async fn close_stops_refresh_task_gracefully() {
849 use common::StorageConfig;
850
851 let config = ReaderConfig {
852 storage: StorageConfig::InMemory,
853 refresh_interval: Duration::from_millis(50),
854 };
855
856 let reader = LogDbReader::open(config).await.unwrap();
857 assert!(reader.refresh_task.is_some());
858
859 // Close should complete without timeout
860 let close_result =
861 tokio::time::timeout(Duration::from_secs(1), async { reader.close().await }).await;
862
863 assert!(
864 close_result.is_ok(),
865 "close() should complete within timeout"
866 );
867 }
868}