Skip to main content

azoth_projector/
projector.rs

1use azoth_core::{
2    error::Result,
3    traits::{CanonicalStore, ProjectionStore, ProjectionTxn},
4    types::EventId,
5    ProjectorConfig,
6};
7use std::sync::atomic::{AtomicBool, Ordering};
8use std::sync::Arc;
9use std::time::{Duration, Instant};
10use tokio::sync::Notify;
11
12/// Projector: consumes events from canonical store and applies to projection
13pub struct Projector<C, P>
14where
15    C: CanonicalStore,
16    P: ProjectionStore,
17{
18    canonical: Arc<C>,
19    projection: Arc<P>,
20    config: ProjectorConfig,
21    shutdown: Arc<AtomicBool>,
22    /// When set, the projector awaits this notification instead of polling.
23    /// The `FileEventLog` fires `notify_waiters()` after every successful append,
24    /// giving near-zero-latency projection with zero CPU waste when idle.
25    event_notify: Option<Arc<Notify>>,
26}
27
28impl<C, P> Projector<C, P>
29where
30    C: CanonicalStore,
31    P: ProjectionStore,
32{
33    pub fn new(canonical: Arc<C>, projection: Arc<P>, config: ProjectorConfig) -> Self {
34        Self {
35            canonical,
36            projection,
37            config,
38            shutdown: Arc::new(AtomicBool::new(false)),
39            event_notify: None,
40        }
41    }
42
43    /// Attach an event notification handle for push-based projection.
44    ///
45    /// When set, `run_continuous()` awaits this notification instead of
46    /// sleeping for `poll_interval_ms` when caught up, giving near-zero
47    /// latency event processing with zero idle CPU usage.
48    pub fn with_event_notify(mut self, notify: Arc<Notify>) -> Self {
49        self.event_notify = Some(notify);
50        self
51    }
52
53    /// Run one iteration of the projector loop
54    pub fn run_once(&self) -> Result<ProjectorStats> {
55        let start = Instant::now();
56
57        // Get current cursor
58        let cursor = self.projection.get_cursor()?;
59
60        // Determine target event ID
61        let meta = self.canonical.meta()?;
62        let target = if let Some(sealed) = meta.sealed_event_id {
63            sealed
64        } else if meta.next_event_id > 0 {
65            meta.next_event_id - 1
66        } else {
67            return Ok(ProjectorStats::empty());
68        };
69
70        // Check if caught up (but not if cursor is u64::MAX which means no events processed)
71        if cursor != u64::MAX && cursor >= target {
72            return Ok(ProjectorStats::empty());
73        }
74
75        // Determine batch size
76        // Note: cursor of u64::MAX indicates -1 was cast from i64 (no events processed yet)
77        let from = if cursor == u64::MAX { 0 } else { cursor + 1 };
78        let to = std::cmp::min(target + 1, from + self.config.batch_events_max as u64);
79
80        // Fetch events
81        let mut events = Vec::new();
82        let mut total_bytes = 0;
83        let mut iter = self.canonical.iter_events(from, Some(to))?;
84
85        while let Some((id, bytes)) = iter.next()? {
86            total_bytes += bytes.len();
87            events.push((id, bytes));
88
89            // Check byte limit
90            if total_bytes >= self.config.batch_bytes_max {
91                break;
92            }
93
94            // Check latency limit
95            if start.elapsed().as_millis() > self.config.max_apply_latency_ms as u128 {
96                break;
97            }
98        }
99
100        if events.is_empty() {
101            return Ok(ProjectorStats::empty());
102        }
103
104        // Apply events in a transaction
105        let mut txn = self.projection.begin_txn()?;
106        txn.apply_batch(&events)?;
107
108        let last_id = events.last().unwrap().0;
109        Box::new(txn).commit(last_id)?;
110
111        Ok(ProjectorStats {
112            events_applied: events.len(),
113            bytes_processed: total_bytes,
114            duration: start.elapsed(),
115            new_cursor: last_id,
116        })
117    }
118
119    /// Run the projector continuously until shutdown.
120    ///
121    /// When an `event_notify` handle is set (via [`Self::with_event_notify`]),
122    /// the projector awaits the notification instead of polling, giving
123    /// near-zero-latency projection with zero CPU waste when idle.
124    /// Falls back to `poll_interval_ms` sleep if no notifier is present.
125    pub async fn run_continuous(&self) -> Result<()> {
126        while !self.shutdown.load(Ordering::SeqCst) {
127            match self.run_once() {
128                Ok(stats) => {
129                    if stats.events_applied == 0 {
130                        // Caught up -- wait for new events
131                        if let Some(notify) = &self.event_notify {
132                            // Push-based: await notification from event log
133                            // Use tokio::select! so we also wake on shutdown
134                            tokio::select! {
135                                _ = notify.notified() => {}
136                                _ = tokio::time::sleep(Duration::from_millis(self.config.poll_interval_ms)) => {}
137                            }
138                        } else {
139                            // Legacy polling fallback
140                            tokio::time::sleep(Duration::from_millis(self.config.poll_interval_ms))
141                                .await;
142                        }
143                    } else {
144                        tracing::debug!(
145                            "Applied {} events, {} bytes in {:?}",
146                            stats.events_applied,
147                            stats.bytes_processed,
148                            stats.duration
149                        );
150                    }
151                }
152                Err(e) => {
153                    tracing::error!("Projector error: {}", e);
154                    // Back off on error
155                    tokio::time::sleep(Duration::from_secs(1)).await;
156                }
157            }
158        }
159
160        Ok(())
161    }
162
163    /// Signal graceful shutdown
164    pub fn shutdown(&self) {
165        self.shutdown.store(true, Ordering::SeqCst);
166    }
167
168    /// Check lag (events behind)
169    pub fn get_lag(&self) -> Result<u64> {
170        let cursor = self.projection.get_cursor()?;
171        let meta = self.canonical.meta()?;
172
173        let tip = if let Some(sealed) = meta.sealed_event_id {
174            sealed
175        } else if meta.next_event_id > 0 {
176            meta.next_event_id - 1
177        } else {
178            return Ok(0);
179        };
180
181        // Handle cursor = u64::MAX (represents -1, no events processed)
182        if cursor == u64::MAX {
183            return Ok(tip + 1);
184        }
185
186        Ok(tip.saturating_sub(cursor))
187    }
188}
189
190#[derive(Debug, Clone)]
191pub struct ProjectorStats {
192    pub events_applied: usize,
193    pub bytes_processed: usize,
194    pub duration: Duration,
195    pub new_cursor: EventId,
196}
197
198impl ProjectorStats {
199    fn empty() -> Self {
200        Self {
201            events_applied: 0,
202            bytes_processed: 0,
203            duration: Duration::from_secs(0),
204            new_cursor: 0,
205        }
206    }
207}