Skip to main content

azoth_projector/
projector.rs

1use azoth_core::{
2    error::Result,
3    traits::{CanonicalStore, ProjectionStore, ProjectionTxn},
4    types::EventId,
5    ProjectorConfig,
6};
7use std::sync::atomic::{AtomicBool, Ordering};
8use std::sync::Arc;
9use std::time::{Duration, Instant};
10
11/// Projector: consumes events from canonical store and applies to projection
12pub struct Projector<C, P>
13where
14    C: CanonicalStore,
15    P: ProjectionStore,
16{
17    canonical: Arc<C>,
18    projection: Arc<P>,
19    config: ProjectorConfig,
20    shutdown: Arc<AtomicBool>,
21}
22
23impl<C, P> Projector<C, P>
24where
25    C: CanonicalStore,
26    P: ProjectionStore,
27{
28    pub fn new(canonical: Arc<C>, projection: Arc<P>, config: ProjectorConfig) -> Self {
29        Self {
30            canonical,
31            projection,
32            config,
33            shutdown: Arc::new(AtomicBool::new(false)),
34        }
35    }
36
37    /// Run one iteration of the projector loop
38    pub fn run_once(&self) -> Result<ProjectorStats> {
39        let start = Instant::now();
40
41        // Get current cursor
42        let cursor = self.projection.get_cursor()?;
43
44        // Determine target event ID
45        let meta = self.canonical.meta()?;
46        let target = if let Some(sealed) = meta.sealed_event_id {
47            sealed
48        } else if meta.next_event_id > 0 {
49            meta.next_event_id - 1
50        } else {
51            return Ok(ProjectorStats::empty());
52        };
53
54        // Check if caught up (but not if cursor is u64::MAX which means no events processed)
55        if cursor != u64::MAX && cursor >= target {
56            return Ok(ProjectorStats::empty());
57        }
58
59        // Determine batch size
60        // Note: cursor of u64::MAX indicates -1 was cast from i64 (no events processed yet)
61        let from = if cursor == u64::MAX { 0 } else { cursor + 1 };
62        let to = std::cmp::min(target + 1, from + self.config.batch_events_max as u64);
63
64        // Fetch events
65        let mut events = Vec::new();
66        let mut total_bytes = 0;
67        let mut iter = self.canonical.iter_events(from, Some(to))?;
68
69        while let Some((id, bytes)) = iter.next()? {
70            total_bytes += bytes.len();
71            events.push((id, bytes));
72
73            // Check byte limit
74            if total_bytes >= self.config.batch_bytes_max {
75                break;
76            }
77
78            // Check latency limit
79            if start.elapsed().as_millis() > self.config.max_apply_latency_ms as u128 {
80                break;
81            }
82        }
83
84        if events.is_empty() {
85            return Ok(ProjectorStats::empty());
86        }
87
88        // Apply events in a transaction
89        let mut txn = self.projection.begin_txn()?;
90        txn.apply_batch(&events)?;
91
92        let last_id = events.last().unwrap().0;
93        Box::new(txn).commit(last_id)?;
94
95        Ok(ProjectorStats {
96            events_applied: events.len(),
97            bytes_processed: total_bytes,
98            duration: start.elapsed(),
99            new_cursor: last_id,
100        })
101    }
102
103    /// Run the projector continuously until shutdown
104    pub async fn run_continuous(&self) -> Result<()> {
105        while !self.shutdown.load(Ordering::SeqCst) {
106            match self.run_once() {
107                Ok(stats) => {
108                    if stats.events_applied == 0 {
109                        // Caught up, sleep briefly
110                        tokio::time::sleep(Duration::from_millis(self.config.poll_interval_ms))
111                            .await;
112                    } else {
113                        tracing::debug!(
114                            "Applied {} events, {} bytes in {:?}",
115                            stats.events_applied,
116                            stats.bytes_processed,
117                            stats.duration
118                        );
119                    }
120                }
121                Err(e) => {
122                    tracing::error!("Projector error: {}", e);
123                    // Back off on error
124                    tokio::time::sleep(Duration::from_secs(1)).await;
125                }
126            }
127        }
128
129        Ok(())
130    }
131
132    /// Signal graceful shutdown
133    pub fn shutdown(&self) {
134        self.shutdown.store(true, Ordering::SeqCst);
135    }
136
137    /// Check lag (events behind)
138    pub fn get_lag(&self) -> Result<u64> {
139        let cursor = self.projection.get_cursor()?;
140        let meta = self.canonical.meta()?;
141
142        let tip = if let Some(sealed) = meta.sealed_event_id {
143            sealed
144        } else if meta.next_event_id > 0 {
145            meta.next_event_id - 1
146        } else {
147            return Ok(0);
148        };
149
150        // Handle cursor = u64::MAX (represents -1, no events processed)
151        if cursor == u64::MAX {
152            return Ok(tip + 1);
153        }
154
155        Ok(tip.saturating_sub(cursor))
156    }
157}
158
159#[derive(Debug, Clone)]
160pub struct ProjectorStats {
161    pub events_applied: usize,
162    pub bytes_processed: usize,
163    pub duration: Duration,
164    pub new_cursor: EventId,
165}
166
167impl ProjectorStats {
168    fn empty() -> Self {
169        Self {
170            events_applied: 0,
171            bytes_processed: 0,
172            duration: Duration::from_secs(0),
173            new_cursor: 0,
174        }
175    }
176}