Skip to main content

syncular_runtime/storage/
traits.rs

1use crate::binary_snapshot::SnapshotChunkRows;
2use crate::client::SubscriptionSpec;
3use crate::error::{Result, SyncularError};
4use crate::protocol::*;
5use serde::{Deserialize, Serialize};
6use serde_json::Value;
7#[cfg(not(all(target_arch = "wasm32", feature = "web-transport")))]
8use std::time::{SystemTime, UNIX_EPOCH};
9
10#[cfg(not(all(target_arch = "wasm32", feature = "web-transport")))]
11pub fn now_ms() -> i64 {
12    SystemTime::now()
13        .duration_since(UNIX_EPOCH)
14        .unwrap_or_default()
15        .as_millis() as i64
16}
17
18#[cfg(all(target_arch = "wasm32", feature = "web-transport"))]
19pub fn now_ms() -> i64 {
20    js_sys::Date::now() as i64
21}
22
23pub const MAX_SYNC_RETRIES: i32 = 5;
24pub const SYNC_SENDING_TIMEOUT_MS: i64 = 30_000;
25pub const MAX_BLOB_UPLOAD_RETRIES: i32 = 3;
26pub const BLOB_UPLOAD_STALE_TIMEOUT_MS: i64 = 30_000;
27pub const SQLITE_BUSY_TIMEOUT_MS: i32 = 5_000;
28pub const APP_SCHEMA_ID: &str = "syncular-app";
29
30const RETRY_BASE_DELAY_MS: i64 = 1_000;
31const RETRY_MAX_DELAY_MS: i64 = 30_000;
32const BLOB_UPLOAD_RETRY_BASE_DELAY_MS: i64 = 100;
33const BLOB_UPLOAD_RETRY_MAX_DELAY_MS: i64 = 5_000;
34
35pub fn retry_backoff_delay_ms(attempt_count: i32) -> i64 {
36    let exponent = attempt_count.saturating_sub(1).min(12) as u32;
37    RETRY_BASE_DELAY_MS
38        .saturating_mul(2_i64.saturating_pow(exponent))
39        .min(RETRY_MAX_DELAY_MS)
40}
41
42pub fn next_retry_at(now: i64, attempt_count: i32) -> i64 {
43    now.saturating_add(retry_backoff_delay_ms(attempt_count))
44}
45
46pub fn blob_upload_retry_backoff_delay_ms(attempt_count: i32) -> i64 {
47    let exponent = attempt_count.saturating_sub(1).min(12) as u32;
48    BLOB_UPLOAD_RETRY_BASE_DELAY_MS
49        .saturating_mul(2_i64.saturating_pow(exponent))
50        .min(BLOB_UPLOAD_RETRY_MAX_DELAY_MS)
51}
52
53pub fn next_blob_upload_retry_at(now: i64, attempt_count: i32) -> i64 {
54    now.saturating_add(blob_upload_retry_backoff_delay_ms(attempt_count))
55}
56
57#[derive(Debug, Clone, Serialize, Deserialize)]
58#[cfg(feature = "demo-todo-fixture")]
59pub struct Task {
60    pub id: String,
61    pub title: String,
62    pub completed: i32,
63    pub user_id: String,
64    pub project_id: Option<String>,
65    pub server_version: i64,
66    pub image: Option<String>,
67    pub title_yjs_state: Option<String>,
68}
69
70#[derive(Debug, Clone, Serialize, Deserialize)]
71pub struct OutboxCommit {
72    pub id: String,
73    pub client_commit_id: String,
74    pub status: String,
75    pub operations_json: String,
76    pub last_response_json: Option<String>,
77    pub error: Option<String>,
78    pub created_at: i64,
79    pub updated_at: i64,
80    pub attempt_count: i32,
81    pub acked_commit_seq: Option<i64>,
82    pub schema_version: i32,
83    pub next_attempt_at: i64,
84    #[serde(default, skip_serializing_if = "Option::is_none")]
85    pub auth_lease: Option<AuthLeaseProvenance>,
86}
87
88#[derive(Debug, Clone, Serialize, Deserialize)]
89pub struct SubscriptionState {
90    pub state_id: String,
91    pub subscription_id: String,
92    pub table: String,
93    pub scopes_json: String,
94    pub params_json: String,
95    pub cursor: i64,
96    pub bootstrap_state_json: Option<String>,
97    pub status: String,
98}
99
100#[derive(Debug, Clone, Serialize, Deserialize)]
101pub struct VerifiedRoot {
102    pub state_id: String,
103    pub subscription_id: String,
104    pub partition_id: String,
105    pub commit_seq: i64,
106    pub root: String,
107}
108
109#[derive(Debug, Clone, Serialize, Deserialize)]
110pub struct AppliedMigration {
111    pub version: String,
112    pub name: String,
113    pub checksum: String,
114    pub applied_at: i64,
115}
116
117#[derive(Debug, Clone, Serialize, Deserialize)]
118#[serde(rename_all = "camelCase")]
119pub struct AppSchemaState {
120    pub schema_id: String,
121    pub schema_version: Option<i32>,
122    pub current_schema_version: i32,
123    pub updated_at: Option<i64>,
124}
125
126#[derive(Debug, Clone, Serialize, Deserialize)]
127pub struct OutboxSummary {
128    pub client_commit_id: String,
129    pub status: String,
130    pub schema_version: i32,
131    #[serde(default, skip_serializing_if = "Option::is_none")]
132    pub auth_lease: Option<AuthLeaseProvenance>,
133}
134
135#[derive(Debug, Clone, Serialize, Deserialize)]
136#[serde(rename_all = "camelCase")]
137pub struct AuthLeaseRecord {
138    pub lease_id: String,
139    pub kid: String,
140    pub actor_id: String,
141    pub issued_at_ms: i64,
142    pub not_before_ms: i64,
143    pub expires_at_ms: i64,
144    pub schema_version: i32,
145    pub payload_json: String,
146    pub token: String,
147    pub status: String,
148    #[serde(skip_serializing_if = "Option::is_none")]
149    pub last_validation_error: Option<String>,
150    pub created_at_ms: i64,
151    pub updated_at_ms: i64,
152}
153
154#[derive(Debug, Clone, Serialize, Deserialize)]
155pub struct ConflictSummary {
156    pub id: String,
157    pub client_commit_id: String,
158    pub op_index: i32,
159    pub result_status: String,
160    pub message: String,
161    pub code: Option<String>,
162    pub server_version: Option<i64>,
163    pub resolved_at: Option<i64>,
164    pub resolution: Option<String>,
165}
166
167#[derive(Debug, Clone, Default, Serialize, Deserialize)]
168#[serde(rename_all = "camelCase")]
169pub struct BlobHealthSummary {
170    pub cache_count: i64,
171    pub cache_bytes: i64,
172    pub upload_pending: i64,
173    pub upload_uploading: i64,
174    pub upload_failed: i64,
175    pub checked_references: i64,
176    pub invalid_references: i64,
177}
178
179#[derive(Debug, Clone, Default, Serialize, Deserialize)]
180#[serde(rename_all = "camelCase")]
181pub struct CrdtHealthSummary {
182    pub document_count: i64,
183    pub pending_updates: i64,
184    pub flushed_updates: i64,
185    pub acked_updates: i64,
186    pub log_updates: i64,
187    pub orphaned_documents: i64,
188    pub orphaned_log_entries: i64,
189}
190
191#[derive(Debug, Clone, Default, Serialize, Deserialize)]
192#[serde(rename_all = "camelCase")]
193pub struct ScopedRowsHealthSummary {
194    pub checked_synced_rows: i64,
195    pub orphaned_synced_rows: i64,
196    pub tables: Vec<ScopedRowsTableHealth>,
197}
198
199#[derive(Debug, Clone, Default, Serialize, Deserialize)]
200#[serde(rename_all = "camelCase")]
201pub struct ScopedRowsTableHealth {
202    pub table: String,
203    pub checked_synced_rows: i64,
204    pub orphaned_synced_rows: i64,
205}
206
207pub trait SyncStore {
208    type Tx<'a>: SyncStoreTx
209    where
210        Self: 'a;
211
212    fn transaction<T>(&mut self, f: impl FnOnce(&mut Self::Tx<'_>) -> Result<T>) -> Result<T>;
213
214    fn supports_sqlite_snapshot_artifacts(&self) -> bool {
215        false
216    }
217
218    fn decode_sqlite_snapshot_artifact_rows(
219        &self,
220        _table: &str,
221        _artifact_bytes: &[u8],
222    ) -> Result<Vec<Value>> {
223        Err(SyncularError::protocol_message(
224            "snapshot artifacts are not supported by this store",
225        ))
226    }
227}
228
229pub trait SyncStoreTx {
230    fn pending_outbox(&mut self, limit: i64) -> Result<Vec<OutboxCommit>>;
231    fn requeue_stale_outbox(&mut self) -> Result<()>;
232    fn mark_outbox_sending(&mut self, row_id: &str) -> Result<()>;
233    fn mark_pushed_operation_server_versions(
234        &mut self,
235        _outbox: &OutboxCommit,
236        _response: &PushCommitResponse,
237    ) -> Result<()> {
238        Ok(())
239    }
240    fn mark_outbox_acked(&mut self, row_id: &str, response: &PushCommitResponse) -> Result<()>;
241    fn mark_outbox_failed(
242        &mut self,
243        row_id: &str,
244        error: &str,
245        response: &PushCommitResponse,
246    ) -> Result<()>;
247    fn mark_outbox_retry(
248        &mut self,
249        row_id: &str,
250        error: &str,
251        next_attempt_at: i64,
252        failed: bool,
253    ) -> Result<()>;
254    fn insert_conflict(&mut self, outbox: &OutboxCommit, result: &OperationResult) -> Result<()>;
255
256    fn upsert_auth_lease(&mut self, _lease: &AuthLeaseRecord) -> Result<()> {
257        Err(SyncularError::storage(anyhow::anyhow!(
258            "auth lease storage is not supported by this store"
259        )))
260    }
261
262    fn auth_lease(&mut self, _lease_id: &str) -> Result<Option<AuthLeaseRecord>> {
263        Err(SyncularError::storage(anyhow::anyhow!(
264            "auth lease storage is not supported by this store"
265        )))
266    }
267
268    fn active_auth_leases(
269        &mut self,
270        _actor_id: Option<&str>,
271        _now_ms: i64,
272    ) -> Result<Vec<AuthLeaseRecord>> {
273        Err(SyncularError::storage(anyhow::anyhow!(
274            "auth lease storage is not supported by this store"
275        )))
276    }
277
278    fn set_outbox_auth_lease(
279        &mut self,
280        _client_commit_id: &str,
281        _provenance: Option<&AuthLeaseProvenance>,
282    ) -> Result<()> {
283        Err(SyncularError::storage(anyhow::anyhow!(
284            "outbox auth lease provenance is not supported by this store"
285        )))
286    }
287
288    fn subscription_state(
289        &mut self,
290        state_id: &str,
291        subscription_id: &str,
292    ) -> Result<Option<SubscriptionState>>;
293    fn subscription_states(&mut self, _state_id: &str) -> Result<Vec<SubscriptionState>> {
294        Ok(Vec::new())
295    }
296    fn upsert_subscription_state(&mut self, state: &SubscriptionState) -> Result<()>;
297    fn delete_subscription_state(&mut self, state_id: &str, subscription_id: &str) -> Result<()>;
298    fn verified_root(
299        &mut self,
300        _state_id: &str,
301        _subscription_id: &str,
302    ) -> Result<Option<VerifiedRoot>> {
303        Ok(None)
304    }
305    fn verified_roots(&mut self, _state_id: &str) -> Result<Vec<VerifiedRoot>> {
306        Ok(Vec::new())
307    }
308    fn upsert_verified_root(&mut self, _root: &VerifiedRoot) -> Result<()> {
309        Ok(())
310    }
311    fn delete_verified_root(&mut self, _state_id: &str, _subscription_id: &str) -> Result<()> {
312        Ok(())
313    }
314    fn crdt_state_vector_hints(
315        &mut self,
316        _table: &str,
317        _scopes: &ScopeValues,
318        _limit: i64,
319    ) -> Result<Vec<CrdtStateVectorHint>> {
320        Ok(Vec::new())
321    }
322
323    fn clear_table_for_scopes(&mut self, table: &str, scopes: &ScopeValues) -> Result<()>;
324    fn clear_synced_rows_for_scopes(&mut self, _table: &str, _scopes: &ScopeValues) -> Result<i64> {
325        Err(SyncularError::storage(anyhow::anyhow!(
326            "clearing synced rows is not supported by this store"
327        )))
328    }
329    fn clear_table_for_scopes_preserving_local_crdt(
330        &mut self,
331        table: &str,
332        scopes: &ScopeValues,
333    ) -> Result<()> {
334        self.clear_table_for_scopes(table, scopes)
335    }
336    fn current_row_json(&mut self, _table: &str, _row_id: &str) -> Result<Option<Value>> {
337        Ok(None)
338    }
339    fn upsert_row(&mut self, table: &str, row: &Value, fallback_version: Option<i64>)
340        -> Result<()>;
341    fn upsert_rows(
342        &mut self,
343        table: &str,
344        rows: &[Value],
345        fallback_version: Option<i64>,
346    ) -> Result<()> {
347        for row in rows {
348            self.upsert_row(table, row, fallback_version)?;
349        }
350        Ok(())
351    }
352    fn upsert_snapshot_chunk_rows(
353        &mut self,
354        table: &str,
355        rows: &SnapshotChunkRows,
356        fallback_version: Option<i64>,
357    ) -> Result<()> {
358        let rows = rows.clone().try_into_value_rows()?;
359        self.upsert_rows(table, &rows, fallback_version)
360    }
361    fn apply_change(&mut self, change: &SyncChange) -> Result<()>;
362}
363
364pub trait SyncStateStore {
365    fn applied_migrations(&mut self) -> Result<Vec<AppliedMigration>>;
366
367    fn app_schema_state(&mut self, current_schema_version: i32) -> Result<AppSchemaState> {
368        Ok(AppSchemaState {
369            schema_id: APP_SCHEMA_ID.to_string(),
370            schema_version: None,
371            current_schema_version,
372            updated_at: None,
373        })
374    }
375
376    fn outbox_summaries(&mut self) -> Result<Vec<OutboxSummary>>;
377
378    fn next_outbox_retry_at(&mut self) -> Result<Option<i64>> {
379        Ok(None)
380    }
381
382    fn next_blob_upload_retry_at(&mut self) -> Result<Option<i64>> {
383        Ok(None)
384    }
385
386    fn conflict_summaries(&mut self) -> Result<Vec<ConflictSummary>>;
387
388    fn blob_health_summary(&mut self) -> Result<Option<BlobHealthSummary>> {
389        Ok(None)
390    }
391
392    fn crdt_health_summary(&mut self) -> Result<Option<CrdtHealthSummary>> {
393        Ok(None)
394    }
395
396    fn scoped_rows_health_summary(
397        &mut self,
398        _subscriptions: &[SubscriptionSpec],
399    ) -> Result<Option<ScopedRowsHealthSummary>> {
400        Ok(None)
401    }
402
403    fn clear_orphaned_synced_rows(
404        &mut self,
405        _subscriptions: &[SubscriptionSpec],
406        _tables: &[String],
407    ) -> Result<ScopedRowsHealthSummary> {
408        Err(SyncularError::storage(anyhow::anyhow!(
409            "clearing orphaned synced rows is not supported by this store"
410        )))
411    }
412
413    fn resolve_conflict(&mut self, id: &str, resolution: &str) -> Result<()>;
414
415    fn retry_conflict_keep_local(&mut self, id: &str) -> Result<String>;
416}
417
418#[cfg(feature = "demo-todo-fixture")]
419pub trait DemoTaskStore {
420    fn add_task(
421        &mut self,
422        actor_id: &str,
423        project_id: Option<&str>,
424        task_id: String,
425        title_value: String,
426    ) -> Result<()>;
427
428    fn patch_task_title(
429        &mut self,
430        project_id: Option<&str>,
431        task_id: String,
432        title_value: String,
433    ) -> Result<()>;
434
435    fn list_tasks(&mut self) -> Result<Vec<Task>>;
436}