spacetimedb/subscription/
execution_unit.rs

1use super::query::{self, Supported};
2use super::subscription::{IncrementalJoin, SupportedQuery};
3use crate::db::datastore::locking_tx_datastore::tx::TxId;
4use crate::db::relational_db::{RelationalDB, Tx};
5use crate::error::DBError;
6use crate::estimation;
7use crate::host::module_host::{DatabaseTableUpdate, DatabaseTableUpdateRelValue, UpdatesRelValue};
8use crate::messages::websocket::TableUpdate;
9use crate::util::slow::SlowQueryLogger;
10use crate::vm::{build_query, TxMode};
11use spacetimedb_client_api_messages::websocket::{
12    Compression, QueryUpdate, RowListLen as _, SingleQueryUpdate, WebsocketFormat,
13};
14use spacetimedb_lib::db::error::AuthError;
15use spacetimedb_lib::relation::DbTable;
16use spacetimedb_lib::{Identity, ProductValue};
17use spacetimedb_primitives::TableId;
18use spacetimedb_sats::u256;
19use spacetimedb_vm::eval::IterRows;
20use spacetimedb_vm::expr::{AuthAccess, NoInMemUsed, Query, QueryExpr, SourceExpr, SourceId};
21use spacetimedb_vm::rel_ops::RelOps;
22use spacetimedb_vm::relation::RelValue;
23use std::hash::Hash;
24use std::time::Duration;
25
26/// A hash for uniquely identifying query execution units,
27/// to avoid recompilation of queries that have an open subscription.
28///
29/// Currently we are using a cryptographic hash,
30/// which is most certainly overkill.
31/// However the benefits include uniqueness by definition,
32/// and a compact representation for equality comparisons.
33///
34/// It also decouples the hash from the physical plan.
35///
36/// Note that we could hash QueryExprs directly,
37/// using the standard library's hasher.
38/// However some execution units are comprised of several query plans,
39/// as is the case for incremental joins.
40/// And we want to associate a hash with the entire unit of execution,
41/// rather than an individual plan.
42#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)]
43pub struct QueryHash {
44    data: [u8; 32],
45}
46
47impl From<QueryHash> for u256 {
48    fn from(hash: QueryHash) -> Self {
49        u256::from_le_bytes(hash.data)
50    }
51}
52
53impl QueryHash {
54    /// The zero value of a QueryHash
55    pub const NONE: Self = Self { data: [0; 32] };
56
57    /// The min value of a QueryHash
58    pub const MIN: Self = Self::NONE;
59
60    /// The max value of a QueryHash
61    pub const MAX: Self = Self { data: [0xFFu8; 32] };
62
63    pub fn from_bytes(bytes: &[u8]) -> Self {
64        Self {
65            data: blake3::hash(bytes).into(),
66        }
67    }
68
69    /// Generate a hash from a query string
70    pub fn from_string(str: &str, identity: Identity, has_param: bool) -> Self {
71        if has_param {
72            return Self::from_string_and_identity(str, identity);
73        }
74        Self::from_bytes(str.as_bytes())
75    }
76
77    /// If a query is parameterized with `:sender`, we must use the value of `:sender`,
78    /// i.e. the identity of the caller, when hashing the query text,
79    /// so that two identical queries from different clients aren't hashed to the same value.
80    ///
81    /// TODO: Once we have RLS, this hash must computed after name resolution.
82    /// It can no longer be computed from the source text.
83    pub fn from_string_and_identity(str: &str, identity: Identity) -> Self {
84        let mut hasher = blake3::Hasher::new();
85        hasher.update(str.as_bytes());
86        hasher.update(&identity.to_byte_array());
87        Self {
88            data: hasher.finalize().into(),
89        }
90    }
91}
92
93#[derive(Debug)]
94enum EvalIncrPlan {
95    /// For semijoins, store several versions of the plan,
96    /// for querying all combinations of L_{inserts/deletes/committed} * R_(inserts/deletes/committed).
97    Semijoin(IncrementalJoin),
98
99    /// For single-table selects, store only one version of the plan,
100    /// which has a single source, an in-memory table, produced by [`query::query_to_mem_table`].
101    Select(QueryExpr),
102}
103
104/// An atomic unit of execution within a subscription set.
105/// Currently just a single query plan,
106/// however in the future this could be multiple query plans,
107/// such as those of an incremental join.
108#[derive(Debug)]
109pub struct ExecutionUnit {
110    hash: QueryHash,
111
112    pub(crate) sql: String,
113    /// A version of the plan optimized for `eval`,
114    /// whose source is a [`DbTable`].
115    ///
116    /// This is a direct compilation of the source query.
117    eval_plan: QueryExpr,
118    /// A version of the plan optimized for `eval_incr`,
119    /// whose source is an in-memory table, as if by [`query::to_mem_table`].
120    eval_incr_plan: EvalIncrPlan,
121}
122
123/// An ExecutionUnit is uniquely identified by its QueryHash.
124impl Eq for ExecutionUnit {}
125
126impl PartialEq for ExecutionUnit {
127    fn eq(&self, other: &Self) -> bool {
128        self.hash == other.hash
129    }
130}
131
132impl From<SupportedQuery> for ExecutionUnit {
133    // Used in tests and benches.
134    // TODO(bikeshedding): Remove this impl,
135    // in favor of more explicit calls to `ExecutionUnit::new` with `QueryHash::NONE`.
136    fn from(plan: SupportedQuery) -> Self {
137        Self::new(plan, QueryHash::NONE).unwrap()
138    }
139}
140
141impl ExecutionUnit {
142    /// Pre-compute a plan for `eval_incr` which reads from an in-memory table
143    /// rather than re-planning on every incremental update.
144    fn compile_select_eval_incr(expr: &QueryExpr) -> QueryExpr {
145        let source = &expr.source;
146        assert!(
147            source.is_db_table(),
148            "The plan passed to `compile_select_eval_incr` must read from `DbTable`s, but found in-mem table"
149        );
150        let source = SourceExpr::from_mem_table(source.head().clone(), source.table_access(), SourceId(0));
151        let query = expr.query.clone();
152        QueryExpr { source, query }
153    }
154
155    pub fn new(eval_plan: SupportedQuery, hash: QueryHash) -> Result<Self, DBError> {
156        // Pre-compile the `expr` as fully as possible, twice, for two different paths:
157        // - `eval_incr_plan`, for incremental updates from an `SourceExpr::InMemory` table.
158        // - `eval_plan`, for initial subscriptions from a `SourceExpr::DbTable`.
159
160        let eval_incr_plan = match &eval_plan {
161            SupportedQuery {
162                kind: query::Supported::Select,
163                expr,
164                ..
165            } => EvalIncrPlan::Select(Self::compile_select_eval_incr(expr)),
166            SupportedQuery {
167                kind: query::Supported::Semijoin,
168                expr,
169                ..
170            } => EvalIncrPlan::Semijoin(IncrementalJoin::new(expr)?),
171        };
172        Ok(ExecutionUnit {
173            hash,
174            sql: eval_plan.sql,
175            eval_plan: eval_plan.expr,
176            eval_incr_plan,
177        })
178    }
179
180    /// Is this a single table select or a semijoin?
181    pub fn kind(&self) -> Supported {
182        match self.eval_incr_plan {
183            EvalIncrPlan::Select(_) => Supported::Select,
184            EvalIncrPlan::Semijoin(_) => Supported::Semijoin,
185        }
186    }
187
188    /// The unique query hash for this execution unit.
189    pub fn hash(&self) -> QueryHash {
190        self.hash
191    }
192
193    fn return_db_table(&self) -> &DbTable {
194        self.eval_plan
195            .source
196            .get_db_table()
197            .expect("ExecutionUnit eval_plan should have DbTable source, but found in-mem table")
198    }
199
200    /// The table from which this query returns rows.
201    pub fn return_table(&self) -> TableId {
202        self.return_db_table().table_id
203    }
204
205    pub fn return_name(&self) -> Box<str> {
206        self.return_db_table().head.table_name.clone()
207    }
208
209    /// The table on which this query filters rows.
210    /// In the case of a single table select,
211    /// this is the same as the return table.
212    /// In the case of a semijoin,
213    /// it is the auxiliary table against which we are joining.
214    pub fn filter_table(&self) -> TableId {
215        let return_table = self.return_table();
216        self.eval_plan
217            .query
218            .first()
219            .and_then(|op| {
220                if let Query::IndexJoin(join) = op {
221                    Some(join)
222                } else {
223                    None
224                }
225            })
226            .and_then(|join| {
227                join.index_side
228                    .get_db_table()
229                    .filter(|t| t.table_id != return_table)
230                    .or_else(|| join.probe_side.source.get_db_table())
231                    .filter(|t| t.table_id != return_table)
232                    .map(|t| t.table_id)
233            })
234            .unwrap_or(return_table)
235    }
236
237    /// Evaluate this execution unit against the database using the specified format.
238    #[tracing::instrument(level = "trace", skip_all)]
239    pub fn eval<F: WebsocketFormat>(
240        &self,
241        db: &RelationalDB,
242        tx: &Tx,
243        sql: &str,
244        slow_query_threshold: Option<Duration>,
245        compression: Compression,
246    ) -> Option<TableUpdate<F>> {
247        let _slow_query = SlowQueryLogger::new(sql, slow_query_threshold, tx.ctx.workload()).log_guard();
248
249        // Build & execute the query and then encode it to a row list.
250        let tx = &tx.into();
251        let mut inserts = build_query(db, tx, &self.eval_plan, &mut NoInMemUsed);
252        let inserts = inserts.iter();
253        let (inserts, num_rows) = F::encode_list(inserts);
254
255        (!inserts.is_empty()).then(|| {
256            let deletes = F::List::default();
257            let qu = QueryUpdate { deletes, inserts };
258            let update = F::into_query_update(qu, compression);
259            TableUpdate::new(
260                self.return_table(),
261                self.return_name(),
262                SingleQueryUpdate { update, num_rows },
263            )
264        })
265    }
266
267    /// Evaluate this execution unit against the given delta tables.
268    pub fn eval_incr<'a>(
269        &'a self,
270        db: &'a RelationalDB,
271        tx: &'a TxMode<'a>,
272        sql: &'a str,
273        tables: impl 'a + Clone + Iterator<Item = &'a DatabaseTableUpdate>,
274        slow_query_threshold: Option<Duration>,
275    ) -> Option<DatabaseTableUpdateRelValue<'a>> {
276        let _slow_query = SlowQueryLogger::new(sql, slow_query_threshold, tx.ctx().workload()).log_guard();
277        let updates = match &self.eval_incr_plan {
278            EvalIncrPlan::Select(plan) => Self::eval_incr_query_expr(db, tx, tables, plan, self.return_table()),
279            EvalIncrPlan::Semijoin(plan) => plan.eval(db, tx, tables),
280        };
281
282        updates.has_updates().then(|| DatabaseTableUpdateRelValue {
283            table_id: self.return_table(),
284            table_name: self.return_name(),
285            updates,
286        })
287    }
288
289    fn eval_query_expr_against_memtable<'a>(
290        db: &'a RelationalDB,
291        tx: &'a TxMode,
292        mem_table: &'a [ProductValue],
293        eval_incr_plan: &'a QueryExpr,
294    ) -> Box<IterRows<'a>> {
295        // Provide the updates from `table`.
296        let sources = &mut Some(mem_table.iter().map(RelValue::ProjRef));
297        // Evaluate the saved plan against the new updates,
298        // returning an iterator over the selected rows.
299        build_query(db, tx, eval_incr_plan, sources)
300    }
301
302    fn eval_incr_query_expr<'a>(
303        db: &'a RelationalDB,
304        tx: &'a TxMode<'a>,
305        tables: impl Iterator<Item = &'a DatabaseTableUpdate>,
306        eval_incr_plan: &'a QueryExpr,
307        return_table: TableId,
308    ) -> UpdatesRelValue<'a> {
309        assert!(
310            eval_incr_plan.source.is_mem_table(),
311            "Expected in-mem table in `eval_incr_plan`, but found `DbTable`"
312        );
313
314        let mut deletes = Vec::new();
315        let mut inserts = Vec::new();
316        for table in tables.filter(|table| table.table_id == return_table) {
317            // Evaluate the query separately against inserts and deletes,
318            // so that we can pass each row to the query engine unaltered,
319            // without forgetting which are inserts and which are deletes.
320            // Previously, we used to add such a column `"__op_type: AlgebraicType::U8"`.
321            if !table.inserts.is_empty() {
322                inserts.extend(Self::eval_query_expr_against_memtable(db, tx, &table.inserts, eval_incr_plan).iter());
323            }
324            if !table.deletes.is_empty() {
325                deletes.extend(Self::eval_query_expr_against_memtable(db, tx, &table.deletes, eval_incr_plan).iter());
326            }
327        }
328
329        UpdatesRelValue { deletes, inserts }
330    }
331
332    /// The estimated number of rows returned by this execution unit.
333    pub fn row_estimate(&self, tx: &TxId) -> u64 {
334        estimation::num_rows(tx, &self.eval_plan)
335    }
336}
337
338impl AuthAccess for ExecutionUnit {
339    fn check_auth(&self, owner: Identity, caller: Identity) -> Result<(), AuthError> {
340        self.eval_plan.check_auth(owner, caller)
341    }
342}