spacetimedb_client_api_messages/
websocket.rs

1//! Messages sent over the SpacetimeDB WebSocket protocol.
2//!
3//! Client -> Server messages are encoded as [`ClientMessage`].
4//! Server -> Client messages are encoded as [`ServerMessage`].
5//!
6//! Any changes to this file must be paired with a change to the WebSocket protocol identifiers
7//! defined in `crates/client-api/src/routes/subscribe.rs`,
8//! and be paired with changes to all of:
9//!
10//! - The C# SDK.
11//! - The TypeScript SDK.
12//! - The SpacetimeDB website.
13//!
14//! Changes to the Rust SDK are not necessarily required, as it depends on this crate
15//! rather than using an external mirror of this schema.
16
17use crate::energy::EnergyQuanta;
18use bytes::Bytes;
19use bytestring::ByteString;
20use core::{
21    fmt::Debug,
22    ops::{Deref, Range},
23};
24use enum_as_inner::EnumAsInner;
25use smallvec::SmallVec;
26use spacetimedb_lib::{ConnectionId, Identity, TimeDuration, Timestamp};
27use spacetimedb_primitives::TableId;
28use spacetimedb_sats::{
29    de::{Deserialize, Error},
30    impl_deserialize, impl_serialize, impl_st,
31    ser::Serialize,
32    AlgebraicType, SpacetimeType,
33};
34use std::sync::Arc;
35
36pub const TEXT_PROTOCOL: &str = "v1.json.spacetimedb";
37pub const BIN_PROTOCOL: &str = "v1.bsatn.spacetimedb";
38
39pub trait RowListLen {
40    /// Returns the length, in number of rows, not bytes, of the row list.
41    fn len(&self) -> usize;
42    /// Returns whether the list is empty or not.
43    fn is_empty(&self) -> bool {
44        self.len() == 0
45    }
46}
47
48impl<T, L: Deref<Target = [T]>> RowListLen for L {
49    fn len(&self) -> usize {
50        self.deref().len()
51    }
52    fn is_empty(&self) -> bool {
53        self.deref().is_empty()
54    }
55}
56
57pub trait ByteListLen {
58    /// Returns the uncompressed size of the list in bytes
59    fn num_bytes(&self) -> usize;
60}
61
62impl ByteListLen for Vec<ByteString> {
63    fn num_bytes(&self) -> usize {
64        self.iter().map(|str| str.len()).sum()
65    }
66}
67
68/// A format / codec used by the websocket API.
69///
70/// This can be e.g., BSATN, JSON.
71pub trait WebsocketFormat: Sized {
72    /// The type used for the encoding of a single item.
73    type Single: SpacetimeType + for<'de> Deserialize<'de> + Serialize + Debug + Clone;
74
75    /// The type used for the encoding of a list of items.
76    type List: SpacetimeType
77        + for<'de> Deserialize<'de>
78        + Serialize
79        + RowListLen
80        + ByteListLen
81        + Debug
82        + Clone
83        + Default;
84
85    /// The type used to encode query updates.
86    /// This type exists so that some formats, e.g., BSATN, can compress an update.
87    type QueryUpdate: SpacetimeType + for<'de> Deserialize<'de> + Serialize + Debug + Clone + Send;
88}
89
90/// Messages sent from the client to the server.
91///
92/// Parametric over the reducer argument type to enable [`ClientMessage::map_args`].
93#[derive(SpacetimeType)]
94#[sats(crate = spacetimedb_lib)]
95pub enum ClientMessage<Args> {
96    /// Request a reducer run.
97    CallReducer(CallReducer<Args>),
98    /// Register SQL queries on which to receive updates.
99    Subscribe(Subscribe),
100    /// Send a one-off SQL query without establishing a subscription.
101    OneOffQuery(OneOffQuery),
102    /// Register a SQL query to to subscribe to updates. This does not affect other subscriptions.
103    SubscribeSingle(SubscribeSingle),
104    SubscribeMulti(SubscribeMulti),
105    /// Remove a subscription to a SQL query that was added with SubscribeSingle.
106    Unsubscribe(Unsubscribe),
107    UnsubscribeMulti(UnsubscribeMulti),
108    /// Request a procedure run.
109    CallProcedure(CallProcedure<Args>),
110}
111
112impl<Args> ClientMessage<Args> {
113    pub fn map_args<Args2>(self, f: impl FnOnce(Args) -> Args2) -> ClientMessage<Args2> {
114        match self {
115            ClientMessage::CallReducer(CallReducer {
116                reducer,
117                args,
118                request_id,
119                flags,
120            }) => ClientMessage::CallReducer(CallReducer {
121                reducer,
122                args: f(args),
123                request_id,
124                flags,
125            }),
126            ClientMessage::OneOffQuery(x) => ClientMessage::OneOffQuery(x),
127            ClientMessage::SubscribeSingle(x) => ClientMessage::SubscribeSingle(x),
128            ClientMessage::Unsubscribe(x) => ClientMessage::Unsubscribe(x),
129            ClientMessage::Subscribe(x) => ClientMessage::Subscribe(x),
130            ClientMessage::SubscribeMulti(x) => ClientMessage::SubscribeMulti(x),
131            ClientMessage::UnsubscribeMulti(x) => ClientMessage::UnsubscribeMulti(x),
132            ClientMessage::CallProcedure(CallProcedure {
133                procedure,
134                args,
135                request_id,
136                flags,
137            }) => ClientMessage::CallProcedure(CallProcedure {
138                procedure,
139                args: f(args),
140                request_id,
141                flags,
142            }),
143        }
144    }
145}
146
147/// Request a reducer run.
148///
149/// Parametric over the argument type to enable [`ClientMessage::map_args`].
150#[derive(SpacetimeType)]
151#[sats(crate = spacetimedb_lib)]
152pub struct CallReducer<Args> {
153    /// The name of the reducer to call.
154    pub reducer: Box<str>,
155    /// The arguments to the reducer.
156    ///
157    /// In the wire format, this will be a [`Bytes`], BSATN or JSON encoded according to the reducer's argument schema
158    /// and the enclosing message format.
159    pub args: Args,
160    /// An identifier for a client request.
161    ///
162    /// The server will include the same ID in the response [`TransactionUpdate`].
163    pub request_id: u32,
164    /// Assorted flags that can be passed when calling a reducer.
165    ///
166    /// Currently accepts 0 or 1 where the latter means
167    /// that the caller does not want to be notified about the reducer
168    /// without being subscribed to any relevant queries.
169    pub flags: CallReducerFlags,
170}
171
172#[derive(Clone, Copy, Default, PartialEq, Eq)]
173pub enum CallReducerFlags {
174    /// The reducer's caller does want to be notified about the reducer completing successfully
175    /// regardless of whether the caller had subscribed to a relevant query.
176    ///
177    /// Note that updates to a reducer's caller are always sent as full updates
178    /// whether subscribed to a relevant query or not.
179    /// That is, the light tx mode setting does not apply to the reducer's caller.
180    ///
181    /// This is the default flag.
182    #[default]
183    FullUpdate,
184    /// The reducer's caller does not want to be notified about the reducer completing successfully
185    /// without having subscribed to any of the relevant queries.
186    NoSuccessNotify,
187}
188
189impl_st!([] CallReducerFlags, AlgebraicType::U8);
190impl_serialize!([] CallReducerFlags, (self, ser) => ser.serialize_u8(*self as u8));
191impl_deserialize!([] CallReducerFlags, de => match de.deserialize_u8()? {
192    0 => Ok(Self::FullUpdate),
193    1 => Ok(Self::NoSuccessNotify),
194    x => Err(D::Error::custom(format_args!("invalid call reducer flag {x}"))),
195});
196
197/// An opaque id generated by the client to refer to a subscription.
198/// This is used in Unsubscribe messages and errors.
199#[derive(SpacetimeType, Copy, Clone, Debug, PartialEq, Eq, Hash)]
200#[sats(crate = spacetimedb_lib)]
201pub struct QueryId {
202    pub id: u32,
203}
204
205impl QueryId {
206    pub fn new(id: u32) -> Self {
207        Self { id }
208    }
209}
210
211/// Sent by client to database to register a set of queries, about which the client will
212/// receive `TransactionUpdate`s.
213///
214/// After issuing a `Subscribe` message, the client will receive a single
215/// `SubscriptionUpdate` message containing every current row of every table which matches
216/// the subscribed queries. Then, after each reducer run which updates one or more
217/// subscribed rows, the client will receive a `TransactionUpdate` containing the updates.
218///
219/// A `Subscribe` message sets or replaces the entire set of queries to which the client
220/// is subscribed. If the client is previously subscribed to some set of queries `A`, and
221/// then sends a `Subscribe` message to subscribe to a set `B`, afterwards, the client
222/// will be subscribed to `B` but not `A`. In this case, the client will receive a
223/// `SubscriptionUpdate` containing every existing row that matches `B`, even if some were
224/// already in `A`.
225#[derive(SpacetimeType)]
226#[sats(crate = spacetimedb_lib)]
227pub struct Subscribe {
228    /// A sequence of SQL queries.
229    pub query_strings: Box<[Box<str>]>,
230    pub request_id: u32,
231}
232
233/// Sent by client to register a subscription to single query, for which the client should receive
234/// receive relevant `TransactionUpdate`s.
235///
236/// After issuing a `SubscribeSingle` message, the client will receive a single
237/// `SubscribeApplied` message containing every current row which matches the query. Then, any
238/// time a reducer updates the query's results, the client will receive a `TransactionUpdate`
239/// containing the relevant updates.
240///
241/// If a client subscribes to queries with overlapping results, the client will receive
242/// multiple copies of rows that appear in multiple queries.
243#[derive(SpacetimeType)]
244#[sats(crate = spacetimedb_lib)]
245pub struct SubscribeSingle {
246    /// A single SQL `SELECT` query to subscribe to.
247    pub query: Box<str>,
248    /// An identifier for a client request.
249    pub request_id: u32,
250
251    /// An identifier for this subscription, which should not be used for any other subscriptions on the same connection.
252    /// This is used to refer to this subscription in Unsubscribe messages from the client and errors sent from the server.
253    /// These only have meaning given a ConnectionId.
254    pub query_id: QueryId,
255}
256
257#[derive(SpacetimeType)]
258#[sats(crate = spacetimedb_lib)]
259pub struct SubscribeMulti {
260    /// A single SQL `SELECT` query to subscribe to.
261    pub query_strings: Box<[Box<str>]>,
262    /// An identifier for a client request.
263    pub request_id: u32,
264
265    /// An identifier for this subscription, which should not be used for any other subscriptions on the same connection.
266    /// This is used to refer to this subscription in Unsubscribe messages from the client and errors sent from the server.
267    /// These only have meaning given a ConnectionId.
268    pub query_id: QueryId,
269}
270
271/// Client request for removing a query from a subscription.
272#[derive(SpacetimeType)]
273#[sats(crate = spacetimedb_lib)]
274pub struct Unsubscribe {
275    /// An identifier for a client request.
276    pub request_id: u32,
277
278    /// The ID used in the corresponding `SubscribeSingle` message.
279    pub query_id: QueryId,
280}
281
282/// Client request for removing a query from a subscription.
283#[derive(SpacetimeType)]
284#[sats(crate = spacetimedb_lib)]
285pub struct UnsubscribeMulti {
286    /// An identifier for a client request.
287    pub request_id: u32,
288
289    /// The ID used in the corresponding `SubscribeSingle` message.
290    pub query_id: QueryId,
291}
292
293/// A one-off query submission.
294///
295/// Query should be a "SELECT * FROM Table WHERE ...". Other types of queries will be rejected.
296/// Multiple such semicolon-delimited queries are allowed.
297///
298/// One-off queries are identified by a client-generated messageID.
299/// To avoid data leaks, the server will NOT cache responses to messages based on UUID!
300/// It also will not check for duplicate IDs. They are just a way to match responses to messages.
301#[derive(SpacetimeType)]
302#[sats(crate = spacetimedb_lib)]
303pub struct OneOffQuery {
304    pub message_id: Box<[u8]>,
305    pub query_string: Box<str>,
306}
307
308#[derive(SpacetimeType)]
309#[sats(crate = spacetimedb_lib)]
310/// Request a procedure run.
311///
312/// Parametric over the argument type to enable [`ClientMessage::map_args`].
313pub struct CallProcedure<Args> {
314    /// The name of the procedure to call.
315    pub procedure: Box<str>,
316    /// The arguments to the procedure.
317    ///
318    /// In the wire format, this will be a [`Bytes`], BSATN or JSON encoded according to the reducer's argument schema
319    /// and the enclosing message format.
320    pub args: Args,
321    /// An identifier for a client request.
322    ///
323    /// The server will include the same ID in the response [`ProcedureResult`].
324    pub request_id: u32,
325    /// Reserved space for future extensions.
326    pub flags: CallProcedureFlags,
327}
328
329#[derive(Clone, Copy, Default, PartialEq, Eq)]
330pub enum CallProcedureFlags {
331    #[default]
332    Default,
333}
334
335impl_st!([] CallProcedureFlags, AlgebraicType::U8);
336impl_serialize!([] CallProcedureFlags, (self, ser) => ser.serialize_u8(*self as u8));
337impl_deserialize!([] CallProcedureFlags, de => match de.deserialize_u8()? {
338    0 => Ok(Self::Default),
339    x => Err(D::Error::custom(format_args!("invalid call procedure flag {x}"))),
340});
341
342/// The tag recognized by the host and SDKs to mean no compression of a [`ServerMessage`].
343pub const SERVER_MSG_COMPRESSION_TAG_NONE: u8 = 0;
344
345/// The tag recognized by the host and SDKs to mean brotli compression  of a [`ServerMessage`].
346pub const SERVER_MSG_COMPRESSION_TAG_BROTLI: u8 = 1;
347
348/// The tag recognized by the host and SDKs to mean brotli compression  of a [`ServerMessage`].
349pub const SERVER_MSG_COMPRESSION_TAG_GZIP: u8 = 2;
350
351/// Messages sent from the server to the client.
352#[derive(SpacetimeType, derive_more::From)]
353#[sats(crate = spacetimedb_lib)]
354pub enum ServerMessage<F: WebsocketFormat> {
355    /// Informs of changes to subscribed rows.
356    /// This will be removed when we switch to `SubscribeSingle`.
357    InitialSubscription(InitialSubscription<F>),
358    /// Upon reducer run.
359    TransactionUpdate(TransactionUpdate<F>),
360    /// Upon reducer run, but limited to just the table updates.
361    TransactionUpdateLight(TransactionUpdateLight<F>),
362    /// After connecting, to inform client of its identity.
363    IdentityToken(IdentityToken),
364    /// Return results to a one off SQL query.
365    OneOffQueryResponse(OneOffQueryResponse<F>),
366    /// Sent in response to a `SubscribeSingle` message. This contains the initial matching rows.
367    SubscribeApplied(SubscribeApplied<F>),
368    /// Sent in response to an `Unsubscribe` message. This contains the matching rows.
369    UnsubscribeApplied(UnsubscribeApplied<F>),
370    /// Communicate an error in the subscription lifecycle.
371    SubscriptionError(SubscriptionError),
372    /// Sent in response to a `SubscribeMulti` message. This contains the initial matching rows.
373    SubscribeMultiApplied(SubscribeMultiApplied<F>),
374    /// Sent in response to an `UnsubscribeMulti` message. This contains the matching rows.
375    UnsubscribeMultiApplied(UnsubscribeMultiApplied<F>),
376    /// Sent in response to a [`CallProcedure`] message. This contains the return value.
377    ProcedureResult(ProcedureResult<F>),
378}
379
380/// The matching rows of a subscription query.
381#[derive(SpacetimeType)]
382#[sats(crate = spacetimedb_lib)]
383pub struct SubscribeRows<F: WebsocketFormat> {
384    /// The table ID of the query.
385    pub table_id: TableId,
386    /// The table name of the query.
387    pub table_name: Box<str>,
388    /// The BSATN row values.
389    pub table_rows: TableUpdate<F>,
390}
391
392/// Response to [`Subscribe`] containing the initial matching rows.
393#[derive(SpacetimeType)]
394#[sats(crate = spacetimedb_lib)]
395pub struct SubscribeApplied<F: WebsocketFormat> {
396    /// The request_id of the corresponding `SubscribeSingle` message.
397    pub request_id: u32,
398    /// The overall time between the server receiving a request and sending the response.
399    pub total_host_execution_duration_micros: u64,
400    /// An identifier for the subscribed query sent by the client.
401    pub query_id: QueryId,
402    /// The matching rows for this query.
403    pub rows: SubscribeRows<F>,
404}
405
406/// Server response to a client [`Unsubscribe`] request.
407#[derive(SpacetimeType)]
408#[sats(crate = spacetimedb_lib)]
409pub struct UnsubscribeApplied<F: WebsocketFormat> {
410    /// Provided by the client via the `Subscribe` message.
411    /// TODO: switch to subscription id?
412    pub request_id: u32,
413    /// The overall time between the server receiving a request and sending the response.
414    pub total_host_execution_duration_micros: u64,
415    /// The ID included in the `SubscribeApplied` and `Unsubscribe` messages.
416    pub query_id: QueryId,
417    /// The matching rows for this query.
418    /// Note, this makes unsubscribing potentially very expensive.
419    /// To remove this in the future, we would need to send query_ids with rows in transaction updates,
420    /// and we would need clients to track which rows exist in which queries.
421    pub rows: SubscribeRows<F>,
422}
423
424/// Server response to an error at any point of the subscription lifecycle.
425/// If this error doesn't have a request_id, the client should drop all subscriptions.
426#[derive(SpacetimeType)]
427#[sats(crate = spacetimedb_lib)]
428pub struct SubscriptionError {
429    /// The overall time between the server receiving a request and sending the response.
430    pub total_host_execution_duration_micros: u64,
431    /// Provided by the client via a [`Subscribe`] or [`Unsubscribe`] message.
432    /// [`None`] if this occurred as the result of a [`TransactionUpdate`].
433    pub request_id: Option<u32>,
434    /// Provided by the client via a [`Subscribe`] or [`Unsubscribe`] message.
435    /// [`None`] if this occurred as the result of a [`TransactionUpdate`].
436    pub query_id: Option<u32>,
437    /// The return table of the query in question.
438    /// The server is not required to set this field.
439    /// It has been added to avoid a breaking change post 1.0.
440    ///
441    /// If unset, an error results in the entire subscription being dropped.
442    /// Otherwise only queries of this table type must be dropped.
443    pub table_id: Option<TableId>,
444    /// An error message describing the failure.
445    ///
446    /// This should reference specific fragments of the query where applicable,
447    /// but should not include the full text of the query,
448    /// as the client can retrieve that from the `request_id`.
449    ///
450    /// This is intended for diagnostic purposes.
451    /// It need not have a predictable/parseable format.
452    pub error: Box<str>,
453}
454
455/// Response to [`Subscribe`] containing the initial matching rows.
456#[derive(SpacetimeType)]
457#[sats(crate = spacetimedb_lib)]
458pub struct SubscribeMultiApplied<F: WebsocketFormat> {
459    /// The request_id of the corresponding `SubscribeSingle` message.
460    pub request_id: u32,
461    /// The overall time between the server receiving a request and sending the response.
462    pub total_host_execution_duration_micros: u64,
463    /// An identifier for the subscribed query sent by the client.
464    pub query_id: QueryId,
465    /// The matching rows for this query.
466    pub update: DatabaseUpdate<F>,
467}
468
469/// Server response to a client [`Unsubscribe`] request.
470#[derive(SpacetimeType)]
471#[sats(crate = spacetimedb_lib)]
472pub struct UnsubscribeMultiApplied<F: WebsocketFormat> {
473    /// Provided by the client via the `Subscribe` message.
474    /// TODO: switch to subscription id?
475    pub request_id: u32,
476    /// The overall time between the server receiving a request and sending the response.
477    pub total_host_execution_duration_micros: u64,
478    /// The ID included in the `SubscribeApplied` and `Unsubscribe` messages.
479    pub query_id: QueryId,
480    /// The matching rows for this query set.
481    /// Note, this makes unsubscribing potentially very expensive.
482    /// To remove this in the future, we would need to send query_ids with rows in transaction updates,
483    /// and we would need clients to track which rows exist in which queries.
484    pub update: DatabaseUpdate<F>,
485}
486
487/// Response to [`Subscribe`] containing the initial matching rows.
488#[derive(SpacetimeType)]
489#[sats(crate = spacetimedb_lib)]
490pub struct SubscriptionUpdate<F: WebsocketFormat> {
491    /// A [`DatabaseUpdate`] containing only inserts, the rows which match the subscription queries.
492    pub database_update: DatabaseUpdate<F>,
493    /// An identifier sent by the client in requests.
494    /// The server will include the same request_id in the response.
495    pub request_id: u32,
496    /// The overall time between the server receiving a request and sending the response.
497    pub total_host_execution_duration_micros: u64,
498}
499
500/// Response to [`Subscribe`] containing the initial matching rows.
501#[derive(SpacetimeType)]
502#[sats(crate = spacetimedb_lib)]
503pub struct InitialSubscription<F: WebsocketFormat> {
504    /// A [`DatabaseUpdate`] containing only inserts, the rows which match the subscription queries.
505    pub database_update: DatabaseUpdate<F>,
506    /// An identifier sent by the client in requests.
507    /// The server will include the same request_id in the response.
508    pub request_id: u32,
509    /// The overall time between the server receiving a request and sending the response.
510    pub total_host_execution_duration: TimeDuration,
511}
512
513/// Received by database from client to inform of user's identity, token and client connection id.
514///
515/// The database will always send an `IdentityToken` message
516/// as the first message for a new WebSocket connection.
517/// If the client is re-connecting with existing credentials,
518/// the message will include those credentials.
519/// If the client connected anonymously,
520/// the database will generate new credentials to identify it.
521#[derive(SpacetimeType, Debug)]
522#[sats(crate = spacetimedb_lib)]
523pub struct IdentityToken {
524    pub identity: Identity,
525    pub token: Box<str>,
526    pub connection_id: ConnectionId,
527}
528
529/// Received by client from database upon a reducer run.
530///
531/// Clients receive `TransactionUpdate`s only for reducers
532/// which update at least one of their subscribed rows,
533/// or for their own `Failed` or `OutOfEnergy` reducer invocations.
534#[derive(SpacetimeType, Debug)]
535#[sats(crate = spacetimedb_lib)]
536pub struct TransactionUpdate<F: WebsocketFormat> {
537    /// The status of the transaction. Contains the updated rows, if successful.
538    pub status: UpdateStatus<F>,
539    /// The time when the reducer started.
540    ///
541    /// Note that [`Timestamp`] serializes as `i64` nanoseconds since the Unix epoch.
542    pub timestamp: Timestamp,
543    /// The identity of the user who requested the reducer run. For event-driven and
544    /// scheduled reducers, it is the identity of the database owner.
545    pub caller_identity: Identity,
546
547    /// The 16-byte [`ConnectionId`] of the user who requested the reducer run.
548    ///
549    /// The all-zeros id is a sentinel which denotes no meaningful value.
550    /// This can occur in the following situations:
551    /// - `init` and `update` reducers will have a `caller_connection_id`
552    ///   if and only if one was provided to the `publish` HTTP endpoint.
553    /// - Scheduled reducers will never have a `caller_connection_id`.
554    /// - Reducers invoked by WebSocket or the HTTP API will always have a `caller_connection_id`.
555    pub caller_connection_id: ConnectionId,
556    /// The original CallReducer request that triggered this reducer.
557    pub reducer_call: ReducerCallInfo<F>,
558    /// The amount of energy credits consumed by running the reducer.
559    pub energy_quanta_used: EnergyQuanta,
560    /// How long the reducer took to run.
561    pub total_host_execution_duration: TimeDuration,
562}
563
564/// Received by client from database upon a reducer run.
565///
566/// Clients receive `TransactionUpdateLight`s only for reducers
567/// which update at least one of their subscribed rows.
568/// Failed reducers result in full [`TransactionUpdate`]s
569#[derive(SpacetimeType, Debug)]
570#[sats(crate = spacetimedb_lib)]
571pub struct TransactionUpdateLight<F: WebsocketFormat> {
572    /// An identifier for a client request
573    pub request_id: u32,
574
575    /// The reducer ran successfully and its changes were committed to the database.
576    /// The rows altered in the database/ are recorded in this `DatabaseUpdate`.
577    pub update: DatabaseUpdate<F>,
578}
579
580/// Contained in a [`TransactionUpdate`], metadata about a reducer invocation.
581#[derive(SpacetimeType, Debug)]
582#[sats(crate = spacetimedb_lib)]
583pub struct ReducerCallInfo<F: WebsocketFormat> {
584    /// The name of the reducer that was called.
585    ///
586    /// NOTE(centril, 1.0): For bandwidth resource constrained clients
587    /// this can encourage them to have poor naming of reducers like `a`.
588    /// We should consider not sending this at all and instead
589    /// having a startup message where the name <-> id bindings
590    /// are established between the host and the client.
591    pub reducer_name: Box<str>,
592    /// The numerical id of the reducer that was called.
593    pub reducer_id: u32,
594    /// The arguments to the reducer, encoded as BSATN or JSON according to the reducer's argument schema
595    /// and the client's requested protocol.
596    pub args: F::Single,
597    /// An identifier for a client request
598    pub request_id: u32,
599}
600
601/// The status of a [`TransactionUpdate`].
602#[derive(SpacetimeType, Debug)]
603#[sats(crate = spacetimedb_lib)]
604pub enum UpdateStatus<F: WebsocketFormat> {
605    /// The reducer ran successfully and its changes were committed to the database.
606    /// The rows altered in the database/ will be recorded in the `DatabaseUpdate`.
607    Committed(DatabaseUpdate<F>),
608    /// The reducer errored, and any changes it attempted to were rolled back.
609    /// This is the error message.
610    Failed(Box<str>),
611    /// The reducer was interrupted due to insufficient energy/funds,
612    /// and any changes it attempted to make were rolled back.
613    OutOfEnergy,
614}
615
616/// A collection of inserted and deleted rows, contained in a [`TransactionUpdate`] or [`SubscriptionUpdate`].
617#[derive(SpacetimeType, Debug, Clone, Default)]
618#[sats(crate = spacetimedb_lib)]
619pub struct DatabaseUpdate<F: WebsocketFormat> {
620    pub tables: Vec<TableUpdate<F>>,
621}
622
623impl<F: WebsocketFormat> DatabaseUpdate<F> {
624    pub fn is_empty(&self) -> bool {
625        self.tables.is_empty()
626    }
627
628    pub fn num_rows(&self) -> usize {
629        self.tables.iter().map(|t| t.num_rows()).sum()
630    }
631}
632
633impl<F: WebsocketFormat> FromIterator<TableUpdate<F>> for DatabaseUpdate<F> {
634    fn from_iter<T: IntoIterator<Item = TableUpdate<F>>>(iter: T) -> Self {
635        DatabaseUpdate {
636            tables: iter.into_iter().collect(),
637        }
638    }
639}
640
641/// Part of a [`DatabaseUpdate`] received by client from database for alterations to a single table.
642///
643/// NOTE(centril): in 0.12 we added `num_rows` and `table_name` to the struct.
644/// These inflate the size of messages, which for some customers is the wrong default.
645/// We might want to consider `v1.spacetimedb.bsatn.lightweight`
646#[derive(SpacetimeType, Debug, Clone)]
647#[sats(crate = spacetimedb_lib)]
648pub struct TableUpdate<F: WebsocketFormat> {
649    /// The id of the table. Clients should prefer `table_name`, as it is a stable part of a module's API,
650    /// whereas `table_id` may change between runs.
651    pub table_id: TableId,
652    /// The name of the table.
653    ///
654    /// NOTE(centril, 1.0): we might want to remove this and instead
655    /// tell clients about changes to table_name <-> table_id mappings.
656    pub table_name: Box<str>,
657    /// The sum total of rows in `self.updates`,
658    pub num_rows: u64,
659    /// The actual insert and delete updates for this table.
660    pub updates: SmallVec<[F::QueryUpdate; 1]>,
661}
662
663/// Computed update for a single query, annotated with the number of matching rows.
664#[derive(Debug)]
665pub struct SingleQueryUpdate<F: WebsocketFormat> {
666    pub update: F::QueryUpdate,
667    pub num_rows: u64,
668}
669
670impl<F: WebsocketFormat> TableUpdate<F> {
671    pub fn new(table_id: TableId, table_name: Box<str>, update: SingleQueryUpdate<F>) -> Self {
672        Self {
673            table_id,
674            table_name,
675            num_rows: update.num_rows,
676            updates: [update.update].into(),
677        }
678    }
679
680    pub fn empty(table_id: TableId, table_name: Box<str>) -> Self {
681        Self {
682            table_id,
683            table_name,
684            num_rows: 0,
685            updates: SmallVec::new(),
686        }
687    }
688
689    pub fn push(&mut self, update: SingleQueryUpdate<F>) {
690        self.updates.push(update.update);
691        self.num_rows += update.num_rows;
692    }
693
694    pub fn num_rows(&self) -> usize {
695        self.num_rows as usize
696    }
697}
698
699#[derive(SpacetimeType, Debug, Clone, EnumAsInner)]
700#[sats(crate = spacetimedb_lib)]
701pub enum CompressableQueryUpdate<F: WebsocketFormat> {
702    Uncompressed(QueryUpdate<F>),
703    Brotli(Bytes),
704    Gzip(Bytes),
705}
706
707#[derive(SpacetimeType, Debug, Clone)]
708#[sats(crate = spacetimedb_lib)]
709pub struct QueryUpdate<F: WebsocketFormat> {
710    /// When in a [`TransactionUpdate`], the matching rows of this table deleted by the transaction.
711    ///
712    /// Rows are encoded as BSATN or JSON according to the table's schema
713    /// and the client's requested protocol.
714    ///
715    /// Always empty when in an [`InitialSubscription`].
716    pub deletes: F::List,
717    /// When in a [`TransactionUpdate`], the matching rows of this table inserted by the transaction.
718    /// When in an [`InitialSubscription`], the matching rows of this table in the entire committed state.
719    ///
720    /// Rows are encoded as BSATN or JSON according to the table's schema
721    /// and the client's requested protocol.
722    pub inserts: F::List,
723}
724
725/// A response to a [`OneOffQuery`].
726/// Will contain either one error or some number of response rows.
727/// At most one of these messages will be sent in reply to any query.
728///
729/// The messageId will be identical to the one sent in the original query.
730#[derive(SpacetimeType, Debug)]
731#[sats(crate = spacetimedb_lib)]
732pub struct OneOffQueryResponse<F: WebsocketFormat> {
733    pub message_id: Box<[u8]>,
734    /// If query compilation or evaluation errored, an error message.
735    pub error: Option<Box<str>>,
736
737    /// If query compilation and evaluation succeeded, a set of resulting rows, grouped by table.
738    pub tables: Box<[OneOffTable<F>]>,
739
740    /// The total duration of query compilation and evaluation on the server, in microseconds.
741    pub total_host_execution_duration: TimeDuration,
742}
743
744/// A table included as part of a [`OneOffQueryResponse`].
745#[derive(SpacetimeType, Debug)]
746#[sats(crate = spacetimedb_lib)]
747pub struct OneOffTable<F: WebsocketFormat> {
748    /// The name of the table.
749    pub table_name: Box<str>,
750    /// The set of rows which matched the query, encoded as BSATN or JSON according to the table's schema
751    /// and the client's requested protocol.
752    ///
753    /// TODO(centril, 1.0): Evaluate whether we want to conditionally compress these.
754    pub rows: F::List,
755}
756
757/// The result of running a procedure,
758/// including the return value of the procedure on success.
759///
760/// Sent in response to a [`CallProcedure`] message.
761#[derive(SpacetimeType, Debug)]
762#[sats(crate = spacetimedb_lib)]
763pub struct ProcedureResult<F: WebsocketFormat> {
764    /// The status of the procedure run.
765    ///
766    /// Contains the return value if successful, or the error message if not.
767    pub status: ProcedureStatus<F>,
768    /// The time when the reducer started.
769    ///
770    /// Note that [`Timestamp`] serializes as `i64` nanoseconds since the Unix epoch.
771    pub timestamp: Timestamp,
772    /// The time the procedure took to run.
773    pub total_host_execution_duration: TimeDuration,
774    /// The same same client-provided identifier as in the original [`ProcedureCall`] request.
775    ///
776    /// Clients use this to correlate the response with the original request.
777    pub request_id: u32,
778}
779
780/// The status of a procedure call,
781/// including the return value on success.
782#[derive(SpacetimeType, Debug)]
783#[sats(crate = spacetimedb_lib)]
784pub enum ProcedureStatus<F: WebsocketFormat> {
785    /// The procedure ran and returned the enclosed value.
786    ///
787    /// All user error handling happens within here;
788    /// the returned value may be a `Result` or `Option`,
789    /// or any other type to which the user may ascribe arbitrary meaning.
790    Returned(F::Single),
791    /// The reducer was interrupted due to insufficient energy/funds.
792    ///
793    /// The procedure may have performed some observable side effects before being interrupted.
794    OutOfEnergy,
795    /// The call failed in the host, e.g. due to a type error or unknown procedure name.
796    InternalError(String),
797}
798
799/// Used whenever different formats need to coexist.
800#[derive(Debug, Clone)]
801pub enum FormatSwitch<B, J> {
802    Bsatn(B),
803    Json(J),
804}
805
806impl<B1, J1> FormatSwitch<B1, J1> {
807    /// Zips together two switches.
808    pub fn zip_mut<B2, J2>(&mut self, other: FormatSwitch<B2, J2>) -> FormatSwitch<(&mut B1, B2), (&mut J1, J2)> {
809        match (self, other) {
810            (FormatSwitch::Bsatn(a), FormatSwitch::Bsatn(b)) => FormatSwitch::Bsatn((a, b)),
811            (FormatSwitch::Json(a), FormatSwitch::Json(b)) => FormatSwitch::Json((a, b)),
812            _ => panic!("format should be the same for both sides of the zip"),
813        }
814    }
815}
816
817#[derive(Clone, Copy, Default, Debug, SpacetimeType)]
818#[sats(crate = spacetimedb_lib)]
819pub struct JsonFormat;
820
821impl WebsocketFormat for JsonFormat {
822    type Single = ByteString;
823    type List = Vec<ByteString>;
824    type QueryUpdate = QueryUpdate<Self>;
825}
826
827#[derive(Clone, Copy, Default, Debug, SpacetimeType)]
828#[sats(crate = spacetimedb_lib)]
829pub struct BsatnFormat;
830
831impl WebsocketFormat for BsatnFormat {
832    type Single = Box<[u8]>;
833    type List = BsatnRowList;
834    type QueryUpdate = CompressableQueryUpdate<Self>;
835}
836
837/// A specification of either a desired or decided compression algorithm.
838#[derive(serde::Deserialize, Default, PartialEq, Eq, Clone, Copy, Hash, Debug)]
839pub enum Compression {
840    /// No compression ever.
841    None,
842    /// Compress using brotli if a certain size threshold was met.
843    #[default]
844    Brotli,
845    /// Compress using gzip if a certain size threshold was met.
846    Gzip,
847}
848
849pub type RowSize = u16;
850pub type RowOffset = u64;
851
852/// A packed list of BSATN-encoded rows.
853#[derive(SpacetimeType, Debug, Clone, Default)]
854#[sats(crate = spacetimedb_lib)]
855pub struct BsatnRowList {
856    /// A size hint about `rows_data`
857    /// intended to facilitate parallel decode purposes on large initial updates.
858    size_hint: RowSizeHint,
859    /// The flattened byte array for a list of rows.
860    rows_data: Bytes,
861}
862
863impl BsatnRowList {
864    /// Returns a new row list where `rows_data` is the flattened byte array
865    /// containing the BSATN of each row, without any markers for where a row begins and end.
866    ///
867    /// The `size_hint` encodes the boundaries of each row in `rows_data`.
868    /// See [`RowSizeHint`] for more details on the encoding.
869    pub fn new(size_hint: RowSizeHint, rows_data: Bytes) -> Self {
870        Self { size_hint, rows_data }
871    }
872}
873
874/// NOTE(centril, 1.0): We might want to add a `None` variant to this
875/// where the client has to decode in a loop until `rows_data` has been exhausted.
876/// The use-case for this is clients who are bandwidth limited and where every byte counts.
877#[derive(SpacetimeType, Debug, Clone)]
878#[sats(crate = spacetimedb_lib)]
879pub enum RowSizeHint {
880    /// Each row in `rows_data` is of the same fixed size as specified here.
881    FixedSize(RowSize),
882    /// The offsets into `rows_data` defining the boundaries of each row.
883    /// Only stores the offset to the start of each row.
884    /// The ends of each row is inferred from the start of the next row, or `rows_data.len()`.
885    /// The behavior of this is identical to that of `PackedStr`.
886    RowOffsets(Arc<[RowOffset]>),
887}
888
889impl Default for RowSizeHint {
890    fn default() -> Self {
891        Self::RowOffsets([].into())
892    }
893}
894
895impl RowSizeHint {
896    fn index_to_range(&self, index: usize, data_end: usize) -> Option<Range<usize>> {
897        match self {
898            Self::FixedSize(size) => {
899                let size = *size as usize;
900                let start = index * size;
901                if start >= data_end {
902                    // We've reached beyond `data_end`,
903                    // so this is a row that doesn't exist, so we are beyond the count.
904                    return None;
905                }
906                let end = (index + 1) * size;
907                Some(start..end)
908            }
909            Self::RowOffsets(offsets) => {
910                let offsets = offsets.as_ref();
911                let start = *offsets.get(index)? as usize;
912                // The end is either the start of the next element or the end.
913                let end = offsets.get(index + 1).map(|e| *e as usize).unwrap_or(data_end);
914                Some(start..end)
915            }
916        }
917    }
918}
919
920impl RowListLen for BsatnRowList {
921    fn len(&self) -> usize {
922        match &self.size_hint {
923            // `size != 0` is always the case for `FixedSize`.
924            RowSizeHint::FixedSize(size) => self.rows_data.as_ref().len() / *size as usize,
925            RowSizeHint::RowOffsets(offsets) => offsets.as_ref().len(),
926        }
927    }
928}
929
930impl ByteListLen for BsatnRowList {
931    /// Returns the uncompressed size of the list in bytes
932    fn num_bytes(&self) -> usize {
933        self.rows_data.as_ref().len()
934    }
935}
936
937impl BsatnRowList {
938    /// Returns the element at `index` in the list.
939    pub fn get(&self, index: usize) -> Option<Bytes> {
940        let data_end = self.rows_data.len();
941        let data_range = self.size_hint.index_to_range(index, data_end)?;
942        Some(self.rows_data.slice(data_range))
943    }
944}
945
946/// An iterator over all the elements in a [`BsatnRowList`].
947pub struct BsatnRowListIter<'a> {
948    list: &'a BsatnRowList,
949    index: usize,
950}
951
952impl<'a> IntoIterator for &'a BsatnRowList {
953    type IntoIter = BsatnRowListIter<'a>;
954    type Item = Bytes;
955    fn into_iter(self) -> Self::IntoIter {
956        BsatnRowListIter { list: self, index: 0 }
957    }
958}
959
960impl Iterator for BsatnRowListIter<'_> {
961    type Item = Bytes;
962    fn next(&mut self) -> Option<Self::Item> {
963        let index = self.index;
964        self.index += 1;
965        self.list.get(index)
966    }
967}