Skip to main content

syncular_protocol/
realtime.rs

1use crate::{OperationResult, ProtocolError, PushCommitRequest, PushCommitResponse, Result};
2use serde::{Deserialize, Serialize};
3use serde_json::Value;
4
5pub const REALTIME_CLIENT_MESSAGE_PUSH: &str = "push";
6pub const REALTIME_CLIENT_MESSAGE_PRESENCE: &str = "presence";
7pub const REALTIME_SERVER_EVENT_SYNC: &str = "sync";
8pub const REALTIME_SERVER_EVENT_PRESENCE: &str = "presence";
9pub const REALTIME_SERVER_EVENT_PUSH_RESPONSE: &str = "push-response";
10
11#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
12#[serde(rename_all = "camelCase")]
13pub struct RealtimePresenceEntry {
14    pub client_id: String,
15    pub actor_id: String,
16    pub joined_at: i64,
17    #[serde(default, skip_serializing_if = "Option::is_none")]
18    pub metadata: Option<Value>,
19}
20
21#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
22#[serde(rename_all = "camelCase")]
23pub struct RealtimePresenceEvent {
24    pub action: String,
25    pub scope_key: String,
26    #[serde(default, skip_serializing_if = "Option::is_none")]
27    pub client_id: Option<String>,
28    #[serde(default, skip_serializing_if = "Option::is_none")]
29    pub actor_id: Option<String>,
30    #[serde(default, skip_serializing_if = "Option::is_none")]
31    pub metadata: Option<Value>,
32    #[serde(default, skip_serializing_if = "Vec::is_empty")]
33    pub entries: Vec<RealtimePresenceEntry>,
34}
35
36#[derive(Debug, Clone, Serialize, Deserialize)]
37#[serde(rename_all = "camelCase")]
38pub struct RealtimePushRequest {
39    #[serde(rename = "type")]
40    pub message_type: String,
41    pub request_id: String,
42    pub client_commit_id: String,
43    pub operations: Vec<crate::SyncOperation>,
44    pub schema_version: i32,
45    #[serde(default, skip_serializing_if = "Option::is_none")]
46    pub auth_lease: Option<crate::AuthLeaseProvenance>,
47}
48
49impl RealtimePushRequest {
50    pub fn from_commit(request_id: impl Into<String>, commit: PushCommitRequest) -> Self {
51        Self {
52            message_type: REALTIME_CLIENT_MESSAGE_PUSH.to_string(),
53            request_id: request_id.into(),
54            client_commit_id: commit.client_commit_id,
55            operations: commit.operations,
56            schema_version: commit.schema_version,
57            auth_lease: commit.auth_lease,
58        }
59    }
60}
61
62#[derive(Debug, Clone, Serialize, Deserialize)]
63#[serde(rename_all = "camelCase")]
64pub struct RealtimePresenceRequest {
65    #[serde(rename = "type")]
66    pub message_type: String,
67    pub action: String,
68    pub scope_key: String,
69    #[serde(skip_serializing_if = "Option::is_none")]
70    pub metadata: Option<Value>,
71}
72
73impl RealtimePresenceRequest {
74    pub fn new(
75        action: impl Into<String>,
76        scope_key: impl Into<String>,
77        metadata: Option<Value>,
78    ) -> Self {
79        Self {
80            message_type: REALTIME_CLIENT_MESSAGE_PRESENCE.to_string(),
81            action: action.into(),
82            scope_key: scope_key.into(),
83            metadata,
84        }
85    }
86}
87
88#[derive(Debug, Clone, Deserialize)]
89pub struct RealtimeServerMessage {
90    pub event: String,
91    #[serde(default)]
92    pub data: Value,
93}
94
95#[derive(Debug, Clone, Deserialize)]
96#[serde(rename_all = "camelCase")]
97pub struct RealtimePushResponseData {
98    pub request_id: String,
99    #[serde(default)]
100    pub status: Option<String>,
101    #[serde(default)]
102    pub commit_seq: Option<i64>,
103    #[serde(default)]
104    pub results: Vec<OperationResult>,
105}
106
107pub fn realtime_push_response_from_value(
108    value: &Value,
109    expected_request_id: &str,
110    client_commit_id: &str,
111) -> Result<Option<PushCommitResponse>> {
112    let event = value.get("event").and_then(Value::as_str).unwrap_or("");
113    if event != REALTIME_SERVER_EVENT_PUSH_RESPONSE {
114        return Ok(None);
115    }
116    let data = value
117        .get("data")
118        .cloned()
119        .ok_or_else(|| ProtocolError::message("push-response missing data"))?;
120    let data: RealtimePushResponseData = serde_json::from_value(data)?;
121    if data.request_id != expected_request_id {
122        return Ok(None);
123    }
124    Ok(Some(PushCommitResponse {
125        client_commit_id: client_commit_id.to_string(),
126        status: data.status.unwrap_or_else(|| "rejected".to_string()),
127        commit_seq: data.commit_seq,
128        results: data.results,
129    }))
130}
131
132pub fn realtime_presence_event_from_value(value: &Value) -> Option<RealtimePresenceEvent> {
133    let presence = value
134        .get("data")
135        .and_then(|data| data.get("presence"))
136        .or_else(|| value.get("presence"))
137        .or_else(|| value.get("data"))?;
138    serde_json::from_value(presence.clone()).ok()
139}
140
141#[cfg(test)]
142mod tests {
143    use super::*;
144    use crate::SyncOperation;
145    use serde_json::json;
146
147    #[test]
148    fn encodes_push_and_presence_requests() {
149        let push = RealtimePushRequest::from_commit(
150            "req-1",
151            PushCommitRequest {
152                client_commit_id: "commit-1".to_string(),
153                operations: vec![SyncOperation {
154                    table: "tasks".to_string(),
155                    row_id: "task-1".to_string(),
156                    op: "upsert".to_string(),
157                    payload: None,
158                    base_version: None,
159                }],
160                schema_version: 7,
161                auth_lease: Some(crate::AuthLeaseProvenance {
162                    lease_id: "lease-1".to_string(),
163                    lease_expires_at_ms: 1_779_446_400_000,
164                    lease_status_at_enqueue: "active".to_string(),
165                    lease_scope_summary_json: None,
166                    lease_token: Some("lease-token".to_string()),
167                }),
168            },
169        );
170        assert_eq!(
171            serde_json::to_value(push).expect("push json"),
172            json!({
173                "type": "push",
174                "requestId": "req-1",
175                "clientCommitId": "commit-1",
176                "operations": [{
177                    "table": "tasks",
178                    "row_id": "task-1",
179                    "op": "upsert",
180                    "payload": null,
181                    "base_version": null
182                }],
183                "schemaVersion": 7,
184                "authLease": {
185                    "leaseId": "lease-1",
186                    "leaseExpiresAtMs": 1_779_446_400_000_i64,
187                    "leaseStatusAtEnqueue": "active",
188                    "leaseToken": "lease-token"
189                }
190            })
191        );
192
193        let presence = RealtimePresenceRequest::new("join", "user:1", Some(json!({"doc": "a"})));
194        assert_eq!(
195            serde_json::to_value(presence).expect("presence json"),
196            json!({
197                "type": "presence",
198                "action": "join",
199                "scopeKey": "user:1",
200                "metadata": {"doc": "a"}
201            })
202        );
203    }
204
205    #[test]
206    fn decodes_matching_push_response() {
207        let response = realtime_push_response_from_value(
208            &json!({
209                "event": "push-response",
210                "data": {
211                    "requestId": "req-1",
212                    "status": "accepted",
213                    "commitSeq": 42,
214                    "results": [{"opIndex": 0, "status": "ok"}]
215                }
216            }),
217            "req-1",
218            "commit-1",
219        )
220        .expect("decode")
221        .expect("response");
222
223        assert_eq!(response.client_commit_id, "commit-1");
224        assert_eq!(response.status, "accepted");
225        assert_eq!(response.commit_seq, Some(42));
226        assert_eq!(response.results.len(), 1);
227    }
228}