reifydb_sub_column/actor/
table.rs1use std::{collections::HashMap, sync::Arc, time::Duration};
5
6use reifydb_column::{
7 compress::Compressor,
8 registry::SnapshotRegistry,
9 snapshot::{Snapshot, SnapshotId, SnapshotSource},
10};
11use reifydb_core::{
12 common::CommitVersion,
13 interface::catalog::{id::TableId, table::Table},
14};
15use reifydb_engine::{
16 engine::StandardEngine,
17 vm::{
18 stack::SymbolTable,
19 volcano::{
20 query::{QueryContext, QueryNode},
21 scan::table::TableScanNode,
22 },
23 },
24};
25use reifydb_runtime::actor::{
26 context::Context,
27 system::ActorConfig,
28 timers::TimerHandle,
29 traits::{Actor, Directive},
30};
31use reifydb_transaction::transaction::{Transaction, query::QueryTransaction};
32use reifydb_type::{
33 Result,
34 params::Params,
35 value::{datetime::DateTime, identity::IdentityId},
36};
37use tracing::{debug, warn};
38
39use crate::actor::{TableMessage, batches::column_block_from_batches};
40
41pub struct TableMaterializationState {
42 pub last_seen: HashMap<TableId, CommitVersion>,
43 _timer_handle: Option<TimerHandle>,
44}
45
46pub struct TableMaterializationActor {
47 engine: StandardEngine,
48 registry: SnapshotRegistry,
49 compressor: Compressor,
50 tick_interval: Duration,
51}
52
53impl TableMaterializationActor {
54 pub fn new(
55 engine: StandardEngine,
56 registry: SnapshotRegistry,
57 compressor: Compressor,
58 tick_interval: Duration,
59 ) -> Self {
60 Self {
61 engine,
62 registry,
63 compressor,
64 tick_interval,
65 }
66 }
67
68 pub fn registry(&self) -> &SnapshotRegistry {
69 &self.registry
70 }
71
72 fn run_tick(&self, state: &mut TableMaterializationState, _now: DateTime) {
73 let mut query_txn = match self.engine.begin_query(IdentityId::system()) {
74 Ok(t) => t,
75 Err(e) => {
76 warn!("table materialization: begin_query failed: {e}");
77 return;
78 }
79 };
80 let current = query_txn.version();
81
82 let tables = match self.engine.catalog().list_tables_all(&mut Transaction::Query(&mut query_txn)) {
83 Ok(t) => t,
84 Err(e) => {
85 warn!("table materialization: list_tables_all failed: {e}");
86 return;
87 }
88 };
89 for table in tables {
90 if state.last_seen.get(&table.id).copied() == Some(current) {
91 continue;
92 }
93 match self.materialize_table(&mut query_txn, &table, current) {
94 Ok(()) => {
95 state.last_seen.insert(table.id, current);
96 }
97 Err(e) => {
98 warn!("table materialization skipped for {:?}: {e}", table.id);
99 }
100 }
101 }
102 }
103
104 fn materialize_table(
105 &self,
106 query_txn: &mut QueryTransaction,
107 table: &Table,
108 version: CommitVersion,
109 ) -> Result<()> {
110 let services = self.engine.services();
111 let catalog = self.engine.catalog();
112 let mut tx: Transaction<'_> = (&mut *query_txn).into();
113 let resolved = catalog.resolve_table(&mut tx, table.id)?;
114
115 let context = Arc::new(QueryContext {
116 services,
117 source: None,
118 batch_size: 1024,
119 params: Params::None,
120 symbols: SymbolTable::new(),
121 identity: IdentityId::system(),
122 });
123
124 let mut scan = TableScanNode::new(resolved, Arc::clone(&context), &mut tx)?;
125 scan.initialize(&mut tx, &context)?;
126 let mut ctx = (*context).clone();
127 let mut batches = Vec::new();
128 while let Some(batch) = scan.next(&mut tx, &mut ctx)? {
129 batches.push(batch);
130 }
131
132 let schema: Vec<_> = table.columns.iter().map(|c| (c.name.clone(), c.constraint.get_type())).collect();
133 let block = column_block_from_batches(schema, batches, &self.compressor)?;
134
135 let namespace = self
136 .engine
137 .catalog()
138 .find_namespace(&mut Transaction::Query(&mut *query_txn), table.namespace)
139 .ok()
140 .flatten()
141 .map(|ns| ns.name().to_string())
142 .unwrap_or_default();
143
144 let snapshot = Snapshot {
145 id: SnapshotId::Table {
146 table_id: table.id,
147 commit_version: version,
148 },
149 source: SnapshotSource::Table {
150 table_id: table.id,
151 commit_version: version,
152 },
153 namespace,
154 name: table.name.clone(),
155 created_at: self.engine.clock().instant(),
156 block,
157 };
158 self.registry.insert(Arc::new(snapshot));
159 Ok(())
160 }
161}
162
163impl Actor for TableMaterializationActor {
164 type State = TableMaterializationState;
165 type Message = TableMessage;
166
167 fn init(&self, ctx: &Context<TableMessage>) -> TableMaterializationState {
168 debug!("TableMaterializationActor started (tick interval = {:?})", self.tick_interval);
169 let handle =
170 ctx.schedule_tick(self.tick_interval, |nanos| TableMessage::Tick(DateTime::from_nanos(nanos)));
171 TableMaterializationState {
172 last_seen: HashMap::new(),
173 _timer_handle: Some(handle),
174 }
175 }
176
177 fn handle(&self, state: &mut Self::State, msg: Self::Message, ctx: &Context<Self::Message>) -> Directive {
178 if ctx.is_cancelled() {
179 return Directive::Stop;
180 }
181 match msg {
182 TableMessage::Tick(now) => self.run_tick(state, now),
183 TableMessage::Shutdown => {
184 debug!("TableMaterializationActor shutting down");
185 return Directive::Stop;
186 }
187 }
188 Directive::Continue
189 }
190
191 fn post_stop(&self) {
192 debug!("TableMaterializationActor stopped");
193 }
194
195 fn config(&self) -> ActorConfig {
196 ActorConfig::new().mailbox_capacity(64)
197 }
198}