pulsedb/watch/poll.rs
1//! Cross-process change detection via WAL sequence polling.
2//!
3//! Reader processes use [`ChangePoller`] to discover experience mutations
4//! made by the writer process. Each poller maintains an independent cursor
5//! (last seen sequence number), enabling multiple readers to poll at their
6//! own pace without interfering with each other.
7//!
8//! # Architecture
9//!
10//! ```text
11//! Writer Process Reader Process(es)
12//! ┌──────────────┐ ┌──────────────────┐
13//! │ PulseDB │ │ ChangePoller │
14//! │ ├── write ───┤──► redb ◄──────┤── poll_changes() │
15//! │ │ (seq++) │ (shared │ (reads WAL log) │
16//! │ └────────────┘ file) └──────────────────┘
17//! └──────────────┘
18//! ```
19
20use crate::error::Result;
21use crate::storage::schema::EntityTypeTag;
22use crate::storage::StorageEngine;
23use crate::watch::WatchEvent;
24
25/// Default maximum number of events returned per poll call.
26const DEFAULT_BATCH_LIMIT: usize = 1000;
27
28/// Cross-process change poller.
29///
30/// Tracks a cursor position in the WAL event log and returns new events
31/// on each `poll()` call. Each poller is independent — multiple readers
32/// can poll the same database at different rates.
33///
34/// # Example
35///
36/// ```rust,no_run
37/// # fn main() -> pulsedb::Result<()> {
38/// # let dir = tempfile::tempdir().unwrap();
39/// use std::time::Duration;
40/// use pulsedb::{Config, ChangePoller};
41/// use pulsedb::storage::{StorageEngine, RedbStorage};
42///
43/// let storage = RedbStorage::open(dir.path().join("test.db"), &Config::default())?;
44/// let mut poller = ChangePoller::new();
45///
46/// loop {
47/// let events = poller.poll(&storage)?;
48/// for event in events {
49/// println!("Change: {:?}", event.event_type);
50/// }
51/// std::thread::sleep(Duration::from_millis(100));
52/// }
53/// # }
54/// ```
55pub struct ChangePoller {
56 /// Last sequence number successfully consumed.
57 last_seq: u64,
58
59 /// Maximum events returned per poll call.
60 batch_limit: usize,
61}
62
63impl ChangePoller {
64 /// Creates a new poller starting from sequence 0 (receives all events).
65 pub fn new() -> Self {
66 Self {
67 last_seq: 0,
68 batch_limit: DEFAULT_BATCH_LIMIT,
69 }
70 }
71
72 /// Creates a poller starting from a specific sequence number.
73 ///
74 /// Events with sequence <= `seq` will not be returned. Use this to
75 /// resume polling from a previously saved position.
76 pub fn from_sequence(seq: u64) -> Self {
77 Self {
78 last_seq: seq,
79 batch_limit: DEFAULT_BATCH_LIMIT,
80 }
81 }
82
83 /// Creates a poller with a custom batch limit.
84 pub fn with_batch_limit(mut self, limit: usize) -> Self {
85 self.batch_limit = limit;
86 self
87 }
88
89 /// Returns the last sequence number this poller has consumed.
90 ///
91 /// Save this value to resume polling after a restart.
92 pub fn last_sequence(&self) -> u64 {
93 self.last_seq
94 }
95
96 /// Polls for new experience changes since the last call.
97 ///
98 /// Returns new [`WatchEvent`]s in sequence order and advances the
99 /// internal cursor. Returns an empty vec if no new changes exist.
100 ///
101 /// **Backward compatibility**: Only returns Experience-type events.
102 /// Non-experience WAL events (relations, insights, collectives) are
103 /// skipped but the cursor still advances past them.
104 ///
105 /// # Performance
106 ///
107 /// Target: < 10ms per call. This performs a range scan on the
108 /// watch_events redb table, which is O(k) where k = number of
109 /// new events (not total events).
110 pub fn poll(&mut self, storage: &dyn StorageEngine) -> Result<Vec<WatchEvent>> {
111 let (records, new_seq) = storage.poll_watch_events(self.last_seq, self.batch_limit)?;
112 self.last_seq = new_seq;
113 Ok(records
114 .into_iter()
115 .filter(|r| r.entity_type == EntityTypeTag::Experience)
116 .map(WatchEvent::from)
117 .collect())
118 }
119
120 /// Polls for ALL entity changes since the last call (sync protocol).
121 ///
122 /// Unlike [`poll()`](Self::poll) which only returns Experience events,
123 /// this method returns all entity types with their WAL sequence numbers.
124 /// Used by the sync pusher (Phase 3) to construct `SyncChange` objects.
125 ///
126 /// Returns `(sequence, record)` pairs in ascending sequence order.
127 #[cfg(feature = "sync")]
128 pub fn poll_sync_events(
129 &mut self,
130 storage: &dyn StorageEngine,
131 ) -> Result<Vec<(u64, crate::storage::schema::WatchEventRecord)>> {
132 let events = storage.poll_sync_events(self.last_seq, self.batch_limit)?;
133 if let Some((last_seq, _)) = events.last() {
134 self.last_seq = *last_seq;
135 }
136 Ok(events)
137 }
138}
139
140impl Default for ChangePoller {
141 fn default() -> Self {
142 Self::new()
143 }
144}
145
146#[cfg(test)]
147mod tests {
148 use super::*;
149
150 #[test]
151 fn test_change_poller_new_starts_at_zero() {
152 let poller = ChangePoller::new();
153 assert_eq!(poller.last_sequence(), 0);
154 }
155
156 #[test]
157 fn test_change_poller_from_sequence() {
158 let poller = ChangePoller::from_sequence(42);
159 assert_eq!(poller.last_sequence(), 42);
160 }
161
162 #[test]
163 fn test_change_poller_default() {
164 let poller = ChangePoller::default();
165 assert_eq!(poller.last_sequence(), 0);
166 }
167
168 #[test]
169 fn test_change_poller_with_batch_limit() {
170 let poller = ChangePoller::new().with_batch_limit(10);
171 assert_eq!(poller.batch_limit, 10);
172 }
173}