Skip to main content

pulsedb/watch/
poll.rs

1//! Cross-process change detection via WAL sequence polling.
2//!
3//! Reader processes use [`ChangePoller`] to discover experience mutations
4//! made by the writer process. Each poller maintains an independent cursor
5//! (last seen sequence number), enabling multiple readers to poll at their
6//! own pace without interfering with each other.
7//!
8//! # Architecture
9//!
10//! ```text
11//! Writer Process                    Reader Process(es)
12//! ┌──────────────┐                 ┌──────────────────┐
13//! │ PulseDB      │                 │ ChangePoller     │
14//! │ ├── write ───┤──► redb ◄──────┤── poll_changes() │
15//! │ │ (seq++)    │   (shared      │ (reads WAL log)  │
16//! │ └────────────┘    file)       └──────────────────┘
17//! └──────────────┘
18//! ```
19
20use crate::error::Result;
21use crate::storage::StorageEngine;
22use crate::watch::WatchEvent;
23
24/// Default maximum number of events returned per poll call.
25const DEFAULT_BATCH_LIMIT: usize = 1000;
26
27/// Cross-process change poller.
28///
29/// Tracks a cursor position in the WAL event log and returns new events
30/// on each `poll()` call. Each poller is independent — multiple readers
31/// can poll the same database at different rates.
32///
33/// # Example
34///
35/// ```rust,no_run
36/// # fn main() -> pulsedb::Result<()> {
37/// # let dir = tempfile::tempdir().unwrap();
38/// use std::time::Duration;
39/// use pulsedb::{Config, ChangePoller};
40/// use pulsedb::storage::{StorageEngine, RedbStorage};
41///
42/// let storage = RedbStorage::open(dir.path().join("test.db"), &Config::default())?;
43/// let mut poller = ChangePoller::new();
44///
45/// loop {
46///     let events = poller.poll(&storage)?;
47///     for event in events {
48///         println!("Change: {:?}", event.event_type);
49///     }
50///     std::thread::sleep(Duration::from_millis(100));
51/// }
52/// # }
53/// ```
54pub struct ChangePoller {
55    /// Last sequence number successfully consumed.
56    last_seq: u64,
57
58    /// Maximum events returned per poll call.
59    batch_limit: usize,
60}
61
62impl ChangePoller {
63    /// Creates a new poller starting from sequence 0 (receives all events).
64    pub fn new() -> Self {
65        Self {
66            last_seq: 0,
67            batch_limit: DEFAULT_BATCH_LIMIT,
68        }
69    }
70
71    /// Creates a poller starting from a specific sequence number.
72    ///
73    /// Events with sequence <= `seq` will not be returned. Use this to
74    /// resume polling from a previously saved position.
75    pub fn from_sequence(seq: u64) -> Self {
76        Self {
77            last_seq: seq,
78            batch_limit: DEFAULT_BATCH_LIMIT,
79        }
80    }
81
82    /// Creates a poller with a custom batch limit.
83    pub fn with_batch_limit(mut self, limit: usize) -> Self {
84        self.batch_limit = limit;
85        self
86    }
87
88    /// Returns the last sequence number this poller has consumed.
89    ///
90    /// Save this value to resume polling after a restart.
91    pub fn last_sequence(&self) -> u64 {
92        self.last_seq
93    }
94
95    /// Polls for new experience changes since the last call.
96    ///
97    /// Returns new [`WatchEvent`]s in sequence order and advances the
98    /// internal cursor. Returns an empty vec if no new changes exist.
99    ///
100    /// # Performance
101    ///
102    /// Target: < 10ms per call. This performs a range scan on the
103    /// watch_events redb table, which is O(k) where k = number of
104    /// new events (not total events).
105    pub fn poll(&mut self, storage: &dyn StorageEngine) -> Result<Vec<WatchEvent>> {
106        let (records, new_seq) = storage.poll_watch_events(self.last_seq, self.batch_limit)?;
107        self.last_seq = new_seq;
108        Ok(records.into_iter().map(WatchEvent::from).collect())
109    }
110}
111
112impl Default for ChangePoller {
113    fn default() -> Self {
114        Self::new()
115    }
116}
117
118#[cfg(test)]
119mod tests {
120    use super::*;
121
122    #[test]
123    fn test_change_poller_new_starts_at_zero() {
124        let poller = ChangePoller::new();
125        assert_eq!(poller.last_sequence(), 0);
126    }
127
128    #[test]
129    fn test_change_poller_from_sequence() {
130        let poller = ChangePoller::from_sequence(42);
131        assert_eq!(poller.last_sequence(), 42);
132    }
133
134    #[test]
135    fn test_change_poller_default() {
136        let poller = ChangePoller::default();
137        assert_eq!(poller.last_sequence(), 0);
138    }
139
140    #[test]
141    fn test_change_poller_with_batch_limit() {
142        let poller = ChangePoller::new().with_batch_limit(10);
143        assert_eq!(poller.batch_limit, 10);
144    }
145}