spacetimedb_client_api_messages/websocket.rs
1//! Messages sent over the SpacetimeDB WebSocket protocol.
2//!
3//! Client -> Server messages are encoded as [`ClientMessage`].
4//! Server -> Client messages are encoded as [`ServerMessage`].
5//!
6//! Any changes to this file must be paired with a change to the WebSocket protocol identifiers
7//! defined in `crates/client-api/src/routes/subscribe.rs`,
8//! and be paired with changes to all of:
9//!
10//! - The C# SDK.
11//! - The TypeScript SDK.
12//! - The SpacetimeDB website.
13//!
14//! Changes to the Rust SDK are not necessarily required, as it depends on this crate
15//! rather than using an external mirror of this schema.
16
17use crate::energy::EnergyQuanta;
18use bytes::Bytes;
19use bytestring::ByteString;
20use core::{
21 fmt::Debug,
22 ops::{Deref, Range},
23};
24use enum_as_inner::EnumAsInner;
25use smallvec::SmallVec;
26use spacetimedb_lib::{ConnectionId, Identity, TimeDuration, Timestamp};
27use spacetimedb_primitives::TableId;
28use spacetimedb_sats::{
29 bsatn::{self, ToBsatn},
30 de::{Deserialize, Error},
31 impl_deserialize, impl_serialize, impl_st,
32 ser::{serde::SerializeWrapper, Serialize},
33 AlgebraicType, SpacetimeType,
34};
35use std::{
36 io::{self, Read as _, Write as _},
37 sync::Arc,
38};
39
40pub const TEXT_PROTOCOL: &str = "v1.json.spacetimedb";
41pub const BIN_PROTOCOL: &str = "v1.bsatn.spacetimedb";
42
43pub trait RowListLen {
44 /// Returns the length of the list.
45 fn len(&self) -> usize;
46 /// Returns whether the list is empty or not.
47 fn is_empty(&self) -> bool {
48 self.len() == 0
49 }
50}
51
52impl<T, L: Deref<Target = [T]>> RowListLen for L {
53 fn len(&self) -> usize {
54 self.deref().len()
55 }
56 fn is_empty(&self) -> bool {
57 self.deref().is_empty()
58 }
59}
60
61pub trait ByteListLen {
62 /// Returns the uncompressed size of the list in bytes
63 fn num_bytes(&self) -> usize;
64}
65
66impl ByteListLen for Vec<ByteString> {
67 fn num_bytes(&self) -> usize {
68 self.iter().map(|str| str.len()).sum()
69 }
70}
71
72/// A format / codec used by the websocket API.
73///
74/// This can be e.g., BSATN, JSON.
75pub trait WebsocketFormat: Sized {
76 /// The type used for the encoding of a single item.
77 type Single: SpacetimeType + for<'de> Deserialize<'de> + Serialize + Debug + Clone;
78
79 /// The type used for the encoding of a list of items.
80 type List: SpacetimeType
81 + for<'de> Deserialize<'de>
82 + Serialize
83 + RowListLen
84 + ByteListLen
85 + Debug
86 + Clone
87 + Default;
88
89 /// Encodes the `elems` to a list in the format and also returns the length of the list.
90 fn encode_list<R: ToBsatn + Serialize>(elems: impl Iterator<Item = R>) -> (Self::List, u64);
91
92 /// The type used to encode query updates.
93 /// This type exists so that some formats, e.g., BSATN, can compress an update.
94 type QueryUpdate: SpacetimeType + for<'de> Deserialize<'de> + Serialize + Debug + Clone + Send;
95
96 /// Convert a `QueryUpdate` into `Self::QueryUpdate`.
97 /// This allows some formats to e.g., compress the update.
98 fn into_query_update(qu: QueryUpdate<Self>, compression: Compression) -> Self::QueryUpdate;
99}
100
101/// Messages sent from the client to the server.
102///
103/// Parametric over the reducer argument type to enable [`ClientMessage::map_args`].
104#[derive(SpacetimeType)]
105#[sats(crate = spacetimedb_lib)]
106pub enum ClientMessage<Args> {
107 /// Request a reducer run.
108 CallReducer(CallReducer<Args>),
109 /// Register SQL queries on which to receive updates.
110 Subscribe(Subscribe),
111 /// Send a one-off SQL query without establishing a subscription.
112 OneOffQuery(OneOffQuery),
113 /// Register a SQL query to to subscribe to updates. This does not affect other subscriptions.
114 SubscribeSingle(SubscribeSingle),
115 SubscribeMulti(SubscribeMulti),
116 /// Remove a subscription to a SQL query that was added with SubscribeSingle.
117 Unsubscribe(Unsubscribe),
118 UnsubscribeMulti(UnsubscribeMulti),
119}
120
121impl<Args> ClientMessage<Args> {
122 pub fn map_args<Args2>(self, f: impl FnOnce(Args) -> Args2) -> ClientMessage<Args2> {
123 match self {
124 ClientMessage::CallReducer(CallReducer {
125 reducer,
126 args,
127 request_id,
128 flags,
129 }) => ClientMessage::CallReducer(CallReducer {
130 reducer,
131 args: f(args),
132 request_id,
133 flags,
134 }),
135 ClientMessage::OneOffQuery(x) => ClientMessage::OneOffQuery(x),
136 ClientMessage::SubscribeSingle(x) => ClientMessage::SubscribeSingle(x),
137 ClientMessage::Unsubscribe(x) => ClientMessage::Unsubscribe(x),
138 ClientMessage::Subscribe(x) => ClientMessage::Subscribe(x),
139 ClientMessage::SubscribeMulti(x) => ClientMessage::SubscribeMulti(x),
140 ClientMessage::UnsubscribeMulti(x) => ClientMessage::UnsubscribeMulti(x),
141 }
142 }
143}
144
145/// Request a reducer run.
146///
147/// Parametric over the argument type to enable [`ClientMessage::map_args`].
148#[derive(SpacetimeType)]
149#[sats(crate = spacetimedb_lib)]
150pub struct CallReducer<Args> {
151 /// The name of the reducer to call.
152 pub reducer: Box<str>,
153 /// The arguments to the reducer.
154 ///
155 /// In the wire format, this will be a [`Bytes`], BSATN or JSON encoded according to the reducer's argument schema
156 /// and the enclosing message format.
157 pub args: Args,
158 /// An identifier for a client request.
159 ///
160 /// The server will include the same ID in the response [`TransactionUpdate`].
161 pub request_id: u32,
162 /// Assorted flags that can be passed when calling a reducer.
163 ///
164 /// Currently accepts 0 or 1 where the latter means
165 /// that the caller does not want to be notified about the reducer
166 /// without being subscribed to any relevant queries.
167 pub flags: CallReducerFlags,
168}
169
170#[derive(Clone, Copy, Default, PartialEq, Eq)]
171pub enum CallReducerFlags {
172 /// The reducer's caller does want to be notified about the reducer completing successfully
173 /// regardless of whether the caller had subscribed to a relevant query.
174 ///
175 /// Note that updates to a reducer's caller are always sent as full updates
176 /// whether subscribed to a relevant query or not.
177 /// That is, the light tx mode setting does not apply to the reducer's caller.
178 ///
179 /// This is the default flag.
180 #[default]
181 FullUpdate,
182 /// The reducer's caller does not want to be notified about the reducer completing successfully
183 /// without having subscribed to any of the relevant queries.
184 NoSuccessNotify,
185}
186
187impl_st!([] CallReducerFlags, AlgebraicType::U8);
188impl_serialize!([] CallReducerFlags, (self, ser) => ser.serialize_u8(*self as u8));
189impl_deserialize!([] CallReducerFlags, de => match de.deserialize_u8()? {
190 0 => Ok(Self::FullUpdate),
191 1 => Ok(Self::NoSuccessNotify),
192 x => Err(D::Error::custom(format_args!("invalid call reducer flag {x}"))),
193});
194
195/// An opaque id generated by the client to refer to a subscription.
196/// This is used in Unsubscribe messages and errors.
197#[derive(SpacetimeType, Copy, Clone, Debug, PartialEq, Eq, Hash)]
198#[sats(crate = spacetimedb_lib)]
199pub struct QueryId {
200 pub id: u32,
201}
202
203impl QueryId {
204 pub fn new(id: u32) -> Self {
205 Self { id }
206 }
207}
208
209/// Sent by client to database to register a set of queries, about which the client will
210/// receive `TransactionUpdate`s.
211///
212/// After issuing a `Subscribe` message, the client will receive a single
213/// `SubscriptionUpdate` message containing every current row of every table which matches
214/// the subscribed queries. Then, after each reducer run which updates one or more
215/// subscribed rows, the client will receive a `TransactionUpdate` containing the updates.
216///
217/// A `Subscribe` message sets or replaces the entire set of queries to which the client
218/// is subscribed. If the client is previously subscribed to some set of queries `A`, and
219/// then sends a `Subscribe` message to subscribe to a set `B`, afterwards, the client
220/// will be subscribed to `B` but not `A`. In this case, the client will receive a
221/// `SubscriptionUpdate` containing every existing row that matches `B`, even if some were
222/// already in `A`.
223#[derive(SpacetimeType)]
224#[sats(crate = spacetimedb_lib)]
225pub struct Subscribe {
226 /// A sequence of SQL queries.
227 pub query_strings: Box<[Box<str>]>,
228 pub request_id: u32,
229}
230
231/// Sent by client to register a subscription to single query, for which the client should receive
232/// receive relevant `TransactionUpdate`s.
233///
234/// After issuing a `SubscribeSingle` message, the client will receive a single
235/// `SubscribeApplied` message containing every current row which matches the query. Then, any
236/// time a reducer updates the query's results, the client will receive a `TransactionUpdate`
237/// containing the relevant updates.
238///
239/// If a client subscribes to queries with overlapping results, the client will receive
240/// multiple copies of rows that appear in multiple queries.
241#[derive(SpacetimeType)]
242#[sats(crate = spacetimedb_lib)]
243pub struct SubscribeSingle {
244 /// A single SQL `SELECT` query to subscribe to.
245 pub query: Box<str>,
246 /// An identifier for a client request.
247 pub request_id: u32,
248
249 /// An identifier for this subscription, which should not be used for any other subscriptions on the same connection.
250 /// This is used to refer to this subscription in Unsubscribe messages from the client and errors sent from the server.
251 /// These only have meaning given a ConnectionId.
252 pub query_id: QueryId,
253}
254
255#[derive(SpacetimeType)]
256#[sats(crate = spacetimedb_lib)]
257pub struct SubscribeMulti {
258 /// A single SQL `SELECT` query to subscribe to.
259 pub query_strings: Box<[Box<str>]>,
260 /// An identifier for a client request.
261 pub request_id: u32,
262
263 /// An identifier for this subscription, which should not be used for any other subscriptions on the same connection.
264 /// This is used to refer to this subscription in Unsubscribe messages from the client and errors sent from the server.
265 /// These only have meaning given a ConnectionId.
266 pub query_id: QueryId,
267}
268
269/// Client request for removing a query from a subscription.
270#[derive(SpacetimeType)]
271#[sats(crate = spacetimedb_lib)]
272pub struct Unsubscribe {
273 /// An identifier for a client request.
274 pub request_id: u32,
275
276 /// The ID used in the corresponding `SubscribeSingle` message.
277 pub query_id: QueryId,
278}
279
280/// Client request for removing a query from a subscription.
281#[derive(SpacetimeType)]
282#[sats(crate = spacetimedb_lib)]
283pub struct UnsubscribeMulti {
284 /// An identifier for a client request.
285 pub request_id: u32,
286
287 /// The ID used in the corresponding `SubscribeSingle` message.
288 pub query_id: QueryId,
289}
290
291/// A one-off query submission.
292///
293/// Query should be a "SELECT * FROM Table WHERE ...". Other types of queries will be rejected.
294/// Multiple such semicolon-delimited queries are allowed.
295///
296/// One-off queries are identified by a client-generated messageID.
297/// To avoid data leaks, the server will NOT cache responses to messages based on UUID!
298/// It also will not check for duplicate IDs. They are just a way to match responses to messages.
299#[derive(SpacetimeType)]
300#[sats(crate = spacetimedb_lib)]
301pub struct OneOffQuery {
302 pub message_id: Box<[u8]>,
303 pub query_string: Box<str>,
304}
305
306/// The tag recognized by the host and SDKs to mean no compression of a [`ServerMessage`].
307pub const SERVER_MSG_COMPRESSION_TAG_NONE: u8 = 0;
308
309/// The tag recognized by the host and SDKs to mean brotli compression of a [`ServerMessage`].
310pub const SERVER_MSG_COMPRESSION_TAG_BROTLI: u8 = 1;
311
312/// The tag recognized by the host and SDKs to mean brotli compression of a [`ServerMessage`].
313pub const SERVER_MSG_COMPRESSION_TAG_GZIP: u8 = 2;
314
315/// Messages sent from the server to the client.
316#[derive(SpacetimeType, derive_more::From)]
317#[sats(crate = spacetimedb_lib)]
318pub enum ServerMessage<F: WebsocketFormat> {
319 /// Informs of changes to subscribed rows.
320 /// This will be removed when we switch to `SubscribeSingle`.
321 InitialSubscription(InitialSubscription<F>),
322 /// Upon reducer run.
323 TransactionUpdate(TransactionUpdate<F>),
324 /// Upon reducer run, but limited to just the table updates.
325 TransactionUpdateLight(TransactionUpdateLight<F>),
326 /// After connecting, to inform client of its identity.
327 IdentityToken(IdentityToken),
328 /// Return results to a one off SQL query.
329 OneOffQueryResponse(OneOffQueryResponse<F>),
330 /// Sent in response to a `SubscribeSingle` message. This contains the initial matching rows.
331 SubscribeApplied(SubscribeApplied<F>),
332 /// Sent in response to an `Unsubscribe` message. This contains the matching rows.
333 UnsubscribeApplied(UnsubscribeApplied<F>),
334 /// Communicate an error in the subscription lifecycle.
335 SubscriptionError(SubscriptionError),
336 /// Sent in response to a `SubscribeMulti` message. This contains the initial matching rows.
337 SubscribeMultiApplied(SubscribeMultiApplied<F>),
338 /// Sent in response to an `UnsubscribeMulti` message. This contains the matching rows.
339 UnsubscribeMultiApplied(UnsubscribeMultiApplied<F>),
340}
341
342/// The matching rows of a subscription query.
343#[derive(SpacetimeType)]
344#[sats(crate = spacetimedb_lib)]
345pub struct SubscribeRows<F: WebsocketFormat> {
346 /// The table ID of the query.
347 pub table_id: TableId,
348 /// The table name of the query.
349 pub table_name: Box<str>,
350 /// The BSATN row values.
351 pub table_rows: TableUpdate<F>,
352}
353
354/// Response to [`Subscribe`] containing the initial matching rows.
355#[derive(SpacetimeType)]
356#[sats(crate = spacetimedb_lib)]
357pub struct SubscribeApplied<F: WebsocketFormat> {
358 /// The request_id of the corresponding `SubscribeSingle` message.
359 pub request_id: u32,
360 /// The overall time between the server receiving a request and sending the response.
361 pub total_host_execution_duration_micros: u64,
362 /// An identifier for the subscribed query sent by the client.
363 pub query_id: QueryId,
364 /// The matching rows for this query.
365 pub rows: SubscribeRows<F>,
366}
367
368/// Server response to a client [`Unsubscribe`] request.
369#[derive(SpacetimeType)]
370#[sats(crate = spacetimedb_lib)]
371pub struct UnsubscribeApplied<F: WebsocketFormat> {
372 /// Provided by the client via the `Subscribe` message.
373 /// TODO: switch to subscription id?
374 pub request_id: u32,
375 /// The overall time between the server receiving a request and sending the response.
376 pub total_host_execution_duration_micros: u64,
377 /// The ID included in the `SubscribeApplied` and `Unsubscribe` messages.
378 pub query_id: QueryId,
379 /// The matching rows for this query.
380 /// Note, this makes unsubscribing potentially very expensive.
381 /// To remove this in the future, we would need to send query_ids with rows in transaction updates,
382 /// and we would need clients to track which rows exist in which queries.
383 pub rows: SubscribeRows<F>,
384}
385
386/// Server response to an error at any point of the subscription lifecycle.
387/// If this error doesn't have a request_id, the client should drop all subscriptions.
388#[derive(SpacetimeType)]
389#[sats(crate = spacetimedb_lib)]
390pub struct SubscriptionError {
391 /// The overall time between the server receiving a request and sending the response.
392 pub total_host_execution_duration_micros: u64,
393 /// Provided by the client via a [`Subscribe`] or [`Unsubscribe`] message.
394 /// [`None`] if this occurred as the result of a [`TransactionUpdate`].
395 pub request_id: Option<u32>,
396 /// Provided by the client via a [`Subscribe`] or [`Unsubscribe`] message.
397 /// [`None`] if this occurred as the result of a [`TransactionUpdate`].
398 pub query_id: Option<u32>,
399 /// The return table of the query in question.
400 /// The server is not required to set this field.
401 /// It has been added to avoid a breaking change post 1.0.
402 ///
403 /// If unset, an error results in the entire subscription being dropped.
404 /// Otherwise only queries of this table type must be dropped.
405 pub table_id: Option<TableId>,
406 /// An error message describing the failure.
407 ///
408 /// This should reference specific fragments of the query where applicable,
409 /// but should not include the full text of the query,
410 /// as the client can retrieve that from the `request_id`.
411 ///
412 /// This is intended for diagnostic purposes.
413 /// It need not have a predictable/parseable format.
414 pub error: Box<str>,
415}
416
417/// Response to [`Subscribe`] containing the initial matching rows.
418#[derive(SpacetimeType)]
419#[sats(crate = spacetimedb_lib)]
420pub struct SubscribeMultiApplied<F: WebsocketFormat> {
421 /// The request_id of the corresponding `SubscribeSingle` message.
422 pub request_id: u32,
423 /// The overall time between the server receiving a request and sending the response.
424 pub total_host_execution_duration_micros: u64,
425 /// An identifier for the subscribed query sent by the client.
426 pub query_id: QueryId,
427 /// The matching rows for this query.
428 pub update: DatabaseUpdate<F>,
429}
430
431/// Server response to a client [`Unsubscribe`] request.
432#[derive(SpacetimeType)]
433#[sats(crate = spacetimedb_lib)]
434pub struct UnsubscribeMultiApplied<F: WebsocketFormat> {
435 /// Provided by the client via the `Subscribe` message.
436 /// TODO: switch to subscription id?
437 pub request_id: u32,
438 /// The overall time between the server receiving a request and sending the response.
439 pub total_host_execution_duration_micros: u64,
440 /// The ID included in the `SubscribeApplied` and `Unsubscribe` messages.
441 pub query_id: QueryId,
442 /// The matching rows for this query set.
443 /// Note, this makes unsubscribing potentially very expensive.
444 /// To remove this in the future, we would need to send query_ids with rows in transaction updates,
445 /// and we would need clients to track which rows exist in which queries.
446 pub update: DatabaseUpdate<F>,
447}
448
449/// Response to [`Subscribe`] containing the initial matching rows.
450#[derive(SpacetimeType)]
451#[sats(crate = spacetimedb_lib)]
452pub struct SubscriptionUpdate<F: WebsocketFormat> {
453 /// A [`DatabaseUpdate`] containing only inserts, the rows which match the subscription queries.
454 pub database_update: DatabaseUpdate<F>,
455 /// An identifier sent by the client in requests.
456 /// The server will include the same request_id in the response.
457 pub request_id: u32,
458 /// The overall time between the server receiving a request and sending the response.
459 pub total_host_execution_duration_micros: u64,
460}
461
462/// Response to [`Subscribe`] containing the initial matching rows.
463#[derive(SpacetimeType)]
464#[sats(crate = spacetimedb_lib)]
465pub struct InitialSubscription<F: WebsocketFormat> {
466 /// A [`DatabaseUpdate`] containing only inserts, the rows which match the subscription queries.
467 pub database_update: DatabaseUpdate<F>,
468 /// An identifier sent by the client in requests.
469 /// The server will include the same request_id in the response.
470 pub request_id: u32,
471 /// The overall time between the server receiving a request and sending the response.
472 pub total_host_execution_duration: TimeDuration,
473}
474
475/// Received by database from client to inform of user's identity, token and client connection id.
476///
477/// The database will always send an `IdentityToken` message
478/// as the first message for a new WebSocket connection.
479/// If the client is re-connecting with existing credentials,
480/// the message will include those credentials.
481/// If the client connected anonymously,
482/// the database will generate new credentials to identify it.
483#[derive(SpacetimeType, Debug)]
484#[sats(crate = spacetimedb_lib)]
485pub struct IdentityToken {
486 pub identity: Identity,
487 pub token: Box<str>,
488 pub connection_id: ConnectionId,
489}
490
491/// Received by client from database upon a reducer run.
492///
493/// Clients receive `TransactionUpdate`s only for reducers
494/// which update at least one of their subscribed rows,
495/// or for their own `Failed` or `OutOfEnergy` reducer invocations.
496#[derive(SpacetimeType, Debug)]
497#[sats(crate = spacetimedb_lib)]
498pub struct TransactionUpdate<F: WebsocketFormat> {
499 /// The status of the transaction. Contains the updated rows, if successful.
500 pub status: UpdateStatus<F>,
501 /// The time when the reducer started.
502 ///
503 /// Note that [`Timestamp`] serializes as `i64` nanoseconds since the Unix epoch.
504 pub timestamp: Timestamp,
505 /// The identity of the user who requested the reducer run. For event-driven and
506 /// scheduled reducers, it is the identity of the database owner.
507 pub caller_identity: Identity,
508
509 /// The 16-byte [`ConnectionId`] of the user who requested the reducer run.
510 ///
511 /// The all-zeros id is a sentinel which denotes no meaningful value.
512 /// This can occur in the following situations:
513 /// - `init` and `update` reducers will have a `caller_connection_id`
514 /// if and only if one was provided to the `publish` HTTP endpoint.
515 /// - Scheduled reducers will never have a `caller_connection_id`.
516 /// - Reducers invoked by WebSocket or the HTTP API will always have a `caller_connection_id`.
517 pub caller_connection_id: ConnectionId,
518 /// The original CallReducer request that triggered this reducer.
519 pub reducer_call: ReducerCallInfo<F>,
520 /// The amount of energy credits consumed by running the reducer.
521 pub energy_quanta_used: EnergyQuanta,
522 /// How long the reducer took to run.
523 pub total_host_execution_duration: TimeDuration,
524}
525
526/// Received by client from database upon a reducer run.
527///
528/// Clients receive `TransactionUpdateLight`s only for reducers
529/// which update at least one of their subscribed rows.
530/// Failed reducers result in full [`TransactionUpdate`]s
531#[derive(SpacetimeType, Debug)]
532#[sats(crate = spacetimedb_lib)]
533pub struct TransactionUpdateLight<F: WebsocketFormat> {
534 /// An identifier for a client request
535 pub request_id: u32,
536
537 /// The reducer ran successfully and its changes were committed to the database.
538 /// The rows altered in the database/ are recorded in this `DatabaseUpdate`.
539 pub update: DatabaseUpdate<F>,
540}
541
542/// Contained in a [`TransactionUpdate`], metadata about a reducer invocation.
543#[derive(SpacetimeType, Debug)]
544#[sats(crate = spacetimedb_lib)]
545pub struct ReducerCallInfo<F: WebsocketFormat> {
546 /// The name of the reducer that was called.
547 ///
548 /// NOTE(centril, 1.0): For bandwidth resource constrained clients
549 /// this can encourage them to have poor naming of reducers like `a`.
550 /// We should consider not sending this at all and instead
551 /// having a startup message where the name <-> id bindings
552 /// are established between the host and the client.
553 pub reducer_name: Box<str>,
554 /// The numerical id of the reducer that was called.
555 pub reducer_id: u32,
556 /// The arguments to the reducer, encoded as BSATN or JSON according to the reducer's argument schema
557 /// and the client's requested protocol.
558 pub args: F::Single,
559 /// An identifier for a client request
560 pub request_id: u32,
561}
562
563/// The status of a [`TransactionUpdate`].
564#[derive(SpacetimeType, Debug)]
565#[sats(crate = spacetimedb_lib)]
566pub enum UpdateStatus<F: WebsocketFormat> {
567 /// The reducer ran successfully and its changes were committed to the database.
568 /// The rows altered in the database/ will be recorded in the `DatabaseUpdate`.
569 Committed(DatabaseUpdate<F>),
570 /// The reducer errored, and any changes it attempted to were rolled back.
571 /// This is the error message.
572 Failed(Box<str>),
573 /// The reducer was interrupted due to insufficient energy/funds,
574 /// and any changes it attempted to make were rolled back.
575 OutOfEnergy,
576}
577
578/// A collection of inserted and deleted rows, contained in a [`TransactionUpdate`] or [`SubscriptionUpdate`].
579#[derive(SpacetimeType, Debug, Clone, Default)]
580#[sats(crate = spacetimedb_lib)]
581pub struct DatabaseUpdate<F: WebsocketFormat> {
582 pub tables: Vec<TableUpdate<F>>,
583}
584
585impl<F: WebsocketFormat> DatabaseUpdate<F> {
586 pub fn is_empty(&self) -> bool {
587 self.tables.is_empty()
588 }
589
590 pub fn num_rows(&self) -> usize {
591 self.tables.iter().map(|t| t.num_rows()).sum()
592 }
593}
594
595impl<F: WebsocketFormat> FromIterator<TableUpdate<F>> for DatabaseUpdate<F> {
596 fn from_iter<T: IntoIterator<Item = TableUpdate<F>>>(iter: T) -> Self {
597 DatabaseUpdate {
598 tables: iter.into_iter().collect(),
599 }
600 }
601}
602
603/// Part of a [`DatabaseUpdate`] received by client from database for alterations to a single table.
604///
605/// NOTE(centril): in 0.12 we added `num_rows` and `table_name` to the struct.
606/// These inflate the size of messages, which for some customers is the wrong default.
607/// We might want to consider `v1.spacetimedb.bsatn.lightweight`
608#[derive(SpacetimeType, Debug, Clone)]
609#[sats(crate = spacetimedb_lib)]
610pub struct TableUpdate<F: WebsocketFormat> {
611 /// The id of the table. Clients should prefer `table_name`, as it is a stable part of a module's API,
612 /// whereas `table_id` may change between runs.
613 pub table_id: TableId,
614 /// The name of the table.
615 ///
616 /// NOTE(centril, 1.0): we might want to remove this and instead
617 /// tell clients about changes to table_name <-> table_id mappings.
618 pub table_name: Box<str>,
619 /// The sum total of rows in `self.updates`,
620 pub num_rows: u64,
621 /// The actual insert and delete updates for this table.
622 pub updates: SmallVec<[F::QueryUpdate; 1]>,
623}
624
625/// Computed update for a single query, annotated with the number of matching rows.
626#[derive(Debug)]
627pub struct SingleQueryUpdate<F: WebsocketFormat> {
628 pub update: F::QueryUpdate,
629 pub num_rows: u64,
630}
631
632impl<F: WebsocketFormat> TableUpdate<F> {
633 pub fn new(table_id: TableId, table_name: Box<str>, update: SingleQueryUpdate<F>) -> Self {
634 Self {
635 table_id,
636 table_name,
637 num_rows: update.num_rows,
638 updates: [update.update].into(),
639 }
640 }
641
642 pub fn empty(table_id: TableId, table_name: Box<str>) -> Self {
643 Self {
644 table_id,
645 table_name,
646 num_rows: 0,
647 updates: SmallVec::new(),
648 }
649 }
650
651 pub fn push(&mut self, update: SingleQueryUpdate<F>) {
652 self.updates.push(update.update);
653 self.num_rows += update.num_rows;
654 }
655
656 pub fn num_rows(&self) -> usize {
657 self.num_rows as usize
658 }
659}
660
661#[derive(SpacetimeType, Debug, Clone, EnumAsInner)]
662#[sats(crate = spacetimedb_lib)]
663pub enum CompressableQueryUpdate<F: WebsocketFormat> {
664 Uncompressed(QueryUpdate<F>),
665 Brotli(Bytes),
666 Gzip(Bytes),
667}
668
669impl CompressableQueryUpdate<BsatnFormat> {
670 pub fn maybe_decompress(self) -> QueryUpdate<BsatnFormat> {
671 match self {
672 Self::Uncompressed(qu) => qu,
673 Self::Brotli(bytes) => {
674 let bytes = brotli_decompress(&bytes).unwrap();
675 bsatn::from_slice(&bytes).unwrap()
676 }
677 Self::Gzip(bytes) => {
678 let bytes = gzip_decompress(&bytes).unwrap();
679 bsatn::from_slice(&bytes).unwrap()
680 }
681 }
682 }
683}
684
685#[derive(SpacetimeType, Debug, Clone)]
686#[sats(crate = spacetimedb_lib)]
687pub struct QueryUpdate<F: WebsocketFormat> {
688 /// When in a [`TransactionUpdate`], the matching rows of this table deleted by the transaction.
689 ///
690 /// Rows are encoded as BSATN or JSON according to the table's schema
691 /// and the client's requested protocol.
692 ///
693 /// Always empty when in an [`InitialSubscription`].
694 pub deletes: F::List,
695 /// When in a [`TransactionUpdate`], the matching rows of this table inserted by the transaction.
696 /// When in an [`InitialSubscription`], the matching rows of this table in the entire committed state.
697 ///
698 /// Rows are encoded as BSATN or JSON according to the table's schema
699 /// and the client's requested protocol.
700 pub inserts: F::List,
701}
702
703/// A response to a [`OneOffQuery`].
704/// Will contain either one error or some number of response rows.
705/// At most one of these messages will be sent in reply to any query.
706///
707/// The messageId will be identical to the one sent in the original query.
708#[derive(SpacetimeType, Debug)]
709#[sats(crate = spacetimedb_lib)]
710pub struct OneOffQueryResponse<F: WebsocketFormat> {
711 pub message_id: Box<[u8]>,
712 /// If query compilation or evaluation errored, an error message.
713 pub error: Option<Box<str>>,
714
715 /// If query compilation and evaluation succeeded, a set of resulting rows, grouped by table.
716 pub tables: Box<[OneOffTable<F>]>,
717
718 /// The total duration of query compilation and evaluation on the server, in microseconds.
719 pub total_host_execution_duration: TimeDuration,
720}
721
722/// A table included as part of a [`OneOffQueryResponse`].
723#[derive(SpacetimeType, Debug)]
724#[sats(crate = spacetimedb_lib)]
725pub struct OneOffTable<F: WebsocketFormat> {
726 /// The name of the table.
727 pub table_name: Box<str>,
728 /// The set of rows which matched the query, encoded as BSATN or JSON according to the table's schema
729 /// and the client's requested protocol.
730 ///
731 /// TODO(centril, 1.0): Evaluate whether we want to conditionally compress these.
732 pub rows: F::List,
733}
734
735/// Used whenever different formats need to coexist.
736#[derive(Debug, Clone)]
737pub enum FormatSwitch<B, J> {
738 Bsatn(B),
739 Json(J),
740}
741
742impl<B1, J1> FormatSwitch<B1, J1> {
743 /// Zips together two switches.
744 pub fn zip_mut<B2, J2>(&mut self, other: FormatSwitch<B2, J2>) -> FormatSwitch<(&mut B1, B2), (&mut J1, J2)> {
745 match (self, other) {
746 (FormatSwitch::Bsatn(a), FormatSwitch::Bsatn(b)) => FormatSwitch::Bsatn((a, b)),
747 (FormatSwitch::Json(a), FormatSwitch::Json(b)) => FormatSwitch::Json((a, b)),
748 _ => panic!("format should be the same for both sides of the zip"),
749 }
750 }
751}
752
753#[derive(Clone, Copy, Default, Debug, SpacetimeType)]
754#[sats(crate = spacetimedb_lib)]
755pub struct JsonFormat;
756
757impl WebsocketFormat for JsonFormat {
758 type Single = ByteString;
759
760 type List = Vec<ByteString>;
761
762 fn encode_list<R: ToBsatn + Serialize>(elems: impl Iterator<Item = R>) -> (Self::List, u64) {
763 let mut count = 0;
764 let list = elems
765 .map(|elem| serde_json::to_string(&SerializeWrapper::new(elem)).unwrap().into())
766 .inspect(|_| count += 1)
767 .collect();
768 (list, count)
769 }
770
771 type QueryUpdate = QueryUpdate<Self>;
772
773 fn into_query_update(qu: QueryUpdate<Self>, _: Compression) -> Self::QueryUpdate {
774 qu
775 }
776}
777
778#[derive(Clone, Copy, Default, Debug, SpacetimeType)]
779#[sats(crate = spacetimedb_lib)]
780pub struct BsatnFormat;
781
782impl WebsocketFormat for BsatnFormat {
783 type Single = Box<[u8]>;
784
785 type List = BsatnRowList;
786
787 fn encode_list<R: ToBsatn + Serialize>(mut elems: impl Iterator<Item = R>) -> (Self::List, u64) {
788 // For an empty list, the size of a row is unknown, so use `RowOffsets`.
789 let Some(first) = elems.next() else {
790 return (BsatnRowList::row_offsets(), 0);
791 };
792 // We have at least one row. Determine the static size from that, if available.
793 let (mut list, mut scratch) = match first.static_bsatn_size() {
794 Some(size) => (BsatnRowListBuilder::fixed(size), Vec::with_capacity(size as usize)),
795 None => (BsatnRowListBuilder::row_offsets(), Vec::new()),
796 };
797 // Add the first element and then the rest.
798 // We assume that the schema of rows yielded by `elems` stays the same,
799 // so once the size is fixed, it will stay that way.
800 let mut count = 0;
801 let mut push = |elem: R| {
802 elem.to_bsatn_extend(&mut scratch).unwrap();
803 list.push(&scratch);
804 scratch.clear();
805 count += 1;
806 };
807 push(first);
808 for elem in elems {
809 push(elem);
810 }
811 (list.finish(), count)
812 }
813
814 type QueryUpdate = CompressableQueryUpdate<Self>;
815
816 fn into_query_update(qu: QueryUpdate<Self>, compression: Compression) -> Self::QueryUpdate {
817 let qu_len_would_have_been = bsatn::to_len(&qu).unwrap();
818
819 match decide_compression(qu_len_would_have_been, compression) {
820 Compression::None => CompressableQueryUpdate::Uncompressed(qu),
821 Compression::Brotli => {
822 let bytes = bsatn::to_vec(&qu).unwrap();
823 let mut out = Vec::new();
824 brotli_compress(&bytes, &mut out);
825 CompressableQueryUpdate::Brotli(out.into())
826 }
827 Compression::Gzip => {
828 let bytes = bsatn::to_vec(&qu).unwrap();
829 let mut out = Vec::new();
830 gzip_compress(&bytes, &mut out);
831 CompressableQueryUpdate::Gzip(out.into())
832 }
833 }
834 }
835}
836
837/// A specification of either a desired or decided compression algorithm.
838#[derive(serde::Deserialize, Default, PartialEq, Eq, Clone, Copy, Hash, Debug)]
839pub enum Compression {
840 /// No compression ever.
841 None,
842 /// Compress using brotli if a certain size threshold was met.
843 #[default]
844 Brotli,
845 /// Compress using gzip if a certain size threshold was met.
846 Gzip,
847}
848
849pub fn decide_compression(len: usize, compression: Compression) -> Compression {
850 /// The threshold beyond which we start to compress messages.
851 /// 1KiB was chosen without measurement.
852 /// TODO(perf): measure!
853 const COMPRESS_THRESHOLD: usize = 1024;
854
855 if len > COMPRESS_THRESHOLD {
856 compression
857 } else {
858 Compression::None
859 }
860}
861
862pub fn brotli_compress(bytes: &[u8], out: &mut Vec<u8>) {
863 let reader = &mut &bytes[..];
864
865 // The default Brotli buffer size.
866 const BUFFER_SIZE: usize = 4096;
867 // We are optimizing for compression speed,
868 // so we choose the lowest (fastest) level of compression.
869 // Experiments on internal workloads have shown compression ratios between 7:1 and 10:1
870 // for large `SubscriptionUpdate` messages at this level.
871 const COMPRESSION_LEVEL: u32 = 1;
872 // The default value for an internal compression parameter.
873 // See `BrotliEncoderParams` for more details.
874 const LG_WIN: u32 = 22;
875
876 let mut encoder = brotli::CompressorReader::new(reader, BUFFER_SIZE, COMPRESSION_LEVEL, LG_WIN);
877
878 encoder
879 .read_to_end(out)
880 .expect("Failed to Brotli compress `SubscriptionUpdateMessage`");
881}
882
883pub fn brotli_decompress(bytes: &[u8]) -> Result<Vec<u8>, io::Error> {
884 let mut decompressed = Vec::new();
885 brotli::BrotliDecompress(&mut &bytes[..], &mut decompressed)?;
886 Ok(decompressed)
887}
888
889pub fn gzip_compress(bytes: &[u8], out: &mut Vec<u8>) {
890 let mut encoder = flate2::write::GzEncoder::new(out, flate2::Compression::fast());
891 encoder.write_all(bytes).unwrap();
892 encoder.finish().expect("Failed to gzip compress `bytes`");
893}
894
895pub fn gzip_decompress(bytes: &[u8]) -> Result<Vec<u8>, io::Error> {
896 let mut decompressed = Vec::new();
897 let _ = flate2::read::GzDecoder::new(bytes).read_to_end(&mut decompressed)?;
898 Ok(decompressed)
899}
900
901type RowSize = u16;
902type RowOffset = u64;
903
904/// A packed list of BSATN-encoded rows.
905#[derive(SpacetimeType, Debug, Clone)]
906#[sats(crate = spacetimedb_lib)]
907pub struct BsatnRowList<B = Bytes, I = Arc<[RowOffset]>> {
908 /// A size hint about `rows_data`
909 /// intended to facilitate parallel decode purposes on large initial updates.
910 size_hint: RowSizeHint<I>,
911 /// The flattened byte array for a list of rows.
912 rows_data: B,
913}
914
915impl Default for BsatnRowList {
916 fn default() -> Self {
917 Self::row_offsets()
918 }
919}
920
921/// NOTE(centril, 1.0): We might want to add a `None` variant to this
922/// where the client has to decode in a loop until `rows_data` has been exhausted.
923/// The use-case for this is clients who are bandwidth limited and where every byte counts.
924#[derive(SpacetimeType, Debug, Clone)]
925#[sats(crate = spacetimedb_lib)]
926pub enum RowSizeHint<I> {
927 /// Each row in `rows_data` is of the same fixed size as specified here.
928 FixedSize(RowSize),
929 /// The offsets into `rows_data` defining the boundaries of each row.
930 /// Only stores the offset to the start of each row.
931 /// The ends of each row is inferred from the start of the next row, or `rows_data.len()`.
932 /// The behavior of this is identical to that of `PackedStr`.
933 RowOffsets(I),
934}
935
936impl<I: AsRef<[RowOffset]>> RowSizeHint<I> {
937 fn index_to_range(&self, index: usize, data_end: usize) -> Option<Range<usize>> {
938 match self {
939 Self::FixedSize(size) => {
940 let size = *size as usize;
941 let start = index * size;
942 if start >= data_end {
943 // We've reached beyond `data_end`,
944 // so this is a row that doesn't exist, so we are beyond the count.
945 return None;
946 }
947 let end = (index + 1) * size;
948 Some(start..end)
949 }
950 Self::RowOffsets(offsets) => {
951 let offsets = offsets.as_ref();
952 let start = *offsets.get(index)? as usize;
953 // The end is either the start of the next element or the end.
954 let end = offsets.get(index + 1).map(|e| *e as usize).unwrap_or(data_end);
955 Some(start..end)
956 }
957 }
958 }
959}
960
961impl<B: Default, I> BsatnRowList<B, I> {
962 pub fn fixed(row_size: RowSize) -> Self {
963 Self {
964 size_hint: RowSizeHint::FixedSize(row_size),
965 rows_data: <_>::default(),
966 }
967 }
968
969 /// Returns a new empty list using indices
970 pub fn row_offsets() -> Self
971 where
972 I: From<[RowOffset; 0]>,
973 {
974 Self {
975 size_hint: RowSizeHint::RowOffsets([].into()),
976 rows_data: <_>::default(),
977 }
978 }
979}
980
981impl<B: AsRef<[u8]>, I: AsRef<[RowOffset]>> RowListLen for BsatnRowList<B, I> {
982 /// Returns the length of the row list.
983 fn len(&self) -> usize {
984 match &self.size_hint {
985 RowSizeHint::FixedSize(size) => self.rows_data.as_ref().len() / *size as usize,
986 RowSizeHint::RowOffsets(offsets) => offsets.as_ref().len(),
987 }
988 }
989}
990
991impl<B: AsRef<[u8]>, I> ByteListLen for BsatnRowList<B, I> {
992 /// Returns the uncompressed size of the list in bytes
993 fn num_bytes(&self) -> usize {
994 self.rows_data.as_ref().len()
995 }
996}
997
998impl BsatnRowList {
999 /// Returns the element at `index` in the list.
1000 pub fn get(&self, index: usize) -> Option<Bytes> {
1001 let data_end = self.rows_data.len();
1002 let data_range = self.size_hint.index_to_range(index, data_end)?;
1003 Some(self.rows_data.slice(data_range))
1004 }
1005}
1006
1007/// An iterator over all the elements in a [`BsatnRowList`].
1008pub struct BsatnRowListIter<'a> {
1009 list: &'a BsatnRowList,
1010 index: usize,
1011}
1012
1013impl<'a> IntoIterator for &'a BsatnRowList {
1014 type IntoIter = BsatnRowListIter<'a>;
1015 type Item = Bytes;
1016 fn into_iter(self) -> Self::IntoIter {
1017 BsatnRowListIter { list: self, index: 0 }
1018 }
1019}
1020
1021impl Iterator for BsatnRowListIter<'_> {
1022 type Item = Bytes;
1023 fn next(&mut self) -> Option<Self::Item> {
1024 let index = self.index;
1025 self.index += 1;
1026 self.list.get(index)
1027 }
1028}
1029
1030/// A [`BsatnRowList`] that can be added to.
1031pub type BsatnRowListBuilder = BsatnRowList<Vec<u8>, Vec<RowOffset>>;
1032
1033impl BsatnRowListBuilder {
1034 /// Adds `row`, BSATN-encoded to this list.
1035 #[inline]
1036 pub fn push(&mut self, row: &[u8]) {
1037 if let RowSizeHint::RowOffsets(offsets) = &mut self.size_hint {
1038 offsets.push(self.rows_data.len() as u64);
1039 }
1040 self.rows_data.extend_from_slice(row);
1041 }
1042
1043 /// Finish the in flight list, throwing away the capability to mutate.
1044 pub fn finish(self) -> BsatnRowList {
1045 let Self { size_hint, rows_data } = self;
1046 let rows_data = rows_data.into();
1047 let size_hint = match size_hint {
1048 RowSizeHint::FixedSize(fs) => RowSizeHint::FixedSize(fs),
1049 RowSizeHint::RowOffsets(ro) => RowSizeHint::RowOffsets(ro.into()),
1050 };
1051 BsatnRowList { size_hint, rows_data }
1052 }
1053}