Skip to main content

reifydb_sub_column/actor/
table.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use std::{collections::HashMap, sync::Arc, time::Duration};
5
6use reifydb_column::{
7	compress::Compressor,
8	registry::SnapshotRegistry,
9	snapshot::{Snapshot, SnapshotId, SnapshotSource},
10};
11use reifydb_core::{
12	common::CommitVersion,
13	interface::catalog::{id::TableId, table::Table},
14};
15use reifydb_engine::{
16	engine::StandardEngine,
17	vm::{
18		stack::SymbolTable,
19		volcano::{
20			query::{QueryContext, QueryNode},
21			scan::table::TableScanNode,
22		},
23	},
24};
25use reifydb_runtime::actor::{
26	context::Context,
27	system::ActorConfig,
28	timers::TimerHandle,
29	traits::{Actor, Directive},
30};
31use reifydb_transaction::transaction::{Transaction, query::QueryTransaction};
32use reifydb_type::{
33	Result,
34	params::Params,
35	value::{datetime::DateTime, identity::IdentityId},
36};
37use tracing::{debug, warn};
38
39use crate::actor::{TableMessage, batches::column_block_from_batches};
40
41pub struct TableMaterializationState {
42	pub last_seen: HashMap<TableId, CommitVersion>,
43	_timer_handle: Option<TimerHandle>,
44}
45
46// Periodic per-table materialization. Each tick: open a fresh read transaction,
47// walk the catalog, skip tables whose `CommitVersion` hasn't advanced, and
48// otherwise drive the engine's `TableScanNode` to collect `Columns` batches,
49// concatenate into a single-chunk `ColumnBlock`, wrap as a `Snapshot`, and
50// insert into the shared `SnapshotRegistry`.
51pub struct TableMaterializationActor {
52	engine: StandardEngine,
53	registry: SnapshotRegistry,
54	compressor: Compressor,
55	tick_interval: Duration,
56}
57
58impl TableMaterializationActor {
59	pub fn new(
60		engine: StandardEngine,
61		registry: SnapshotRegistry,
62		compressor: Compressor,
63		tick_interval: Duration,
64	) -> Self {
65		Self {
66			engine,
67			registry,
68			compressor,
69			tick_interval,
70		}
71	}
72
73	pub fn registry(&self) -> &SnapshotRegistry {
74		&self.registry
75	}
76
77	fn run_tick(&self, state: &mut TableMaterializationState, _now: DateTime) {
78		let mut query_txn = match self.engine.begin_query(IdentityId::system()) {
79			Ok(t) => t,
80			Err(e) => {
81				warn!("table materialization: begin_query failed: {e}");
82				return;
83			}
84		};
85		let current = query_txn.version();
86
87		let tables = self.engine.catalog().materialized.list_tables();
88		for table in tables {
89			if state.last_seen.get(&table.id).copied() == Some(current) {
90				continue;
91			}
92			match self.materialize_table(&mut query_txn, &table, current) {
93				Ok(()) => {
94					state.last_seen.insert(table.id, current);
95				}
96				Err(e) => {
97					warn!("table materialization skipped for {:?}: {e}", table.id);
98				}
99			}
100		}
101	}
102
103	fn materialize_table(
104		&self,
105		query_txn: &mut QueryTransaction,
106		table: &Table,
107		version: CommitVersion,
108	) -> Result<()> {
109		let services = self.engine.services();
110		let catalog = self.engine.catalog();
111		let mut tx: Transaction<'_> = (&mut *query_txn).into();
112		let resolved = catalog.resolve_table(&mut tx, table.id)?;
113
114		let context = Arc::new(QueryContext {
115			services,
116			source: None,
117			batch_size: 1024,
118			params: Params::None,
119			symbols: SymbolTable::new(),
120			identity: IdentityId::system(),
121		});
122
123		let mut scan = TableScanNode::new(resolved, Arc::clone(&context), &mut tx)?;
124		scan.initialize(&mut tx, &context)?;
125		let mut ctx = (*context).clone();
126		let mut batches = Vec::new();
127		while let Some(batch) = scan.next(&mut tx, &mut ctx)? {
128			batches.push(batch);
129		}
130
131		let schema: Vec<_> = table.columns.iter().map(|c| (c.name.clone(), c.constraint.get_type())).collect();
132		let block = column_block_from_batches(schema, batches, &self.compressor)?;
133
134		let namespace = self
135			.engine
136			.catalog()
137			.materialized
138			.find_namespace(table.namespace)
139			.map(|ns| ns.name().to_string())
140			.unwrap_or_default();
141
142		let snapshot = Snapshot {
143			id: SnapshotId::Table {
144				table_id: table.id,
145				commit_version: version,
146			},
147			source: SnapshotSource::Table {
148				table_id: table.id,
149				commit_version: version,
150			},
151			namespace,
152			name: table.name.clone(),
153			created_at: self.engine.clock().instant(),
154			block,
155		};
156		self.registry.insert(Arc::new(snapshot));
157		Ok(())
158	}
159}
160
161impl Actor for TableMaterializationActor {
162	type State = TableMaterializationState;
163	type Message = TableMessage;
164
165	fn init(&self, ctx: &Context<TableMessage>) -> TableMaterializationState {
166		debug!("TableMaterializationActor started (tick interval = {:?})", self.tick_interval);
167		let handle =
168			ctx.schedule_tick(self.tick_interval, |nanos| TableMessage::Tick(DateTime::from_nanos(nanos)));
169		TableMaterializationState {
170			last_seen: HashMap::new(),
171			_timer_handle: Some(handle),
172		}
173	}
174
175	fn handle(&self, state: &mut Self::State, msg: Self::Message, ctx: &Context<Self::Message>) -> Directive {
176		if ctx.is_cancelled() {
177			return Directive::Stop;
178		}
179		match msg {
180			TableMessage::Tick(now) => self.run_tick(state, now),
181			TableMessage::Shutdown => {
182				debug!("TableMaterializationActor shutting down");
183				return Directive::Stop;
184			}
185		}
186		Directive::Continue
187	}
188
189	fn post_stop(&self) {
190		debug!("TableMaterializationActor stopped");
191	}
192
193	fn config(&self) -> ActorConfig {
194		ActorConfig::new().mailbox_capacity(64)
195	}
196}