Skip to main content

syncular_testkit/
protocol.rs

1use serde_json::{Map, Value};
2use syncular_runtime::protocol::{
3    snapshot_manifest_digest, BootstrapState, CombinedRequest, CombinedResponse, OperationResult,
4    PullResponse, PushBatchResponse, PushCommitResponse, ScopeValues, SnapshotChunkRef,
5    SnapshotManifest, SnapshotManifestChunkRef, SubscriptionResponse, SyncChange, SyncCommit,
6    SyncSnapshot,
7};
8
9pub fn scope_values(items: impl IntoIterator<Item = (impl Into<String>, Value)>) -> ScopeValues {
10    items
11        .into_iter()
12        .map(|(key, value)| (key.into(), value))
13        .collect::<Map<_, _>>()
14}
15
16pub fn actor_project_scopes(actor_id: &str, project_id: Option<&str>) -> ScopeValues {
17    let mut scopes = ScopeValues::new();
18    scopes.insert("user_id".to_string(), Value::String(actor_id.to_string()));
19    if let Some(project_id) = project_id {
20        scopes.insert(
21            "project_id".to_string(),
22            Value::String(project_id.to_string()),
23        );
24    }
25    scopes
26}
27
28pub fn schema_required_response(required_schema_version: i32) -> CombinedResponse {
29    CombinedResponse {
30        ok: true,
31        required_schema_version: Some(required_schema_version),
32        latest_schema_version: Some(required_schema_version),
33        push: None,
34        pull: None,
35    }
36}
37
38pub fn schema_latest_response(latest_schema_version: i32) -> CombinedResponse {
39    CombinedResponse {
40        ok: true,
41        required_schema_version: None,
42        latest_schema_version: Some(latest_schema_version),
43        push: None,
44        pull: None,
45    }
46}
47
48pub fn combined_not_ok_response() -> CombinedResponse {
49    CombinedResponse {
50        ok: false,
51        required_schema_version: None,
52        latest_schema_version: None,
53        push: None,
54        pull: None,
55    }
56}
57
58pub fn push_not_ok_response(request: &CombinedRequest) -> CombinedResponse {
59    CombinedResponse {
60        ok: true,
61        required_schema_version: None,
62        latest_schema_version: None,
63        push: request.push.as_ref().map(|_| PushBatchResponse {
64            ok: false,
65            commits: Vec::new(),
66        }),
67        pull: None,
68    }
69}
70
71pub fn pull_not_ok_response() -> CombinedResponse {
72    CombinedResponse {
73        ok: true,
74        required_schema_version: None,
75        latest_schema_version: None,
76        push: None,
77        pull: Some(PullResponse {
78            ok: false,
79            subscriptions: Vec::new(),
80        }),
81    }
82}
83
84pub fn snapshot_combined_response(
85    subscription_id: &str,
86    table: &str,
87    rows: Vec<Value>,
88    scopes: ScopeValues,
89    next_cursor: i64,
90) -> CombinedResponse {
91    CombinedResponse {
92        ok: true,
93        required_schema_version: None,
94        latest_schema_version: None,
95        push: None,
96        pull: Some(PullResponse {
97            ok: true,
98            subscriptions: vec![snapshot_subscription_response(
99                subscription_id,
100                table,
101                rows,
102                scopes,
103                next_cursor,
104            )],
105        }),
106    }
107}
108
109pub fn snapshot_page_combined_response(
110    subscription_id: &str,
111    table: &str,
112    rows: Vec<Value>,
113    scopes: ScopeValues,
114    next_cursor: i64,
115    is_first_page: bool,
116    is_last_page: bool,
117    bootstrap_state: Option<BootstrapState>,
118) -> CombinedResponse {
119    CombinedResponse {
120        ok: true,
121        required_schema_version: None,
122        latest_schema_version: None,
123        push: None,
124        pull: Some(PullResponse {
125            ok: true,
126            subscriptions: vec![SubscriptionResponse {
127                id: subscription_id.to_string(),
128                status: "active".to_string(),
129                scopes,
130                bootstrap: true,
131                bootstrap_state,
132                next_cursor,
133                integrity: None,
134                commits: Vec::new(),
135                snapshots: Some(vec![SyncSnapshot {
136                    table: table.to_string(),
137                    rows,
138                    chunks: None,
139                    artifacts: None,
140                    manifest: None,
141                    is_first_page,
142                    is_last_page,
143                    bootstrap_state_after: None,
144                }]),
145            }],
146        }),
147    }
148}
149
150pub fn snapshot_chunks_combined_response(
151    subscription_id: &str,
152    table: &str,
153    chunks: Vec<SnapshotChunkRef>,
154    scopes: ScopeValues,
155    next_cursor: i64,
156) -> CombinedResponse {
157    let manifest = snapshot_manifest_for_chunks(table, next_cursor, &chunks);
158    CombinedResponse {
159        ok: true,
160        required_schema_version: None,
161        latest_schema_version: None,
162        push: None,
163        pull: Some(PullResponse {
164            ok: true,
165            subscriptions: vec![SubscriptionResponse {
166                id: subscription_id.to_string(),
167                status: "active".to_string(),
168                scopes,
169                bootstrap: true,
170                bootstrap_state: None,
171                next_cursor,
172                integrity: None,
173                commits: Vec::new(),
174                snapshots: Some(vec![SyncSnapshot {
175                    table: table.to_string(),
176                    rows: Vec::new(),
177                    chunks: Some(chunks),
178                    artifacts: None,
179                    manifest: Some(manifest),
180                    is_first_page: true,
181                    is_last_page: true,
182                    bootstrap_state_after: None,
183                }]),
184            }],
185        }),
186    }
187}
188
189fn snapshot_manifest_for_chunks(
190    table: &str,
191    as_of_commit_seq: i64,
192    chunks: &[SnapshotChunkRef],
193) -> SnapshotManifest {
194    let mut manifest = SnapshotManifest {
195        version: 1,
196        digest: String::new(),
197        table: table.to_string(),
198        as_of_commit_seq,
199        scope_digest: "0".repeat(64),
200        row_cursor: None,
201        row_limit: 1000,
202        next_row_cursor: None,
203        is_first_page: true,
204        is_last_page: true,
205        chunks: chunks
206            .iter()
207            .map(|chunk| SnapshotManifestChunkRef {
208                id: chunk.id.clone(),
209                byte_length: chunk.byte_length,
210                sha256: chunk.sha256.clone(),
211                encoding: chunk.encoding.clone(),
212                compression: chunk.compression.clone(),
213            })
214            .collect(),
215    };
216    manifest.digest = snapshot_manifest_digest(&manifest).expect("test snapshot manifest digest");
217    manifest
218}
219
220pub fn snapshot_subscription_response(
221    subscription_id: &str,
222    table: &str,
223    rows: Vec<Value>,
224    scopes: ScopeValues,
225    next_cursor: i64,
226) -> SubscriptionResponse {
227    SubscriptionResponse {
228        id: subscription_id.to_string(),
229        status: "active".to_string(),
230        scopes,
231        bootstrap: true,
232        bootstrap_state: None,
233        next_cursor,
234        integrity: None,
235        commits: Vec::new(),
236        snapshots: Some(vec![SyncSnapshot {
237            table: table.to_string(),
238            rows,
239            chunks: None,
240            artifacts: None,
241            manifest: None,
242            is_first_page: true,
243            is_last_page: true,
244            bootstrap_state_after: None,
245        }]),
246    }
247}
248
249pub fn revoked_subscription_response(
250    subscription_id: &str,
251    scopes: ScopeValues,
252    next_cursor: i64,
253) -> CombinedResponse {
254    CombinedResponse {
255        ok: true,
256        required_schema_version: None,
257        latest_schema_version: None,
258        push: None,
259        pull: Some(PullResponse {
260            ok: true,
261            subscriptions: vec![SubscriptionResponse {
262                id: subscription_id.to_string(),
263                status: "revoked".to_string(),
264                scopes,
265                bootstrap: false,
266                bootstrap_state: None,
267                next_cursor,
268                integrity: None,
269                commits: Vec::new(),
270                snapshots: None,
271            }],
272        }),
273    }
274}
275
276pub fn commit_combined_response(
277    subscription_id: &str,
278    scopes: ScopeValues,
279    next_cursor: i64,
280    commit_seq: i64,
281    changes: Vec<SyncChange>,
282) -> CombinedResponse {
283    commits_combined_response(
284        subscription_id,
285        scopes,
286        next_cursor,
287        vec![SyncCommit {
288            commit_seq,
289            created_at: "2026-01-01T00:00:00.000Z".to_string(),
290            actor_id: "test-server".to_string(),
291            changes,
292        }],
293    )
294}
295
296pub fn commits_combined_response(
297    subscription_id: &str,
298    scopes: ScopeValues,
299    next_cursor: i64,
300    commits: Vec<SyncCommit>,
301) -> CombinedResponse {
302    CombinedResponse {
303        ok: true,
304        required_schema_version: None,
305        latest_schema_version: None,
306        push: None,
307        pull: Some(PullResponse {
308            ok: true,
309            subscriptions: vec![SubscriptionResponse {
310                id: subscription_id.to_string(),
311                status: "active".to_string(),
312                scopes,
313                bootstrap: false,
314                bootstrap_state: None,
315                next_cursor,
316                integrity: None,
317                commits,
318                snapshots: None,
319            }],
320        }),
321    }
322}
323
324pub fn push_conflict_response(
325    request: &CombinedRequest,
326    message: &str,
327    code: &str,
328    server_row: Value,
329    server_version: i64,
330) -> CombinedResponse {
331    CombinedResponse {
332        ok: true,
333        required_schema_version: None,
334        latest_schema_version: None,
335        push: request.push.as_ref().map(|push| PushBatchResponse {
336            ok: true,
337            commits: push
338                .commits
339                .iter()
340                .map(|commit| PushCommitResponse {
341                    client_commit_id: commit.client_commit_id.clone(),
342                    status: "rejected".to_string(),
343                    commit_seq: None,
344                    results: vec![OperationResult {
345                        op_index: 0,
346                        status: "conflict".to_string(),
347                        message: Some(message.to_string()),
348                        error: None,
349                        code: Some(code.to_string()),
350                        retriable: Some(false),
351                        server_version: Some(server_version),
352                        server_row: Some(server_row.clone()),
353                    }],
354                })
355                .collect(),
356        }),
357        pull: Some(PullResponse {
358            ok: true,
359            subscriptions: Vec::new(),
360        }),
361    }
362}
363
364pub fn upsert_change(table: &str, row_id: &str, row: Value, row_version: i64) -> SyncChange {
365    SyncChange {
366        table: table.to_string(),
367        row_id: row_id.to_string(),
368        op: "upsert".to_string(),
369        row_json: Some(row),
370        row_version: Some(row_version),
371        scopes: ScopeValues::new(),
372    }
373}
374
375pub fn delete_change(table: &str, row_id: &str, row_version: i64) -> SyncChange {
376    SyncChange {
377        table: table.to_string(),
378        row_id: row_id.to_string(),
379        op: "delete".to_string(),
380        row_json: None,
381        row_version: Some(row_version),
382        scopes: ScopeValues::new(),
383    }
384}