Skip to main content

reifydb_sub_column/actor/
series.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use std::{
5	collections::HashMap,
6	sync::Arc,
7	time::{Duration, SystemTime, UNIX_EPOCH},
8};
9
10use reifydb_catalog::catalog::Catalog;
11use reifydb_column::{
12	bucket::{Bucket, BucketId, bucket_for, is_closed},
13	compress::Compressor,
14	registry::SnapshotRegistry,
15	snapshot::{Snapshot, SnapshotId, SnapshotSource},
16};
17use reifydb_core::interface::{
18	catalog::{
19		id::SeriesId,
20		series::{Series, SeriesMetadata},
21	},
22	resolved::{ResolvedNamespace, ResolvedSeries},
23};
24use reifydb_engine::{
25	engine::StandardEngine,
26	vm::{
27		stack::SymbolTable,
28		volcano::{
29			query::{QueryContext, QueryNode},
30			scan::series::SeriesScanNode,
31		},
32	},
33};
34use reifydb_runtime::actor::{
35	context::Context,
36	system::ActorConfig,
37	timers::TimerHandle,
38	traits::{Actor, Directive},
39};
40use reifydb_transaction::transaction::{Transaction, query::QueryTransaction};
41use reifydb_type::{
42	Result,
43	fragment::Fragment,
44	params::Params,
45	value::{datetime::DateTime, identity::IdentityId, r#type::Type},
46};
47use tracing::{debug, warn};
48
49use crate::{
50	actor::{SeriesMessage, batches::column_block_from_batches},
51	error::SubColumnError,
52};
53
54pub struct SeriesBucketState {
55	pub materialized_at_sequence: u64,
56}
57
58pub struct SeriesMaterializationState {
59	pub bucket_state: HashMap<(SeriesId, BucketId), SeriesBucketState>,
60	_timer_handle: Option<TimerHandle>,
61}
62
63pub struct SeriesMaterializationActor {
64	engine: StandardEngine,
65	registry: SnapshotRegistry,
66	compressor: Compressor,
67	tick_interval: Duration,
68	bucket_width: u64,
69	grace: Duration,
70}
71
72impl SeriesMaterializationActor {
73	pub fn new(
74		engine: StandardEngine,
75		registry: SnapshotRegistry,
76		compressor: Compressor,
77		tick_interval: Duration,
78		bucket_width: u64,
79		grace: Duration,
80	) -> Self {
81		Self {
82			engine,
83			registry,
84			compressor,
85			tick_interval,
86			bucket_width,
87			grace,
88		}
89	}
90
91	pub fn registry(&self) -> &SnapshotRegistry {
92		&self.registry
93	}
94
95	fn run_tick(&self, state: &mut SeriesMaterializationState, _now: DateTime) {
96		let Some(mut query_txn) = self.begin_query_or_warn() else {
97			return;
98		};
99		let catalog = self.engine.catalog();
100		let now_wall = UNIX_EPOCH + Duration::from_nanos(self.engine.clock().now_nanos());
101		let series_list = match catalog.list_series_all(&mut Transaction::Query(&mut query_txn)) {
102			Ok(s) => s,
103			Err(e) => {
104				warn!("series materialization: list_series_all failed: {e}");
105				return;
106			}
107		};
108		for series in series_list {
109			self.materialize_series_buckets(state, &mut query_txn, &catalog, &series, now_wall);
110		}
111	}
112
113	#[inline]
114	fn begin_query_or_warn(&self) -> Option<QueryTransaction> {
115		match self.engine.begin_query(IdentityId::system()) {
116			Ok(t) => Some(t),
117			Err(e) => {
118				warn!("series materialization: begin_query failed: {e}");
119				None
120			}
121		}
122	}
123
124	fn materialize_series_buckets(
125		&self,
126		state: &mut SeriesMaterializationState,
127		query_txn: &mut QueryTransaction,
128		catalog: &Catalog,
129		series: &Series,
130		now_wall: SystemTime,
131	) {
132		let Some(metadata) = self.load_series_metadata_or_warn(query_txn, catalog, series) else {
133			return;
134		};
135		if metadata.row_count == 0 {
136			return;
137		}
138		let first = bucket_for(metadata.oldest_key, self.bucket_width);
139		let last = bucket_for(metadata.newest_key, self.bucket_width);
140		let mut start = first.start;
141		while start <= last.start {
142			let bucket = Bucket {
143				start,
144				end: start + self.bucket_width,
145				width: self.bucket_width,
146			};
147			start = start.saturating_add(self.bucket_width);
148			self.maybe_materialize_bucket(state, query_txn, series, &metadata, &bucket, now_wall);
149		}
150	}
151
152	#[inline]
153	fn load_series_metadata_or_warn(
154		&self,
155		query_txn: &mut QueryTransaction,
156		catalog: &Catalog,
157		series: &Series,
158	) -> Option<SeriesMetadata> {
159		let mut tx: Transaction<'_> = query_txn.into();
160		match catalog.find_series_metadata(&mut tx, series.id) {
161			Ok(Some(m)) => Some(m),
162			Ok(None) => None,
163			Err(e) => {
164				warn!("series materialization: find_series_metadata failed for {:?}: {e}", series.id);
165				None
166			}
167		}
168	}
169
170	fn maybe_materialize_bucket(
171		&self,
172		state: &mut SeriesMaterializationState,
173		query_txn: &mut QueryTransaction,
174		series: &Series,
175		metadata: &SeriesMetadata,
176		bucket: &Bucket,
177		now_wall: SystemTime,
178	) {
179		if !is_closed(bucket, series, metadata, now_wall, self.grace) {
180			return;
181		}
182		let key = (series.id, bucket.id());
183		let need_remat = match state.bucket_state.get(&key) {
184			None => true,
185			Some(s) => s.materialized_at_sequence < metadata.sequence_counter,
186		};
187		if !need_remat {
188			return;
189		}
190		match self.materialize_bucket(query_txn, series, metadata, bucket) {
191			Ok(()) => {
192				state.bucket_state.insert(
193					key,
194					SeriesBucketState {
195						materialized_at_sequence: metadata.sequence_counter,
196					},
197				);
198			}
199			Err(e) => {
200				warn!(
201					"series materialization skipped for {:?} bucket {:?}: {e}",
202					series.id,
203					bucket.id()
204				);
205			}
206		}
207	}
208
209	fn materialize_bucket(
210		&self,
211		query_txn: &mut QueryTransaction,
212		series: &Series,
213		metadata: &SeriesMetadata,
214		bucket: &Bucket,
215	) -> Result<()> {
216		let services = self.engine.services();
217		let catalog = self.engine.catalog();
218
219		let namespace_def = catalog
220			.find_namespace(&mut Transaction::Query(&mut *query_txn), series.namespace)?
221			.ok_or_else(|| missing_namespace(series))?;
222		let resolved_namespace =
223			ResolvedNamespace::new(Fragment::internal(namespace_def.name()), namespace_def.clone());
224		let resolved_series = ResolvedSeries::new(
225			Fragment::internal(series.name.clone()),
226			resolved_namespace,
227			series.clone(),
228		);
229
230		let context = Arc::new(QueryContext {
231			services,
232			source: None,
233			batch_size: 1024,
234			params: Params::None,
235			symbols: SymbolTable::new(),
236			identity: IdentityId::system(),
237		});
238
239		let mut scan = SeriesScanNode::new(
240			resolved_series,
241			Some(bucket.start),
242			Some(bucket.end),
243			None,
244			Arc::clone(&context),
245		)?;
246
247		let mut tx: Transaction<'_> = (&mut *query_txn).into();
248		scan.initialize(&mut tx, &context)?;
249		let mut ctx = (*context).clone();
250		let mut batches = Vec::new();
251		while let Some(batch) = scan.next(&mut tx, &mut ctx)? {
252			batches.push(batch);
253		}
254
255		let schema = scan_output_schema(series);
256		let block = column_block_from_batches(schema, batches, &self.compressor)?;
257
258		let snapshot = Snapshot {
259			id: SnapshotId::Series {
260				series_id: series.id,
261				bucket: bucket.id(),
262			},
263			source: SnapshotSource::Series {
264				series_id: series.id,
265				bucket: *bucket,
266				sequence_counter: metadata.sequence_counter,
267			},
268			namespace: namespace_def.name().to_string(),
269			name: series.name.clone(),
270			created_at: self.engine.clock().instant(),
271			block,
272		};
273		self.registry.insert(Arc::new(snapshot));
274		Ok(())
275	}
276}
277
278fn scan_output_schema(series: &Series) -> Vec<(String, Type)> {
279	let key_name = series.key.column().to_string();
280	let key_ty = series
281		.columns
282		.iter()
283		.find(|c| c.name == key_name)
284		.map(|c| c.constraint.get_type())
285		.unwrap_or(Type::Uint8);
286
287	let mut schema = Vec::with_capacity(series.columns.len() + 1);
288	schema.push((key_name.clone(), key_ty));
289	if series.tag.is_some() {
290		schema.push(("tag".to_string(), Type::Uint1));
291	}
292	for col in series.data_columns() {
293		schema.push((col.name.clone(), col.constraint.get_type()));
294	}
295	schema
296}
297
298fn missing_namespace(series: &Series) -> SubColumnError {
299	SubColumnError::NamespaceMissing {
300		namespace: series.namespace,
301		series: series.id,
302	}
303}
304
305impl Actor for SeriesMaterializationActor {
306	type State = SeriesMaterializationState;
307	type Message = SeriesMessage;
308
309	fn init(&self, ctx: &Context<SeriesMessage>) -> SeriesMaterializationState {
310		debug!(
311			"SeriesMaterializationActor started (tick={:?}, width={}, grace={:?})",
312			self.tick_interval, self.bucket_width, self.grace
313		);
314		let handle =
315			ctx.schedule_tick(self.tick_interval, |nanos| SeriesMessage::Tick(DateTime::from_nanos(nanos)));
316		SeriesMaterializationState {
317			bucket_state: HashMap::new(),
318			_timer_handle: Some(handle),
319		}
320	}
321
322	fn handle(&self, state: &mut Self::State, msg: Self::Message, ctx: &Context<Self::Message>) -> Directive {
323		if ctx.is_cancelled() {
324			return Directive::Stop;
325		}
326		match msg {
327			SeriesMessage::Tick(now) => self.run_tick(state, now),
328			SeriesMessage::Shutdown => {
329				debug!("SeriesMaterializationActor shutting down");
330				return Directive::Stop;
331			}
332		}
333		Directive::Continue
334	}
335
336	fn post_stop(&self) {
337		debug!("SeriesMaterializationActor stopped");
338	}
339
340	fn config(&self) -> ActorConfig {
341		ActorConfig::new().mailbox_capacity(64)
342	}
343}