1use serde_json::{Map, Value};
2use syncular_runtime::protocol::{
3 snapshot_manifest_digest, BootstrapState, CombinedRequest, CombinedResponse, OperationResult,
4 PullResponse, PushBatchResponse, PushCommitResponse, ScopeValues, SnapshotChunkRef,
5 SnapshotManifest, SnapshotManifestChunkRef, SubscriptionResponse, SyncChange, SyncCommit,
6 SyncSnapshot,
7};
8
9pub fn scope_values(items: impl IntoIterator<Item = (impl Into<String>, Value)>) -> ScopeValues {
10 items
11 .into_iter()
12 .map(|(key, value)| (key.into(), value))
13 .collect::<Map<_, _>>()
14}
15
16pub fn actor_project_scopes(actor_id: &str, project_id: Option<&str>) -> ScopeValues {
17 let mut scopes = ScopeValues::new();
18 scopes.insert("user_id".to_string(), Value::String(actor_id.to_string()));
19 if let Some(project_id) = project_id {
20 scopes.insert(
21 "project_id".to_string(),
22 Value::String(project_id.to_string()),
23 );
24 }
25 scopes
26}
27
28pub fn schema_required_response(required_schema_version: i32) -> CombinedResponse {
29 CombinedResponse {
30 ok: true,
31 required_schema_version: Some(required_schema_version),
32 latest_schema_version: Some(required_schema_version),
33 push: None,
34 pull: None,
35 }
36}
37
38pub fn schema_latest_response(latest_schema_version: i32) -> CombinedResponse {
39 CombinedResponse {
40 ok: true,
41 required_schema_version: None,
42 latest_schema_version: Some(latest_schema_version),
43 push: None,
44 pull: None,
45 }
46}
47
48pub fn combined_not_ok_response() -> CombinedResponse {
49 CombinedResponse {
50 ok: false,
51 required_schema_version: None,
52 latest_schema_version: None,
53 push: None,
54 pull: None,
55 }
56}
57
58pub fn push_not_ok_response(request: &CombinedRequest) -> CombinedResponse {
59 CombinedResponse {
60 ok: true,
61 required_schema_version: None,
62 latest_schema_version: None,
63 push: request.push.as_ref().map(|_| PushBatchResponse {
64 ok: false,
65 commits: Vec::new(),
66 }),
67 pull: None,
68 }
69}
70
71pub fn pull_not_ok_response() -> CombinedResponse {
72 CombinedResponse {
73 ok: true,
74 required_schema_version: None,
75 latest_schema_version: None,
76 push: None,
77 pull: Some(PullResponse {
78 ok: false,
79 subscriptions: Vec::new(),
80 }),
81 }
82}
83
84pub fn snapshot_combined_response(
85 subscription_id: &str,
86 table: &str,
87 rows: Vec<Value>,
88 scopes: ScopeValues,
89 next_cursor: i64,
90) -> CombinedResponse {
91 CombinedResponse {
92 ok: true,
93 required_schema_version: None,
94 latest_schema_version: None,
95 push: None,
96 pull: Some(PullResponse {
97 ok: true,
98 subscriptions: vec![snapshot_subscription_response(
99 subscription_id,
100 table,
101 rows,
102 scopes,
103 next_cursor,
104 )],
105 }),
106 }
107}
108
109pub fn snapshot_page_combined_response(
110 subscription_id: &str,
111 table: &str,
112 rows: Vec<Value>,
113 scopes: ScopeValues,
114 next_cursor: i64,
115 is_first_page: bool,
116 is_last_page: bool,
117 bootstrap_state: Option<BootstrapState>,
118) -> CombinedResponse {
119 CombinedResponse {
120 ok: true,
121 required_schema_version: None,
122 latest_schema_version: None,
123 push: None,
124 pull: Some(PullResponse {
125 ok: true,
126 subscriptions: vec![SubscriptionResponse {
127 id: subscription_id.to_string(),
128 status: "active".to_string(),
129 scopes,
130 bootstrap: true,
131 bootstrap_state,
132 next_cursor,
133 integrity: None,
134 commits: Vec::new(),
135 snapshots: Some(vec![SyncSnapshot {
136 table: table.to_string(),
137 rows,
138 chunks: None,
139 artifacts: None,
140 manifest: None,
141 is_first_page,
142 is_last_page,
143 bootstrap_state_after: None,
144 }]),
145 }],
146 }),
147 }
148}
149
150pub fn snapshot_chunks_combined_response(
151 subscription_id: &str,
152 table: &str,
153 chunks: Vec<SnapshotChunkRef>,
154 scopes: ScopeValues,
155 next_cursor: i64,
156) -> CombinedResponse {
157 let manifest = snapshot_manifest_for_chunks(table, next_cursor, &chunks);
158 CombinedResponse {
159 ok: true,
160 required_schema_version: None,
161 latest_schema_version: None,
162 push: None,
163 pull: Some(PullResponse {
164 ok: true,
165 subscriptions: vec![SubscriptionResponse {
166 id: subscription_id.to_string(),
167 status: "active".to_string(),
168 scopes,
169 bootstrap: true,
170 bootstrap_state: None,
171 next_cursor,
172 integrity: None,
173 commits: Vec::new(),
174 snapshots: Some(vec![SyncSnapshot {
175 table: table.to_string(),
176 rows: Vec::new(),
177 chunks: Some(chunks),
178 artifacts: None,
179 manifest: Some(manifest),
180 is_first_page: true,
181 is_last_page: true,
182 bootstrap_state_after: None,
183 }]),
184 }],
185 }),
186 }
187}
188
189fn snapshot_manifest_for_chunks(
190 table: &str,
191 as_of_commit_seq: i64,
192 chunks: &[SnapshotChunkRef],
193) -> SnapshotManifest {
194 let mut manifest = SnapshotManifest {
195 version: 1,
196 digest: String::new(),
197 table: table.to_string(),
198 as_of_commit_seq,
199 scope_digest: "0".repeat(64),
200 row_cursor: None,
201 row_limit: 1000,
202 next_row_cursor: None,
203 is_first_page: true,
204 is_last_page: true,
205 chunks: chunks
206 .iter()
207 .map(|chunk| SnapshotManifestChunkRef {
208 id: chunk.id.clone(),
209 byte_length: chunk.byte_length,
210 sha256: chunk.sha256.clone(),
211 encoding: chunk.encoding.clone(),
212 compression: chunk.compression.clone(),
213 })
214 .collect(),
215 };
216 manifest.digest = snapshot_manifest_digest(&manifest).expect("test snapshot manifest digest");
217 manifest
218}
219
220pub fn snapshot_subscription_response(
221 subscription_id: &str,
222 table: &str,
223 rows: Vec<Value>,
224 scopes: ScopeValues,
225 next_cursor: i64,
226) -> SubscriptionResponse {
227 SubscriptionResponse {
228 id: subscription_id.to_string(),
229 status: "active".to_string(),
230 scopes,
231 bootstrap: true,
232 bootstrap_state: None,
233 next_cursor,
234 integrity: None,
235 commits: Vec::new(),
236 snapshots: Some(vec![SyncSnapshot {
237 table: table.to_string(),
238 rows,
239 chunks: None,
240 artifacts: None,
241 manifest: None,
242 is_first_page: true,
243 is_last_page: true,
244 bootstrap_state_after: None,
245 }]),
246 }
247}
248
249pub fn revoked_subscription_response(
250 subscription_id: &str,
251 scopes: ScopeValues,
252 next_cursor: i64,
253) -> CombinedResponse {
254 CombinedResponse {
255 ok: true,
256 required_schema_version: None,
257 latest_schema_version: None,
258 push: None,
259 pull: Some(PullResponse {
260 ok: true,
261 subscriptions: vec![SubscriptionResponse {
262 id: subscription_id.to_string(),
263 status: "revoked".to_string(),
264 scopes,
265 bootstrap: false,
266 bootstrap_state: None,
267 next_cursor,
268 integrity: None,
269 commits: Vec::new(),
270 snapshots: None,
271 }],
272 }),
273 }
274}
275
276pub fn commit_combined_response(
277 subscription_id: &str,
278 scopes: ScopeValues,
279 next_cursor: i64,
280 commit_seq: i64,
281 changes: Vec<SyncChange>,
282) -> CombinedResponse {
283 commits_combined_response(
284 subscription_id,
285 scopes,
286 next_cursor,
287 vec![SyncCommit {
288 commit_seq,
289 created_at: "2026-01-01T00:00:00.000Z".to_string(),
290 actor_id: "test-server".to_string(),
291 changes,
292 }],
293 )
294}
295
296pub fn commits_combined_response(
297 subscription_id: &str,
298 scopes: ScopeValues,
299 next_cursor: i64,
300 commits: Vec<SyncCommit>,
301) -> CombinedResponse {
302 CombinedResponse {
303 ok: true,
304 required_schema_version: None,
305 latest_schema_version: None,
306 push: None,
307 pull: Some(PullResponse {
308 ok: true,
309 subscriptions: vec![SubscriptionResponse {
310 id: subscription_id.to_string(),
311 status: "active".to_string(),
312 scopes,
313 bootstrap: false,
314 bootstrap_state: None,
315 next_cursor,
316 integrity: None,
317 commits,
318 snapshots: None,
319 }],
320 }),
321 }
322}
323
324pub fn push_conflict_response(
325 request: &CombinedRequest,
326 message: &str,
327 code: &str,
328 server_row: Value,
329 server_version: i64,
330) -> CombinedResponse {
331 CombinedResponse {
332 ok: true,
333 required_schema_version: None,
334 latest_schema_version: None,
335 push: request.push.as_ref().map(|push| PushBatchResponse {
336 ok: true,
337 commits: push
338 .commits
339 .iter()
340 .map(|commit| PushCommitResponse {
341 client_commit_id: commit.client_commit_id.clone(),
342 status: "rejected".to_string(),
343 commit_seq: None,
344 results: vec![OperationResult {
345 op_index: 0,
346 status: "conflict".to_string(),
347 message: Some(message.to_string()),
348 error: None,
349 code: Some(code.to_string()),
350 retriable: Some(false),
351 server_version: Some(server_version),
352 server_row: Some(server_row.clone()),
353 }],
354 })
355 .collect(),
356 }),
357 pull: Some(PullResponse {
358 ok: true,
359 subscriptions: Vec::new(),
360 }),
361 }
362}
363
364pub fn upsert_change(table: &str, row_id: &str, row: Value, row_version: i64) -> SyncChange {
365 SyncChange {
366 table: table.to_string(),
367 row_id: row_id.to_string(),
368 op: "upsert".to_string(),
369 row_json: Some(row),
370 row_version: Some(row_version),
371 scopes: ScopeValues::new(),
372 }
373}
374
375pub fn delete_change(table: &str, row_id: &str, row_version: i64) -> SyncChange {
376 SyncChange {
377 table: table.to_string(),
378 row_id: row_id.to_string(),
379 op: "delete".to_string(),
380 row_json: None,
381 row_version: Some(row_version),
382 scopes: ScopeValues::new(),
383 }
384}