1use std::{
5 collections::HashMap,
6 sync::Arc,
7 time::{Duration, SystemTime, UNIX_EPOCH},
8};
9
10use reifydb_catalog::catalog::Catalog;
11use reifydb_column::{
12 bucket::{Bucket, BucketId, bucket_for, is_closed},
13 compress::Compressor,
14 registry::SnapshotRegistry,
15 snapshot::{Snapshot, SnapshotId, SnapshotSource},
16};
17use reifydb_core::interface::{
18 catalog::{
19 id::SeriesId,
20 series::{Series, SeriesMetadata},
21 },
22 resolved::{ResolvedNamespace, ResolvedSeries},
23};
24use reifydb_engine::{
25 engine::StandardEngine,
26 vm::{
27 stack::SymbolTable,
28 volcano::{
29 query::{QueryContext, QueryNode},
30 scan::series::SeriesScanNode,
31 },
32 },
33};
34use reifydb_runtime::actor::{
35 context::Context,
36 system::ActorConfig,
37 timers::TimerHandle,
38 traits::{Actor, Directive},
39};
40use reifydb_transaction::transaction::{Transaction, query::QueryTransaction};
41use reifydb_type::{
42 Result,
43 fragment::Fragment,
44 params::Params,
45 value::{datetime::DateTime, identity::IdentityId, r#type::Type},
46};
47use tracing::{debug, warn};
48
49use crate::{
50 actor::{SeriesMessage, batches::column_block_from_batches},
51 error::SubColumnError,
52};
53
54pub struct SeriesBucketState {
55 pub materialized_at_sequence: u64,
56}
57
58pub struct SeriesMaterializationState {
59 pub bucket_state: HashMap<(SeriesId, BucketId), SeriesBucketState>,
60 _timer_handle: Option<TimerHandle>,
61}
62
63pub struct SeriesMaterializationActor {
70 engine: StandardEngine,
71 registry: SnapshotRegistry,
72 compressor: Compressor,
73 tick_interval: Duration,
74 bucket_width: u64,
75 grace: Duration,
76}
77
78impl SeriesMaterializationActor {
79 pub fn new(
80 engine: StandardEngine,
81 registry: SnapshotRegistry,
82 compressor: Compressor,
83 tick_interval: Duration,
84 bucket_width: u64,
85 grace: Duration,
86 ) -> Self {
87 Self {
88 engine,
89 registry,
90 compressor,
91 tick_interval,
92 bucket_width,
93 grace,
94 }
95 }
96
97 pub fn registry(&self) -> &SnapshotRegistry {
98 &self.registry
99 }
100
101 fn run_tick(&self, state: &mut SeriesMaterializationState, _now: DateTime) {
102 let Some(mut query_txn) = self.begin_query_or_warn() else {
103 return;
104 };
105 let catalog = self.engine.catalog();
106 let now_wall = UNIX_EPOCH + Duration::from_nanos(self.engine.clock().now_nanos());
107 for series in catalog.materialized.list_series() {
108 self.materialize_series_buckets(state, &mut query_txn, &catalog, &series, now_wall);
109 }
110 }
111
112 #[inline]
113 fn begin_query_or_warn(&self) -> Option<QueryTransaction> {
114 match self.engine.begin_query(IdentityId::system()) {
115 Ok(t) => Some(t),
116 Err(e) => {
117 warn!("series materialization: begin_query failed: {e}");
118 None
119 }
120 }
121 }
122
123 fn materialize_series_buckets(
124 &self,
125 state: &mut SeriesMaterializationState,
126 query_txn: &mut QueryTransaction,
127 catalog: &Catalog,
128 series: &Series,
129 now_wall: SystemTime,
130 ) {
131 let Some(metadata) = self.load_series_metadata_or_warn(query_txn, catalog, series) else {
132 return;
133 };
134 if metadata.row_count == 0 {
135 return;
136 }
137 let first = bucket_for(metadata.oldest_key, self.bucket_width);
138 let last = bucket_for(metadata.newest_key, self.bucket_width);
139 let mut start = first.start;
140 while start <= last.start {
141 let bucket = Bucket {
142 start,
143 end: start + self.bucket_width,
144 width: self.bucket_width,
145 };
146 start = start.saturating_add(self.bucket_width);
147 self.maybe_materialize_bucket(state, query_txn, series, &metadata, &bucket, now_wall);
148 }
149 }
150
151 #[inline]
152 fn load_series_metadata_or_warn(
153 &self,
154 query_txn: &mut QueryTransaction,
155 catalog: &Catalog,
156 series: &Series,
157 ) -> Option<SeriesMetadata> {
158 let mut tx: Transaction<'_> = query_txn.into();
159 match catalog.find_series_metadata(&mut tx, series.id) {
160 Ok(Some(m)) => Some(m),
161 Ok(None) => None,
162 Err(e) => {
163 warn!("series materialization: find_series_metadata failed for {:?}: {e}", series.id);
164 None
165 }
166 }
167 }
168
169 fn maybe_materialize_bucket(
170 &self,
171 state: &mut SeriesMaterializationState,
172 query_txn: &mut QueryTransaction,
173 series: &Series,
174 metadata: &SeriesMetadata,
175 bucket: &Bucket,
176 now_wall: SystemTime,
177 ) {
178 if !is_closed(bucket, series, metadata, now_wall, self.grace) {
179 return;
180 }
181 let key = (series.id, bucket.id());
182 let need_remat = match state.bucket_state.get(&key) {
183 None => true,
184 Some(s) => s.materialized_at_sequence < metadata.sequence_counter,
185 };
186 if !need_remat {
187 return;
188 }
189 match self.materialize_bucket(query_txn, series, metadata, bucket) {
190 Ok(()) => {
191 state.bucket_state.insert(
192 key,
193 SeriesBucketState {
194 materialized_at_sequence: metadata.sequence_counter,
195 },
196 );
197 }
198 Err(e) => {
199 warn!(
200 "series materialization skipped for {:?} bucket {:?}: {e}",
201 series.id,
202 bucket.id()
203 );
204 }
205 }
206 }
207
208 fn materialize_bucket(
209 &self,
210 query_txn: &mut QueryTransaction,
211 series: &Series,
212 metadata: &SeriesMetadata,
213 bucket: &Bucket,
214 ) -> Result<()> {
215 let services = self.engine.services();
216 let catalog = self.engine.catalog();
217
218 let namespace_def = catalog
219 .materialized
220 .find_namespace(series.namespace)
221 .ok_or_else(|| missing_namespace(series))?;
222 let resolved_namespace =
223 ResolvedNamespace::new(Fragment::internal(namespace_def.name()), namespace_def.clone());
224 let resolved_series = ResolvedSeries::new(
225 Fragment::internal(series.name.clone()),
226 resolved_namespace,
227 series.clone(),
228 );
229
230 let context = Arc::new(QueryContext {
231 services,
232 source: None,
233 batch_size: 1024,
234 params: Params::None,
235 symbols: SymbolTable::new(),
236 identity: IdentityId::system(),
237 });
238
239 let mut scan = SeriesScanNode::new(
240 resolved_series,
241 Some(bucket.start),
242 Some(bucket.end),
243 None,
244 Arc::clone(&context),
245 )?;
246
247 let mut tx: Transaction<'_> = (&mut *query_txn).into();
248 scan.initialize(&mut tx, &context)?;
249 let mut ctx = (*context).clone();
250 let mut batches = Vec::new();
251 while let Some(batch) = scan.next(&mut tx, &mut ctx)? {
252 batches.push(batch);
253 }
254
255 let schema = scan_output_schema(series);
256 let block = column_block_from_batches(schema, batches, &self.compressor)?;
257
258 let snapshot = Snapshot {
259 id: SnapshotId::Series {
260 series_id: series.id,
261 bucket: bucket.id(),
262 },
263 source: SnapshotSource::Series {
264 series_id: series.id,
265 bucket: *bucket,
266 sequence_counter: metadata.sequence_counter,
267 },
268 namespace: namespace_def.name().to_string(),
269 name: series.name.clone(),
270 created_at: self.engine.clock().instant(),
271 block,
272 };
273 self.registry.insert(Arc::new(snapshot));
274 Ok(())
275 }
276}
277
278fn scan_output_schema(series: &Series) -> Vec<(String, Type)> {
282 let key_name = series.key.column().to_string();
283 let key_ty = series
284 .columns
285 .iter()
286 .find(|c| c.name == key_name)
287 .map(|c| c.constraint.get_type())
288 .unwrap_or(Type::Uint8);
289
290 let mut schema = Vec::with_capacity(series.columns.len() + 1);
291 schema.push((key_name.clone(), key_ty));
292 if series.tag.is_some() {
293 schema.push(("tag".to_string(), Type::Uint1));
294 }
295 for col in series.data_columns() {
296 schema.push((col.name.clone(), col.constraint.get_type()));
297 }
298 schema
299}
300
301fn missing_namespace(series: &Series) -> SubColumnError {
302 SubColumnError::NamespaceMissing {
303 namespace: series.namespace,
304 series: series.id,
305 }
306}
307
308impl Actor for SeriesMaterializationActor {
309 type State = SeriesMaterializationState;
310 type Message = SeriesMessage;
311
312 fn init(&self, ctx: &Context<SeriesMessage>) -> SeriesMaterializationState {
313 debug!(
314 "SeriesMaterializationActor started (tick={:?}, width={}, grace={:?})",
315 self.tick_interval, self.bucket_width, self.grace
316 );
317 let handle =
318 ctx.schedule_tick(self.tick_interval, |nanos| SeriesMessage::Tick(DateTime::from_nanos(nanos)));
319 SeriesMaterializationState {
320 bucket_state: HashMap::new(),
321 _timer_handle: Some(handle),
322 }
323 }
324
325 fn handle(&self, state: &mut Self::State, msg: Self::Message, ctx: &Context<Self::Message>) -> Directive {
326 if ctx.is_cancelled() {
327 return Directive::Stop;
328 }
329 match msg {
330 SeriesMessage::Tick(now) => self.run_tick(state, now),
331 SeriesMessage::Shutdown => {
332 debug!("SeriesMaterializationActor shutting down");
333 return Directive::Stop;
334 }
335 }
336 Directive::Continue
337 }
338
339 fn post_stop(&self) {
340 debug!("SeriesMaterializationActor stopped");
341 }
342
343 fn config(&self) -> ActorConfig {
344 ActorConfig::new().mailbox_capacity(64)
345 }
346}