Skip to main content

reifydb_sub_column/actor/
table.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use std::{collections::HashMap, sync::Arc, time::Duration};
5
6use reifydb_column::{
7	compress::Compressor,
8	registry::SnapshotRegistry,
9	snapshot::{Snapshot, SnapshotId, SnapshotSource},
10};
11use reifydb_core::{
12	common::CommitVersion,
13	interface::catalog::{id::TableId, table::Table},
14};
15use reifydb_engine::{
16	engine::StandardEngine,
17	vm::{
18		stack::SymbolTable,
19		volcano::{
20			query::{QueryContext, QueryNode},
21			scan::table::TableScanNode,
22		},
23	},
24};
25use reifydb_runtime::actor::{
26	context::Context,
27	system::ActorConfig,
28	timers::TimerHandle,
29	traits::{Actor, Directive},
30};
31use reifydb_transaction::transaction::{Transaction, query::QueryTransaction};
32use reifydb_type::{
33	Result,
34	params::Params,
35	value::{datetime::DateTime, identity::IdentityId},
36};
37use tracing::{debug, warn};
38
39use crate::actor::{TableMessage, batches::column_block_from_batches};
40
41pub struct TableMaterializationState {
42	pub last_seen: HashMap<TableId, CommitVersion>,
43	_timer_handle: Option<TimerHandle>,
44}
45
46pub struct TableMaterializationActor {
47	engine: StandardEngine,
48	registry: SnapshotRegistry,
49	compressor: Compressor,
50	tick_interval: Duration,
51}
52
53impl TableMaterializationActor {
54	pub fn new(
55		engine: StandardEngine,
56		registry: SnapshotRegistry,
57		compressor: Compressor,
58		tick_interval: Duration,
59	) -> Self {
60		Self {
61			engine,
62			registry,
63			compressor,
64			tick_interval,
65		}
66	}
67
68	pub fn registry(&self) -> &SnapshotRegistry {
69		&self.registry
70	}
71
72	fn run_tick(&self, state: &mut TableMaterializationState, _now: DateTime) {
73		let mut query_txn = match self.engine.begin_query(IdentityId::system()) {
74			Ok(t) => t,
75			Err(e) => {
76				warn!("table materialization: begin_query failed: {e}");
77				return;
78			}
79		};
80		let current = query_txn.version();
81
82		let tables = match self.engine.catalog().list_tables_all(&mut Transaction::Query(&mut query_txn)) {
83			Ok(t) => t,
84			Err(e) => {
85				warn!("table materialization: list_tables_all failed: {e}");
86				return;
87			}
88		};
89		for table in tables {
90			if state.last_seen.get(&table.id).copied() == Some(current) {
91				continue;
92			}
93			match self.materialize_table(&mut query_txn, &table, current) {
94				Ok(()) => {
95					state.last_seen.insert(table.id, current);
96				}
97				Err(e) => {
98					warn!("table materialization skipped for {:?}: {e}", table.id);
99				}
100			}
101		}
102	}
103
104	fn materialize_table(
105		&self,
106		query_txn: &mut QueryTransaction,
107		table: &Table,
108		version: CommitVersion,
109	) -> Result<()> {
110		let services = self.engine.services();
111		let catalog = self.engine.catalog();
112		let mut tx: Transaction<'_> = (&mut *query_txn).into();
113		let resolved = catalog.resolve_table(&mut tx, table.id)?;
114
115		let context = Arc::new(QueryContext {
116			services,
117			source: None,
118			batch_size: 1024,
119			params: Params::None,
120			symbols: SymbolTable::new(),
121			identity: IdentityId::system(),
122		});
123
124		let mut scan = TableScanNode::new(resolved, Arc::clone(&context), &mut tx)?;
125		scan.initialize(&mut tx, &context)?;
126		let mut ctx = (*context).clone();
127		let mut batches = Vec::new();
128		while let Some(batch) = scan.next(&mut tx, &mut ctx)? {
129			batches.push(batch);
130		}
131
132		let schema: Vec<_> = table.columns.iter().map(|c| (c.name.clone(), c.constraint.get_type())).collect();
133		let block = column_block_from_batches(schema, batches, &self.compressor)?;
134
135		let namespace = self
136			.engine
137			.catalog()
138			.find_namespace(&mut Transaction::Query(&mut *query_txn), table.namespace)
139			.ok()
140			.flatten()
141			.map(|ns| ns.name().to_string())
142			.unwrap_or_default();
143
144		let snapshot = Snapshot {
145			id: SnapshotId::Table {
146				table_id: table.id,
147				commit_version: version,
148			},
149			source: SnapshotSource::Table {
150				table_id: table.id,
151				commit_version: version,
152			},
153			namespace,
154			name: table.name.clone(),
155			created_at: self.engine.clock().instant(),
156			block,
157		};
158		self.registry.insert(Arc::new(snapshot));
159		Ok(())
160	}
161}
162
163impl Actor for TableMaterializationActor {
164	type State = TableMaterializationState;
165	type Message = TableMessage;
166
167	fn init(&self, ctx: &Context<TableMessage>) -> TableMaterializationState {
168		debug!("TableMaterializationActor started (tick interval = {:?})", self.tick_interval);
169		let handle =
170			ctx.schedule_tick(self.tick_interval, |nanos| TableMessage::Tick(DateTime::from_nanos(nanos)));
171		TableMaterializationState {
172			last_seen: HashMap::new(),
173			_timer_handle: Some(handle),
174		}
175	}
176
177	fn handle(&self, state: &mut Self::State, msg: Self::Message, ctx: &Context<Self::Message>) -> Directive {
178		if ctx.is_cancelled() {
179			return Directive::Stop;
180		}
181		match msg {
182			TableMessage::Tick(now) => self.run_tick(state, now),
183			TableMessage::Shutdown => {
184				debug!("TableMaterializationActor shutting down");
185				return Directive::Stop;
186			}
187		}
188		Directive::Continue
189	}
190
191	fn post_stop(&self) {
192		debug!("TableMaterializationActor stopped");
193	}
194
195	fn config(&self) -> ActorConfig {
196		ActorConfig::new().mailbox_capacity(64)
197	}
198}