1use crate::{OperationResult, ProtocolError, PushCommitRequest, PushCommitResponse, Result};
2use serde::{Deserialize, Serialize};
3use serde_json::Value;
4
5pub const REALTIME_CLIENT_MESSAGE_PUSH: &str = "push";
6pub const REALTIME_CLIENT_MESSAGE_PRESENCE: &str = "presence";
7pub const REALTIME_SERVER_EVENT_SYNC: &str = "sync";
8pub const REALTIME_SERVER_EVENT_PRESENCE: &str = "presence";
9pub const REALTIME_SERVER_EVENT_PUSH_RESPONSE: &str = "push-response";
10
11#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
12#[serde(rename_all = "camelCase")]
13pub struct RealtimePresenceEntry {
14 pub client_id: String,
15 pub actor_id: String,
16 pub joined_at: i64,
17 #[serde(default, skip_serializing_if = "Option::is_none")]
18 pub metadata: Option<Value>,
19}
20
21#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
22#[serde(rename_all = "camelCase")]
23pub struct RealtimePresenceEvent {
24 pub action: String,
25 pub scope_key: String,
26 #[serde(default, skip_serializing_if = "Option::is_none")]
27 pub client_id: Option<String>,
28 #[serde(default, skip_serializing_if = "Option::is_none")]
29 pub actor_id: Option<String>,
30 #[serde(default, skip_serializing_if = "Option::is_none")]
31 pub metadata: Option<Value>,
32 #[serde(default, skip_serializing_if = "Vec::is_empty")]
33 pub entries: Vec<RealtimePresenceEntry>,
34}
35
36#[derive(Debug, Clone, Serialize, Deserialize)]
37#[serde(rename_all = "camelCase")]
38pub struct RealtimePushRequest {
39 #[serde(rename = "type")]
40 pub message_type: String,
41 pub request_id: String,
42 pub client_commit_id: String,
43 pub operations: Vec<crate::SyncOperation>,
44 pub schema_version: i32,
45 #[serde(default, skip_serializing_if = "Option::is_none")]
46 pub auth_lease: Option<crate::AuthLeaseProvenance>,
47}
48
49impl RealtimePushRequest {
50 pub fn from_commit(request_id: impl Into<String>, commit: PushCommitRequest) -> Self {
51 Self {
52 message_type: REALTIME_CLIENT_MESSAGE_PUSH.to_string(),
53 request_id: request_id.into(),
54 client_commit_id: commit.client_commit_id,
55 operations: commit.operations,
56 schema_version: commit.schema_version,
57 auth_lease: commit.auth_lease,
58 }
59 }
60}
61
62#[derive(Debug, Clone, Serialize, Deserialize)]
63#[serde(rename_all = "camelCase")]
64pub struct RealtimePresenceRequest {
65 #[serde(rename = "type")]
66 pub message_type: String,
67 pub action: String,
68 pub scope_key: String,
69 #[serde(skip_serializing_if = "Option::is_none")]
70 pub metadata: Option<Value>,
71}
72
73impl RealtimePresenceRequest {
74 pub fn new(
75 action: impl Into<String>,
76 scope_key: impl Into<String>,
77 metadata: Option<Value>,
78 ) -> Self {
79 Self {
80 message_type: REALTIME_CLIENT_MESSAGE_PRESENCE.to_string(),
81 action: action.into(),
82 scope_key: scope_key.into(),
83 metadata,
84 }
85 }
86}
87
88#[derive(Debug, Clone, Deserialize)]
89pub struct RealtimeServerMessage {
90 pub event: String,
91 #[serde(default)]
92 pub data: Value,
93}
94
95#[derive(Debug, Clone, Deserialize)]
96#[serde(rename_all = "camelCase")]
97pub struct RealtimePushResponseData {
98 pub request_id: String,
99 #[serde(default)]
100 pub status: Option<String>,
101 #[serde(default)]
102 pub commit_seq: Option<i64>,
103 #[serde(default)]
104 pub results: Vec<OperationResult>,
105}
106
107pub fn realtime_push_response_from_value(
108 value: &Value,
109 expected_request_id: &str,
110 client_commit_id: &str,
111) -> Result<Option<PushCommitResponse>> {
112 let event = value.get("event").and_then(Value::as_str).unwrap_or("");
113 if event != REALTIME_SERVER_EVENT_PUSH_RESPONSE {
114 return Ok(None);
115 }
116 let data = value
117 .get("data")
118 .cloned()
119 .ok_or_else(|| ProtocolError::message("push-response missing data"))?;
120 let data: RealtimePushResponseData = serde_json::from_value(data)?;
121 if data.request_id != expected_request_id {
122 return Ok(None);
123 }
124 Ok(Some(PushCommitResponse {
125 client_commit_id: client_commit_id.to_string(),
126 status: data.status.unwrap_or_else(|| "rejected".to_string()),
127 commit_seq: data.commit_seq,
128 results: data.results,
129 }))
130}
131
132pub fn realtime_presence_event_from_value(value: &Value) -> Option<RealtimePresenceEvent> {
133 let presence = value
134 .get("data")
135 .and_then(|data| data.get("presence"))
136 .or_else(|| value.get("presence"))
137 .or_else(|| value.get("data"))?;
138 serde_json::from_value(presence.clone()).ok()
139}
140
141#[cfg(test)]
142mod tests {
143 use super::*;
144 use crate::SyncOperation;
145 use serde_json::json;
146
147 #[test]
148 fn encodes_push_and_presence_requests() {
149 let push = RealtimePushRequest::from_commit(
150 "req-1",
151 PushCommitRequest {
152 client_commit_id: "commit-1".to_string(),
153 operations: vec![SyncOperation {
154 table: "tasks".to_string(),
155 row_id: "task-1".to_string(),
156 op: "upsert".to_string(),
157 payload: None,
158 base_version: None,
159 }],
160 schema_version: 7,
161 auth_lease: Some(crate::AuthLeaseProvenance {
162 lease_id: "lease-1".to_string(),
163 lease_expires_at_ms: 1_779_446_400_000,
164 lease_status_at_enqueue: "active".to_string(),
165 lease_scope_summary_json: None,
166 lease_token: Some("lease-token".to_string()),
167 }),
168 },
169 );
170 assert_eq!(
171 serde_json::to_value(push).expect("push json"),
172 json!({
173 "type": "push",
174 "requestId": "req-1",
175 "clientCommitId": "commit-1",
176 "operations": [{
177 "table": "tasks",
178 "row_id": "task-1",
179 "op": "upsert",
180 "payload": null,
181 "base_version": null
182 }],
183 "schemaVersion": 7,
184 "authLease": {
185 "leaseId": "lease-1",
186 "leaseExpiresAtMs": 1_779_446_400_000_i64,
187 "leaseStatusAtEnqueue": "active",
188 "leaseToken": "lease-token"
189 }
190 })
191 );
192
193 let presence = RealtimePresenceRequest::new("join", "user:1", Some(json!({"doc": "a"})));
194 assert_eq!(
195 serde_json::to_value(presence).expect("presence json"),
196 json!({
197 "type": "presence",
198 "action": "join",
199 "scopeKey": "user:1",
200 "metadata": {"doc": "a"}
201 })
202 );
203 }
204
205 #[test]
206 fn decodes_matching_push_response() {
207 let response = realtime_push_response_from_value(
208 &json!({
209 "event": "push-response",
210 "data": {
211 "requestId": "req-1",
212 "status": "accepted",
213 "commitSeq": 42,
214 "results": [{"opIndex": 0, "status": "ok"}]
215 }
216 }),
217 "req-1",
218 "commit-1",
219 )
220 .expect("decode")
221 .expect("response");
222
223 assert_eq!(response.client_commit_id, "commit-1");
224 assert_eq!(response.status, "accepted");
225 assert_eq!(response.commit_seq, Some(42));
226 assert_eq!(response.results.len(), 1);
227 }
228}