1use std::{
5 collections::HashMap,
6 sync::Arc,
7 time::{Duration, SystemTime, UNIX_EPOCH},
8};
9
10use reifydb_catalog::catalog::Catalog;
11use reifydb_column::{
12 bucket::{Bucket, BucketId, bucket_for, is_closed},
13 compress::Compressor,
14 registry::SnapshotRegistry,
15 snapshot::{Snapshot, SnapshotId, SnapshotSource},
16};
17use reifydb_core::interface::{
18 catalog::{
19 id::SeriesId,
20 series::{Series, SeriesMetadata},
21 },
22 resolved::{ResolvedNamespace, ResolvedSeries},
23};
24use reifydb_engine::{
25 engine::StandardEngine,
26 vm::{
27 stack::SymbolTable,
28 volcano::{
29 query::{QueryContext, QueryNode},
30 scan::series::SeriesScanNode,
31 },
32 },
33};
34use reifydb_runtime::actor::{
35 context::Context,
36 system::ActorConfig,
37 timers::TimerHandle,
38 traits::{Actor, Directive},
39};
40use reifydb_transaction::transaction::{Transaction, query::QueryTransaction};
41use reifydb_type::{
42 Result,
43 fragment::Fragment,
44 params::Params,
45 value::{datetime::DateTime, identity::IdentityId, r#type::Type},
46};
47use tracing::{debug, warn};
48
49use crate::{
50 actor::{SeriesMessage, batches::column_block_from_batches},
51 error::SubColumnError,
52};
53
54pub struct SeriesBucketState {
55 pub materialized_at_sequence: u64,
56}
57
58pub struct SeriesMaterializationState {
59 pub bucket_state: HashMap<(SeriesId, BucketId), SeriesBucketState>,
60 _timer_handle: Option<TimerHandle>,
61}
62
63pub struct SeriesMaterializationActor {
64 engine: StandardEngine,
65 registry: SnapshotRegistry,
66 compressor: Compressor,
67 tick_interval: Duration,
68 bucket_width: u64,
69 grace: Duration,
70}
71
72impl SeriesMaterializationActor {
73 pub fn new(
74 engine: StandardEngine,
75 registry: SnapshotRegistry,
76 compressor: Compressor,
77 tick_interval: Duration,
78 bucket_width: u64,
79 grace: Duration,
80 ) -> Self {
81 Self {
82 engine,
83 registry,
84 compressor,
85 tick_interval,
86 bucket_width,
87 grace,
88 }
89 }
90
91 pub fn registry(&self) -> &SnapshotRegistry {
92 &self.registry
93 }
94
95 fn run_tick(&self, state: &mut SeriesMaterializationState, _now: DateTime) {
96 let Some(mut query_txn) = self.begin_query_or_warn() else {
97 return;
98 };
99 let catalog = self.engine.catalog();
100 let now_wall = UNIX_EPOCH + Duration::from_nanos(self.engine.clock().now_nanos());
101 let series_list = match catalog.list_series_all(&mut Transaction::Query(&mut query_txn)) {
102 Ok(s) => s,
103 Err(e) => {
104 warn!("series materialization: list_series_all failed: {e}");
105 return;
106 }
107 };
108 for series in series_list {
109 self.materialize_series_buckets(state, &mut query_txn, &catalog, &series, now_wall);
110 }
111 }
112
113 #[inline]
114 fn begin_query_or_warn(&self) -> Option<QueryTransaction> {
115 match self.engine.begin_query(IdentityId::system()) {
116 Ok(t) => Some(t),
117 Err(e) => {
118 warn!("series materialization: begin_query failed: {e}");
119 None
120 }
121 }
122 }
123
124 fn materialize_series_buckets(
125 &self,
126 state: &mut SeriesMaterializationState,
127 query_txn: &mut QueryTransaction,
128 catalog: &Catalog,
129 series: &Series,
130 now_wall: SystemTime,
131 ) {
132 let Some(metadata) = self.load_series_metadata_or_warn(query_txn, catalog, series) else {
133 return;
134 };
135 if metadata.row_count == 0 {
136 return;
137 }
138 let first = bucket_for(metadata.oldest_key, self.bucket_width);
139 let last = bucket_for(metadata.newest_key, self.bucket_width);
140 let mut start = first.start;
141 while start <= last.start {
142 let bucket = Bucket {
143 start,
144 end: start + self.bucket_width,
145 width: self.bucket_width,
146 };
147 start = start.saturating_add(self.bucket_width);
148 self.maybe_materialize_bucket(state, query_txn, series, &metadata, &bucket, now_wall);
149 }
150 }
151
152 #[inline]
153 fn load_series_metadata_or_warn(
154 &self,
155 query_txn: &mut QueryTransaction,
156 catalog: &Catalog,
157 series: &Series,
158 ) -> Option<SeriesMetadata> {
159 let mut tx: Transaction<'_> = query_txn.into();
160 match catalog.find_series_metadata(&mut tx, series.id) {
161 Ok(Some(m)) => Some(m),
162 Ok(None) => None,
163 Err(e) => {
164 warn!("series materialization: find_series_metadata failed for {:?}: {e}", series.id);
165 None
166 }
167 }
168 }
169
170 fn maybe_materialize_bucket(
171 &self,
172 state: &mut SeriesMaterializationState,
173 query_txn: &mut QueryTransaction,
174 series: &Series,
175 metadata: &SeriesMetadata,
176 bucket: &Bucket,
177 now_wall: SystemTime,
178 ) {
179 if !is_closed(bucket, series, metadata, now_wall, self.grace) {
180 return;
181 }
182 let key = (series.id, bucket.id());
183 let need_remat = match state.bucket_state.get(&key) {
184 None => true,
185 Some(s) => s.materialized_at_sequence < metadata.sequence_counter,
186 };
187 if !need_remat {
188 return;
189 }
190 match self.materialize_bucket(query_txn, series, metadata, bucket) {
191 Ok(()) => {
192 state.bucket_state.insert(
193 key,
194 SeriesBucketState {
195 materialized_at_sequence: metadata.sequence_counter,
196 },
197 );
198 }
199 Err(e) => {
200 warn!(
201 "series materialization skipped for {:?} bucket {:?}: {e}",
202 series.id,
203 bucket.id()
204 );
205 }
206 }
207 }
208
209 fn materialize_bucket(
210 &self,
211 query_txn: &mut QueryTransaction,
212 series: &Series,
213 metadata: &SeriesMetadata,
214 bucket: &Bucket,
215 ) -> Result<()> {
216 let services = self.engine.services();
217 let catalog = self.engine.catalog();
218
219 let namespace_def = catalog
220 .find_namespace(&mut Transaction::Query(&mut *query_txn), series.namespace)?
221 .ok_or_else(|| missing_namespace(series))?;
222 let resolved_namespace =
223 ResolvedNamespace::new(Fragment::internal(namespace_def.name()), namespace_def.clone());
224 let resolved_series = ResolvedSeries::new(
225 Fragment::internal(series.name.clone()),
226 resolved_namespace,
227 series.clone(),
228 );
229
230 let context = Arc::new(QueryContext {
231 services,
232 source: None,
233 batch_size: 1024,
234 params: Params::None,
235 symbols: SymbolTable::new(),
236 identity: IdentityId::system(),
237 });
238
239 let mut scan = SeriesScanNode::new(
240 resolved_series,
241 Some(bucket.start),
242 Some(bucket.end),
243 None,
244 Arc::clone(&context),
245 )?;
246
247 let mut tx: Transaction<'_> = (&mut *query_txn).into();
248 scan.initialize(&mut tx, &context)?;
249 let mut ctx = (*context).clone();
250 let mut batches = Vec::new();
251 while let Some(batch) = scan.next(&mut tx, &mut ctx)? {
252 batches.push(batch);
253 }
254
255 let schema = scan_output_schema(series);
256 let block = column_block_from_batches(schema, batches, &self.compressor)?;
257
258 let snapshot = Snapshot {
259 id: SnapshotId::Series {
260 series_id: series.id,
261 bucket: bucket.id(),
262 },
263 source: SnapshotSource::Series {
264 series_id: series.id,
265 bucket: *bucket,
266 sequence_counter: metadata.sequence_counter,
267 },
268 namespace: namespace_def.name().to_string(),
269 name: series.name.clone(),
270 created_at: self.engine.clock().instant(),
271 block,
272 };
273 self.registry.insert(Arc::new(snapshot));
274 Ok(())
275 }
276}
277
278fn scan_output_schema(series: &Series) -> Vec<(String, Type)> {
279 let key_name = series.key.column().to_string();
280 let key_ty = series
281 .columns
282 .iter()
283 .find(|c| c.name == key_name)
284 .map(|c| c.constraint.get_type())
285 .unwrap_or(Type::Uint8);
286
287 let mut schema = Vec::with_capacity(series.columns.len() + 1);
288 schema.push((key_name.clone(), key_ty));
289 if series.tag.is_some() {
290 schema.push(("tag".to_string(), Type::Uint1));
291 }
292 for col in series.data_columns() {
293 schema.push((col.name.clone(), col.constraint.get_type()));
294 }
295 schema
296}
297
298fn missing_namespace(series: &Series) -> SubColumnError {
299 SubColumnError::NamespaceMissing {
300 namespace: series.namespace,
301 series: series.id,
302 }
303}
304
305impl Actor for SeriesMaterializationActor {
306 type State = SeriesMaterializationState;
307 type Message = SeriesMessage;
308
309 fn init(&self, ctx: &Context<SeriesMessage>) -> SeriesMaterializationState {
310 debug!(
311 "SeriesMaterializationActor started (tick={:?}, width={}, grace={:?})",
312 self.tick_interval, self.bucket_width, self.grace
313 );
314 let handle =
315 ctx.schedule_tick(self.tick_interval, |nanos| SeriesMessage::Tick(DateTime::from_nanos(nanos)));
316 SeriesMaterializationState {
317 bucket_state: HashMap::new(),
318 _timer_handle: Some(handle),
319 }
320 }
321
322 fn handle(&self, state: &mut Self::State, msg: Self::Message, ctx: &Context<Self::Message>) -> Directive {
323 if ctx.is_cancelled() {
324 return Directive::Stop;
325 }
326 match msg {
327 SeriesMessage::Tick(now) => self.run_tick(state, now),
328 SeriesMessage::Shutdown => {
329 debug!("SeriesMaterializationActor shutting down");
330 return Directive::Stop;
331 }
332 }
333 Directive::Continue
334 }
335
336 fn post_stop(&self) {
337 debug!("SeriesMaterializationActor stopped");
338 }
339
340 fn config(&self) -> ActorConfig {
341 ActorConfig::new().mailbox_capacity(64)
342 }
343}