Skip to main content

azoth_projector/
projector.rs

1use azoth_core::{
2    error::Result,
3    traits::{CanonicalStore, ProjectionStore, ProjectionTxn},
4    types::EventId,
5    ProjectorConfig,
6};
7use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
8use std::sync::Arc;
9use std::time::{Duration, Instant};
10use tokio::sync::Notify;
11
12/// Projector: consumes events from canonical store and applies to projection
13pub struct Projector<C, P>
14where
15    C: CanonicalStore,
16    P: ProjectionStore,
17{
18    canonical: Arc<C>,
19    projection: Arc<P>,
20    config: ProjectorConfig,
21    shutdown: Arc<AtomicBool>,
22    /// When set, the projector awaits this notification instead of polling.
23    /// The `FileEventLog` fires `notify_waiters()` after every successful append,
24    /// giving near-zero-latency projection with zero CPU waste when idle.
25    event_notify: Option<Arc<Notify>>,
26    /// Running count of event ID gaps detected across all `run_once()` calls.
27    /// Used for operational monitoring / health checks.
28    total_gaps_detected: Arc<AtomicU64>,
29    /// Running count of individual missing event IDs across all gaps.
30    total_events_skipped: Arc<AtomicU64>,
31}
32
33impl<C, P> Projector<C, P>
34where
35    C: CanonicalStore,
36    P: ProjectionStore,
37{
38    pub fn new(canonical: Arc<C>, projection: Arc<P>, config: ProjectorConfig) -> Self {
39        Self {
40            canonical,
41            projection,
42            config,
43            shutdown: Arc::new(AtomicBool::new(false)),
44            event_notify: None,
45            total_gaps_detected: Arc::new(AtomicU64::new(0)),
46            total_events_skipped: Arc::new(AtomicU64::new(0)),
47        }
48    }
49
50    /// Attach an event notification handle for push-based projection.
51    ///
52    /// When set, `run_continuous()` awaits this notification instead of
53    /// sleeping for `poll_interval_ms` when caught up, giving near-zero
54    /// latency event processing with zero idle CPU usage.
55    pub fn with_event_notify(mut self, notify: Arc<Notify>) -> Self {
56        self.event_notify = Some(notify);
57        self
58    }
59
60    /// Run one iteration of the projector loop.
61    ///
62    /// Fetches a batch of events from the canonical store and applies them
63    /// to the projection. Detects and tolerates event ID gaps: if the event
64    /// log is missing events (e.g. due to DLQ fallback), the projector logs
65    /// a warning and advances past the gap instead of blocking.
66    pub fn run_once(&self) -> Result<ProjectorStats> {
67        let start = Instant::now();
68
69        // Get current cursor
70        let cursor = self.projection.get_cursor()?;
71
72        // Determine target event ID
73        let meta = self.canonical.meta()?;
74        let target = if let Some(sealed) = meta.sealed_event_id {
75            sealed
76        } else if meta.next_event_id > 0 {
77            meta.next_event_id - 1
78        } else {
79            return Ok(ProjectorStats::empty());
80        };
81
82        // Check if caught up (but not if cursor is u64::MAX which means no events processed)
83        if cursor != u64::MAX && cursor >= target {
84            return Ok(ProjectorStats::empty());
85        }
86
87        // Determine batch size
88        // Note: cursor of u64::MAX indicates -1 was cast from i64 (no events processed yet)
89        let from = if cursor == u64::MAX { 0 } else { cursor + 1 };
90        let to = std::cmp::min(target + 1, from + self.config.batch_events_max as u64);
91
92        // Fetch events
93        let mut events = Vec::new();
94        let mut total_bytes = 0;
95        let mut iter = self.canonical.iter_events(from, Some(to))?;
96
97        while let Some((id, bytes)) = iter.next()? {
98            total_bytes += bytes.len();
99            events.push((id, bytes));
100
101            // Check byte limit
102            if total_bytes >= self.config.batch_bytes_max {
103                break;
104            }
105
106            // Check latency limit
107            if start.elapsed().as_millis() > self.config.max_apply_latency_ms as u128 {
108                break;
109            }
110        }
111
112        if events.is_empty() {
113            return Ok(ProjectorStats::empty());
114        }
115
116        // Detect event ID gaps (events lost to DLQ)
117        let (gaps_detected, events_skipped) = detect_gaps(from, &events);
118
119        if gaps_detected > 0 {
120            self.total_gaps_detected
121                .fetch_add(gaps_detected, Ordering::Relaxed);
122            self.total_events_skipped
123                .fetch_add(events_skipped, Ordering::Relaxed);
124        }
125
126        // Apply events in a transaction (only events we actually have)
127        let mut txn = self.projection.begin_txn()?;
128        txn.apply_batch(&events)?;
129
130        let last_id = events.last().unwrap().0;
131        Box::new(txn).commit(last_id)?;
132
133        Ok(ProjectorStats {
134            events_applied: events.len(),
135            bytes_processed: total_bytes,
136            duration: start.elapsed(),
137            new_cursor: last_id,
138            gaps_detected,
139            events_skipped,
140        })
141    }
142
143    /// Run the projector continuously until shutdown.
144    ///
145    /// When an `event_notify` handle is set (via [`Self::with_event_notify`]),
146    /// the projector awaits the notification instead of polling, giving
147    /// near-zero-latency projection with zero CPU waste when idle.
148    /// Falls back to `poll_interval_ms` sleep if no notifier is present.
149    pub async fn run_continuous(&self) -> Result<()> {
150        while !self.shutdown.load(Ordering::SeqCst) {
151            match self.run_once() {
152                Ok(stats) => {
153                    if stats.events_applied == 0 {
154                        // Caught up -- wait for new events
155                        if let Some(notify) = &self.event_notify {
156                            // Push-based: await notification from event log
157                            // Use tokio::select! so we also wake on shutdown
158                            tokio::select! {
159                                _ = notify.notified() => {}
160                                _ = tokio::time::sleep(Duration::from_millis(self.config.poll_interval_ms)) => {}
161                            }
162                        } else {
163                            // Legacy polling fallback
164                            tokio::time::sleep(Duration::from_millis(self.config.poll_interval_ms))
165                                .await;
166                        }
167                    } else if stats.gaps_detected > 0 {
168                        tracing::warn!(
169                            events = stats.events_applied,
170                            bytes = stats.bytes_processed,
171                            gaps = stats.gaps_detected,
172                            skipped = stats.events_skipped,
173                            elapsed = ?stats.duration,
174                            "Applied events with gaps (events may be in DLQ)"
175                        );
176                    } else {
177                        tracing::debug!(
178                            "Applied {} events, {} bytes in {:?}",
179                            stats.events_applied,
180                            stats.bytes_processed,
181                            stats.duration
182                        );
183                    }
184                }
185                Err(e) => {
186                    tracing::error!("Projector error: {}", e);
187                    // Back off on error
188                    tokio::time::sleep(Duration::from_secs(1)).await;
189                }
190            }
191        }
192
193        Ok(())
194    }
195
196    /// Signal graceful shutdown
197    pub fn shutdown(&self) {
198        self.shutdown.store(true, Ordering::SeqCst);
199    }
200
201    /// Returns the total number of event ID gaps detected across all iterations.
202    pub fn total_gaps_detected(&self) -> u64 {
203        self.total_gaps_detected.load(Ordering::Relaxed)
204    }
205
206    /// Returns the total number of individual event IDs that were skipped due to gaps.
207    pub fn total_events_skipped(&self) -> u64 {
208        self.total_events_skipped.load(Ordering::Relaxed)
209    }
210
211    /// Check lag (events behind)
212    pub fn get_lag(&self) -> Result<u64> {
213        let cursor = self.projection.get_cursor()?;
214        let meta = self.canonical.meta()?;
215
216        let tip = if let Some(sealed) = meta.sealed_event_id {
217            sealed
218        } else if meta.next_event_id > 0 {
219            meta.next_event_id - 1
220        } else {
221            return Ok(0);
222        };
223
224        // Handle cursor = u64::MAX (represents -1, no events processed)
225        if cursor == u64::MAX {
226            return Ok(tip + 1);
227        }
228
229        Ok(tip.saturating_sub(cursor))
230    }
231}
232
233#[derive(Debug, Clone)]
234pub struct ProjectorStats {
235    pub events_applied: usize,
236    pub bytes_processed: usize,
237    pub duration: Duration,
238    pub new_cursor: EventId,
239    /// Number of event ID gaps detected in this batch.
240    pub gaps_detected: u64,
241    /// Number of individual event IDs that were skipped due to gaps.
242    pub events_skipped: u64,
243}
244
245impl ProjectorStats {
246    fn empty() -> Self {
247        Self {
248            events_applied: 0,
249            bytes_processed: 0,
250            duration: Duration::from_secs(0),
251            new_cursor: 0,
252            gaps_detected: 0,
253            events_skipped: 0,
254        }
255    }
256}
257
258/// Detect event ID gaps in a batch of events.
259///
260/// Checks two types of gaps:
261/// 1. Between the expected first event ID and the actual first event
262/// 2. Between consecutive events within the batch
263///
264/// Returns `(gaps_detected, events_skipped)`.
265fn detect_gaps(expected_first: EventId, events: &[(EventId, Vec<u8>)]) -> (u64, u64) {
266    if events.is_empty() {
267        return (0, 0);
268    }
269
270    let mut gaps_detected: u64 = 0;
271    let mut events_skipped: u64 = 0;
272
273    // Check gap between cursor and first event
274    let actual_first = events[0].0;
275    if actual_first > expected_first {
276        let missing = actual_first - expected_first;
277        gaps_detected += 1;
278        events_skipped += missing;
279        tracing::warn!(
280            expected_id = expected_first,
281            actual_id = actual_first,
282            missing_count = missing,
283            "Projector: event gap detected between cursor and first event \
284             (events {}-{} missing, likely in DLQ). Advancing past gap.",
285            expected_first,
286            actual_first - 1,
287        );
288    }
289
290    // Check gaps within the batch
291    for window in events.windows(2) {
292        let prev_id = window[0].0;
293        let curr_id = window[1].0;
294        let expected = prev_id + 1;
295        if curr_id > expected {
296            let missing = curr_id - expected;
297            gaps_detected += 1;
298            events_skipped += missing;
299            tracing::warn!(
300                expected_id = expected,
301                actual_id = curr_id,
302                missing_count = missing,
303                "Projector: event gap detected within batch \
304                 (events {}-{} missing, likely in DLQ). Skipping gap.",
305                expected,
306                curr_id - 1,
307            );
308        }
309    }
310
311    (gaps_detected, events_skipped)
312}
313
314#[cfg(test)]
315mod tests {
316    use super::*;
317
318    fn event(id: EventId) -> (EventId, Vec<u8>) {
319        (id, vec![id as u8])
320    }
321
322    #[test]
323    fn test_no_gaps_sequential() {
324        let events = vec![event(5), event(6), event(7)];
325        let (gaps, skipped) = detect_gaps(5, &events);
326        assert_eq!(gaps, 0);
327        assert_eq!(skipped, 0);
328    }
329
330    #[test]
331    fn test_gap_before_first_event() {
332        // Expected 5, but first available event is 8 (5,6,7 missing)
333        let events = vec![event(8), event(9), event(10)];
334        let (gaps, skipped) = detect_gaps(5, &events);
335        assert_eq!(gaps, 1);
336        assert_eq!(skipped, 3); // events 5, 6, 7
337    }
338
339    #[test]
340    fn test_gap_within_batch() {
341        // 5,6 present, 7,8 missing, 9 present
342        let events = vec![event(5), event(6), event(9)];
343        let (gaps, skipped) = detect_gaps(5, &events);
344        assert_eq!(gaps, 1);
345        assert_eq!(skipped, 2); // events 7, 8
346    }
347
348    #[test]
349    fn test_multiple_gaps() {
350        // Gap before first (0,1 missing), gap mid-batch (4 missing), gap again (7 missing)
351        let events = vec![event(2), event(3), event(5), event(8)];
352        let (gaps, skipped) = detect_gaps(0, &events);
353        assert_eq!(gaps, 3); // before first, between 3-5, between 5-8
354        assert_eq!(skipped, 2 + 1 + 2); // 0-1 + 4 + 6-7
355    }
356
357    #[test]
358    fn test_single_event_no_gap() {
359        let events = vec![event(42)];
360        let (gaps, skipped) = detect_gaps(42, &events);
361        assert_eq!(gaps, 0);
362        assert_eq!(skipped, 0);
363    }
364
365    #[test]
366    fn test_single_event_with_gap() {
367        let events = vec![event(45)];
368        let (gaps, skipped) = detect_gaps(42, &events);
369        assert_eq!(gaps, 1);
370        assert_eq!(skipped, 3); // events 42, 43, 44
371    }
372
373    #[test]
374    fn test_empty_events() {
375        let events: Vec<(EventId, Vec<u8>)> = vec![];
376        let (gaps, skipped) = detect_gaps(0, &events);
377        assert_eq!(gaps, 0);
378        assert_eq!(skipped, 0);
379    }
380
381    #[test]
382    fn test_first_event_matches_exactly() {
383        let events = vec![event(0), event(1), event(2)];
384        let (gaps, skipped) = detect_gaps(0, &events);
385        assert_eq!(gaps, 0);
386        assert_eq!(skipped, 0);
387    }
388}