1use azoth_core::{
2 error::Result,
3 traits::{CanonicalStore, ProjectionStore, ProjectionTxn},
4 types::EventId,
5 ProjectorConfig,
6};
7use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
8use std::sync::Arc;
9use std::time::{Duration, Instant};
10use tokio::sync::Notify;
11
12pub struct Projector<C, P>
14where
15 C: CanonicalStore,
16 P: ProjectionStore,
17{
18 canonical: Arc<C>,
19 projection: Arc<P>,
20 config: ProjectorConfig,
21 shutdown: Arc<AtomicBool>,
22 event_notify: Option<Arc<Notify>>,
26 total_gaps_detected: Arc<AtomicU64>,
29 total_events_skipped: Arc<AtomicU64>,
31}
32
33impl<C, P> Projector<C, P>
34where
35 C: CanonicalStore,
36 P: ProjectionStore,
37{
38 pub fn new(canonical: Arc<C>, projection: Arc<P>, config: ProjectorConfig) -> Self {
39 Self {
40 canonical,
41 projection,
42 config,
43 shutdown: Arc::new(AtomicBool::new(false)),
44 event_notify: None,
45 total_gaps_detected: Arc::new(AtomicU64::new(0)),
46 total_events_skipped: Arc::new(AtomicU64::new(0)),
47 }
48 }
49
50 pub fn with_event_notify(mut self, notify: Arc<Notify>) -> Self {
56 self.event_notify = Some(notify);
57 self
58 }
59
60 pub fn run_once(&self) -> Result<ProjectorStats> {
67 let start = Instant::now();
68
69 let cursor = self.projection.get_cursor()?;
71
72 let meta = self.canonical.meta()?;
74 let target = if let Some(sealed) = meta.sealed_event_id {
75 sealed
76 } else if meta.next_event_id > 0 {
77 meta.next_event_id - 1
78 } else {
79 return Ok(ProjectorStats::empty());
80 };
81
82 if cursor != u64::MAX && cursor >= target {
84 return Ok(ProjectorStats::empty());
85 }
86
87 let from = if cursor == u64::MAX { 0 } else { cursor + 1 };
90 let to = std::cmp::min(target + 1, from + self.config.batch_events_max as u64);
91
92 let mut events = Vec::new();
94 let mut total_bytes = 0;
95 let mut iter = self.canonical.iter_events(from, Some(to))?;
96
97 while let Some((id, bytes)) = iter.next()? {
98 total_bytes += bytes.len();
99 events.push((id, bytes));
100
101 if total_bytes >= self.config.batch_bytes_max {
103 break;
104 }
105
106 if start.elapsed().as_millis() > self.config.max_apply_latency_ms as u128 {
108 break;
109 }
110 }
111
112 if events.is_empty() {
113 return Ok(ProjectorStats::empty());
114 }
115
116 let (gaps_detected, events_skipped) = detect_gaps(from, &events);
118
119 if gaps_detected > 0 {
120 self.total_gaps_detected
121 .fetch_add(gaps_detected, Ordering::Relaxed);
122 self.total_events_skipped
123 .fetch_add(events_skipped, Ordering::Relaxed);
124 }
125
126 let mut txn = self.projection.begin_txn()?;
128 txn.apply_batch(&events)?;
129
130 let last_id = events.last().unwrap().0;
131 Box::new(txn).commit(last_id)?;
132
133 Ok(ProjectorStats {
134 events_applied: events.len(),
135 bytes_processed: total_bytes,
136 duration: start.elapsed(),
137 new_cursor: last_id,
138 gaps_detected,
139 events_skipped,
140 })
141 }
142
143 pub async fn run_continuous(&self) -> Result<()> {
150 while !self.shutdown.load(Ordering::SeqCst) {
151 match self.run_once() {
152 Ok(stats) => {
153 if stats.events_applied == 0 {
154 if let Some(notify) = &self.event_notify {
156 tokio::select! {
159 _ = notify.notified() => {}
160 _ = tokio::time::sleep(Duration::from_millis(self.config.poll_interval_ms)) => {}
161 }
162 } else {
163 tokio::time::sleep(Duration::from_millis(self.config.poll_interval_ms))
165 .await;
166 }
167 } else if stats.gaps_detected > 0 {
168 tracing::warn!(
169 events = stats.events_applied,
170 bytes = stats.bytes_processed,
171 gaps = stats.gaps_detected,
172 skipped = stats.events_skipped,
173 elapsed = ?stats.duration,
174 "Applied events with gaps (events may be in DLQ)"
175 );
176 } else {
177 tracing::debug!(
178 "Applied {} events, {} bytes in {:?}",
179 stats.events_applied,
180 stats.bytes_processed,
181 stats.duration
182 );
183 }
184 }
185 Err(e) => {
186 tracing::error!("Projector error: {}", e);
187 tokio::time::sleep(Duration::from_secs(1)).await;
189 }
190 }
191 }
192
193 Ok(())
194 }
195
196 pub fn shutdown(&self) {
198 self.shutdown.store(true, Ordering::SeqCst);
199 }
200
201 pub fn total_gaps_detected(&self) -> u64 {
203 self.total_gaps_detected.load(Ordering::Relaxed)
204 }
205
206 pub fn total_events_skipped(&self) -> u64 {
208 self.total_events_skipped.load(Ordering::Relaxed)
209 }
210
211 pub fn get_lag(&self) -> Result<u64> {
213 let cursor = self.projection.get_cursor()?;
214 let meta = self.canonical.meta()?;
215
216 let tip = if let Some(sealed) = meta.sealed_event_id {
217 sealed
218 } else if meta.next_event_id > 0 {
219 meta.next_event_id - 1
220 } else {
221 return Ok(0);
222 };
223
224 if cursor == u64::MAX {
226 return Ok(tip + 1);
227 }
228
229 Ok(tip.saturating_sub(cursor))
230 }
231}
232
233#[derive(Debug, Clone)]
234pub struct ProjectorStats {
235 pub events_applied: usize,
236 pub bytes_processed: usize,
237 pub duration: Duration,
238 pub new_cursor: EventId,
239 pub gaps_detected: u64,
241 pub events_skipped: u64,
243}
244
245impl ProjectorStats {
246 fn empty() -> Self {
247 Self {
248 events_applied: 0,
249 bytes_processed: 0,
250 duration: Duration::from_secs(0),
251 new_cursor: 0,
252 gaps_detected: 0,
253 events_skipped: 0,
254 }
255 }
256}
257
258fn detect_gaps(expected_first: EventId, events: &[(EventId, Vec<u8>)]) -> (u64, u64) {
266 if events.is_empty() {
267 return (0, 0);
268 }
269
270 let mut gaps_detected: u64 = 0;
271 let mut events_skipped: u64 = 0;
272
273 let actual_first = events[0].0;
275 if actual_first > expected_first {
276 let missing = actual_first - expected_first;
277 gaps_detected += 1;
278 events_skipped += missing;
279 tracing::warn!(
280 expected_id = expected_first,
281 actual_id = actual_first,
282 missing_count = missing,
283 "Projector: event gap detected between cursor and first event \
284 (events {}-{} missing, likely in DLQ). Advancing past gap.",
285 expected_first,
286 actual_first - 1,
287 );
288 }
289
290 for window in events.windows(2) {
292 let prev_id = window[0].0;
293 let curr_id = window[1].0;
294 let expected = prev_id + 1;
295 if curr_id > expected {
296 let missing = curr_id - expected;
297 gaps_detected += 1;
298 events_skipped += missing;
299 tracing::warn!(
300 expected_id = expected,
301 actual_id = curr_id,
302 missing_count = missing,
303 "Projector: event gap detected within batch \
304 (events {}-{} missing, likely in DLQ). Skipping gap.",
305 expected,
306 curr_id - 1,
307 );
308 }
309 }
310
311 (gaps_detected, events_skipped)
312}
313
314#[cfg(test)]
315mod tests {
316 use super::*;
317
318 fn event(id: EventId) -> (EventId, Vec<u8>) {
319 (id, vec![id as u8])
320 }
321
322 #[test]
323 fn test_no_gaps_sequential() {
324 let events = vec![event(5), event(6), event(7)];
325 let (gaps, skipped) = detect_gaps(5, &events);
326 assert_eq!(gaps, 0);
327 assert_eq!(skipped, 0);
328 }
329
330 #[test]
331 fn test_gap_before_first_event() {
332 let events = vec![event(8), event(9), event(10)];
334 let (gaps, skipped) = detect_gaps(5, &events);
335 assert_eq!(gaps, 1);
336 assert_eq!(skipped, 3); }
338
339 #[test]
340 fn test_gap_within_batch() {
341 let events = vec![event(5), event(6), event(9)];
343 let (gaps, skipped) = detect_gaps(5, &events);
344 assert_eq!(gaps, 1);
345 assert_eq!(skipped, 2); }
347
348 #[test]
349 fn test_multiple_gaps() {
350 let events = vec![event(2), event(3), event(5), event(8)];
352 let (gaps, skipped) = detect_gaps(0, &events);
353 assert_eq!(gaps, 3); assert_eq!(skipped, 2 + 1 + 2); }
356
357 #[test]
358 fn test_single_event_no_gap() {
359 let events = vec![event(42)];
360 let (gaps, skipped) = detect_gaps(42, &events);
361 assert_eq!(gaps, 0);
362 assert_eq!(skipped, 0);
363 }
364
365 #[test]
366 fn test_single_event_with_gap() {
367 let events = vec![event(45)];
368 let (gaps, skipped) = detect_gaps(42, &events);
369 assert_eq!(gaps, 1);
370 assert_eq!(skipped, 3); }
372
373 #[test]
374 fn test_empty_events() {
375 let events: Vec<(EventId, Vec<u8>)> = vec![];
376 let (gaps, skipped) = detect_gaps(0, &events);
377 assert_eq!(gaps, 0);
378 assert_eq!(skipped, 0);
379 }
380
381 #[test]
382 fn test_first_event_matches_exactly() {
383 let events = vec![event(0), event(1), event(2)];
384 let (gaps, skipped) = detect_gaps(0, &events);
385 assert_eq!(gaps, 0);
386 assert_eq!(skipped, 0);
387 }
388}