Skip to main content

pulsedb/watch/
poll.rs

1//! Cross-process change detection via WAL sequence polling.
2//!
3//! Reader processes use [`ChangePoller`] to discover experience mutations
4//! made by the writer process. Each poller maintains an independent cursor
5//! (last seen sequence number), enabling multiple readers to poll at their
6//! own pace without interfering with each other.
7//!
8//! # Architecture
9//!
10//! ```text
11//! Writer Process                    Reader Process(es)
12//! ┌──────────────┐                 ┌──────────────────┐
13//! │ PulseDB      │                 │ ChangePoller     │
14//! │ ├── write ───┤──► redb ◄──────┤── poll_changes() │
15//! │ │ (seq++)    │   (shared      │ (reads WAL log)  │
16//! │ └────────────┘    file)       └──────────────────┘
17//! └──────────────┘
18//! ```
19
20use crate::error::Result;
21use crate::storage::schema::EntityTypeTag;
22use crate::storage::StorageEngine;
23use crate::watch::WatchEvent;
24
25/// Default maximum number of events returned per poll call.
26const DEFAULT_BATCH_LIMIT: usize = 1000;
27
28/// Cross-process change poller.
29///
30/// Tracks a cursor position in the WAL event log and returns new events
31/// on each `poll()` call. Each poller is independent — multiple readers
32/// can poll the same database at different rates.
33///
34/// # Example
35///
36/// ```rust,no_run
37/// # fn main() -> pulsedb::Result<()> {
38/// # let dir = tempfile::tempdir().unwrap();
39/// use std::time::Duration;
40/// use pulsedb::{Config, ChangePoller};
41/// use pulsedb::storage::{StorageEngine, RedbStorage};
42///
43/// let storage = RedbStorage::open(dir.path().join("test.db"), &Config::default())?;
44/// let mut poller = ChangePoller::new();
45///
46/// loop {
47///     let events = poller.poll(&storage)?;
48///     for event in events {
49///         println!("Change: {:?}", event.event_type);
50///     }
51///     std::thread::sleep(Duration::from_millis(100));
52/// }
53/// # }
54/// ```
55pub struct ChangePoller {
56    /// Last sequence number successfully consumed.
57    last_seq: u64,
58
59    /// Maximum events returned per poll call.
60    batch_limit: usize,
61}
62
63impl ChangePoller {
64    /// Creates a new poller starting from sequence 0 (receives all events).
65    pub fn new() -> Self {
66        Self {
67            last_seq: 0,
68            batch_limit: DEFAULT_BATCH_LIMIT,
69        }
70    }
71
72    /// Creates a poller starting from a specific sequence number.
73    ///
74    /// Events with sequence <= `seq` will not be returned. Use this to
75    /// resume polling from a previously saved position.
76    pub fn from_sequence(seq: u64) -> Self {
77        Self {
78            last_seq: seq,
79            batch_limit: DEFAULT_BATCH_LIMIT,
80        }
81    }
82
83    /// Creates a poller with a custom batch limit.
84    pub fn with_batch_limit(mut self, limit: usize) -> Self {
85        self.batch_limit = limit;
86        self
87    }
88
89    /// Returns the last sequence number this poller has consumed.
90    ///
91    /// Save this value to resume polling after a restart.
92    pub fn last_sequence(&self) -> u64 {
93        self.last_seq
94    }
95
96    /// Polls for new experience changes since the last call.
97    ///
98    /// Returns new [`WatchEvent`]s in sequence order and advances the
99    /// internal cursor. Returns an empty vec if no new changes exist.
100    ///
101    /// **Backward compatibility**: Only returns Experience-type events.
102    /// Non-experience WAL events (relations, insights, collectives) are
103    /// skipped but the cursor still advances past them.
104    ///
105    /// # Performance
106    ///
107    /// Target: < 10ms per call. This performs a range scan on the
108    /// watch_events redb table, which is O(k) where k = number of
109    /// new events (not total events).
110    pub fn poll(&mut self, storage: &dyn StorageEngine) -> Result<Vec<WatchEvent>> {
111        let (records, new_seq) = storage.poll_watch_events(self.last_seq, self.batch_limit)?;
112        self.last_seq = new_seq;
113        Ok(records
114            .into_iter()
115            .filter(|r| r.entity_type == EntityTypeTag::Experience)
116            .map(WatchEvent::from)
117            .collect())
118    }
119
120    /// Polls for ALL entity changes since the last call (sync protocol).
121    ///
122    /// Unlike [`poll()`](Self::poll) which only returns Experience events,
123    /// this method returns all entity types with their WAL sequence numbers.
124    /// Used by the sync pusher (Phase 3) to construct `SyncChange` objects.
125    ///
126    /// Returns `(sequence, record)` pairs in ascending sequence order.
127    #[cfg(feature = "sync")]
128    pub fn poll_sync_events(
129        &mut self,
130        storage: &dyn StorageEngine,
131    ) -> Result<Vec<(u64, crate::storage::schema::WatchEventRecord)>> {
132        let events = storage.poll_sync_events(self.last_seq, self.batch_limit)?;
133        if let Some((last_seq, _)) = events.last() {
134            self.last_seq = *last_seq;
135        }
136        Ok(events)
137    }
138}
139
140impl Default for ChangePoller {
141    fn default() -> Self {
142        Self::new()
143    }
144}
145
146#[cfg(test)]
147mod tests {
148    use super::*;
149
150    #[test]
151    fn test_change_poller_new_starts_at_zero() {
152        let poller = ChangePoller::new();
153        assert_eq!(poller.last_sequence(), 0);
154    }
155
156    #[test]
157    fn test_change_poller_from_sequence() {
158        let poller = ChangePoller::from_sequence(42);
159        assert_eq!(poller.last_sequence(), 42);
160    }
161
162    #[test]
163    fn test_change_poller_default() {
164        let poller = ChangePoller::default();
165        assert_eq!(poller.last_sequence(), 0);
166    }
167
168    #[test]
169    fn test_change_poller_with_batch_limit() {
170        let poller = ChangePoller::new().with_batch_limit(10);
171        assert_eq!(poller.batch_limit, 10);
172    }
173}