spacetimedb/subscription/
execution_unit.rs1use super::query::{self, Supported};
2use super::subscription::{IncrementalJoin, SupportedQuery};
3use crate::db::datastore::locking_tx_datastore::tx::TxId;
4use crate::db::relational_db::{RelationalDB, Tx};
5use crate::error::DBError;
6use crate::estimation;
7use crate::host::module_host::{DatabaseTableUpdate, DatabaseTableUpdateRelValue, UpdatesRelValue};
8use crate::messages::websocket::TableUpdate;
9use crate::util::slow::SlowQueryLogger;
10use crate::vm::{build_query, TxMode};
11use spacetimedb_client_api_messages::websocket::{
12 Compression, QueryUpdate, RowListLen as _, SingleQueryUpdate, WebsocketFormat,
13};
14use spacetimedb_lib::db::error::AuthError;
15use spacetimedb_lib::relation::DbTable;
16use spacetimedb_lib::{Identity, ProductValue};
17use spacetimedb_primitives::TableId;
18use spacetimedb_sats::u256;
19use spacetimedb_vm::eval::IterRows;
20use spacetimedb_vm::expr::{AuthAccess, NoInMemUsed, Query, QueryExpr, SourceExpr, SourceId};
21use spacetimedb_vm::rel_ops::RelOps;
22use spacetimedb_vm::relation::RelValue;
23use std::hash::Hash;
24use std::time::Duration;
25
26#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)]
43pub struct QueryHash {
44 data: [u8; 32],
45}
46
47impl From<QueryHash> for u256 {
48 fn from(hash: QueryHash) -> Self {
49 u256::from_le_bytes(hash.data)
50 }
51}
52
53impl QueryHash {
54 pub const NONE: Self = Self { data: [0; 32] };
56
57 pub const MIN: Self = Self::NONE;
59
60 pub const MAX: Self = Self { data: [0xFFu8; 32] };
62
63 pub fn from_bytes(bytes: &[u8]) -> Self {
64 Self {
65 data: blake3::hash(bytes).into(),
66 }
67 }
68
69 pub fn from_string(str: &str, identity: Identity, has_param: bool) -> Self {
71 if has_param {
72 return Self::from_string_and_identity(str, identity);
73 }
74 Self::from_bytes(str.as_bytes())
75 }
76
77 pub fn from_string_and_identity(str: &str, identity: Identity) -> Self {
84 let mut hasher = blake3::Hasher::new();
85 hasher.update(str.as_bytes());
86 hasher.update(&identity.to_byte_array());
87 Self {
88 data: hasher.finalize().into(),
89 }
90 }
91}
92
93#[derive(Debug)]
94enum EvalIncrPlan {
95 Semijoin(IncrementalJoin),
98
99 Select(QueryExpr),
102}
103
104#[derive(Debug)]
109pub struct ExecutionUnit {
110 hash: QueryHash,
111
112 pub(crate) sql: String,
113 eval_plan: QueryExpr,
118 eval_incr_plan: EvalIncrPlan,
121}
122
123impl Eq for ExecutionUnit {}
125
126impl PartialEq for ExecutionUnit {
127 fn eq(&self, other: &Self) -> bool {
128 self.hash == other.hash
129 }
130}
131
132impl From<SupportedQuery> for ExecutionUnit {
133 fn from(plan: SupportedQuery) -> Self {
137 Self::new(plan, QueryHash::NONE).unwrap()
138 }
139}
140
141impl ExecutionUnit {
142 fn compile_select_eval_incr(expr: &QueryExpr) -> QueryExpr {
145 let source = &expr.source;
146 assert!(
147 source.is_db_table(),
148 "The plan passed to `compile_select_eval_incr` must read from `DbTable`s, but found in-mem table"
149 );
150 let source = SourceExpr::from_mem_table(source.head().clone(), source.table_access(), SourceId(0));
151 let query = expr.query.clone();
152 QueryExpr { source, query }
153 }
154
155 pub fn new(eval_plan: SupportedQuery, hash: QueryHash) -> Result<Self, DBError> {
156 let eval_incr_plan = match &eval_plan {
161 SupportedQuery {
162 kind: query::Supported::Select,
163 expr,
164 ..
165 } => EvalIncrPlan::Select(Self::compile_select_eval_incr(expr)),
166 SupportedQuery {
167 kind: query::Supported::Semijoin,
168 expr,
169 ..
170 } => EvalIncrPlan::Semijoin(IncrementalJoin::new(expr)?),
171 };
172 Ok(ExecutionUnit {
173 hash,
174 sql: eval_plan.sql,
175 eval_plan: eval_plan.expr,
176 eval_incr_plan,
177 })
178 }
179
180 pub fn kind(&self) -> Supported {
182 match self.eval_incr_plan {
183 EvalIncrPlan::Select(_) => Supported::Select,
184 EvalIncrPlan::Semijoin(_) => Supported::Semijoin,
185 }
186 }
187
188 pub fn hash(&self) -> QueryHash {
190 self.hash
191 }
192
193 fn return_db_table(&self) -> &DbTable {
194 self.eval_plan
195 .source
196 .get_db_table()
197 .expect("ExecutionUnit eval_plan should have DbTable source, but found in-mem table")
198 }
199
200 pub fn return_table(&self) -> TableId {
202 self.return_db_table().table_id
203 }
204
205 pub fn return_name(&self) -> Box<str> {
206 self.return_db_table().head.table_name.clone()
207 }
208
209 pub fn filter_table(&self) -> TableId {
215 let return_table = self.return_table();
216 self.eval_plan
217 .query
218 .first()
219 .and_then(|op| {
220 if let Query::IndexJoin(join) = op {
221 Some(join)
222 } else {
223 None
224 }
225 })
226 .and_then(|join| {
227 join.index_side
228 .get_db_table()
229 .filter(|t| t.table_id != return_table)
230 .or_else(|| join.probe_side.source.get_db_table())
231 .filter(|t| t.table_id != return_table)
232 .map(|t| t.table_id)
233 })
234 .unwrap_or(return_table)
235 }
236
237 #[tracing::instrument(level = "trace", skip_all)]
239 pub fn eval<F: WebsocketFormat>(
240 &self,
241 db: &RelationalDB,
242 tx: &Tx,
243 sql: &str,
244 slow_query_threshold: Option<Duration>,
245 compression: Compression,
246 ) -> Option<TableUpdate<F>> {
247 let _slow_query = SlowQueryLogger::new(sql, slow_query_threshold, tx.ctx.workload()).log_guard();
248
249 let tx = &tx.into();
251 let mut inserts = build_query(db, tx, &self.eval_plan, &mut NoInMemUsed);
252 let inserts = inserts.iter();
253 let (inserts, num_rows) = F::encode_list(inserts);
254
255 (!inserts.is_empty()).then(|| {
256 let deletes = F::List::default();
257 let qu = QueryUpdate { deletes, inserts };
258 let update = F::into_query_update(qu, compression);
259 TableUpdate::new(
260 self.return_table(),
261 self.return_name(),
262 SingleQueryUpdate { update, num_rows },
263 )
264 })
265 }
266
267 pub fn eval_incr<'a>(
269 &'a self,
270 db: &'a RelationalDB,
271 tx: &'a TxMode<'a>,
272 sql: &'a str,
273 tables: impl 'a + Clone + Iterator<Item = &'a DatabaseTableUpdate>,
274 slow_query_threshold: Option<Duration>,
275 ) -> Option<DatabaseTableUpdateRelValue<'a>> {
276 let _slow_query = SlowQueryLogger::new(sql, slow_query_threshold, tx.ctx().workload()).log_guard();
277 let updates = match &self.eval_incr_plan {
278 EvalIncrPlan::Select(plan) => Self::eval_incr_query_expr(db, tx, tables, plan, self.return_table()),
279 EvalIncrPlan::Semijoin(plan) => plan.eval(db, tx, tables),
280 };
281
282 updates.has_updates().then(|| DatabaseTableUpdateRelValue {
283 table_id: self.return_table(),
284 table_name: self.return_name(),
285 updates,
286 })
287 }
288
289 fn eval_query_expr_against_memtable<'a>(
290 db: &'a RelationalDB,
291 tx: &'a TxMode,
292 mem_table: &'a [ProductValue],
293 eval_incr_plan: &'a QueryExpr,
294 ) -> Box<IterRows<'a>> {
295 let sources = &mut Some(mem_table.iter().map(RelValue::ProjRef));
297 build_query(db, tx, eval_incr_plan, sources)
300 }
301
302 fn eval_incr_query_expr<'a>(
303 db: &'a RelationalDB,
304 tx: &'a TxMode<'a>,
305 tables: impl Iterator<Item = &'a DatabaseTableUpdate>,
306 eval_incr_plan: &'a QueryExpr,
307 return_table: TableId,
308 ) -> UpdatesRelValue<'a> {
309 assert!(
310 eval_incr_plan.source.is_mem_table(),
311 "Expected in-mem table in `eval_incr_plan`, but found `DbTable`"
312 );
313
314 let mut deletes = Vec::new();
315 let mut inserts = Vec::new();
316 for table in tables.filter(|table| table.table_id == return_table) {
317 if !table.inserts.is_empty() {
322 inserts.extend(Self::eval_query_expr_against_memtable(db, tx, &table.inserts, eval_incr_plan).iter());
323 }
324 if !table.deletes.is_empty() {
325 deletes.extend(Self::eval_query_expr_against_memtable(db, tx, &table.deletes, eval_incr_plan).iter());
326 }
327 }
328
329 UpdatesRelValue { deletes, inserts }
330 }
331
332 pub fn row_estimate(&self, tx: &TxId) -> u64 {
334 estimation::num_rows(tx, &self.eval_plan)
335 }
336}
337
338impl AuthAccess for ExecutionUnit {
339 fn check_auth(&self, owner: Identity, caller: Identity) -> Result<(), AuthError> {
340 self.eval_plan.check_auth(owner, caller)
341 }
342}