reifydb_sub_column/actor/
table.rs1use std::{collections::HashMap, sync::Arc, time::Duration};
5
6use reifydb_column::{
7 compress::Compressor,
8 registry::SnapshotRegistry,
9 snapshot::{Snapshot, SnapshotId, SnapshotSource},
10};
11use reifydb_core::{
12 common::CommitVersion,
13 interface::catalog::{id::TableId, table::Table},
14};
15use reifydb_engine::{
16 engine::StandardEngine,
17 vm::{
18 stack::SymbolTable,
19 volcano::{
20 query::{QueryContext, QueryNode},
21 scan::table::TableScanNode,
22 },
23 },
24};
25use reifydb_runtime::actor::{
26 context::Context,
27 system::ActorConfig,
28 timers::TimerHandle,
29 traits::{Actor, Directive},
30};
31use reifydb_transaction::transaction::{Transaction, query::QueryTransaction};
32use reifydb_type::{
33 Result,
34 params::Params,
35 value::{datetime::DateTime, identity::IdentityId},
36};
37use tracing::{debug, warn};
38
39use crate::actor::{TableMessage, batches::column_block_from_batches};
40
41pub struct TableMaterializationState {
42 pub last_seen: HashMap<TableId, CommitVersion>,
43 _timer_handle: Option<TimerHandle>,
44}
45
46pub struct TableMaterializationActor {
52 engine: StandardEngine,
53 registry: SnapshotRegistry,
54 compressor: Compressor,
55 tick_interval: Duration,
56}
57
58impl TableMaterializationActor {
59 pub fn new(
60 engine: StandardEngine,
61 registry: SnapshotRegistry,
62 compressor: Compressor,
63 tick_interval: Duration,
64 ) -> Self {
65 Self {
66 engine,
67 registry,
68 compressor,
69 tick_interval,
70 }
71 }
72
73 pub fn registry(&self) -> &SnapshotRegistry {
74 &self.registry
75 }
76
77 fn run_tick(&self, state: &mut TableMaterializationState, _now: DateTime) {
78 let mut query_txn = match self.engine.begin_query(IdentityId::system()) {
79 Ok(t) => t,
80 Err(e) => {
81 warn!("table materialization: begin_query failed: {e}");
82 return;
83 }
84 };
85 let current = query_txn.version();
86
87 let tables = self.engine.catalog().materialized.list_tables();
88 for table in tables {
89 if state.last_seen.get(&table.id).copied() == Some(current) {
90 continue;
91 }
92 match self.materialize_table(&mut query_txn, &table, current) {
93 Ok(()) => {
94 state.last_seen.insert(table.id, current);
95 }
96 Err(e) => {
97 warn!("table materialization skipped for {:?}: {e}", table.id);
98 }
99 }
100 }
101 }
102
103 fn materialize_table(
104 &self,
105 query_txn: &mut QueryTransaction,
106 table: &Table,
107 version: CommitVersion,
108 ) -> Result<()> {
109 let services = self.engine.services();
110 let catalog = self.engine.catalog();
111 let mut tx: Transaction<'_> = (&mut *query_txn).into();
112 let resolved = catalog.resolve_table(&mut tx, table.id)?;
113
114 let context = Arc::new(QueryContext {
115 services,
116 source: None,
117 batch_size: 1024,
118 params: Params::None,
119 symbols: SymbolTable::new(),
120 identity: IdentityId::system(),
121 });
122
123 let mut scan = TableScanNode::new(resolved, Arc::clone(&context), &mut tx)?;
124 scan.initialize(&mut tx, &context)?;
125 let mut ctx = (*context).clone();
126 let mut batches = Vec::new();
127 while let Some(batch) = scan.next(&mut tx, &mut ctx)? {
128 batches.push(batch);
129 }
130
131 let schema: Vec<_> = table.columns.iter().map(|c| (c.name.clone(), c.constraint.get_type())).collect();
132 let block = column_block_from_batches(schema, batches, &self.compressor)?;
133
134 let namespace = self
135 .engine
136 .catalog()
137 .materialized
138 .find_namespace(table.namespace)
139 .map(|ns| ns.name().to_string())
140 .unwrap_or_default();
141
142 let snapshot = Snapshot {
143 id: SnapshotId::Table {
144 table_id: table.id,
145 commit_version: version,
146 },
147 source: SnapshotSource::Table {
148 table_id: table.id,
149 commit_version: version,
150 },
151 namespace,
152 name: table.name.clone(),
153 created_at: self.engine.clock().instant(),
154 block,
155 };
156 self.registry.insert(Arc::new(snapshot));
157 Ok(())
158 }
159}
160
161impl Actor for TableMaterializationActor {
162 type State = TableMaterializationState;
163 type Message = TableMessage;
164
165 fn init(&self, ctx: &Context<TableMessage>) -> TableMaterializationState {
166 debug!("TableMaterializationActor started (tick interval = {:?})", self.tick_interval);
167 let handle =
168 ctx.schedule_tick(self.tick_interval, |nanos| TableMessage::Tick(DateTime::from_nanos(nanos)));
169 TableMaterializationState {
170 last_seen: HashMap::new(),
171 _timer_handle: Some(handle),
172 }
173 }
174
175 fn handle(&self, state: &mut Self::State, msg: Self::Message, ctx: &Context<Self::Message>) -> Directive {
176 if ctx.is_cancelled() {
177 return Directive::Stop;
178 }
179 match msg {
180 TableMessage::Tick(now) => self.run_tick(state, now),
181 TableMessage::Shutdown => {
182 debug!("TableMaterializationActor shutting down");
183 return Directive::Stop;
184 }
185 }
186 Directive::Continue
187 }
188
189 fn post_stop(&self) {
190 debug!("TableMaterializationActor stopped");
191 }
192
193 fn config(&self) -> ActorConfig {
194 ActorConfig::new().mailbox_capacity(64)
195 }
196}