1use crate::binary_snapshot::SnapshotChunkRows;
2use crate::client::SubscriptionSpec;
3use crate::error::{Result, SyncularError};
4use crate::protocol::*;
5use serde::{Deserialize, Serialize};
6use serde_json::Value;
7#[cfg(not(all(target_arch = "wasm32", feature = "web-transport")))]
8use std::time::{SystemTime, UNIX_EPOCH};
9
10#[cfg(not(all(target_arch = "wasm32", feature = "web-transport")))]
11pub fn now_ms() -> i64 {
12 SystemTime::now()
13 .duration_since(UNIX_EPOCH)
14 .unwrap_or_default()
15 .as_millis() as i64
16}
17
18#[cfg(all(target_arch = "wasm32", feature = "web-transport"))]
19pub fn now_ms() -> i64 {
20 js_sys::Date::now() as i64
21}
22
23pub const MAX_SYNC_RETRIES: i32 = 5;
24pub const SYNC_SENDING_TIMEOUT_MS: i64 = 30_000;
25pub const MAX_BLOB_UPLOAD_RETRIES: i32 = 3;
26pub const BLOB_UPLOAD_STALE_TIMEOUT_MS: i64 = 30_000;
27pub const SQLITE_BUSY_TIMEOUT_MS: i32 = 5_000;
28pub const APP_SCHEMA_ID: &str = "syncular-app";
29
30const RETRY_BASE_DELAY_MS: i64 = 1_000;
31const RETRY_MAX_DELAY_MS: i64 = 30_000;
32const BLOB_UPLOAD_RETRY_BASE_DELAY_MS: i64 = 100;
33const BLOB_UPLOAD_RETRY_MAX_DELAY_MS: i64 = 5_000;
34
35pub fn retry_backoff_delay_ms(attempt_count: i32) -> i64 {
36 let exponent = attempt_count.saturating_sub(1).min(12) as u32;
37 RETRY_BASE_DELAY_MS
38 .saturating_mul(2_i64.saturating_pow(exponent))
39 .min(RETRY_MAX_DELAY_MS)
40}
41
42pub fn next_retry_at(now: i64, attempt_count: i32) -> i64 {
43 now.saturating_add(retry_backoff_delay_ms(attempt_count))
44}
45
46pub fn blob_upload_retry_backoff_delay_ms(attempt_count: i32) -> i64 {
47 let exponent = attempt_count.saturating_sub(1).min(12) as u32;
48 BLOB_UPLOAD_RETRY_BASE_DELAY_MS
49 .saturating_mul(2_i64.saturating_pow(exponent))
50 .min(BLOB_UPLOAD_RETRY_MAX_DELAY_MS)
51}
52
53pub fn next_blob_upload_retry_at(now: i64, attempt_count: i32) -> i64 {
54 now.saturating_add(blob_upload_retry_backoff_delay_ms(attempt_count))
55}
56
57#[derive(Debug, Clone, Serialize, Deserialize)]
58#[cfg(feature = "demo-todo-fixture")]
59pub struct Task {
60 pub id: String,
61 pub title: String,
62 pub completed: i32,
63 pub user_id: String,
64 pub project_id: Option<String>,
65 pub server_version: i64,
66 pub image: Option<String>,
67 pub title_yjs_state: Option<String>,
68}
69
70#[derive(Debug, Clone, Serialize, Deserialize)]
71pub struct OutboxCommit {
72 pub id: String,
73 pub client_commit_id: String,
74 pub status: String,
75 pub operations_json: String,
76 pub last_response_json: Option<String>,
77 pub error: Option<String>,
78 pub created_at: i64,
79 pub updated_at: i64,
80 pub attempt_count: i32,
81 pub acked_commit_seq: Option<i64>,
82 pub schema_version: i32,
83 pub next_attempt_at: i64,
84 #[serde(default, skip_serializing_if = "Option::is_none")]
85 pub auth_lease: Option<AuthLeaseProvenance>,
86}
87
88#[derive(Debug, Clone, Serialize, Deserialize)]
89pub struct SubscriptionState {
90 pub state_id: String,
91 pub subscription_id: String,
92 pub table: String,
93 pub scopes_json: String,
94 pub params_json: String,
95 pub cursor: i64,
96 pub bootstrap_state_json: Option<String>,
97 pub status: String,
98}
99
100#[derive(Debug, Clone, Serialize, Deserialize)]
101pub struct VerifiedRoot {
102 pub state_id: String,
103 pub subscription_id: String,
104 pub partition_id: String,
105 pub commit_seq: i64,
106 pub root: String,
107}
108
109#[derive(Debug, Clone, Serialize, Deserialize)]
110pub struct AppliedMigration {
111 pub version: String,
112 pub name: String,
113 pub checksum: String,
114 pub applied_at: i64,
115}
116
117#[derive(Debug, Clone, Serialize, Deserialize)]
118#[serde(rename_all = "camelCase")]
119pub struct AppSchemaState {
120 pub schema_id: String,
121 pub schema_version: Option<i32>,
122 pub current_schema_version: i32,
123 pub updated_at: Option<i64>,
124}
125
126#[derive(Debug, Clone, Serialize, Deserialize)]
127pub struct OutboxSummary {
128 pub client_commit_id: String,
129 pub status: String,
130 pub schema_version: i32,
131 #[serde(default, skip_serializing_if = "Option::is_none")]
132 pub auth_lease: Option<AuthLeaseProvenance>,
133}
134
135#[derive(Debug, Clone, Serialize, Deserialize)]
136#[serde(rename_all = "camelCase")]
137pub struct AuthLeaseRecord {
138 pub lease_id: String,
139 pub kid: String,
140 pub actor_id: String,
141 pub issued_at_ms: i64,
142 pub not_before_ms: i64,
143 pub expires_at_ms: i64,
144 pub schema_version: i32,
145 pub payload_json: String,
146 pub token: String,
147 pub status: String,
148 #[serde(skip_serializing_if = "Option::is_none")]
149 pub last_validation_error: Option<String>,
150 pub created_at_ms: i64,
151 pub updated_at_ms: i64,
152}
153
154#[derive(Debug, Clone, Serialize, Deserialize)]
155pub struct ConflictSummary {
156 pub id: String,
157 pub client_commit_id: String,
158 pub op_index: i32,
159 pub result_status: String,
160 pub message: String,
161 pub code: Option<String>,
162 pub server_version: Option<i64>,
163 pub resolved_at: Option<i64>,
164 pub resolution: Option<String>,
165}
166
167#[derive(Debug, Clone, Default, Serialize, Deserialize)]
168#[serde(rename_all = "camelCase")]
169pub struct BlobHealthSummary {
170 pub cache_count: i64,
171 pub cache_bytes: i64,
172 pub upload_pending: i64,
173 pub upload_uploading: i64,
174 pub upload_failed: i64,
175 pub checked_references: i64,
176 pub invalid_references: i64,
177}
178
179#[derive(Debug, Clone, Default, Serialize, Deserialize)]
180#[serde(rename_all = "camelCase")]
181pub struct CrdtHealthSummary {
182 pub document_count: i64,
183 pub pending_updates: i64,
184 pub flushed_updates: i64,
185 pub acked_updates: i64,
186 pub log_updates: i64,
187 pub orphaned_documents: i64,
188 pub orphaned_log_entries: i64,
189}
190
191#[derive(Debug, Clone, Default, Serialize, Deserialize)]
192#[serde(rename_all = "camelCase")]
193pub struct ScopedRowsHealthSummary {
194 pub checked_synced_rows: i64,
195 pub orphaned_synced_rows: i64,
196 pub tables: Vec<ScopedRowsTableHealth>,
197}
198
199#[derive(Debug, Clone, Default, Serialize, Deserialize)]
200#[serde(rename_all = "camelCase")]
201pub struct ScopedRowsTableHealth {
202 pub table: String,
203 pub checked_synced_rows: i64,
204 pub orphaned_synced_rows: i64,
205}
206
207pub trait SyncStore {
208 type Tx<'a>: SyncStoreTx
209 where
210 Self: 'a;
211
212 fn transaction<T>(&mut self, f: impl FnOnce(&mut Self::Tx<'_>) -> Result<T>) -> Result<T>;
213
214 fn supports_sqlite_snapshot_artifacts(&self) -> bool {
215 false
216 }
217
218 fn decode_sqlite_snapshot_artifact_rows(
219 &self,
220 _table: &str,
221 _artifact_bytes: &[u8],
222 ) -> Result<Vec<Value>> {
223 Err(SyncularError::protocol_message(
224 "snapshot artifacts are not supported by this store",
225 ))
226 }
227}
228
229pub trait SyncStoreTx {
230 fn pending_outbox(&mut self, limit: i64) -> Result<Vec<OutboxCommit>>;
231 fn requeue_stale_outbox(&mut self) -> Result<()>;
232 fn mark_outbox_sending(&mut self, row_id: &str) -> Result<()>;
233 fn mark_pushed_operation_server_versions(
234 &mut self,
235 _outbox: &OutboxCommit,
236 _response: &PushCommitResponse,
237 ) -> Result<()> {
238 Ok(())
239 }
240 fn mark_outbox_acked(&mut self, row_id: &str, response: &PushCommitResponse) -> Result<()>;
241 fn mark_outbox_failed(
242 &mut self,
243 row_id: &str,
244 error: &str,
245 response: &PushCommitResponse,
246 ) -> Result<()>;
247 fn mark_outbox_retry(
248 &mut self,
249 row_id: &str,
250 error: &str,
251 next_attempt_at: i64,
252 failed: bool,
253 ) -> Result<()>;
254 fn insert_conflict(&mut self, outbox: &OutboxCommit, result: &OperationResult) -> Result<()>;
255
256 fn upsert_auth_lease(&mut self, _lease: &AuthLeaseRecord) -> Result<()> {
257 Err(SyncularError::storage(anyhow::anyhow!(
258 "auth lease storage is not supported by this store"
259 )))
260 }
261
262 fn auth_lease(&mut self, _lease_id: &str) -> Result<Option<AuthLeaseRecord>> {
263 Err(SyncularError::storage(anyhow::anyhow!(
264 "auth lease storage is not supported by this store"
265 )))
266 }
267
268 fn active_auth_leases(
269 &mut self,
270 _actor_id: Option<&str>,
271 _now_ms: i64,
272 ) -> Result<Vec<AuthLeaseRecord>> {
273 Err(SyncularError::storage(anyhow::anyhow!(
274 "auth lease storage is not supported by this store"
275 )))
276 }
277
278 fn set_outbox_auth_lease(
279 &mut self,
280 _client_commit_id: &str,
281 _provenance: Option<&AuthLeaseProvenance>,
282 ) -> Result<()> {
283 Err(SyncularError::storage(anyhow::anyhow!(
284 "outbox auth lease provenance is not supported by this store"
285 )))
286 }
287
288 fn subscription_state(
289 &mut self,
290 state_id: &str,
291 subscription_id: &str,
292 ) -> Result<Option<SubscriptionState>>;
293 fn subscription_states(&mut self, _state_id: &str) -> Result<Vec<SubscriptionState>> {
294 Ok(Vec::new())
295 }
296 fn upsert_subscription_state(&mut self, state: &SubscriptionState) -> Result<()>;
297 fn delete_subscription_state(&mut self, state_id: &str, subscription_id: &str) -> Result<()>;
298 fn verified_root(
299 &mut self,
300 _state_id: &str,
301 _subscription_id: &str,
302 ) -> Result<Option<VerifiedRoot>> {
303 Ok(None)
304 }
305 fn verified_roots(&mut self, _state_id: &str) -> Result<Vec<VerifiedRoot>> {
306 Ok(Vec::new())
307 }
308 fn upsert_verified_root(&mut self, _root: &VerifiedRoot) -> Result<()> {
309 Ok(())
310 }
311 fn delete_verified_root(&mut self, _state_id: &str, _subscription_id: &str) -> Result<()> {
312 Ok(())
313 }
314 fn crdt_state_vector_hints(
315 &mut self,
316 _table: &str,
317 _scopes: &ScopeValues,
318 _limit: i64,
319 ) -> Result<Vec<CrdtStateVectorHint>> {
320 Ok(Vec::new())
321 }
322
323 fn clear_table_for_scopes(&mut self, table: &str, scopes: &ScopeValues) -> Result<()>;
324 fn clear_synced_rows_for_scopes(&mut self, _table: &str, _scopes: &ScopeValues) -> Result<i64> {
325 Err(SyncularError::storage(anyhow::anyhow!(
326 "clearing synced rows is not supported by this store"
327 )))
328 }
329 fn clear_table_for_scopes_preserving_local_crdt(
330 &mut self,
331 table: &str,
332 scopes: &ScopeValues,
333 ) -> Result<()> {
334 self.clear_table_for_scopes(table, scopes)
335 }
336 fn current_row_json(&mut self, _table: &str, _row_id: &str) -> Result<Option<Value>> {
337 Ok(None)
338 }
339 fn upsert_row(&mut self, table: &str, row: &Value, fallback_version: Option<i64>)
340 -> Result<()>;
341 fn upsert_rows(
342 &mut self,
343 table: &str,
344 rows: &[Value],
345 fallback_version: Option<i64>,
346 ) -> Result<()> {
347 for row in rows {
348 self.upsert_row(table, row, fallback_version)?;
349 }
350 Ok(())
351 }
352 fn upsert_snapshot_chunk_rows(
353 &mut self,
354 table: &str,
355 rows: &SnapshotChunkRows,
356 fallback_version: Option<i64>,
357 ) -> Result<()> {
358 let rows = rows.clone().try_into_value_rows()?;
359 self.upsert_rows(table, &rows, fallback_version)
360 }
361 fn apply_change(&mut self, change: &SyncChange) -> Result<()>;
362}
363
364pub trait SyncStateStore {
365 fn applied_migrations(&mut self) -> Result<Vec<AppliedMigration>>;
366
367 fn app_schema_state(&mut self, current_schema_version: i32) -> Result<AppSchemaState> {
368 Ok(AppSchemaState {
369 schema_id: APP_SCHEMA_ID.to_string(),
370 schema_version: None,
371 current_schema_version,
372 updated_at: None,
373 })
374 }
375
376 fn outbox_summaries(&mut self) -> Result<Vec<OutboxSummary>>;
377
378 fn next_outbox_retry_at(&mut self) -> Result<Option<i64>> {
379 Ok(None)
380 }
381
382 fn next_blob_upload_retry_at(&mut self) -> Result<Option<i64>> {
383 Ok(None)
384 }
385
386 fn conflict_summaries(&mut self) -> Result<Vec<ConflictSummary>>;
387
388 fn blob_health_summary(&mut self) -> Result<Option<BlobHealthSummary>> {
389 Ok(None)
390 }
391
392 fn crdt_health_summary(&mut self) -> Result<Option<CrdtHealthSummary>> {
393 Ok(None)
394 }
395
396 fn scoped_rows_health_summary(
397 &mut self,
398 _subscriptions: &[SubscriptionSpec],
399 ) -> Result<Option<ScopedRowsHealthSummary>> {
400 Ok(None)
401 }
402
403 fn clear_orphaned_synced_rows(
404 &mut self,
405 _subscriptions: &[SubscriptionSpec],
406 _tables: &[String],
407 ) -> Result<ScopedRowsHealthSummary> {
408 Err(SyncularError::storage(anyhow::anyhow!(
409 "clearing orphaned synced rows is not supported by this store"
410 )))
411 }
412
413 fn resolve_conflict(&mut self, id: &str, resolution: &str) -> Result<()>;
414
415 fn retry_conflict_keep_local(&mut self, id: &str) -> Result<String>;
416}
417
418#[cfg(feature = "demo-todo-fixture")]
419pub trait DemoTaskStore {
420 fn add_task(
421 &mut self,
422 actor_id: &str,
423 project_id: Option<&str>,
424 task_id: String,
425 title_value: String,
426 ) -> Result<()>;
427
428 fn patch_task_title(
429 &mut self,
430 project_id: Option<&str>,
431 task_id: String,
432 title_value: String,
433 ) -> Result<()>;
434
435 fn list_tasks(&mut self) -> Result<Vec<Task>>;
436}