crdb_core/
messages.rs

1use web_time::SystemTime;
2
3use crate::{
4    BinPtr, EventId, ObjectId, Query, QueryId, Session, SessionRef, SessionToken, TypeId,
5    Updatedness,
6};
7use std::{
8    collections::{BTreeMap, HashMap, HashSet},
9    sync::Arc,
10};
11
12#[derive(
13    Clone, Copy, Debug, Eq, Hash, Ord, PartialEq, PartialOrd, serde::Deserialize, serde::Serialize,
14)]
15pub struct RequestId(pub u64);
16
17// TODO(misc-med): review what all the (de)serialized JSON for all the types defined here looks like
18#[derive(Debug, serde::Deserialize, serde::Serialize)]
19pub struct ClientMessage {
20    pub request_id: RequestId,
21    pub request: Arc<Request>,
22}
23
24#[derive(Debug, serde::Deserialize, serde::Serialize)]
25pub enum Request {
26    SetToken(SessionToken),
27    RenameSession(String),
28    CurrentSession,
29    ListSessions,
30    Logout,
31    DisconnectSession(SessionRef),
32    GetTime,
33    // Map from object to the only_updated_since information we want on it
34    Get {
35        object_ids: HashMap<ObjectId, Option<Updatedness>>,
36        subscribe: bool,
37    },
38    AlreadyHave {
39        object_ids: HashMap<ObjectId, Updatedness>,
40    },
41    Query {
42        query_id: QueryId,
43        type_id: TypeId,
44        query: Arc<Query>,
45        only_updated_since: Option<Updatedness>,
46        subscribe: bool,
47    },
48    GetBinaries(HashSet<BinPtr>),
49    Unsubscribe(HashSet<ObjectId>),
50    UnsubscribeQuery(QueryId),
51    Upload(Upload),
52    UploadBinaries(usize), // There are N binaries is in the N websocket frames of type `Binary` just after this one
53}
54
55#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
56pub enum Upload {
57    Object {
58        object_id: ObjectId,
59        type_id: TypeId,
60        created_at: EventId,
61        snapshot_version: i32,
62        object: Arc<serde_json::Value>,
63        subscribe: bool,
64    },
65    Event {
66        object_id: ObjectId,
67        type_id: TypeId,
68        event_id: EventId,
69        event: Arc<serde_json::Value>,
70        subscribe: bool,
71    },
72}
73
74/// One ServerMessage is supposed to hold as much data as possible
75/// without delaying updates, but still avoiding going too far above
76/// than 1M / message, to allow for better resumability.
77///
78/// If the `last_response` field is set to `true`, then it means that
79/// all the previous `ServerMessage`s that answered this `request`,
80/// taken together, hold the answer to the request.
81///
82/// Any subsequent updates, obtained by subscribing to the object or
83/// query, will be pushed as `Update`s.
84#[derive(Debug, serde::Deserialize, serde::Serialize)]
85pub enum ServerMessage {
86    Response {
87        request_id: RequestId,
88        response: ResponsePart,
89        last_response: bool,
90    },
91    Updates(Updates),
92}
93
94#[derive(Debug, serde::Deserialize, serde::Serialize)]
95pub struct Updates {
96    pub data: Vec<Arc<Update>>,
97    // This is the updatedness for all the currently subscribed queries
98    pub now_have_all_until: Updatedness,
99}
100
101#[derive(Debug, serde::Deserialize, serde::Serialize)]
102pub enum ResponsePart {
103    Success,
104    Error(crate::SerializableError),
105    Sessions(Vec<Session>),
106    CurrentTime(SystemTime),
107    Objects {
108        data: Vec<MaybeObject>,
109        // Set only in answer to a Query, this is the max of the Updatedness of all the returned objects.
110        // This is only set in the last ResponsePart of the query request, to make sure if connection cuts
111        // the client will not wrongfully assume having already received everything.
112        // TODO(perf-low): Server would have better perf if this were actually the max updatedness it's guaranteed
113        // to have answered. This way, clients would ask queries with a higher only_updated_since, and thus
114        // postgresql would be able to filter more lines faster.
115        now_have_all_until: Option<Updatedness>,
116    },
117    Binaries(usize),
118    // Note: the server's answer to GetBinaries is a Binaries(x) message, followed by `x`
119    // websocket frames of type Binary. It can be split into multiple parts.
120}
121
122#[derive(Debug, serde::Deserialize, serde::Serialize)]
123pub enum MaybeObject {
124    AlreadySubscribed(ObjectId),
125    NotYetSubscribed(ObjectData),
126}
127
128#[derive(Debug, serde::Deserialize, serde::Serialize)]
129pub struct ObjectData {
130    pub object_id: ObjectId,
131    pub type_id: TypeId,
132    // TODO(misc-med): expose some API to make it easy for client writers to notice they're getting snapshots
133    // with versions higher than what their current code version supports, to suggest an upgrade
134    pub creation_snapshot: Option<(EventId, i32, Arc<serde_json::Value>)>,
135    pub events: BTreeMap<EventId, Arc<serde_json::Value>>,
136    pub now_have_all_until: Updatedness,
137}
138
139impl ObjectData {
140    pub fn into_updates(self) -> Vec<Arc<Update>> {
141        let mut res =
142            Vec::with_capacity(self.events.len() + self.creation_snapshot.is_some() as usize);
143        if let Some((created_at, snapshot_version, data)) = self.creation_snapshot {
144            res.push(Arc::new(Update {
145                object_id: self.object_id,
146                data: UpdateData::Creation {
147                    type_id: self.type_id,
148                    created_at,
149                    snapshot_version,
150                    data,
151                },
152            }));
153        }
154        for (event_id, data) in self.events.into_iter() {
155            res.push(Arc::new(Update {
156                object_id: self.object_id,
157                data: UpdateData::Event {
158                    type_id: self.type_id,
159                    event_id,
160                    data,
161                },
162            }));
163        }
164        res
165    }
166}
167
168#[derive(Debug, serde::Deserialize, serde::Serialize)]
169pub struct Update {
170    pub object_id: ObjectId,
171    pub data: UpdateData,
172}
173
174#[derive(Debug, serde::Deserialize, serde::Serialize)]
175pub enum UpdateData {
176    // Also used for re-creation events
177    Creation {
178        type_id: TypeId,
179        created_at: EventId,
180        snapshot_version: i32,
181        data: Arc<serde_json::Value>,
182    },
183    Event {
184        type_id: TypeId,
185        event_id: EventId,
186        data: Arc<serde_json::Value>,
187    },
188    LostReadRights,
189}