Skip to main content

reifydb_sub_column/actor/
series.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use std::{
5	collections::HashMap,
6	sync::Arc,
7	time::{Duration, SystemTime, UNIX_EPOCH},
8};
9
10use reifydb_catalog::catalog::Catalog;
11use reifydb_column::{
12	bucket::{Bucket, BucketId, bucket_for, is_closed},
13	compress::Compressor,
14	registry::SnapshotRegistry,
15	snapshot::{Snapshot, SnapshotId, SnapshotSource},
16};
17use reifydb_core::interface::{
18	catalog::{
19		id::SeriesId,
20		series::{Series, SeriesMetadata},
21	},
22	resolved::{ResolvedNamespace, ResolvedSeries},
23};
24use reifydb_engine::{
25	engine::StandardEngine,
26	vm::{
27		stack::SymbolTable,
28		volcano::{
29			query::{QueryContext, QueryNode},
30			scan::series::SeriesScanNode,
31		},
32	},
33};
34use reifydb_runtime::actor::{
35	context::Context,
36	system::ActorConfig,
37	timers::TimerHandle,
38	traits::{Actor, Directive},
39};
40use reifydb_transaction::transaction::{Transaction, query::QueryTransaction};
41use reifydb_type::{
42	Result,
43	fragment::Fragment,
44	params::Params,
45	value::{datetime::DateTime, identity::IdentityId, r#type::Type},
46};
47use tracing::{debug, warn};
48
49use crate::{
50	actor::{SeriesMessage, batches::column_block_from_batches},
51	error::SubColumnError,
52};
53
54pub struct SeriesBucketState {
55	pub materialized_at_sequence: u64,
56}
57
58pub struct SeriesMaterializationState {
59	pub bucket_state: HashMap<(SeriesId, BucketId), SeriesBucketState>,
60	_timer_handle: Option<TimerHandle>,
61}
62
63// Per-series, per-closed-bucket materialization. Each tick opens one read
64// transaction, reads `SeriesMetadata` per series, enumerates candidate buckets
65// in `[oldest_key, newest_key]`, filters to closed buckets via `is_closed`,
66// materializes those whose `sequence_counter` has advanced since last
67// materialization (late-arrival signal), and overwrites the registry entry
68// atomically under the shared `(series_id, bucket)` key.
69pub struct SeriesMaterializationActor {
70	engine: StandardEngine,
71	registry: SnapshotRegistry,
72	compressor: Compressor,
73	tick_interval: Duration,
74	bucket_width: u64,
75	grace: Duration,
76}
77
78impl SeriesMaterializationActor {
79	pub fn new(
80		engine: StandardEngine,
81		registry: SnapshotRegistry,
82		compressor: Compressor,
83		tick_interval: Duration,
84		bucket_width: u64,
85		grace: Duration,
86	) -> Self {
87		Self {
88			engine,
89			registry,
90			compressor,
91			tick_interval,
92			bucket_width,
93			grace,
94		}
95	}
96
97	pub fn registry(&self) -> &SnapshotRegistry {
98		&self.registry
99	}
100
101	fn run_tick(&self, state: &mut SeriesMaterializationState, _now: DateTime) {
102		let Some(mut query_txn) = self.begin_query_or_warn() else {
103			return;
104		};
105		let catalog = self.engine.catalog();
106		let now_wall = UNIX_EPOCH + Duration::from_nanos(self.engine.clock().now_nanos());
107		for series in catalog.materialized.list_series() {
108			self.materialize_series_buckets(state, &mut query_txn, &catalog, &series, now_wall);
109		}
110	}
111
112	#[inline]
113	fn begin_query_or_warn(&self) -> Option<QueryTransaction> {
114		match self.engine.begin_query(IdentityId::system()) {
115			Ok(t) => Some(t),
116			Err(e) => {
117				warn!("series materialization: begin_query failed: {e}");
118				None
119			}
120		}
121	}
122
123	fn materialize_series_buckets(
124		&self,
125		state: &mut SeriesMaterializationState,
126		query_txn: &mut QueryTransaction,
127		catalog: &Catalog,
128		series: &Series,
129		now_wall: SystemTime,
130	) {
131		let Some(metadata) = self.load_series_metadata_or_warn(query_txn, catalog, series) else {
132			return;
133		};
134		if metadata.row_count == 0 {
135			return;
136		}
137		let first = bucket_for(metadata.oldest_key, self.bucket_width);
138		let last = bucket_for(metadata.newest_key, self.bucket_width);
139		let mut start = first.start;
140		while start <= last.start {
141			let bucket = Bucket {
142				start,
143				end: start + self.bucket_width,
144				width: self.bucket_width,
145			};
146			start = start.saturating_add(self.bucket_width);
147			self.maybe_materialize_bucket(state, query_txn, series, &metadata, &bucket, now_wall);
148		}
149	}
150
151	#[inline]
152	fn load_series_metadata_or_warn(
153		&self,
154		query_txn: &mut QueryTransaction,
155		catalog: &Catalog,
156		series: &Series,
157	) -> Option<SeriesMetadata> {
158		let mut tx: Transaction<'_> = query_txn.into();
159		match catalog.find_series_metadata(&mut tx, series.id) {
160			Ok(Some(m)) => Some(m),
161			Ok(None) => None,
162			Err(e) => {
163				warn!("series materialization: find_series_metadata failed for {:?}: {e}", series.id);
164				None
165			}
166		}
167	}
168
169	fn maybe_materialize_bucket(
170		&self,
171		state: &mut SeriesMaterializationState,
172		query_txn: &mut QueryTransaction,
173		series: &Series,
174		metadata: &SeriesMetadata,
175		bucket: &Bucket,
176		now_wall: SystemTime,
177	) {
178		if !is_closed(bucket, series, metadata, now_wall, self.grace) {
179			return;
180		}
181		let key = (series.id, bucket.id());
182		let need_remat = match state.bucket_state.get(&key) {
183			None => true,
184			Some(s) => s.materialized_at_sequence < metadata.sequence_counter,
185		};
186		if !need_remat {
187			return;
188		}
189		match self.materialize_bucket(query_txn, series, metadata, bucket) {
190			Ok(()) => {
191				state.bucket_state.insert(
192					key,
193					SeriesBucketState {
194						materialized_at_sequence: metadata.sequence_counter,
195					},
196				);
197			}
198			Err(e) => {
199				warn!(
200					"series materialization skipped for {:?} bucket {:?}: {e}",
201					series.id,
202					bucket.id()
203				);
204			}
205		}
206	}
207
208	fn materialize_bucket(
209		&self,
210		query_txn: &mut QueryTransaction,
211		series: &Series,
212		metadata: &SeriesMetadata,
213		bucket: &Bucket,
214	) -> Result<()> {
215		let services = self.engine.services();
216		let catalog = self.engine.catalog();
217
218		let namespace_def = catalog
219			.materialized
220			.find_namespace(series.namespace)
221			.ok_or_else(|| missing_namespace(series))?;
222		let resolved_namespace =
223			ResolvedNamespace::new(Fragment::internal(namespace_def.name()), namespace_def.clone());
224		let resolved_series = ResolvedSeries::new(
225			Fragment::internal(series.name.clone()),
226			resolved_namespace,
227			series.clone(),
228		);
229
230		let context = Arc::new(QueryContext {
231			services,
232			source: None,
233			batch_size: 1024,
234			params: Params::None,
235			symbols: SymbolTable::new(),
236			identity: IdentityId::system(),
237		});
238
239		let mut scan = SeriesScanNode::new(
240			resolved_series,
241			Some(bucket.start),
242			Some(bucket.end),
243			None,
244			Arc::clone(&context),
245		)?;
246
247		let mut tx: Transaction<'_> = (&mut *query_txn).into();
248		scan.initialize(&mut tx, &context)?;
249		let mut ctx = (*context).clone();
250		let mut batches = Vec::new();
251		while let Some(batch) = scan.next(&mut tx, &mut ctx)? {
252			batches.push(batch);
253		}
254
255		let schema = scan_output_schema(series);
256		let block = column_block_from_batches(schema, batches, &self.compressor)?;
257
258		let snapshot = Snapshot {
259			id: SnapshotId::Series {
260				series_id: series.id,
261				bucket: bucket.id(),
262			},
263			source: SnapshotSource::Series {
264				series_id: series.id,
265				bucket: *bucket,
266				sequence_counter: metadata.sequence_counter,
267			},
268			namespace: namespace_def.name().to_string(),
269			name: series.name.clone(),
270			created_at: self.engine.clock().instant(),
271			block,
272		};
273		self.registry.insert(Arc::new(snapshot));
274		Ok(())
275	}
276}
277
278// The scan emits: [key column, optional "tag", then the remaining data
279// columns (excluding the key)]. Build the schema in the same order so the
280// batch-concat helper finds columns by name.
281fn scan_output_schema(series: &Series) -> Vec<(String, Type)> {
282	let key_name = series.key.column().to_string();
283	let key_ty = series
284		.columns
285		.iter()
286		.find(|c| c.name == key_name)
287		.map(|c| c.constraint.get_type())
288		.unwrap_or(Type::Uint8);
289
290	let mut schema = Vec::with_capacity(series.columns.len() + 1);
291	schema.push((key_name.clone(), key_ty));
292	if series.tag.is_some() {
293		schema.push(("tag".to_string(), Type::Uint1));
294	}
295	for col in series.data_columns() {
296		schema.push((col.name.clone(), col.constraint.get_type()));
297	}
298	schema
299}
300
301fn missing_namespace(series: &Series) -> SubColumnError {
302	SubColumnError::NamespaceMissing {
303		namespace: series.namespace,
304		series: series.id,
305	}
306}
307
308impl Actor for SeriesMaterializationActor {
309	type State = SeriesMaterializationState;
310	type Message = SeriesMessage;
311
312	fn init(&self, ctx: &Context<SeriesMessage>) -> SeriesMaterializationState {
313		debug!(
314			"SeriesMaterializationActor started (tick={:?}, width={}, grace={:?})",
315			self.tick_interval, self.bucket_width, self.grace
316		);
317		let handle =
318			ctx.schedule_tick(self.tick_interval, |nanos| SeriesMessage::Tick(DateTime::from_nanos(nanos)));
319		SeriesMaterializationState {
320			bucket_state: HashMap::new(),
321			_timer_handle: Some(handle),
322		}
323	}
324
325	fn handle(&self, state: &mut Self::State, msg: Self::Message, ctx: &Context<Self::Message>) -> Directive {
326		if ctx.is_cancelled() {
327			return Directive::Stop;
328		}
329		match msg {
330			SeriesMessage::Tick(now) => self.run_tick(state, now),
331			SeriesMessage::Shutdown => {
332				debug!("SeriesMaterializationActor shutting down");
333				return Directive::Stop;
334			}
335		}
336		Directive::Continue
337	}
338
339	fn post_stop(&self) {
340		debug!("SeriesMaterializationActor stopped");
341	}
342
343	fn config(&self) -> ActorConfig {
344		ActorConfig::new().mailbox_capacity(64)
345	}
346}