azoth_projector/
projector.rs1use azoth_core::{
2 error::Result,
3 traits::{CanonicalStore, ProjectionStore, ProjectionTxn},
4 types::EventId,
5 ProjectorConfig,
6};
7use std::sync::atomic::{AtomicBool, Ordering};
8use std::sync::Arc;
9use std::time::{Duration, Instant};
10
11pub struct Projector<C, P>
13where
14 C: CanonicalStore,
15 P: ProjectionStore,
16{
17 canonical: Arc<C>,
18 projection: Arc<P>,
19 config: ProjectorConfig,
20 shutdown: Arc<AtomicBool>,
21}
22
23impl<C, P> Projector<C, P>
24where
25 C: CanonicalStore,
26 P: ProjectionStore,
27{
28 pub fn new(canonical: Arc<C>, projection: Arc<P>, config: ProjectorConfig) -> Self {
29 Self {
30 canonical,
31 projection,
32 config,
33 shutdown: Arc::new(AtomicBool::new(false)),
34 }
35 }
36
37 pub fn run_once(&self) -> Result<ProjectorStats> {
39 let start = Instant::now();
40
41 let cursor = self.projection.get_cursor()?;
43
44 let meta = self.canonical.meta()?;
46 let target = if let Some(sealed) = meta.sealed_event_id {
47 sealed
48 } else if meta.next_event_id > 0 {
49 meta.next_event_id - 1
50 } else {
51 return Ok(ProjectorStats::empty());
52 };
53
54 if cursor != u64::MAX && cursor >= target {
56 return Ok(ProjectorStats::empty());
57 }
58
59 let from = if cursor == u64::MAX { 0 } else { cursor + 1 };
62 let to = std::cmp::min(target + 1, from + self.config.batch_events_max as u64);
63
64 let mut events = Vec::new();
66 let mut total_bytes = 0;
67 let mut iter = self.canonical.iter_events(from, Some(to))?;
68
69 while let Some((id, bytes)) = iter.next()? {
70 total_bytes += bytes.len();
71 events.push((id, bytes));
72
73 if total_bytes >= self.config.batch_bytes_max {
75 break;
76 }
77
78 if start.elapsed().as_millis() > self.config.max_apply_latency_ms as u128 {
80 break;
81 }
82 }
83
84 if events.is_empty() {
85 return Ok(ProjectorStats::empty());
86 }
87
88 let mut txn = self.projection.begin_txn()?;
90 txn.apply_batch(&events)?;
91
92 let last_id = events.last().unwrap().0;
93 Box::new(txn).commit(last_id)?;
94
95 Ok(ProjectorStats {
96 events_applied: events.len(),
97 bytes_processed: total_bytes,
98 duration: start.elapsed(),
99 new_cursor: last_id,
100 })
101 }
102
103 pub async fn run_continuous(&self) -> Result<()> {
105 while !self.shutdown.load(Ordering::SeqCst) {
106 match self.run_once() {
107 Ok(stats) => {
108 if stats.events_applied == 0 {
109 tokio::time::sleep(Duration::from_millis(self.config.poll_interval_ms))
111 .await;
112 } else {
113 tracing::debug!(
114 "Applied {} events, {} bytes in {:?}",
115 stats.events_applied,
116 stats.bytes_processed,
117 stats.duration
118 );
119 }
120 }
121 Err(e) => {
122 tracing::error!("Projector error: {}", e);
123 tokio::time::sleep(Duration::from_secs(1)).await;
125 }
126 }
127 }
128
129 Ok(())
130 }
131
132 pub fn shutdown(&self) {
134 self.shutdown.store(true, Ordering::SeqCst);
135 }
136
137 pub fn get_lag(&self) -> Result<u64> {
139 let cursor = self.projection.get_cursor()?;
140 let meta = self.canonical.meta()?;
141
142 let tip = if let Some(sealed) = meta.sealed_event_id {
143 sealed
144 } else if meta.next_event_id > 0 {
145 meta.next_event_id - 1
146 } else {
147 return Ok(0);
148 };
149
150 if cursor == u64::MAX {
152 return Ok(tip + 1);
153 }
154
155 Ok(tip.saturating_sub(cursor))
156 }
157}
158
159#[derive(Debug, Clone)]
160pub struct ProjectorStats {
161 pub events_applied: usize,
162 pub bytes_processed: usize,
163 pub duration: Duration,
164 pub new_cursor: EventId,
165}
166
167impl ProjectorStats {
168 fn empty() -> Self {
169 Self {
170 events_applied: 0,
171 bytes_processed: 0,
172 duration: Duration::from_secs(0),
173 new_cursor: 0,
174 }
175 }
176}