spacetimedb_client_api_messages/
websocket.rs

1//! Messages sent over the SpacetimeDB WebSocket protocol.
2//!
3//! Client -> Server messages are encoded as [`ClientMessage`].
4//! Server -> Client messages are encoded as [`ServerMessage`].
5//!
6//! Any changes to this file must be paired with a change to the WebSocket protocol identifiers
7//! defined in `crates/client-api/src/routes/subscribe.rs`,
8//! and be paired with changes to all of:
9//!
10//! - The C# SDK.
11//! - The TypeScript SDK.
12//! - The SpacetimeDB website.
13//!
14//! Changes to the Rust SDK are not necessarily required, as it depends on this crate
15//! rather than using an external mirror of this schema.
16
17use crate::energy::EnergyQuanta;
18use bytes::Bytes;
19use bytestring::ByteString;
20use core::{
21    fmt::Debug,
22    ops::{Deref, Range},
23};
24use enum_as_inner::EnumAsInner;
25use smallvec::SmallVec;
26use spacetimedb_lib::{ConnectionId, Identity, TimeDuration, Timestamp};
27use spacetimedb_primitives::TableId;
28use spacetimedb_sats::{
29    bsatn::{self, ToBsatn},
30    de::{Deserialize, Error},
31    impl_deserialize, impl_serialize, impl_st,
32    ser::{serde::SerializeWrapper, Serialize},
33    AlgebraicType, SpacetimeType,
34};
35use std::{
36    io::{self, Read as _, Write as _},
37    sync::Arc,
38};
39
40pub const TEXT_PROTOCOL: &str = "v1.json.spacetimedb";
41pub const BIN_PROTOCOL: &str = "v1.bsatn.spacetimedb";
42
43pub trait RowListLen {
44    /// Returns the length of the list.
45    fn len(&self) -> usize;
46    /// Returns whether the list is empty or not.
47    fn is_empty(&self) -> bool {
48        self.len() == 0
49    }
50}
51
52impl<T, L: Deref<Target = [T]>> RowListLen for L {
53    fn len(&self) -> usize {
54        self.deref().len()
55    }
56    fn is_empty(&self) -> bool {
57        self.deref().is_empty()
58    }
59}
60
61pub trait ByteListLen {
62    /// Returns the uncompressed size of the list in bytes
63    fn num_bytes(&self) -> usize;
64}
65
66impl ByteListLen for Vec<ByteString> {
67    fn num_bytes(&self) -> usize {
68        self.iter().map(|str| str.len()).sum()
69    }
70}
71
72/// A format / codec used by the websocket API.
73///
74/// This can be e.g., BSATN, JSON.
75pub trait WebsocketFormat: Sized {
76    /// The type used for the encoding of a single item.
77    type Single: SpacetimeType + for<'de> Deserialize<'de> + Serialize + Debug + Clone;
78
79    /// The type used for the encoding of a list of items.
80    type List: SpacetimeType
81        + for<'de> Deserialize<'de>
82        + Serialize
83        + RowListLen
84        + ByteListLen
85        + Debug
86        + Clone
87        + Default;
88
89    /// Encodes the `elems` to a list in the format and also returns the length of the list.
90    fn encode_list<R: ToBsatn + Serialize>(elems: impl Iterator<Item = R>) -> (Self::List, u64);
91
92    /// The type used to encode query updates.
93    /// This type exists so that some formats, e.g., BSATN, can compress an update.
94    type QueryUpdate: SpacetimeType + for<'de> Deserialize<'de> + Serialize + Debug + Clone + Send;
95
96    /// Convert a `QueryUpdate` into `Self::QueryUpdate`.
97    /// This allows some formats to e.g., compress the update.
98    fn into_query_update(qu: QueryUpdate<Self>, compression: Compression) -> Self::QueryUpdate;
99}
100
101/// Messages sent from the client to the server.
102///
103/// Parametric over the reducer argument type to enable [`ClientMessage::map_args`].
104#[derive(SpacetimeType)]
105#[sats(crate = spacetimedb_lib)]
106pub enum ClientMessage<Args> {
107    /// Request a reducer run.
108    CallReducer(CallReducer<Args>),
109    /// Register SQL queries on which to receive updates.
110    Subscribe(Subscribe),
111    /// Send a one-off SQL query without establishing a subscription.
112    OneOffQuery(OneOffQuery),
113    /// Register a SQL query to to subscribe to updates. This does not affect other subscriptions.
114    SubscribeSingle(SubscribeSingle),
115    SubscribeMulti(SubscribeMulti),
116    /// Remove a subscription to a SQL query that was added with SubscribeSingle.
117    Unsubscribe(Unsubscribe),
118    UnsubscribeMulti(UnsubscribeMulti),
119}
120
121impl<Args> ClientMessage<Args> {
122    pub fn map_args<Args2>(self, f: impl FnOnce(Args) -> Args2) -> ClientMessage<Args2> {
123        match self {
124            ClientMessage::CallReducer(CallReducer {
125                reducer,
126                args,
127                request_id,
128                flags,
129            }) => ClientMessage::CallReducer(CallReducer {
130                reducer,
131                args: f(args),
132                request_id,
133                flags,
134            }),
135            ClientMessage::OneOffQuery(x) => ClientMessage::OneOffQuery(x),
136            ClientMessage::SubscribeSingle(x) => ClientMessage::SubscribeSingle(x),
137            ClientMessage::Unsubscribe(x) => ClientMessage::Unsubscribe(x),
138            ClientMessage::Subscribe(x) => ClientMessage::Subscribe(x),
139            ClientMessage::SubscribeMulti(x) => ClientMessage::SubscribeMulti(x),
140            ClientMessage::UnsubscribeMulti(x) => ClientMessage::UnsubscribeMulti(x),
141        }
142    }
143}
144
145/// Request a reducer run.
146///
147/// Parametric over the argument type to enable [`ClientMessage::map_args`].
148#[derive(SpacetimeType)]
149#[sats(crate = spacetimedb_lib)]
150pub struct CallReducer<Args> {
151    /// The name of the reducer to call.
152    pub reducer: Box<str>,
153    /// The arguments to the reducer.
154    ///
155    /// In the wire format, this will be a [`Bytes`], BSATN or JSON encoded according to the reducer's argument schema
156    /// and the enclosing message format.
157    pub args: Args,
158    /// An identifier for a client request.
159    ///
160    /// The server will include the same ID in the response [`TransactionUpdate`].
161    pub request_id: u32,
162    /// Assorted flags that can be passed when calling a reducer.
163    ///
164    /// Currently accepts 0 or 1 where the latter means
165    /// that the caller does not want to be notified about the reducer
166    /// without being subscribed to any relevant queries.
167    pub flags: CallReducerFlags,
168}
169
170#[derive(Clone, Copy, Default, PartialEq, Eq)]
171pub enum CallReducerFlags {
172    /// The reducer's caller does want to be notified about the reducer completing successfully
173    /// regardless of whether the caller had subscribed to a relevant query.
174    ///
175    /// Note that updates to a reducer's caller are always sent as full updates
176    /// whether subscribed to a relevant query or not.
177    /// That is, the light tx mode setting does not apply to the reducer's caller.
178    ///
179    /// This is the default flag.
180    #[default]
181    FullUpdate,
182    /// The reducer's caller does not want to be notified about the reducer completing successfully
183    /// without having subscribed to any of the relevant queries.
184    NoSuccessNotify,
185}
186
187impl_st!([] CallReducerFlags, AlgebraicType::U8);
188impl_serialize!([] CallReducerFlags, (self, ser) => ser.serialize_u8(*self as u8));
189impl_deserialize!([] CallReducerFlags, de => match de.deserialize_u8()? {
190    0 => Ok(Self::FullUpdate),
191    1 => Ok(Self::NoSuccessNotify),
192    x => Err(D::Error::custom(format_args!("invalid call reducer flag {x}"))),
193});
194
195/// An opaque id generated by the client to refer to a subscription.
196/// This is used in Unsubscribe messages and errors.
197#[derive(SpacetimeType, Copy, Clone, Debug, PartialEq, Eq, Hash)]
198#[sats(crate = spacetimedb_lib)]
199pub struct QueryId {
200    pub id: u32,
201}
202
203impl QueryId {
204    pub fn new(id: u32) -> Self {
205        Self { id }
206    }
207}
208
209/// Sent by client to database to register a set of queries, about which the client will
210/// receive `TransactionUpdate`s.
211///
212/// After issuing a `Subscribe` message, the client will receive a single
213/// `SubscriptionUpdate` message containing every current row of every table which matches
214/// the subscribed queries. Then, after each reducer run which updates one or more
215/// subscribed rows, the client will receive a `TransactionUpdate` containing the updates.
216///
217/// A `Subscribe` message sets or replaces the entire set of queries to which the client
218/// is subscribed. If the client is previously subscribed to some set of queries `A`, and
219/// then sends a `Subscribe` message to subscribe to a set `B`, afterwards, the client
220/// will be subscribed to `B` but not `A`. In this case, the client will receive a
221/// `SubscriptionUpdate` containing every existing row that matches `B`, even if some were
222/// already in `A`.
223#[derive(SpacetimeType)]
224#[sats(crate = spacetimedb_lib)]
225pub struct Subscribe {
226    /// A sequence of SQL queries.
227    pub query_strings: Box<[Box<str>]>,
228    pub request_id: u32,
229}
230
231/// Sent by client to register a subscription to single query, for which the client should receive
232/// receive relevant `TransactionUpdate`s.
233///
234/// After issuing a `SubscribeSingle` message, the client will receive a single
235/// `SubscribeApplied` message containing every current row which matches the query. Then, any
236/// time a reducer updates the query's results, the client will receive a `TransactionUpdate`
237/// containing the relevant updates.
238///
239/// If a client subscribes to queries with overlapping results, the client will receive
240/// multiple copies of rows that appear in multiple queries.
241#[derive(SpacetimeType)]
242#[sats(crate = spacetimedb_lib)]
243pub struct SubscribeSingle {
244    /// A single SQL `SELECT` query to subscribe to.
245    pub query: Box<str>,
246    /// An identifier for a client request.
247    pub request_id: u32,
248
249    /// An identifier for this subscription, which should not be used for any other subscriptions on the same connection.
250    /// This is used to refer to this subscription in Unsubscribe messages from the client and errors sent from the server.
251    /// These only have meaning given a ConnectionId.
252    pub query_id: QueryId,
253}
254
255#[derive(SpacetimeType)]
256#[sats(crate = spacetimedb_lib)]
257pub struct SubscribeMulti {
258    /// A single SQL `SELECT` query to subscribe to.
259    pub query_strings: Box<[Box<str>]>,
260    /// An identifier for a client request.
261    pub request_id: u32,
262
263    /// An identifier for this subscription, which should not be used for any other subscriptions on the same connection.
264    /// This is used to refer to this subscription in Unsubscribe messages from the client and errors sent from the server.
265    /// These only have meaning given a ConnectionId.
266    pub query_id: QueryId,
267}
268
269/// Client request for removing a query from a subscription.
270#[derive(SpacetimeType)]
271#[sats(crate = spacetimedb_lib)]
272pub struct Unsubscribe {
273    /// An identifier for a client request.
274    pub request_id: u32,
275
276    /// The ID used in the corresponding `SubscribeSingle` message.
277    pub query_id: QueryId,
278}
279
280/// Client request for removing a query from a subscription.
281#[derive(SpacetimeType)]
282#[sats(crate = spacetimedb_lib)]
283pub struct UnsubscribeMulti {
284    /// An identifier for a client request.
285    pub request_id: u32,
286
287    /// The ID used in the corresponding `SubscribeSingle` message.
288    pub query_id: QueryId,
289}
290
291/// A one-off query submission.
292///
293/// Query should be a "SELECT * FROM Table WHERE ...". Other types of queries will be rejected.
294/// Multiple such semicolon-delimited queries are allowed.
295///
296/// One-off queries are identified by a client-generated messageID.
297/// To avoid data leaks, the server will NOT cache responses to messages based on UUID!
298/// It also will not check for duplicate IDs. They are just a way to match responses to messages.
299#[derive(SpacetimeType)]
300#[sats(crate = spacetimedb_lib)]
301pub struct OneOffQuery {
302    pub message_id: Box<[u8]>,
303    pub query_string: Box<str>,
304}
305
306/// The tag recognized by the host and SDKs to mean no compression of a [`ServerMessage`].
307pub const SERVER_MSG_COMPRESSION_TAG_NONE: u8 = 0;
308
309/// The tag recognized by the host and SDKs to mean brotli compression  of a [`ServerMessage`].
310pub const SERVER_MSG_COMPRESSION_TAG_BROTLI: u8 = 1;
311
312/// The tag recognized by the host and SDKs to mean brotli compression  of a [`ServerMessage`].
313pub const SERVER_MSG_COMPRESSION_TAG_GZIP: u8 = 2;
314
315/// Messages sent from the server to the client.
316#[derive(SpacetimeType, derive_more::From)]
317#[sats(crate = spacetimedb_lib)]
318pub enum ServerMessage<F: WebsocketFormat> {
319    /// Informs of changes to subscribed rows.
320    /// This will be removed when we switch to `SubscribeSingle`.
321    InitialSubscription(InitialSubscription<F>),
322    /// Upon reducer run.
323    TransactionUpdate(TransactionUpdate<F>),
324    /// Upon reducer run, but limited to just the table updates.
325    TransactionUpdateLight(TransactionUpdateLight<F>),
326    /// After connecting, to inform client of its identity.
327    IdentityToken(IdentityToken),
328    /// Return results to a one off SQL query.
329    OneOffQueryResponse(OneOffQueryResponse<F>),
330    /// Sent in response to a `SubscribeSingle` message. This contains the initial matching rows.
331    SubscribeApplied(SubscribeApplied<F>),
332    /// Sent in response to an `Unsubscribe` message. This contains the matching rows.
333    UnsubscribeApplied(UnsubscribeApplied<F>),
334    /// Communicate an error in the subscription lifecycle.
335    SubscriptionError(SubscriptionError),
336    /// Sent in response to a `SubscribeMulti` message. This contains the initial matching rows.
337    SubscribeMultiApplied(SubscribeMultiApplied<F>),
338    /// Sent in response to an `UnsubscribeMulti` message. This contains the matching rows.
339    UnsubscribeMultiApplied(UnsubscribeMultiApplied<F>),
340}
341
342/// The matching rows of a subscription query.
343#[derive(SpacetimeType)]
344#[sats(crate = spacetimedb_lib)]
345pub struct SubscribeRows<F: WebsocketFormat> {
346    /// The table ID of the query.
347    pub table_id: TableId,
348    /// The table name of the query.
349    pub table_name: Box<str>,
350    /// The BSATN row values.
351    pub table_rows: TableUpdate<F>,
352}
353
354/// Response to [`Subscribe`] containing the initial matching rows.
355#[derive(SpacetimeType)]
356#[sats(crate = spacetimedb_lib)]
357pub struct SubscribeApplied<F: WebsocketFormat> {
358    /// The request_id of the corresponding `SubscribeSingle` message.
359    pub request_id: u32,
360    /// The overall time between the server receiving a request and sending the response.
361    pub total_host_execution_duration_micros: u64,
362    /// An identifier for the subscribed query sent by the client.
363    pub query_id: QueryId,
364    /// The matching rows for this query.
365    pub rows: SubscribeRows<F>,
366}
367
368/// Server response to a client [`Unsubscribe`] request.
369#[derive(SpacetimeType)]
370#[sats(crate = spacetimedb_lib)]
371pub struct UnsubscribeApplied<F: WebsocketFormat> {
372    /// Provided by the client via the `Subscribe` message.
373    /// TODO: switch to subscription id?
374    pub request_id: u32,
375    /// The overall time between the server receiving a request and sending the response.
376    pub total_host_execution_duration_micros: u64,
377    /// The ID included in the `SubscribeApplied` and `Unsubscribe` messages.
378    pub query_id: QueryId,
379    /// The matching rows for this query.
380    /// Note, this makes unsubscribing potentially very expensive.
381    /// To remove this in the future, we would need to send query_ids with rows in transaction updates,
382    /// and we would need clients to track which rows exist in which queries.
383    pub rows: SubscribeRows<F>,
384}
385
386/// Server response to an error at any point of the subscription lifecycle.
387/// If this error doesn't have a request_id, the client should drop all subscriptions.
388#[derive(SpacetimeType)]
389#[sats(crate = spacetimedb_lib)]
390pub struct SubscriptionError {
391    /// The overall time between the server receiving a request and sending the response.
392    pub total_host_execution_duration_micros: u64,
393    /// Provided by the client via a [`Subscribe`] or [`Unsubscribe`] message.
394    /// [`None`] if this occurred as the result of a [`TransactionUpdate`].
395    pub request_id: Option<u32>,
396    /// Provided by the client via a [`Subscribe`] or [`Unsubscribe`] message.
397    /// [`None`] if this occurred as the result of a [`TransactionUpdate`].
398    pub query_id: Option<u32>,
399    /// The return table of the query in question.
400    /// The server is not required to set this field.
401    /// It has been added to avoid a breaking change post 1.0.
402    ///
403    /// If unset, an error results in the entire subscription being dropped.
404    /// Otherwise only queries of this table type must be dropped.
405    pub table_id: Option<TableId>,
406    /// An error message describing the failure.
407    ///
408    /// This should reference specific fragments of the query where applicable,
409    /// but should not include the full text of the query,
410    /// as the client can retrieve that from the `request_id`.
411    ///
412    /// This is intended for diagnostic purposes.
413    /// It need not have a predictable/parseable format.
414    pub error: Box<str>,
415}
416
417/// Response to [`Subscribe`] containing the initial matching rows.
418#[derive(SpacetimeType)]
419#[sats(crate = spacetimedb_lib)]
420pub struct SubscribeMultiApplied<F: WebsocketFormat> {
421    /// The request_id of the corresponding `SubscribeSingle` message.
422    pub request_id: u32,
423    /// The overall time between the server receiving a request and sending the response.
424    pub total_host_execution_duration_micros: u64,
425    /// An identifier for the subscribed query sent by the client.
426    pub query_id: QueryId,
427    /// The matching rows for this query.
428    pub update: DatabaseUpdate<F>,
429}
430
431/// Server response to a client [`Unsubscribe`] request.
432#[derive(SpacetimeType)]
433#[sats(crate = spacetimedb_lib)]
434pub struct UnsubscribeMultiApplied<F: WebsocketFormat> {
435    /// Provided by the client via the `Subscribe` message.
436    /// TODO: switch to subscription id?
437    pub request_id: u32,
438    /// The overall time between the server receiving a request and sending the response.
439    pub total_host_execution_duration_micros: u64,
440    /// The ID included in the `SubscribeApplied` and `Unsubscribe` messages.
441    pub query_id: QueryId,
442    /// The matching rows for this query set.
443    /// Note, this makes unsubscribing potentially very expensive.
444    /// To remove this in the future, we would need to send query_ids with rows in transaction updates,
445    /// and we would need clients to track which rows exist in which queries.
446    pub update: DatabaseUpdate<F>,
447}
448
449/// Response to [`Subscribe`] containing the initial matching rows.
450#[derive(SpacetimeType)]
451#[sats(crate = spacetimedb_lib)]
452pub struct SubscriptionUpdate<F: WebsocketFormat> {
453    /// A [`DatabaseUpdate`] containing only inserts, the rows which match the subscription queries.
454    pub database_update: DatabaseUpdate<F>,
455    /// An identifier sent by the client in requests.
456    /// The server will include the same request_id in the response.
457    pub request_id: u32,
458    /// The overall time between the server receiving a request and sending the response.
459    pub total_host_execution_duration_micros: u64,
460}
461
462/// Response to [`Subscribe`] containing the initial matching rows.
463#[derive(SpacetimeType)]
464#[sats(crate = spacetimedb_lib)]
465pub struct InitialSubscription<F: WebsocketFormat> {
466    /// A [`DatabaseUpdate`] containing only inserts, the rows which match the subscription queries.
467    pub database_update: DatabaseUpdate<F>,
468    /// An identifier sent by the client in requests.
469    /// The server will include the same request_id in the response.
470    pub request_id: u32,
471    /// The overall time between the server receiving a request and sending the response.
472    pub total_host_execution_duration: TimeDuration,
473}
474
475/// Received by database from client to inform of user's identity, token and client connection id.
476///
477/// The database will always send an `IdentityToken` message
478/// as the first message for a new WebSocket connection.
479/// If the client is re-connecting with existing credentials,
480/// the message will include those credentials.
481/// If the client connected anonymously,
482/// the database will generate new credentials to identify it.
483#[derive(SpacetimeType, Debug)]
484#[sats(crate = spacetimedb_lib)]
485pub struct IdentityToken {
486    pub identity: Identity,
487    pub token: Box<str>,
488    pub connection_id: ConnectionId,
489}
490
491/// Received by client from database upon a reducer run.
492///
493/// Clients receive `TransactionUpdate`s only for reducers
494/// which update at least one of their subscribed rows,
495/// or for their own `Failed` or `OutOfEnergy` reducer invocations.
496#[derive(SpacetimeType, Debug)]
497#[sats(crate = spacetimedb_lib)]
498pub struct TransactionUpdate<F: WebsocketFormat> {
499    /// The status of the transaction. Contains the updated rows, if successful.
500    pub status: UpdateStatus<F>,
501    /// The time when the reducer started.
502    ///
503    /// Note that [`Timestamp`] serializes as `i64` nanoseconds since the Unix epoch.
504    pub timestamp: Timestamp,
505    /// The identity of the user who requested the reducer run. For event-driven and
506    /// scheduled reducers, it is the identity of the database owner.
507    pub caller_identity: Identity,
508
509    /// The 16-byte [`ConnectionId`] of the user who requested the reducer run.
510    ///
511    /// The all-zeros id is a sentinel which denotes no meaningful value.
512    /// This can occur in the following situations:
513    /// - `init` and `update` reducers will have a `caller_connection_id`
514    ///   if and only if one was provided to the `publish` HTTP endpoint.
515    /// - Scheduled reducers will never have a `caller_connection_id`.
516    /// - Reducers invoked by WebSocket or the HTTP API will always have a `caller_connection_id`.
517    pub caller_connection_id: ConnectionId,
518    /// The original CallReducer request that triggered this reducer.
519    pub reducer_call: ReducerCallInfo<F>,
520    /// The amount of energy credits consumed by running the reducer.
521    pub energy_quanta_used: EnergyQuanta,
522    /// How long the reducer took to run.
523    pub total_host_execution_duration: TimeDuration,
524}
525
526/// Received by client from database upon a reducer run.
527///
528/// Clients receive `TransactionUpdateLight`s only for reducers
529/// which update at least one of their subscribed rows.
530/// Failed reducers result in full [`TransactionUpdate`]s
531#[derive(SpacetimeType, Debug)]
532#[sats(crate = spacetimedb_lib)]
533pub struct TransactionUpdateLight<F: WebsocketFormat> {
534    /// An identifier for a client request
535    pub request_id: u32,
536
537    /// The reducer ran successfully and its changes were committed to the database.
538    /// The rows altered in the database/ are recorded in this `DatabaseUpdate`.
539    pub update: DatabaseUpdate<F>,
540}
541
542/// Contained in a [`TransactionUpdate`], metadata about a reducer invocation.
543#[derive(SpacetimeType, Debug)]
544#[sats(crate = spacetimedb_lib)]
545pub struct ReducerCallInfo<F: WebsocketFormat> {
546    /// The name of the reducer that was called.
547    ///
548    /// NOTE(centril, 1.0): For bandwidth resource constrained clients
549    /// this can encourage them to have poor naming of reducers like `a`.
550    /// We should consider not sending this at all and instead
551    /// having a startup message where the name <-> id bindings
552    /// are established between the host and the client.
553    pub reducer_name: Box<str>,
554    /// The numerical id of the reducer that was called.
555    pub reducer_id: u32,
556    /// The arguments to the reducer, encoded as BSATN or JSON according to the reducer's argument schema
557    /// and the client's requested protocol.
558    pub args: F::Single,
559    /// An identifier for a client request
560    pub request_id: u32,
561}
562
563/// The status of a [`TransactionUpdate`].
564#[derive(SpacetimeType, Debug)]
565#[sats(crate = spacetimedb_lib)]
566pub enum UpdateStatus<F: WebsocketFormat> {
567    /// The reducer ran successfully and its changes were committed to the database.
568    /// The rows altered in the database/ will be recorded in the `DatabaseUpdate`.
569    Committed(DatabaseUpdate<F>),
570    /// The reducer errored, and any changes it attempted to were rolled back.
571    /// This is the error message.
572    Failed(Box<str>),
573    /// The reducer was interrupted due to insufficient energy/funds,
574    /// and any changes it attempted to make were rolled back.
575    OutOfEnergy,
576}
577
578/// A collection of inserted and deleted rows, contained in a [`TransactionUpdate`] or [`SubscriptionUpdate`].
579#[derive(SpacetimeType, Debug, Clone, Default)]
580#[sats(crate = spacetimedb_lib)]
581pub struct DatabaseUpdate<F: WebsocketFormat> {
582    pub tables: Vec<TableUpdate<F>>,
583}
584
585impl<F: WebsocketFormat> DatabaseUpdate<F> {
586    pub fn is_empty(&self) -> bool {
587        self.tables.is_empty()
588    }
589
590    pub fn num_rows(&self) -> usize {
591        self.tables.iter().map(|t| t.num_rows()).sum()
592    }
593}
594
595impl<F: WebsocketFormat> FromIterator<TableUpdate<F>> for DatabaseUpdate<F> {
596    fn from_iter<T: IntoIterator<Item = TableUpdate<F>>>(iter: T) -> Self {
597        DatabaseUpdate {
598            tables: iter.into_iter().collect(),
599        }
600    }
601}
602
603/// Part of a [`DatabaseUpdate`] received by client from database for alterations to a single table.
604///
605/// NOTE(centril): in 0.12 we added `num_rows` and `table_name` to the struct.
606/// These inflate the size of messages, which for some customers is the wrong default.
607/// We might want to consider `v1.spacetimedb.bsatn.lightweight`
608#[derive(SpacetimeType, Debug, Clone)]
609#[sats(crate = spacetimedb_lib)]
610pub struct TableUpdate<F: WebsocketFormat> {
611    /// The id of the table. Clients should prefer `table_name`, as it is a stable part of a module's API,
612    /// whereas `table_id` may change between runs.
613    pub table_id: TableId,
614    /// The name of the table.
615    ///
616    /// NOTE(centril, 1.0): we might want to remove this and instead
617    /// tell clients about changes to table_name <-> table_id mappings.
618    pub table_name: Box<str>,
619    /// The sum total of rows in `self.updates`,
620    pub num_rows: u64,
621    /// The actual insert and delete updates for this table.
622    pub updates: SmallVec<[F::QueryUpdate; 1]>,
623}
624
625/// Computed update for a single query, annotated with the number of matching rows.
626#[derive(Debug)]
627pub struct SingleQueryUpdate<F: WebsocketFormat> {
628    pub update: F::QueryUpdate,
629    pub num_rows: u64,
630}
631
632impl<F: WebsocketFormat> TableUpdate<F> {
633    pub fn new(table_id: TableId, table_name: Box<str>, update: SingleQueryUpdate<F>) -> Self {
634        Self {
635            table_id,
636            table_name,
637            num_rows: update.num_rows,
638            updates: [update.update].into(),
639        }
640    }
641
642    pub fn empty(table_id: TableId, table_name: Box<str>) -> Self {
643        Self {
644            table_id,
645            table_name,
646            num_rows: 0,
647            updates: SmallVec::new(),
648        }
649    }
650
651    pub fn push(&mut self, update: SingleQueryUpdate<F>) {
652        self.updates.push(update.update);
653        self.num_rows += update.num_rows;
654    }
655
656    pub fn num_rows(&self) -> usize {
657        self.num_rows as usize
658    }
659}
660
661#[derive(SpacetimeType, Debug, Clone, EnumAsInner)]
662#[sats(crate = spacetimedb_lib)]
663pub enum CompressableQueryUpdate<F: WebsocketFormat> {
664    Uncompressed(QueryUpdate<F>),
665    Brotli(Bytes),
666    Gzip(Bytes),
667}
668
669impl CompressableQueryUpdate<BsatnFormat> {
670    pub fn maybe_decompress(self) -> QueryUpdate<BsatnFormat> {
671        match self {
672            Self::Uncompressed(qu) => qu,
673            Self::Brotli(bytes) => {
674                let bytes = brotli_decompress(&bytes).unwrap();
675                bsatn::from_slice(&bytes).unwrap()
676            }
677            Self::Gzip(bytes) => {
678                let bytes = gzip_decompress(&bytes).unwrap();
679                bsatn::from_slice(&bytes).unwrap()
680            }
681        }
682    }
683}
684
685#[derive(SpacetimeType, Debug, Clone)]
686#[sats(crate = spacetimedb_lib)]
687pub struct QueryUpdate<F: WebsocketFormat> {
688    /// When in a [`TransactionUpdate`], the matching rows of this table deleted by the transaction.
689    ///
690    /// Rows are encoded as BSATN or JSON according to the table's schema
691    /// and the client's requested protocol.
692    ///
693    /// Always empty when in an [`InitialSubscription`].
694    pub deletes: F::List,
695    /// When in a [`TransactionUpdate`], the matching rows of this table inserted by the transaction.
696    /// When in an [`InitialSubscription`], the matching rows of this table in the entire committed state.
697    ///
698    /// Rows are encoded as BSATN or JSON according to the table's schema
699    /// and the client's requested protocol.
700    pub inserts: F::List,
701}
702
703/// A response to a [`OneOffQuery`].
704/// Will contain either one error or some number of response rows.
705/// At most one of these messages will be sent in reply to any query.
706///
707/// The messageId will be identical to the one sent in the original query.
708#[derive(SpacetimeType, Debug)]
709#[sats(crate = spacetimedb_lib)]
710pub struct OneOffQueryResponse<F: WebsocketFormat> {
711    pub message_id: Box<[u8]>,
712    /// If query compilation or evaluation errored, an error message.
713    pub error: Option<Box<str>>,
714
715    /// If query compilation and evaluation succeeded, a set of resulting rows, grouped by table.
716    pub tables: Box<[OneOffTable<F>]>,
717
718    /// The total duration of query compilation and evaluation on the server, in microseconds.
719    pub total_host_execution_duration: TimeDuration,
720}
721
722/// A table included as part of a [`OneOffQueryResponse`].
723#[derive(SpacetimeType, Debug)]
724#[sats(crate = spacetimedb_lib)]
725pub struct OneOffTable<F: WebsocketFormat> {
726    /// The name of the table.
727    pub table_name: Box<str>,
728    /// The set of rows which matched the query, encoded as BSATN or JSON according to the table's schema
729    /// and the client's requested protocol.
730    ///
731    /// TODO(centril, 1.0): Evaluate whether we want to conditionally compress these.
732    pub rows: F::List,
733}
734
735/// Used whenever different formats need to coexist.
736#[derive(Debug, Clone)]
737pub enum FormatSwitch<B, J> {
738    Bsatn(B),
739    Json(J),
740}
741
742impl<B1, J1> FormatSwitch<B1, J1> {
743    /// Zips together two switches.
744    pub fn zip_mut<B2, J2>(&mut self, other: FormatSwitch<B2, J2>) -> FormatSwitch<(&mut B1, B2), (&mut J1, J2)> {
745        match (self, other) {
746            (FormatSwitch::Bsatn(a), FormatSwitch::Bsatn(b)) => FormatSwitch::Bsatn((a, b)),
747            (FormatSwitch::Json(a), FormatSwitch::Json(b)) => FormatSwitch::Json((a, b)),
748            _ => panic!("format should be the same for both sides of the zip"),
749        }
750    }
751}
752
753#[derive(Clone, Copy, Default, Debug, SpacetimeType)]
754#[sats(crate = spacetimedb_lib)]
755pub struct JsonFormat;
756
757impl WebsocketFormat for JsonFormat {
758    type Single = ByteString;
759
760    type List = Vec<ByteString>;
761
762    fn encode_list<R: ToBsatn + Serialize>(elems: impl Iterator<Item = R>) -> (Self::List, u64) {
763        let mut count = 0;
764        let list = elems
765            .map(|elem| serde_json::to_string(&SerializeWrapper::new(elem)).unwrap().into())
766            .inspect(|_| count += 1)
767            .collect();
768        (list, count)
769    }
770
771    type QueryUpdate = QueryUpdate<Self>;
772
773    fn into_query_update(qu: QueryUpdate<Self>, _: Compression) -> Self::QueryUpdate {
774        qu
775    }
776}
777
778#[derive(Clone, Copy, Default, Debug, SpacetimeType)]
779#[sats(crate = spacetimedb_lib)]
780pub struct BsatnFormat;
781
782impl WebsocketFormat for BsatnFormat {
783    type Single = Box<[u8]>;
784
785    type List = BsatnRowList;
786
787    fn encode_list<R: ToBsatn + Serialize>(mut elems: impl Iterator<Item = R>) -> (Self::List, u64) {
788        // For an empty list, the size of a row is unknown, so use `RowOffsets`.
789        let Some(first) = elems.next() else {
790            return (BsatnRowList::row_offsets(), 0);
791        };
792        // We have at least one row. Determine the static size from that, if available.
793        let (mut list, mut scratch) = match first.static_bsatn_size() {
794            Some(size) => (BsatnRowListBuilder::fixed(size), Vec::with_capacity(size as usize)),
795            None => (BsatnRowListBuilder::row_offsets(), Vec::new()),
796        };
797        // Add the first element and then the rest.
798        // We assume that the schema of rows yielded by `elems` stays the same,
799        // so once the size is fixed, it will stay that way.
800        let mut count = 0;
801        let mut push = |elem: R| {
802            elem.to_bsatn_extend(&mut scratch).unwrap();
803            list.push(&scratch);
804            scratch.clear();
805            count += 1;
806        };
807        push(first);
808        for elem in elems {
809            push(elem);
810        }
811        (list.finish(), count)
812    }
813
814    type QueryUpdate = CompressableQueryUpdate<Self>;
815
816    fn into_query_update(qu: QueryUpdate<Self>, compression: Compression) -> Self::QueryUpdate {
817        let qu_len_would_have_been = bsatn::to_len(&qu).unwrap();
818
819        match decide_compression(qu_len_would_have_been, compression) {
820            Compression::None => CompressableQueryUpdate::Uncompressed(qu),
821            Compression::Brotli => {
822                let bytes = bsatn::to_vec(&qu).unwrap();
823                let mut out = Vec::new();
824                brotli_compress(&bytes, &mut out);
825                CompressableQueryUpdate::Brotli(out.into())
826            }
827            Compression::Gzip => {
828                let bytes = bsatn::to_vec(&qu).unwrap();
829                let mut out = Vec::new();
830                gzip_compress(&bytes, &mut out);
831                CompressableQueryUpdate::Gzip(out.into())
832            }
833        }
834    }
835}
836
837/// A specification of either a desired or decided compression algorithm.
838#[derive(serde::Deserialize, Default, PartialEq, Eq, Clone, Copy, Hash, Debug)]
839pub enum Compression {
840    /// No compression ever.
841    None,
842    /// Compress using brotli if a certain size threshold was met.
843    #[default]
844    Brotli,
845    /// Compress using gzip if a certain size threshold was met.
846    Gzip,
847}
848
849pub fn decide_compression(len: usize, compression: Compression) -> Compression {
850    /// The threshold beyond which we start to compress messages.
851    /// 1KiB was chosen without measurement.
852    /// TODO(perf): measure!
853    const COMPRESS_THRESHOLD: usize = 1024;
854
855    if len > COMPRESS_THRESHOLD {
856        compression
857    } else {
858        Compression::None
859    }
860}
861
862pub fn brotli_compress(bytes: &[u8], out: &mut Vec<u8>) {
863    let reader = &mut &bytes[..];
864
865    // The default Brotli buffer size.
866    const BUFFER_SIZE: usize = 4096;
867    // We are optimizing for compression speed,
868    // so we choose the lowest (fastest) level of compression.
869    // Experiments on internal workloads have shown compression ratios between 7:1 and 10:1
870    // for large `SubscriptionUpdate` messages at this level.
871    const COMPRESSION_LEVEL: u32 = 1;
872    // The default value for an internal compression parameter.
873    // See `BrotliEncoderParams` for more details.
874    const LG_WIN: u32 = 22;
875
876    let mut encoder = brotli::CompressorReader::new(reader, BUFFER_SIZE, COMPRESSION_LEVEL, LG_WIN);
877
878    encoder
879        .read_to_end(out)
880        .expect("Failed to Brotli compress `SubscriptionUpdateMessage`");
881}
882
883pub fn brotli_decompress(bytes: &[u8]) -> Result<Vec<u8>, io::Error> {
884    let mut decompressed = Vec::new();
885    brotli::BrotliDecompress(&mut &bytes[..], &mut decompressed)?;
886    Ok(decompressed)
887}
888
889pub fn gzip_compress(bytes: &[u8], out: &mut Vec<u8>) {
890    let mut encoder = flate2::write::GzEncoder::new(out, flate2::Compression::fast());
891    encoder.write_all(bytes).unwrap();
892    encoder.finish().expect("Failed to gzip compress `bytes`");
893}
894
895pub fn gzip_decompress(bytes: &[u8]) -> Result<Vec<u8>, io::Error> {
896    let mut decompressed = Vec::new();
897    let _ = flate2::read::GzDecoder::new(bytes).read_to_end(&mut decompressed)?;
898    Ok(decompressed)
899}
900
901type RowSize = u16;
902type RowOffset = u64;
903
904/// A packed list of BSATN-encoded rows.
905#[derive(SpacetimeType, Debug, Clone)]
906#[sats(crate = spacetimedb_lib)]
907pub struct BsatnRowList<B = Bytes, I = Arc<[RowOffset]>> {
908    /// A size hint about `rows_data`
909    /// intended to facilitate parallel decode purposes on large initial updates.
910    size_hint: RowSizeHint<I>,
911    /// The flattened byte array for a list of rows.
912    rows_data: B,
913}
914
915impl Default for BsatnRowList {
916    fn default() -> Self {
917        Self::row_offsets()
918    }
919}
920
921/// NOTE(centril, 1.0): We might want to add a `None` variant to this
922/// where the client has to decode in a loop until `rows_data` has been exhausted.
923/// The use-case for this is clients who are bandwidth limited and where every byte counts.
924#[derive(SpacetimeType, Debug, Clone)]
925#[sats(crate = spacetimedb_lib)]
926pub enum RowSizeHint<I> {
927    /// Each row in `rows_data` is of the same fixed size as specified here.
928    FixedSize(RowSize),
929    /// The offsets into `rows_data` defining the boundaries of each row.
930    /// Only stores the offset to the start of each row.
931    /// The ends of each row is inferred from the start of the next row, or `rows_data.len()`.
932    /// The behavior of this is identical to that of `PackedStr`.
933    RowOffsets(I),
934}
935
936impl<I: AsRef<[RowOffset]>> RowSizeHint<I> {
937    fn index_to_range(&self, index: usize, data_end: usize) -> Option<Range<usize>> {
938        match self {
939            Self::FixedSize(size) => {
940                let size = *size as usize;
941                let start = index * size;
942                if start >= data_end {
943                    // We've reached beyond `data_end`,
944                    // so this is a row that doesn't exist, so we are beyond the count.
945                    return None;
946                }
947                let end = (index + 1) * size;
948                Some(start..end)
949            }
950            Self::RowOffsets(offsets) => {
951                let offsets = offsets.as_ref();
952                let start = *offsets.get(index)? as usize;
953                // The end is either the start of the next element or the end.
954                let end = offsets.get(index + 1).map(|e| *e as usize).unwrap_or(data_end);
955                Some(start..end)
956            }
957        }
958    }
959}
960
961impl<B: Default, I> BsatnRowList<B, I> {
962    pub fn fixed(row_size: RowSize) -> Self {
963        Self {
964            size_hint: RowSizeHint::FixedSize(row_size),
965            rows_data: <_>::default(),
966        }
967    }
968
969    /// Returns a new empty list using indices
970    pub fn row_offsets() -> Self
971    where
972        I: From<[RowOffset; 0]>,
973    {
974        Self {
975            size_hint: RowSizeHint::RowOffsets([].into()),
976            rows_data: <_>::default(),
977        }
978    }
979}
980
981impl<B: AsRef<[u8]>, I: AsRef<[RowOffset]>> RowListLen for BsatnRowList<B, I> {
982    /// Returns the length of the row list.
983    fn len(&self) -> usize {
984        match &self.size_hint {
985            RowSizeHint::FixedSize(size) => self.rows_data.as_ref().len() / *size as usize,
986            RowSizeHint::RowOffsets(offsets) => offsets.as_ref().len(),
987        }
988    }
989}
990
991impl<B: AsRef<[u8]>, I> ByteListLen for BsatnRowList<B, I> {
992    /// Returns the uncompressed size of the list in bytes
993    fn num_bytes(&self) -> usize {
994        self.rows_data.as_ref().len()
995    }
996}
997
998impl BsatnRowList {
999    /// Returns the element at `index` in the list.
1000    pub fn get(&self, index: usize) -> Option<Bytes> {
1001        let data_end = self.rows_data.len();
1002        let data_range = self.size_hint.index_to_range(index, data_end)?;
1003        Some(self.rows_data.slice(data_range))
1004    }
1005}
1006
1007/// An iterator over all the elements in a [`BsatnRowList`].
1008pub struct BsatnRowListIter<'a> {
1009    list: &'a BsatnRowList,
1010    index: usize,
1011}
1012
1013impl<'a> IntoIterator for &'a BsatnRowList {
1014    type IntoIter = BsatnRowListIter<'a>;
1015    type Item = Bytes;
1016    fn into_iter(self) -> Self::IntoIter {
1017        BsatnRowListIter { list: self, index: 0 }
1018    }
1019}
1020
1021impl Iterator for BsatnRowListIter<'_> {
1022    type Item = Bytes;
1023    fn next(&mut self) -> Option<Self::Item> {
1024        let index = self.index;
1025        self.index += 1;
1026        self.list.get(index)
1027    }
1028}
1029
1030/// A [`BsatnRowList`] that can be added to.
1031pub type BsatnRowListBuilder = BsatnRowList<Vec<u8>, Vec<RowOffset>>;
1032
1033impl BsatnRowListBuilder {
1034    /// Adds `row`, BSATN-encoded to this list.
1035    #[inline]
1036    pub fn push(&mut self, row: &[u8]) {
1037        if let RowSizeHint::RowOffsets(offsets) = &mut self.size_hint {
1038            offsets.push(self.rows_data.len() as u64);
1039        }
1040        self.rows_data.extend_from_slice(row);
1041    }
1042
1043    /// Finish the in flight list, throwing away the capability to mutate.
1044    pub fn finish(self) -> BsatnRowList {
1045        let Self { size_hint, rows_data } = self;
1046        let rows_data = rows_data.into();
1047        let size_hint = match size_hint {
1048            RowSizeHint::FixedSize(fs) => RowSizeHint::FixedSize(fs),
1049            RowSizeHint::RowOffsets(ro) => RowSizeHint::RowOffsets(ro.into()),
1050        };
1051        BsatnRowList { size_hint, rows_data }
1052    }
1053}