Skip to main content

syncular_runtime/core/
error.rs

1use anyhow::anyhow;
2use serde::{Deserialize, Serialize};
3use std::fmt;
4
5pub type Result<T> = std::result::Result<T, SyncularError>;
6
7pub const FULL_SNAPSHOT_RESYNC_REQUIRED: &str = "full snapshot resync required";
8
9#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
10#[serde(rename_all = "camelCase")]
11pub struct SyncularErrorClassification {
12    pub code: String,
13    pub category: String,
14    pub retryable: bool,
15    pub recommended_action: String,
16}
17
18#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
19pub enum ErrorKind {
20    Busy,
21    Config,
22    Storage,
23    Transport,
24    Protocol,
25    Schema,
26    Codegen,
27    Internal,
28}
29
30#[derive(Debug)]
31pub struct SyncularError {
32    kind: ErrorKind,
33    source: anyhow::Error,
34}
35
36impl SyncularError {
37    pub fn new(kind: ErrorKind, source: impl Into<anyhow::Error>) -> Self {
38        Self {
39            kind,
40            source: source.into(),
41        }
42    }
43
44    pub fn message(kind: ErrorKind, message: impl fmt::Display) -> Self {
45        Self::new(kind, anyhow!(message.to_string()))
46    }
47
48    pub fn config(message: impl fmt::Display) -> Self {
49        Self::message(ErrorKind::Config, message)
50    }
51
52    pub fn busy(message: impl fmt::Display) -> Self {
53        Self::message(ErrorKind::Busy, message)
54    }
55
56    pub fn storage(source: impl Into<anyhow::Error>) -> Self {
57        Self::new(ErrorKind::Storage, source)
58    }
59
60    pub fn transport(source: impl Into<anyhow::Error>) -> Self {
61        Self::new(ErrorKind::Transport, source)
62    }
63
64    pub fn protocol(source: impl Into<anyhow::Error>) -> Self {
65        Self::new(ErrorKind::Protocol, source)
66    }
67
68    pub fn protocol_message(message: impl fmt::Display) -> Self {
69        Self::message(ErrorKind::Protocol, message)
70    }
71
72    pub fn schema(message: impl fmt::Display) -> Self {
73        Self::message(ErrorKind::Schema, message)
74    }
75
76    pub fn codegen(message: impl fmt::Display) -> Self {
77        Self::message(ErrorKind::Codegen, message)
78    }
79
80    pub fn limit_exceeded(
81        limit: &str,
82        observed: usize,
83        max: usize,
84        message: impl fmt::Display,
85    ) -> Self {
86        let payload = serde_json::json!({
87            "code": "runtime.limit_exceeded",
88            "category": "limit-exceeded",
89            "retryable": false,
90            "recommendedAction": "reduceInput",
91            "limit": limit,
92            "observed": observed,
93            "max": max,
94        });
95        Self::message(ErrorKind::Config, format!("{message}: {payload}"))
96    }
97
98    pub fn kind(&self) -> ErrorKind {
99        self.kind
100    }
101
102    pub fn message_text(&self) -> String {
103        self.source.to_string()
104    }
105
106    pub fn debug_text(&self) -> String {
107        self.to_string()
108    }
109
110    pub fn requires_full_snapshot_resync(&self) -> bool {
111        self.message_text().contains(FULL_SNAPSHOT_RESYNC_REQUIRED)
112    }
113
114    pub fn classification(&self) -> SyncularErrorClassification {
115        let message = self.message_text();
116        if let Some(classification) = classification_from_server_error(&message) {
117            return classification;
118        }
119
120        if http_status_from_message(&message) == Some(401) {
121            return syncular_error_classification(
122                "sync.auth_required",
123                "auth-required",
124                true,
125                "refreshAuth",
126            );
127        }
128
129        if http_status_from_message(&message) == Some(403) {
130            return syncular_error_classification(
131                "sync.forbidden",
132                "forbidden",
133                false,
134                "checkPermissions",
135            );
136        }
137
138        let haystack = format!("{message}\n{}", self.debug_text());
139        if self.kind == ErrorKind::Schema || haystack_contains_schema_mismatch(&haystack) {
140            return syncular_error_classification(
141                "sync.schema_mismatch",
142                "schema-mismatch",
143                false,
144                "regenerateClient",
145            );
146        }
147
148        if self.kind == ErrorKind::Transport && haystack_contains_offline(&haystack) {
149            return syncular_error_classification("sync.offline", "offline", true, "retryLater");
150        }
151
152        if self.kind == ErrorKind::Protocol
153            && (haystack_contains_integrity_rejection(&haystack)
154                || self.requires_full_snapshot_resync())
155        {
156            return syncular_error_classification(
157                "sync.integrity_rejected",
158                "integrity-rejected",
159                false,
160                "forceResync",
161            );
162        }
163
164        match self.kind {
165            ErrorKind::Busy => {
166                syncular_error_classification("runtime.busy", "rate-limited", true, "retryLater")
167            }
168            ErrorKind::Config => syncular_error_classification(
169                "runtime.config_invalid",
170                "invalid-request",
171                false,
172                "fixRequest",
173            ),
174            ErrorKind::Storage => {
175                syncular_error_classification("storage.failed", "storage", false, "inspectStorage")
176            }
177            ErrorKind::Transport => syncular_error_classification(
178                "sync.transport_failed",
179                "transport",
180                true,
181                "retryLater",
182            ),
183            ErrorKind::Protocol => syncular_error_classification(
184                "sync.invalid_request",
185                "invalid-request",
186                false,
187                "fixRequest",
188            ),
189            ErrorKind::Schema => unreachable!("schema errors are classified above"),
190            ErrorKind::Codegen => syncular_error_classification(
191                "runtime.codegen_mismatch",
192                "schema-mismatch",
193                false,
194                "regenerateClient",
195            ),
196            ErrorKind::Internal => syncular_error_classification(
197                "runtime.internal",
198                "internal",
199                false,
200                "inspectServer",
201            ),
202        }
203    }
204
205    pub fn context(self, context: impl fmt::Display) -> Self {
206        Self {
207            kind: self.kind,
208            source: self.source.context(context.to_string()),
209        }
210    }
211}
212
213fn syncular_error_classification(
214    code: &str,
215    category: &str,
216    retryable: bool,
217    recommended_action: &str,
218) -> SyncularErrorClassification {
219    SyncularErrorClassification {
220        code: code.to_string(),
221        category: category.to_string(),
222        retryable,
223        recommended_action: recommended_action.to_string(),
224    }
225}
226
227fn known_error_classification(code: &str) -> Option<SyncularErrorClassification> {
228    let (category, retryable, recommended_action) = match code {
229        "sync.auth_required" => ("auth-required", true, "refreshAuth"),
230        "sync.auth_lease_missing" => ("auth-required", true, "refreshAuth"),
231        "sync.auth_lease_invalid" => ("auth-required", true, "refreshAuth"),
232        "sync.auth_lease_expired" => ("auth-required", true, "refreshAuth"),
233        "sync.auth_lease_schema_mismatch" => ("schema-mismatch", false, "regenerateClient"),
234        "sync.auth_lease_scope_mismatch" => ("forbidden", false, "checkPermissions"),
235        "sync.auth_lease_scope_revoked" => ("scope-revoked", false, "checkPermissions"),
236        "sync.auth_lease_business_rejected" => ("conflict", false, "resolveConflict"),
237        "sync.forbidden" => ("forbidden", false, "checkPermissions"),
238        "sync.invalid_request" => ("invalid-request", false, "fixRequest"),
239        "sync.invalid_client_id" => ("invalid-request", false, "resetClientId"),
240        "sync.invalid_subscription" => ("invalid-request", false, "fixRequest"),
241        "sync.empty_commit" => ("invalid-request", false, "fixRequest"),
242        "sync.unknown_table" => ("schema-mismatch", false, "regenerateClient"),
243        "sync.unsupported_operation" => ("invalid-request", false, "fixRequest"),
244        "sync.row_missing" => ("not-found", false, "forceResync"),
245        "sync.version_conflict" => ("conflict", false, "resolveConflict"),
246        "sync.constraint_violation" => ("invalid-request", false, "fixRequest"),
247        "sync.missing_scopes" => ("internal", false, "inspectServer"),
248        "sync.idempotency_cache_miss" => ("internal", true, "retryLater"),
249        "sync.too_many_operations" => ("invalid-request", false, "splitBatch"),
250        "sync.not_found" => ("not-found", false, "forceResync"),
251        "sync.rate_limited" => ("rate-limited", true, "retryLater"),
252        "sync.schema_mismatch" => ("schema-mismatch", false, "regenerateClient"),
253        "sync.client_schema_unsupported" => ("schema-mismatch", false, "upgradeClient"),
254        "sync.integrity_rejected" => ("integrity-rejected", false, "forceResync"),
255        "sync.scope_revoked" => ("scope-revoked", false, "checkPermissions"),
256        "sync.offline" => ("offline", true, "retryLater"),
257        "sync.websocket_not_configured" => ("server", false, "inspectServer"),
258        "sync.websocket_connection_limit" => ("rate-limited", true, "retryLater"),
259        "sync.transport_failed" => ("transport", true, "retryLater"),
260        "runtime.busy" => ("rate-limited", true, "retryLater"),
261        "runtime.limit_exceeded" => ("limit-exceeded", false, "reduceInput"),
262        "runtime.config_invalid" => ("invalid-request", false, "fixRequest"),
263        "runtime.codegen_mismatch" => ("schema-mismatch", false, "regenerateClient"),
264        "runtime.internal" => ("internal", false, "inspectServer"),
265        "storage.failed" => ("storage", false, "inspectStorage"),
266        "worker.closed" => ("invalid-request", false, "fixRequest"),
267        "worker.not_open" => ("invalid-request", false, "fixRequest"),
268        "worker.protocol_mismatch" => ("schema-mismatch", false, "regenerateClient"),
269        "worker.request_timeout" => ("rate-limited", true, "retryLater"),
270        "worker.failed" => ("internal", false, "recreateClient"),
271        "worker.message_unreadable" => ("internal", false, "recreateClient"),
272        "console.auth_required" => ("auth-required", true, "refreshAuth"),
273        "console.forbidden_origin" => ("forbidden", false, "checkPermissions"),
274        "console.invalid_request" => ("invalid-request", false, "fixRequest"),
275        "console.schema_unavailable" => ("server", true, "retryLater"),
276        "console.not_found" => ("not-found", false, "inspectServer"),
277        "console.downstream_unavailable" => ("server", true, "retryLater"),
278        "console.downstream_invalid_response" => ("server", false, "inspectServer"),
279        "console.internal" => ("internal", false, "inspectServer"),
280        "proxy.auth_required" => ("auth-required", true, "refreshAuth"),
281        "proxy.forbidden_origin" => ("forbidden", false, "checkPermissions"),
282        "proxy.connection_limit" => ("rate-limited", true, "retryLater"),
283        "blob.invalid_request" => ("blob", false, "fixRequest"),
284        "blob.storage_not_configured" => ("blob", false, "inspectServer"),
285        "blob.too_large" => ("blob", false, "fixRequest"),
286        "blob.not_found" => ("blob", false, "fixRequest"),
287        "blob.forbidden" => ("forbidden", false, "checkPermissions"),
288        "blob.invalid_token" => ("auth-required", true, "refreshAuth"),
289        "blob.upload_failed" => ("blob", true, "retryLater"),
290        "blob.hash_mismatch" => ("integrity-rejected", false, "fixRequest"),
291        "blob.size_mismatch" => ("blob", false, "fixRequest"),
292        _ => return None,
293    };
294
295    Some(syncular_error_classification(
296        code,
297        category,
298        retryable,
299        recommended_action,
300    ))
301}
302
303fn classification_from_server_error(message: &str) -> Option<SyncularErrorClassification> {
304    let parsed = parse_json_object_suffix(message)?;
305    let code = parsed
306        .get("code")
307        .and_then(serde_json::Value::as_str)
308        .or_else(|| parsed.get("error").and_then(serde_json::Value::as_str))?;
309
310    let base = known_error_classification(code)
311        .unwrap_or_else(|| syncular_error_classification(code, "server", false, "inspectServer"));
312    Some(SyncularErrorClassification {
313        code: code.to_string(),
314        category: parsed
315            .get("category")
316            .and_then(serde_json::Value::as_str)
317            .unwrap_or(&base.category)
318            .to_string(),
319        retryable: parsed
320            .get("retryable")
321            .and_then(serde_json::Value::as_bool)
322            .unwrap_or(base.retryable),
323        recommended_action: parsed
324            .get("recommendedAction")
325            .and_then(serde_json::Value::as_str)
326            .unwrap_or(&base.recommended_action)
327            .to_string(),
328    })
329}
330
331fn parse_json_object_suffix(message: &str) -> Option<serde_json::Map<String, serde_json::Value>> {
332    let start = message.find('{')?;
333    let tail = &message[start..];
334    let parsed = match serde_json::from_str::<serde_json::Value>(tail) {
335        Ok(value) => value,
336        Err(_) => {
337            let end = tail.rfind('}')?;
338            serde_json::from_str::<serde_json::Value>(&tail[..=end]).ok()?
339        }
340    };
341    match parsed {
342        serde_json::Value::Object(object) => Some(object),
343        _ => None,
344    }
345}
346
347fn http_status_from_message(message: &str) -> Option<u16> {
348    let index = message.find("HTTP ")?;
349    let status = message.get(index + 5..index + 8)?;
350    status.parse::<u16>().ok()
351}
352
353fn haystack_contains_schema_mismatch(haystack: &str) -> bool {
354    haystack.to_ascii_lowercase().contains("schema version")
355}
356
357fn haystack_contains_offline(haystack: &str) -> bool {
358    let haystack = haystack.to_ascii_lowercase();
359    haystack.contains("offline") || haystack.contains("network is unreachable")
360}
361
362fn haystack_contains_integrity_rejection(haystack: &str) -> bool {
363    let haystack = haystack.to_ascii_lowercase();
364    [
365        "hash mismatch",
366        "sha256 mismatch",
367        "byte length mismatch",
368        "manifest ",
369        "integrity",
370        "chain root",
371        "commit root",
372        "verified root",
373    ]
374    .iter()
375    .any(|needle| haystack.contains(needle))
376}
377
378impl fmt::Display for SyncularError {
379    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
380        write!(f, "{:?}: {}", self.kind, self.source)
381    }
382}
383
384impl std::error::Error for SyncularError {
385    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
386        Some(self.source.as_ref())
387    }
388}
389
390impl From<anyhow::Error> for SyncularError {
391    fn from(source: anyhow::Error) -> Self {
392        Self::new(ErrorKind::Internal, source)
393    }
394}
395
396#[cfg(feature = "native")]
397impl From<diesel::ConnectionError> for SyncularError {
398    fn from(source: diesel::ConnectionError) -> Self {
399        Self::storage(source)
400    }
401}
402
403#[cfg(feature = "native")]
404impl From<diesel::result::Error> for SyncularError {
405    fn from(source: diesel::result::Error) -> Self {
406        Self::storage(source)
407    }
408}
409
410#[cfg(feature = "native")]
411impl From<rusqlite::Error> for SyncularError {
412    fn from(source: rusqlite::Error) -> Self {
413        Self::storage(source)
414    }
415}
416
417#[cfg(feature = "native")]
418impl From<reqwest::Error> for SyncularError {
419    fn from(source: reqwest::Error) -> Self {
420        Self::transport(source)
421    }
422}
423
424#[cfg(feature = "native")]
425impl From<reqwest::header::InvalidHeaderValue> for SyncularError {
426    fn from(source: reqwest::header::InvalidHeaderValue) -> Self {
427        Self::transport(source)
428    }
429}
430
431#[cfg(feature = "native")]
432impl From<tungstenite::Error> for SyncularError {
433    fn from(source: tungstenite::Error) -> Self {
434        Self::transport(source)
435    }
436}
437
438impl From<serde_json::Error> for SyncularError {
439    fn from(source: serde_json::Error) -> Self {
440        Self::protocol(source)
441    }
442}
443
444impl From<syncular_protocol::ProtocolError> for SyncularError {
445    fn from(source: syncular_protocol::ProtocolError) -> Self {
446        Self::protocol(source)
447    }
448}
449
450impl From<std::io::Error> for SyncularError {
451    fn from(source: std::io::Error) -> Self {
452        Self::new(ErrorKind::Internal, source)
453    }
454}
455
456#[cfg(test)]
457mod tests {
458    use super::*;
459
460    #[test]
461    fn classification_prefers_server_error_envelope() {
462        let error = SyncularError::message(
463            ErrorKind::Transport,
464            r#"sync failed with HTTP 403: {"error":"sync.forbidden","code":"sync.forbidden","category":"forbidden","retryable":false,"recommendedAction":"checkPermissions","message":"Forbidden"}"#,
465        );
466
467        assert_eq!(
468            error.classification(),
469            SyncularErrorClassification {
470                code: "sync.forbidden".to_string(),
471                category: "forbidden".to_string(),
472                retryable: false,
473                recommended_action: "checkPermissions".to_string(),
474            }
475        );
476    }
477
478    #[test]
479    fn classification_knows_shared_taxonomy_codes_without_envelope_metadata() {
480        let conflict = SyncularError::message(
481            ErrorKind::Transport,
482            r#"sync failed with HTTP 409: {"error":"sync.version_conflict","code":"sync.version_conflict","message":"Version conflict"}"#,
483        );
484        let worker = SyncularError::message(
485            ErrorKind::Transport,
486            r#"worker failed: {"error":"worker.failed","code":"worker.failed","message":"Worker failed"}"#,
487        );
488
489        assert_eq!(conflict.classification().category, "conflict");
490        assert_eq!(
491            conflict.classification().recommended_action,
492            "resolveConflict"
493        );
494        assert_eq!(worker.classification().category, "internal");
495        assert_eq!(worker.classification().recommended_action, "recreateClient");
496    }
497
498    #[test]
499    fn classification_maps_http_auth_statuses_without_server_envelope() {
500        let auth = SyncularError::message(ErrorKind::Transport, "sync failed with HTTP 401");
501        let forbidden = SyncularError::message(ErrorKind::Transport, "sync failed with HTTP 403");
502
503        assert_eq!(auth.classification().code, "sync.auth_required");
504        assert_eq!(auth.classification().recommended_action, "refreshAuth");
505        assert_eq!(forbidden.classification().code, "sync.forbidden");
506        assert_eq!(
507            forbidden.classification().recommended_action,
508            "checkPermissions"
509        );
510    }
511
512    #[test]
513    fn classification_maps_schema_and_integrity_errors() {
514        let schema = SyncularError::schema("server schema version 12 is not compatible");
515        let integrity = SyncularError::protocol_message(
516            "snapshot chunk sha256 mismatch; full snapshot resync required",
517        );
518
519        assert_eq!(schema.classification().code, "sync.schema_mismatch");
520        assert_eq!(
521            schema.classification().recommended_action,
522            "regenerateClient"
523        );
524        assert_eq!(integrity.classification().code, "sync.integrity_rejected");
525        assert_eq!(integrity.classification().recommended_action, "forceResync");
526    }
527
528    #[test]
529    fn classification_maps_runtime_storage_failures() {
530        let error = SyncularError::message(ErrorKind::Storage, "database is locked");
531
532        assert_eq!(
533            error.classification(),
534            SyncularErrorClassification {
535                code: "storage.failed".to_string(),
536                category: "storage".to_string(),
537                retryable: false,
538                recommended_action: "inspectStorage".to_string(),
539            }
540        );
541    }
542
543    #[test]
544    fn classification_maps_offline_transport_failures() {
545        let error = SyncularError::message(ErrorKind::Transport, "browser fetch failed: offline");
546
547        assert_eq!(
548            error.classification(),
549            SyncularErrorClassification {
550                code: "sync.offline".to_string(),
551                category: "offline".to_string(),
552                retryable: true,
553                recommended_action: "retryLater".to_string(),
554            }
555        );
556    }
557}