azoth_projector/
projector.rs1use azoth_core::{
2 error::Result,
3 traits::{CanonicalStore, ProjectionStore, ProjectionTxn},
4 types::EventId,
5 ProjectorConfig,
6};
7use std::sync::atomic::{AtomicBool, Ordering};
8use std::sync::Arc;
9use std::time::{Duration, Instant};
10use tokio::sync::Notify;
11
12pub struct Projector<C, P>
14where
15 C: CanonicalStore,
16 P: ProjectionStore,
17{
18 canonical: Arc<C>,
19 projection: Arc<P>,
20 config: ProjectorConfig,
21 shutdown: Arc<AtomicBool>,
22 event_notify: Option<Arc<Notify>>,
26}
27
28impl<C, P> Projector<C, P>
29where
30 C: CanonicalStore,
31 P: ProjectionStore,
32{
33 pub fn new(canonical: Arc<C>, projection: Arc<P>, config: ProjectorConfig) -> Self {
34 Self {
35 canonical,
36 projection,
37 config,
38 shutdown: Arc::new(AtomicBool::new(false)),
39 event_notify: None,
40 }
41 }
42
43 pub fn with_event_notify(mut self, notify: Arc<Notify>) -> Self {
49 self.event_notify = Some(notify);
50 self
51 }
52
53 pub fn run_once(&self) -> Result<ProjectorStats> {
55 let start = Instant::now();
56
57 let cursor = self.projection.get_cursor()?;
59
60 let meta = self.canonical.meta()?;
62 let target = if let Some(sealed) = meta.sealed_event_id {
63 sealed
64 } else if meta.next_event_id > 0 {
65 meta.next_event_id - 1
66 } else {
67 return Ok(ProjectorStats::empty());
68 };
69
70 if cursor != u64::MAX && cursor >= target {
72 return Ok(ProjectorStats::empty());
73 }
74
75 let from = if cursor == u64::MAX { 0 } else { cursor + 1 };
78 let to = std::cmp::min(target + 1, from + self.config.batch_events_max as u64);
79
80 let mut events = Vec::new();
82 let mut total_bytes = 0;
83 let mut iter = self.canonical.iter_events(from, Some(to))?;
84
85 while let Some((id, bytes)) = iter.next()? {
86 total_bytes += bytes.len();
87 events.push((id, bytes));
88
89 if total_bytes >= self.config.batch_bytes_max {
91 break;
92 }
93
94 if start.elapsed().as_millis() > self.config.max_apply_latency_ms as u128 {
96 break;
97 }
98 }
99
100 if events.is_empty() {
101 return Ok(ProjectorStats::empty());
102 }
103
104 let mut txn = self.projection.begin_txn()?;
106 txn.apply_batch(&events)?;
107
108 let last_id = events.last().unwrap().0;
109 Box::new(txn).commit(last_id)?;
110
111 Ok(ProjectorStats {
112 events_applied: events.len(),
113 bytes_processed: total_bytes,
114 duration: start.elapsed(),
115 new_cursor: last_id,
116 })
117 }
118
119 pub async fn run_continuous(&self) -> Result<()> {
126 while !self.shutdown.load(Ordering::SeqCst) {
127 match self.run_once() {
128 Ok(stats) => {
129 if stats.events_applied == 0 {
130 if let Some(notify) = &self.event_notify {
132 tokio::select! {
135 _ = notify.notified() => {}
136 _ = tokio::time::sleep(Duration::from_millis(self.config.poll_interval_ms)) => {}
137 }
138 } else {
139 tokio::time::sleep(Duration::from_millis(self.config.poll_interval_ms))
141 .await;
142 }
143 } else {
144 tracing::debug!(
145 "Applied {} events, {} bytes in {:?}",
146 stats.events_applied,
147 stats.bytes_processed,
148 stats.duration
149 );
150 }
151 }
152 Err(e) => {
153 tracing::error!("Projector error: {}", e);
154 tokio::time::sleep(Duration::from_secs(1)).await;
156 }
157 }
158 }
159
160 Ok(())
161 }
162
163 pub fn shutdown(&self) {
165 self.shutdown.store(true, Ordering::SeqCst);
166 }
167
168 pub fn get_lag(&self) -> Result<u64> {
170 let cursor = self.projection.get_cursor()?;
171 let meta = self.canonical.meta()?;
172
173 let tip = if let Some(sealed) = meta.sealed_event_id {
174 sealed
175 } else if meta.next_event_id > 0 {
176 meta.next_event_id - 1
177 } else {
178 return Ok(0);
179 };
180
181 if cursor == u64::MAX {
183 return Ok(tip + 1);
184 }
185
186 Ok(tip.saturating_sub(cursor))
187 }
188}
189
190#[derive(Debug, Clone)]
191pub struct ProjectorStats {
192 pub events_applied: usize,
193 pub bytes_processed: usize,
194 pub duration: Duration,
195 pub new_cursor: EventId,
196}
197
198impl ProjectorStats {
199 fn empty() -> Self {
200 Self {
201 events_applied: 0,
202 bytes_processed: 0,
203 duration: Duration::from_secs(0),
204 new_cursor: 0,
205 }
206 }
207}