spacetimedb_client_api_messages/websocket.rs
1//! Messages sent over the SpacetimeDB WebSocket protocol.
2//!
3//! Client -> Server messages are encoded as [`ClientMessage`].
4//! Server -> Client messages are encoded as [`ServerMessage`].
5//!
6//! Any changes to this file must be paired with a change to the WebSocket protocol identifiers
7//! defined in `crates/client-api/src/routes/subscribe.rs`,
8//! and be paired with changes to all of:
9//!
10//! - The C# SDK.
11//! - The TypeScript SDK.
12//! - The SpacetimeDB website.
13//!
14//! Changes to the Rust SDK are not necessarily required, as it depends on this crate
15//! rather than using an external mirror of this schema.
16
17use crate::energy::EnergyQuanta;
18use bytes::Bytes;
19use bytestring::ByteString;
20use core::{
21 fmt::Debug,
22 ops::{Deref, Range},
23};
24use enum_as_inner::EnumAsInner;
25use smallvec::SmallVec;
26use spacetimedb_lib::{ConnectionId, Identity, TimeDuration, Timestamp};
27use spacetimedb_primitives::TableId;
28use spacetimedb_sats::{
29 de::{Deserialize, Error},
30 impl_deserialize, impl_serialize, impl_st,
31 ser::Serialize,
32 AlgebraicType, SpacetimeType,
33};
34use std::sync::Arc;
35
36pub const TEXT_PROTOCOL: &str = "v1.json.spacetimedb";
37pub const BIN_PROTOCOL: &str = "v1.bsatn.spacetimedb";
38
39pub trait RowListLen {
40 /// Returns the length, in number of rows, not bytes, of the row list.
41 fn len(&self) -> usize;
42 /// Returns whether the list is empty or not.
43 fn is_empty(&self) -> bool {
44 self.len() == 0
45 }
46}
47
48impl<T, L: Deref<Target = [T]>> RowListLen for L {
49 fn len(&self) -> usize {
50 self.deref().len()
51 }
52 fn is_empty(&self) -> bool {
53 self.deref().is_empty()
54 }
55}
56
57pub trait ByteListLen {
58 /// Returns the uncompressed size of the list in bytes
59 fn num_bytes(&self) -> usize;
60}
61
62impl ByteListLen for Vec<ByteString> {
63 fn num_bytes(&self) -> usize {
64 self.iter().map(|str| str.len()).sum()
65 }
66}
67
68/// A format / codec used by the websocket API.
69///
70/// This can be e.g., BSATN, JSON.
71pub trait WebsocketFormat: Sized {
72 /// The type used for the encoding of a single item.
73 type Single: SpacetimeType + for<'de> Deserialize<'de> + Serialize + Debug + Clone;
74
75 /// The type used for the encoding of a list of items.
76 type List: SpacetimeType
77 + for<'de> Deserialize<'de>
78 + Serialize
79 + RowListLen
80 + ByteListLen
81 + Debug
82 + Clone
83 + Default;
84
85 /// The type used to encode query updates.
86 /// This type exists so that some formats, e.g., BSATN, can compress an update.
87 type QueryUpdate: SpacetimeType + for<'de> Deserialize<'de> + Serialize + Debug + Clone + Send;
88}
89
90/// Messages sent from the client to the server.
91///
92/// Parametric over the reducer argument type to enable [`ClientMessage::map_args`].
93#[derive(SpacetimeType)]
94#[sats(crate = spacetimedb_lib)]
95pub enum ClientMessage<Args> {
96 /// Request a reducer run.
97 CallReducer(CallReducer<Args>),
98 /// Register SQL queries on which to receive updates.
99 Subscribe(Subscribe),
100 /// Send a one-off SQL query without establishing a subscription.
101 OneOffQuery(OneOffQuery),
102 /// Register a SQL query to to subscribe to updates. This does not affect other subscriptions.
103 SubscribeSingle(SubscribeSingle),
104 SubscribeMulti(SubscribeMulti),
105 /// Remove a subscription to a SQL query that was added with SubscribeSingle.
106 Unsubscribe(Unsubscribe),
107 UnsubscribeMulti(UnsubscribeMulti),
108 /// Request a procedure run.
109 CallProcedure(CallProcedure<Args>),
110}
111
112impl<Args> ClientMessage<Args> {
113 pub fn map_args<Args2>(self, f: impl FnOnce(Args) -> Args2) -> ClientMessage<Args2> {
114 match self {
115 ClientMessage::CallReducer(CallReducer {
116 reducer,
117 args,
118 request_id,
119 flags,
120 }) => ClientMessage::CallReducer(CallReducer {
121 reducer,
122 args: f(args),
123 request_id,
124 flags,
125 }),
126 ClientMessage::OneOffQuery(x) => ClientMessage::OneOffQuery(x),
127 ClientMessage::SubscribeSingle(x) => ClientMessage::SubscribeSingle(x),
128 ClientMessage::Unsubscribe(x) => ClientMessage::Unsubscribe(x),
129 ClientMessage::Subscribe(x) => ClientMessage::Subscribe(x),
130 ClientMessage::SubscribeMulti(x) => ClientMessage::SubscribeMulti(x),
131 ClientMessage::UnsubscribeMulti(x) => ClientMessage::UnsubscribeMulti(x),
132 ClientMessage::CallProcedure(CallProcedure {
133 procedure,
134 args,
135 request_id,
136 flags,
137 }) => ClientMessage::CallProcedure(CallProcedure {
138 procedure,
139 args: f(args),
140 request_id,
141 flags,
142 }),
143 }
144 }
145}
146
147/// Request a reducer run.
148///
149/// Parametric over the argument type to enable [`ClientMessage::map_args`].
150#[derive(SpacetimeType)]
151#[sats(crate = spacetimedb_lib)]
152pub struct CallReducer<Args> {
153 /// The name of the reducer to call.
154 pub reducer: Box<str>,
155 /// The arguments to the reducer.
156 ///
157 /// In the wire format, this will be a [`Bytes`], BSATN or JSON encoded according to the reducer's argument schema
158 /// and the enclosing message format.
159 pub args: Args,
160 /// An identifier for a client request.
161 ///
162 /// The server will include the same ID in the response [`TransactionUpdate`].
163 pub request_id: u32,
164 /// Assorted flags that can be passed when calling a reducer.
165 ///
166 /// Currently accepts 0 or 1 where the latter means
167 /// that the caller does not want to be notified about the reducer
168 /// without being subscribed to any relevant queries.
169 pub flags: CallReducerFlags,
170}
171
172#[derive(Clone, Copy, Default, PartialEq, Eq)]
173pub enum CallReducerFlags {
174 /// The reducer's caller does want to be notified about the reducer completing successfully
175 /// regardless of whether the caller had subscribed to a relevant query.
176 ///
177 /// Note that updates to a reducer's caller are always sent as full updates
178 /// whether subscribed to a relevant query or not.
179 /// That is, the light tx mode setting does not apply to the reducer's caller.
180 ///
181 /// This is the default flag.
182 #[default]
183 FullUpdate,
184 /// The reducer's caller does not want to be notified about the reducer completing successfully
185 /// without having subscribed to any of the relevant queries.
186 NoSuccessNotify,
187}
188
189impl_st!([] CallReducerFlags, AlgebraicType::U8);
190impl_serialize!([] CallReducerFlags, (self, ser) => ser.serialize_u8(*self as u8));
191impl_deserialize!([] CallReducerFlags, de => match de.deserialize_u8()? {
192 0 => Ok(Self::FullUpdate),
193 1 => Ok(Self::NoSuccessNotify),
194 x => Err(D::Error::custom(format_args!("invalid call reducer flag {x}"))),
195});
196
197/// An opaque id generated by the client to refer to a subscription.
198/// This is used in Unsubscribe messages and errors.
199#[derive(SpacetimeType, Copy, Clone, Debug, PartialEq, Eq, Hash)]
200#[sats(crate = spacetimedb_lib)]
201pub struct QueryId {
202 pub id: u32,
203}
204
205impl QueryId {
206 pub fn new(id: u32) -> Self {
207 Self { id }
208 }
209}
210
211/// Sent by client to database to register a set of queries, about which the client will
212/// receive `TransactionUpdate`s.
213///
214/// After issuing a `Subscribe` message, the client will receive a single
215/// `SubscriptionUpdate` message containing every current row of every table which matches
216/// the subscribed queries. Then, after each reducer run which updates one or more
217/// subscribed rows, the client will receive a `TransactionUpdate` containing the updates.
218///
219/// A `Subscribe` message sets or replaces the entire set of queries to which the client
220/// is subscribed. If the client is previously subscribed to some set of queries `A`, and
221/// then sends a `Subscribe` message to subscribe to a set `B`, afterwards, the client
222/// will be subscribed to `B` but not `A`. In this case, the client will receive a
223/// `SubscriptionUpdate` containing every existing row that matches `B`, even if some were
224/// already in `A`.
225#[derive(SpacetimeType)]
226#[sats(crate = spacetimedb_lib)]
227pub struct Subscribe {
228 /// A sequence of SQL queries.
229 pub query_strings: Box<[Box<str>]>,
230 pub request_id: u32,
231}
232
233/// Sent by client to register a subscription to single query, for which the client should receive
234/// receive relevant `TransactionUpdate`s.
235///
236/// After issuing a `SubscribeSingle` message, the client will receive a single
237/// `SubscribeApplied` message containing every current row which matches the query. Then, any
238/// time a reducer updates the query's results, the client will receive a `TransactionUpdate`
239/// containing the relevant updates.
240///
241/// If a client subscribes to queries with overlapping results, the client will receive
242/// multiple copies of rows that appear in multiple queries.
243#[derive(SpacetimeType)]
244#[sats(crate = spacetimedb_lib)]
245pub struct SubscribeSingle {
246 /// A single SQL `SELECT` query to subscribe to.
247 pub query: Box<str>,
248 /// An identifier for a client request.
249 pub request_id: u32,
250
251 /// An identifier for this subscription, which should not be used for any other subscriptions on the same connection.
252 /// This is used to refer to this subscription in Unsubscribe messages from the client and errors sent from the server.
253 /// These only have meaning given a ConnectionId.
254 pub query_id: QueryId,
255}
256
257#[derive(SpacetimeType)]
258#[sats(crate = spacetimedb_lib)]
259pub struct SubscribeMulti {
260 /// A single SQL `SELECT` query to subscribe to.
261 pub query_strings: Box<[Box<str>]>,
262 /// An identifier for a client request.
263 pub request_id: u32,
264
265 /// An identifier for this subscription, which should not be used for any other subscriptions on the same connection.
266 /// This is used to refer to this subscription in Unsubscribe messages from the client and errors sent from the server.
267 /// These only have meaning given a ConnectionId.
268 pub query_id: QueryId,
269}
270
271/// Client request for removing a query from a subscription.
272#[derive(SpacetimeType)]
273#[sats(crate = spacetimedb_lib)]
274pub struct Unsubscribe {
275 /// An identifier for a client request.
276 pub request_id: u32,
277
278 /// The ID used in the corresponding `SubscribeSingle` message.
279 pub query_id: QueryId,
280}
281
282/// Client request for removing a query from a subscription.
283#[derive(SpacetimeType)]
284#[sats(crate = spacetimedb_lib)]
285pub struct UnsubscribeMulti {
286 /// An identifier for a client request.
287 pub request_id: u32,
288
289 /// The ID used in the corresponding `SubscribeSingle` message.
290 pub query_id: QueryId,
291}
292
293/// A one-off query submission.
294///
295/// Query should be a "SELECT * FROM Table WHERE ...". Other types of queries will be rejected.
296/// Multiple such semicolon-delimited queries are allowed.
297///
298/// One-off queries are identified by a client-generated messageID.
299/// To avoid data leaks, the server will NOT cache responses to messages based on UUID!
300/// It also will not check for duplicate IDs. They are just a way to match responses to messages.
301#[derive(SpacetimeType)]
302#[sats(crate = spacetimedb_lib)]
303pub struct OneOffQuery {
304 pub message_id: Box<[u8]>,
305 pub query_string: Box<str>,
306}
307
308#[derive(SpacetimeType)]
309#[sats(crate = spacetimedb_lib)]
310/// Request a procedure run.
311///
312/// Parametric over the argument type to enable [`ClientMessage::map_args`].
313pub struct CallProcedure<Args> {
314 /// The name of the procedure to call.
315 pub procedure: Box<str>,
316 /// The arguments to the procedure.
317 ///
318 /// In the wire format, this will be a [`Bytes`], BSATN or JSON encoded according to the reducer's argument schema
319 /// and the enclosing message format.
320 pub args: Args,
321 /// An identifier for a client request.
322 ///
323 /// The server will include the same ID in the response [`ProcedureResult`].
324 pub request_id: u32,
325 /// Reserved space for future extensions.
326 pub flags: CallProcedureFlags,
327}
328
329#[derive(Clone, Copy, Default, PartialEq, Eq)]
330pub enum CallProcedureFlags {
331 #[default]
332 Default,
333}
334
335impl_st!([] CallProcedureFlags, AlgebraicType::U8);
336impl_serialize!([] CallProcedureFlags, (self, ser) => ser.serialize_u8(*self as u8));
337impl_deserialize!([] CallProcedureFlags, de => match de.deserialize_u8()? {
338 0 => Ok(Self::Default),
339 x => Err(D::Error::custom(format_args!("invalid call procedure flag {x}"))),
340});
341
342/// The tag recognized by the host and SDKs to mean no compression of a [`ServerMessage`].
343pub const SERVER_MSG_COMPRESSION_TAG_NONE: u8 = 0;
344
345/// The tag recognized by the host and SDKs to mean brotli compression of a [`ServerMessage`].
346pub const SERVER_MSG_COMPRESSION_TAG_BROTLI: u8 = 1;
347
348/// The tag recognized by the host and SDKs to mean brotli compression of a [`ServerMessage`].
349pub const SERVER_MSG_COMPRESSION_TAG_GZIP: u8 = 2;
350
351/// Messages sent from the server to the client.
352#[derive(SpacetimeType, derive_more::From)]
353#[sats(crate = spacetimedb_lib)]
354pub enum ServerMessage<F: WebsocketFormat> {
355 /// Informs of changes to subscribed rows.
356 /// This will be removed when we switch to `SubscribeSingle`.
357 InitialSubscription(InitialSubscription<F>),
358 /// Upon reducer run.
359 TransactionUpdate(TransactionUpdate<F>),
360 /// Upon reducer run, but limited to just the table updates.
361 TransactionUpdateLight(TransactionUpdateLight<F>),
362 /// After connecting, to inform client of its identity.
363 IdentityToken(IdentityToken),
364 /// Return results to a one off SQL query.
365 OneOffQueryResponse(OneOffQueryResponse<F>),
366 /// Sent in response to a `SubscribeSingle` message. This contains the initial matching rows.
367 SubscribeApplied(SubscribeApplied<F>),
368 /// Sent in response to an `Unsubscribe` message. This contains the matching rows.
369 UnsubscribeApplied(UnsubscribeApplied<F>),
370 /// Communicate an error in the subscription lifecycle.
371 SubscriptionError(SubscriptionError),
372 /// Sent in response to a `SubscribeMulti` message. This contains the initial matching rows.
373 SubscribeMultiApplied(SubscribeMultiApplied<F>),
374 /// Sent in response to an `UnsubscribeMulti` message. This contains the matching rows.
375 UnsubscribeMultiApplied(UnsubscribeMultiApplied<F>),
376 /// Sent in response to a [`CallProcedure`] message. This contains the return value.
377 ProcedureResult(ProcedureResult<F>),
378}
379
380/// The matching rows of a subscription query.
381#[derive(SpacetimeType)]
382#[sats(crate = spacetimedb_lib)]
383pub struct SubscribeRows<F: WebsocketFormat> {
384 /// The table ID of the query.
385 pub table_id: TableId,
386 /// The table name of the query.
387 pub table_name: Box<str>,
388 /// The BSATN row values.
389 pub table_rows: TableUpdate<F>,
390}
391
392/// Response to [`Subscribe`] containing the initial matching rows.
393#[derive(SpacetimeType)]
394#[sats(crate = spacetimedb_lib)]
395pub struct SubscribeApplied<F: WebsocketFormat> {
396 /// The request_id of the corresponding `SubscribeSingle` message.
397 pub request_id: u32,
398 /// The overall time between the server receiving a request and sending the response.
399 pub total_host_execution_duration_micros: u64,
400 /// An identifier for the subscribed query sent by the client.
401 pub query_id: QueryId,
402 /// The matching rows for this query.
403 pub rows: SubscribeRows<F>,
404}
405
406/// Server response to a client [`Unsubscribe`] request.
407#[derive(SpacetimeType)]
408#[sats(crate = spacetimedb_lib)]
409pub struct UnsubscribeApplied<F: WebsocketFormat> {
410 /// Provided by the client via the `Subscribe` message.
411 /// TODO: switch to subscription id?
412 pub request_id: u32,
413 /// The overall time between the server receiving a request and sending the response.
414 pub total_host_execution_duration_micros: u64,
415 /// The ID included in the `SubscribeApplied` and `Unsubscribe` messages.
416 pub query_id: QueryId,
417 /// The matching rows for this query.
418 /// Note, this makes unsubscribing potentially very expensive.
419 /// To remove this in the future, we would need to send query_ids with rows in transaction updates,
420 /// and we would need clients to track which rows exist in which queries.
421 pub rows: SubscribeRows<F>,
422}
423
424/// Server response to an error at any point of the subscription lifecycle.
425/// If this error doesn't have a request_id, the client should drop all subscriptions.
426#[derive(SpacetimeType)]
427#[sats(crate = spacetimedb_lib)]
428pub struct SubscriptionError {
429 /// The overall time between the server receiving a request and sending the response.
430 pub total_host_execution_duration_micros: u64,
431 /// Provided by the client via a [`Subscribe`] or [`Unsubscribe`] message.
432 /// [`None`] if this occurred as the result of a [`TransactionUpdate`].
433 pub request_id: Option<u32>,
434 /// Provided by the client via a [`Subscribe`] or [`Unsubscribe`] message.
435 /// [`None`] if this occurred as the result of a [`TransactionUpdate`].
436 pub query_id: Option<u32>,
437 /// The return table of the query in question.
438 /// The server is not required to set this field.
439 /// It has been added to avoid a breaking change post 1.0.
440 ///
441 /// If unset, an error results in the entire subscription being dropped.
442 /// Otherwise only queries of this table type must be dropped.
443 pub table_id: Option<TableId>,
444 /// An error message describing the failure.
445 ///
446 /// This should reference specific fragments of the query where applicable,
447 /// but should not include the full text of the query,
448 /// as the client can retrieve that from the `request_id`.
449 ///
450 /// This is intended for diagnostic purposes.
451 /// It need not have a predictable/parseable format.
452 pub error: Box<str>,
453}
454
455/// Response to [`Subscribe`] containing the initial matching rows.
456#[derive(SpacetimeType)]
457#[sats(crate = spacetimedb_lib)]
458pub struct SubscribeMultiApplied<F: WebsocketFormat> {
459 /// The request_id of the corresponding `SubscribeSingle` message.
460 pub request_id: u32,
461 /// The overall time between the server receiving a request and sending the response.
462 pub total_host_execution_duration_micros: u64,
463 /// An identifier for the subscribed query sent by the client.
464 pub query_id: QueryId,
465 /// The matching rows for this query.
466 pub update: DatabaseUpdate<F>,
467}
468
469/// Server response to a client [`Unsubscribe`] request.
470#[derive(SpacetimeType)]
471#[sats(crate = spacetimedb_lib)]
472pub struct UnsubscribeMultiApplied<F: WebsocketFormat> {
473 /// Provided by the client via the `Subscribe` message.
474 /// TODO: switch to subscription id?
475 pub request_id: u32,
476 /// The overall time between the server receiving a request and sending the response.
477 pub total_host_execution_duration_micros: u64,
478 /// The ID included in the `SubscribeApplied` and `Unsubscribe` messages.
479 pub query_id: QueryId,
480 /// The matching rows for this query set.
481 /// Note, this makes unsubscribing potentially very expensive.
482 /// To remove this in the future, we would need to send query_ids with rows in transaction updates,
483 /// and we would need clients to track which rows exist in which queries.
484 pub update: DatabaseUpdate<F>,
485}
486
487/// Response to [`Subscribe`] containing the initial matching rows.
488#[derive(SpacetimeType)]
489#[sats(crate = spacetimedb_lib)]
490pub struct SubscriptionUpdate<F: WebsocketFormat> {
491 /// A [`DatabaseUpdate`] containing only inserts, the rows which match the subscription queries.
492 pub database_update: DatabaseUpdate<F>,
493 /// An identifier sent by the client in requests.
494 /// The server will include the same request_id in the response.
495 pub request_id: u32,
496 /// The overall time between the server receiving a request and sending the response.
497 pub total_host_execution_duration_micros: u64,
498}
499
500/// Response to [`Subscribe`] containing the initial matching rows.
501#[derive(SpacetimeType)]
502#[sats(crate = spacetimedb_lib)]
503pub struct InitialSubscription<F: WebsocketFormat> {
504 /// A [`DatabaseUpdate`] containing only inserts, the rows which match the subscription queries.
505 pub database_update: DatabaseUpdate<F>,
506 /// An identifier sent by the client in requests.
507 /// The server will include the same request_id in the response.
508 pub request_id: u32,
509 /// The overall time between the server receiving a request and sending the response.
510 pub total_host_execution_duration: TimeDuration,
511}
512
513/// Received by database from client to inform of user's identity, token and client connection id.
514///
515/// The database will always send an `IdentityToken` message
516/// as the first message for a new WebSocket connection.
517/// If the client is re-connecting with existing credentials,
518/// the message will include those credentials.
519/// If the client connected anonymously,
520/// the database will generate new credentials to identify it.
521#[derive(SpacetimeType, Debug)]
522#[sats(crate = spacetimedb_lib)]
523pub struct IdentityToken {
524 pub identity: Identity,
525 pub token: Box<str>,
526 pub connection_id: ConnectionId,
527}
528
529/// Received by client from database upon a reducer run.
530///
531/// Clients receive `TransactionUpdate`s only for reducers
532/// which update at least one of their subscribed rows,
533/// or for their own `Failed` or `OutOfEnergy` reducer invocations.
534#[derive(SpacetimeType, Debug)]
535#[sats(crate = spacetimedb_lib)]
536pub struct TransactionUpdate<F: WebsocketFormat> {
537 /// The status of the transaction. Contains the updated rows, if successful.
538 pub status: UpdateStatus<F>,
539 /// The time when the reducer started.
540 ///
541 /// Note that [`Timestamp`] serializes as `i64` nanoseconds since the Unix epoch.
542 pub timestamp: Timestamp,
543 /// The identity of the user who requested the reducer run. For event-driven and
544 /// scheduled reducers, it is the identity of the database owner.
545 pub caller_identity: Identity,
546
547 /// The 16-byte [`ConnectionId`] of the user who requested the reducer run.
548 ///
549 /// The all-zeros id is a sentinel which denotes no meaningful value.
550 /// This can occur in the following situations:
551 /// - `init` and `update` reducers will have a `caller_connection_id`
552 /// if and only if one was provided to the `publish` HTTP endpoint.
553 /// - Scheduled reducers will never have a `caller_connection_id`.
554 /// - Reducers invoked by WebSocket or the HTTP API will always have a `caller_connection_id`.
555 pub caller_connection_id: ConnectionId,
556 /// The original CallReducer request that triggered this reducer.
557 pub reducer_call: ReducerCallInfo<F>,
558 /// The amount of energy credits consumed by running the reducer.
559 pub energy_quanta_used: EnergyQuanta,
560 /// How long the reducer took to run.
561 pub total_host_execution_duration: TimeDuration,
562}
563
564/// Received by client from database upon a reducer run.
565///
566/// Clients receive `TransactionUpdateLight`s only for reducers
567/// which update at least one of their subscribed rows.
568/// Failed reducers result in full [`TransactionUpdate`]s
569#[derive(SpacetimeType, Debug)]
570#[sats(crate = spacetimedb_lib)]
571pub struct TransactionUpdateLight<F: WebsocketFormat> {
572 /// An identifier for a client request
573 pub request_id: u32,
574
575 /// The reducer ran successfully and its changes were committed to the database.
576 /// The rows altered in the database/ are recorded in this `DatabaseUpdate`.
577 pub update: DatabaseUpdate<F>,
578}
579
580/// Contained in a [`TransactionUpdate`], metadata about a reducer invocation.
581#[derive(SpacetimeType, Debug)]
582#[sats(crate = spacetimedb_lib)]
583pub struct ReducerCallInfo<F: WebsocketFormat> {
584 /// The name of the reducer that was called.
585 ///
586 /// NOTE(centril, 1.0): For bandwidth resource constrained clients
587 /// this can encourage them to have poor naming of reducers like `a`.
588 /// We should consider not sending this at all and instead
589 /// having a startup message where the name <-> id bindings
590 /// are established between the host and the client.
591 pub reducer_name: Box<str>,
592 /// The numerical id of the reducer that was called.
593 pub reducer_id: u32,
594 /// The arguments to the reducer, encoded as BSATN or JSON according to the reducer's argument schema
595 /// and the client's requested protocol.
596 pub args: F::Single,
597 /// An identifier for a client request
598 pub request_id: u32,
599}
600
601/// The status of a [`TransactionUpdate`].
602#[derive(SpacetimeType, Debug)]
603#[sats(crate = spacetimedb_lib)]
604pub enum UpdateStatus<F: WebsocketFormat> {
605 /// The reducer ran successfully and its changes were committed to the database.
606 /// The rows altered in the database/ will be recorded in the `DatabaseUpdate`.
607 Committed(DatabaseUpdate<F>),
608 /// The reducer errored, and any changes it attempted to were rolled back.
609 /// This is the error message.
610 Failed(Box<str>),
611 /// The reducer was interrupted due to insufficient energy/funds,
612 /// and any changes it attempted to make were rolled back.
613 OutOfEnergy,
614}
615
616/// A collection of inserted and deleted rows, contained in a [`TransactionUpdate`] or [`SubscriptionUpdate`].
617#[derive(SpacetimeType, Debug, Clone, Default)]
618#[sats(crate = spacetimedb_lib)]
619pub struct DatabaseUpdate<F: WebsocketFormat> {
620 pub tables: Vec<TableUpdate<F>>,
621}
622
623impl<F: WebsocketFormat> DatabaseUpdate<F> {
624 pub fn is_empty(&self) -> bool {
625 self.tables.is_empty()
626 }
627
628 pub fn num_rows(&self) -> usize {
629 self.tables.iter().map(|t| t.num_rows()).sum()
630 }
631}
632
633impl<F: WebsocketFormat> FromIterator<TableUpdate<F>> for DatabaseUpdate<F> {
634 fn from_iter<T: IntoIterator<Item = TableUpdate<F>>>(iter: T) -> Self {
635 DatabaseUpdate {
636 tables: iter.into_iter().collect(),
637 }
638 }
639}
640
641/// Part of a [`DatabaseUpdate`] received by client from database for alterations to a single table.
642///
643/// NOTE(centril): in 0.12 we added `num_rows` and `table_name` to the struct.
644/// These inflate the size of messages, which for some customers is the wrong default.
645/// We might want to consider `v1.spacetimedb.bsatn.lightweight`
646#[derive(SpacetimeType, Debug, Clone)]
647#[sats(crate = spacetimedb_lib)]
648pub struct TableUpdate<F: WebsocketFormat> {
649 /// The id of the table. Clients should prefer `table_name`, as it is a stable part of a module's API,
650 /// whereas `table_id` may change between runs.
651 pub table_id: TableId,
652 /// The name of the table.
653 ///
654 /// NOTE(centril, 1.0): we might want to remove this and instead
655 /// tell clients about changes to table_name <-> table_id mappings.
656 pub table_name: Box<str>,
657 /// The sum total of rows in `self.updates`,
658 pub num_rows: u64,
659 /// The actual insert and delete updates for this table.
660 pub updates: SmallVec<[F::QueryUpdate; 1]>,
661}
662
663/// Computed update for a single query, annotated with the number of matching rows.
664#[derive(Debug)]
665pub struct SingleQueryUpdate<F: WebsocketFormat> {
666 pub update: F::QueryUpdate,
667 pub num_rows: u64,
668}
669
670impl<F: WebsocketFormat> TableUpdate<F> {
671 pub fn new(table_id: TableId, table_name: Box<str>, update: SingleQueryUpdate<F>) -> Self {
672 Self {
673 table_id,
674 table_name,
675 num_rows: update.num_rows,
676 updates: [update.update].into(),
677 }
678 }
679
680 pub fn empty(table_id: TableId, table_name: Box<str>) -> Self {
681 Self {
682 table_id,
683 table_name,
684 num_rows: 0,
685 updates: SmallVec::new(),
686 }
687 }
688
689 pub fn push(&mut self, update: SingleQueryUpdate<F>) {
690 self.updates.push(update.update);
691 self.num_rows += update.num_rows;
692 }
693
694 pub fn num_rows(&self) -> usize {
695 self.num_rows as usize
696 }
697}
698
699#[derive(SpacetimeType, Debug, Clone, EnumAsInner)]
700#[sats(crate = spacetimedb_lib)]
701pub enum CompressableQueryUpdate<F: WebsocketFormat> {
702 Uncompressed(QueryUpdate<F>),
703 Brotli(Bytes),
704 Gzip(Bytes),
705}
706
707#[derive(SpacetimeType, Debug, Clone)]
708#[sats(crate = spacetimedb_lib)]
709pub struct QueryUpdate<F: WebsocketFormat> {
710 /// When in a [`TransactionUpdate`], the matching rows of this table deleted by the transaction.
711 ///
712 /// Rows are encoded as BSATN or JSON according to the table's schema
713 /// and the client's requested protocol.
714 ///
715 /// Always empty when in an [`InitialSubscription`].
716 pub deletes: F::List,
717 /// When in a [`TransactionUpdate`], the matching rows of this table inserted by the transaction.
718 /// When in an [`InitialSubscription`], the matching rows of this table in the entire committed state.
719 ///
720 /// Rows are encoded as BSATN or JSON according to the table's schema
721 /// and the client's requested protocol.
722 pub inserts: F::List,
723}
724
725/// A response to a [`OneOffQuery`].
726/// Will contain either one error or some number of response rows.
727/// At most one of these messages will be sent in reply to any query.
728///
729/// The messageId will be identical to the one sent in the original query.
730#[derive(SpacetimeType, Debug)]
731#[sats(crate = spacetimedb_lib)]
732pub struct OneOffQueryResponse<F: WebsocketFormat> {
733 pub message_id: Box<[u8]>,
734 /// If query compilation or evaluation errored, an error message.
735 pub error: Option<Box<str>>,
736
737 /// If query compilation and evaluation succeeded, a set of resulting rows, grouped by table.
738 pub tables: Box<[OneOffTable<F>]>,
739
740 /// The total duration of query compilation and evaluation on the server, in microseconds.
741 pub total_host_execution_duration: TimeDuration,
742}
743
744/// A table included as part of a [`OneOffQueryResponse`].
745#[derive(SpacetimeType, Debug)]
746#[sats(crate = spacetimedb_lib)]
747pub struct OneOffTable<F: WebsocketFormat> {
748 /// The name of the table.
749 pub table_name: Box<str>,
750 /// The set of rows which matched the query, encoded as BSATN or JSON according to the table's schema
751 /// and the client's requested protocol.
752 ///
753 /// TODO(centril, 1.0): Evaluate whether we want to conditionally compress these.
754 pub rows: F::List,
755}
756
757/// The result of running a procedure,
758/// including the return value of the procedure on success.
759///
760/// Sent in response to a [`CallProcedure`] message.
761#[derive(SpacetimeType, Debug)]
762#[sats(crate = spacetimedb_lib)]
763pub struct ProcedureResult<F: WebsocketFormat> {
764 /// The status of the procedure run.
765 ///
766 /// Contains the return value if successful, or the error message if not.
767 pub status: ProcedureStatus<F>,
768 /// The time when the reducer started.
769 ///
770 /// Note that [`Timestamp`] serializes as `i64` nanoseconds since the Unix epoch.
771 pub timestamp: Timestamp,
772 /// The time the procedure took to run.
773 pub total_host_execution_duration: TimeDuration,
774 /// The same same client-provided identifier as in the original [`ProcedureCall`] request.
775 ///
776 /// Clients use this to correlate the response with the original request.
777 pub request_id: u32,
778}
779
780/// The status of a procedure call,
781/// including the return value on success.
782#[derive(SpacetimeType, Debug)]
783#[sats(crate = spacetimedb_lib)]
784pub enum ProcedureStatus<F: WebsocketFormat> {
785 /// The procedure ran and returned the enclosed value.
786 ///
787 /// All user error handling happens within here;
788 /// the returned value may be a `Result` or `Option`,
789 /// or any other type to which the user may ascribe arbitrary meaning.
790 Returned(F::Single),
791 /// The reducer was interrupted due to insufficient energy/funds.
792 ///
793 /// The procedure may have performed some observable side effects before being interrupted.
794 OutOfEnergy,
795 /// The call failed in the host, e.g. due to a type error or unknown procedure name.
796 InternalError(String),
797}
798
799/// Used whenever different formats need to coexist.
800#[derive(Debug, Clone)]
801pub enum FormatSwitch<B, J> {
802 Bsatn(B),
803 Json(J),
804}
805
806impl<B1, J1> FormatSwitch<B1, J1> {
807 /// Zips together two switches.
808 pub fn zip_mut<B2, J2>(&mut self, other: FormatSwitch<B2, J2>) -> FormatSwitch<(&mut B1, B2), (&mut J1, J2)> {
809 match (self, other) {
810 (FormatSwitch::Bsatn(a), FormatSwitch::Bsatn(b)) => FormatSwitch::Bsatn((a, b)),
811 (FormatSwitch::Json(a), FormatSwitch::Json(b)) => FormatSwitch::Json((a, b)),
812 _ => panic!("format should be the same for both sides of the zip"),
813 }
814 }
815}
816
817#[derive(Clone, Copy, Default, Debug, SpacetimeType)]
818#[sats(crate = spacetimedb_lib)]
819pub struct JsonFormat;
820
821impl WebsocketFormat for JsonFormat {
822 type Single = ByteString;
823 type List = Vec<ByteString>;
824 type QueryUpdate = QueryUpdate<Self>;
825}
826
827#[derive(Clone, Copy, Default, Debug, SpacetimeType)]
828#[sats(crate = spacetimedb_lib)]
829pub struct BsatnFormat;
830
831impl WebsocketFormat for BsatnFormat {
832 type Single = Box<[u8]>;
833 type List = BsatnRowList;
834 type QueryUpdate = CompressableQueryUpdate<Self>;
835}
836
837/// A specification of either a desired or decided compression algorithm.
838#[derive(serde::Deserialize, Default, PartialEq, Eq, Clone, Copy, Hash, Debug)]
839pub enum Compression {
840 /// No compression ever.
841 None,
842 /// Compress using brotli if a certain size threshold was met.
843 #[default]
844 Brotli,
845 /// Compress using gzip if a certain size threshold was met.
846 Gzip,
847}
848
849pub type RowSize = u16;
850pub type RowOffset = u64;
851
852/// A packed list of BSATN-encoded rows.
853#[derive(SpacetimeType, Debug, Clone, Default)]
854#[sats(crate = spacetimedb_lib)]
855pub struct BsatnRowList {
856 /// A size hint about `rows_data`
857 /// intended to facilitate parallel decode purposes on large initial updates.
858 size_hint: RowSizeHint,
859 /// The flattened byte array for a list of rows.
860 rows_data: Bytes,
861}
862
863impl BsatnRowList {
864 /// Returns a new row list where `rows_data` is the flattened byte array
865 /// containing the BSATN of each row, without any markers for where a row begins and end.
866 ///
867 /// The `size_hint` encodes the boundaries of each row in `rows_data`.
868 /// See [`RowSizeHint`] for more details on the encoding.
869 pub fn new(size_hint: RowSizeHint, rows_data: Bytes) -> Self {
870 Self { size_hint, rows_data }
871 }
872}
873
874/// NOTE(centril, 1.0): We might want to add a `None` variant to this
875/// where the client has to decode in a loop until `rows_data` has been exhausted.
876/// The use-case for this is clients who are bandwidth limited and where every byte counts.
877#[derive(SpacetimeType, Debug, Clone)]
878#[sats(crate = spacetimedb_lib)]
879pub enum RowSizeHint {
880 /// Each row in `rows_data` is of the same fixed size as specified here.
881 FixedSize(RowSize),
882 /// The offsets into `rows_data` defining the boundaries of each row.
883 /// Only stores the offset to the start of each row.
884 /// The ends of each row is inferred from the start of the next row, or `rows_data.len()`.
885 /// The behavior of this is identical to that of `PackedStr`.
886 RowOffsets(Arc<[RowOffset]>),
887}
888
889impl Default for RowSizeHint {
890 fn default() -> Self {
891 Self::RowOffsets([].into())
892 }
893}
894
895impl RowSizeHint {
896 fn index_to_range(&self, index: usize, data_end: usize) -> Option<Range<usize>> {
897 match self {
898 Self::FixedSize(size) => {
899 let size = *size as usize;
900 let start = index * size;
901 if start >= data_end {
902 // We've reached beyond `data_end`,
903 // so this is a row that doesn't exist, so we are beyond the count.
904 return None;
905 }
906 let end = (index + 1) * size;
907 Some(start..end)
908 }
909 Self::RowOffsets(offsets) => {
910 let offsets = offsets.as_ref();
911 let start = *offsets.get(index)? as usize;
912 // The end is either the start of the next element or the end.
913 let end = offsets.get(index + 1).map(|e| *e as usize).unwrap_or(data_end);
914 Some(start..end)
915 }
916 }
917 }
918}
919
920impl RowListLen for BsatnRowList {
921 fn len(&self) -> usize {
922 match &self.size_hint {
923 // `size != 0` is always the case for `FixedSize`.
924 RowSizeHint::FixedSize(size) => self.rows_data.as_ref().len() / *size as usize,
925 RowSizeHint::RowOffsets(offsets) => offsets.as_ref().len(),
926 }
927 }
928}
929
930impl ByteListLen for BsatnRowList {
931 /// Returns the uncompressed size of the list in bytes
932 fn num_bytes(&self) -> usize {
933 self.rows_data.as_ref().len()
934 }
935}
936
937impl BsatnRowList {
938 /// Returns the element at `index` in the list.
939 pub fn get(&self, index: usize) -> Option<Bytes> {
940 let data_end = self.rows_data.len();
941 let data_range = self.size_hint.index_to_range(index, data_end)?;
942 Some(self.rows_data.slice(data_range))
943 }
944}
945
946/// An iterator over all the elements in a [`BsatnRowList`].
947pub struct BsatnRowListIter<'a> {
948 list: &'a BsatnRowList,
949 index: usize,
950}
951
952impl<'a> IntoIterator for &'a BsatnRowList {
953 type IntoIter = BsatnRowListIter<'a>;
954 type Item = Bytes;
955 fn into_iter(self) -> Self::IntoIter {
956 BsatnRowListIter { list: self, index: 0 }
957 }
958}
959
960impl Iterator for BsatnRowListIter<'_> {
961 type Item = Bytes;
962 fn next(&mut self) -> Option<Self::Item> {
963 let index = self.index;
964 self.index += 1;
965 self.list.get(index)
966 }
967}