pulsedb/watch/poll.rs
1//! Cross-process change detection via WAL sequence polling.
2//!
3//! Reader processes use [`ChangePoller`] to discover experience mutations
4//! made by the writer process. Each poller maintains an independent cursor
5//! (last seen sequence number), enabling multiple readers to poll at their
6//! own pace without interfering with each other.
7//!
8//! # Architecture
9//!
10//! ```text
11//! Writer Process Reader Process(es)
12//! ┌──────────────┐ ┌──────────────────┐
13//! │ PulseDB │ │ ChangePoller │
14//! │ ├── write ───┤──► redb ◄──────┤── poll_changes() │
15//! │ │ (seq++) │ (shared │ (reads WAL log) │
16//! │ └────────────┘ file) └──────────────────┘
17//! └──────────────┘
18//! ```
19
20use crate::error::Result;
21use crate::storage::StorageEngine;
22use crate::watch::WatchEvent;
23
24/// Default maximum number of events returned per poll call.
25const DEFAULT_BATCH_LIMIT: usize = 1000;
26
27/// Cross-process change poller.
28///
29/// Tracks a cursor position in the WAL event log and returns new events
30/// on each `poll()` call. Each poller is independent — multiple readers
31/// can poll the same database at different rates.
32///
33/// # Example
34///
35/// ```rust,no_run
36/// # fn main() -> pulsedb::Result<()> {
37/// # let dir = tempfile::tempdir().unwrap();
38/// use std::time::Duration;
39/// use pulsedb::{Config, ChangePoller};
40/// use pulsedb::storage::{StorageEngine, RedbStorage};
41///
42/// let storage = RedbStorage::open(dir.path().join("test.db"), &Config::default())?;
43/// let mut poller = ChangePoller::new();
44///
45/// loop {
46/// let events = poller.poll(&storage)?;
47/// for event in events {
48/// println!("Change: {:?}", event.event_type);
49/// }
50/// std::thread::sleep(Duration::from_millis(100));
51/// }
52/// # }
53/// ```
54pub struct ChangePoller {
55 /// Last sequence number successfully consumed.
56 last_seq: u64,
57
58 /// Maximum events returned per poll call.
59 batch_limit: usize,
60}
61
62impl ChangePoller {
63 /// Creates a new poller starting from sequence 0 (receives all events).
64 pub fn new() -> Self {
65 Self {
66 last_seq: 0,
67 batch_limit: DEFAULT_BATCH_LIMIT,
68 }
69 }
70
71 /// Creates a poller starting from a specific sequence number.
72 ///
73 /// Events with sequence <= `seq` will not be returned. Use this to
74 /// resume polling from a previously saved position.
75 pub fn from_sequence(seq: u64) -> Self {
76 Self {
77 last_seq: seq,
78 batch_limit: DEFAULT_BATCH_LIMIT,
79 }
80 }
81
82 /// Creates a poller with a custom batch limit.
83 pub fn with_batch_limit(mut self, limit: usize) -> Self {
84 self.batch_limit = limit;
85 self
86 }
87
88 /// Returns the last sequence number this poller has consumed.
89 ///
90 /// Save this value to resume polling after a restart.
91 pub fn last_sequence(&self) -> u64 {
92 self.last_seq
93 }
94
95 /// Polls for new experience changes since the last call.
96 ///
97 /// Returns new [`WatchEvent`]s in sequence order and advances the
98 /// internal cursor. Returns an empty vec if no new changes exist.
99 ///
100 /// # Performance
101 ///
102 /// Target: < 10ms per call. This performs a range scan on the
103 /// watch_events redb table, which is O(k) where k = number of
104 /// new events (not total events).
105 pub fn poll(&mut self, storage: &dyn StorageEngine) -> Result<Vec<WatchEvent>> {
106 let (records, new_seq) = storage.poll_watch_events(self.last_seq, self.batch_limit)?;
107 self.last_seq = new_seq;
108 Ok(records.into_iter().map(WatchEvent::from).collect())
109 }
110}
111
112impl Default for ChangePoller {
113 fn default() -> Self {
114 Self::new()
115 }
116}
117
118#[cfg(test)]
119mod tests {
120 use super::*;
121
122 #[test]
123 fn test_change_poller_new_starts_at_zero() {
124 let poller = ChangePoller::new();
125 assert_eq!(poller.last_sequence(), 0);
126 }
127
128 #[test]
129 fn test_change_poller_from_sequence() {
130 let poller = ChangePoller::from_sequence(42);
131 assert_eq!(poller.last_sequence(), 42);
132 }
133
134 #[test]
135 fn test_change_poller_default() {
136 let poller = ChangePoller::default();
137 assert_eq!(poller.last_sequence(), 0);
138 }
139
140 #[test]
141 fn test_change_poller_with_batch_limit() {
142 let poller = ChangePoller::new().with_batch_limit(10);
143 assert_eq!(poller.batch_limit, 10);
144 }
145}