1use anyhow::anyhow;
2use serde::{Deserialize, Serialize};
3use std::fmt;
4
5pub type Result<T> = std::result::Result<T, SyncularError>;
6
7pub const FULL_SNAPSHOT_RESYNC_REQUIRED: &str = "full snapshot resync required";
8
9#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
10#[serde(rename_all = "camelCase")]
11pub struct SyncularErrorClassification {
12 pub code: String,
13 pub category: String,
14 pub retryable: bool,
15 pub recommended_action: String,
16}
17
18#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
19pub enum ErrorKind {
20 Busy,
21 Config,
22 Storage,
23 Transport,
24 Protocol,
25 Schema,
26 Codegen,
27 Internal,
28}
29
30#[derive(Debug)]
31pub struct SyncularError {
32 kind: ErrorKind,
33 source: anyhow::Error,
34}
35
36impl SyncularError {
37 pub fn new(kind: ErrorKind, source: impl Into<anyhow::Error>) -> Self {
38 Self {
39 kind,
40 source: source.into(),
41 }
42 }
43
44 pub fn message(kind: ErrorKind, message: impl fmt::Display) -> Self {
45 Self::new(kind, anyhow!(message.to_string()))
46 }
47
48 pub fn config(message: impl fmt::Display) -> Self {
49 Self::message(ErrorKind::Config, message)
50 }
51
52 pub fn busy(message: impl fmt::Display) -> Self {
53 Self::message(ErrorKind::Busy, message)
54 }
55
56 pub fn storage(source: impl Into<anyhow::Error>) -> Self {
57 Self::new(ErrorKind::Storage, source)
58 }
59
60 pub fn transport(source: impl Into<anyhow::Error>) -> Self {
61 Self::new(ErrorKind::Transport, source)
62 }
63
64 pub fn protocol(source: impl Into<anyhow::Error>) -> Self {
65 Self::new(ErrorKind::Protocol, source)
66 }
67
68 pub fn protocol_message(message: impl fmt::Display) -> Self {
69 Self::message(ErrorKind::Protocol, message)
70 }
71
72 pub fn schema(message: impl fmt::Display) -> Self {
73 Self::message(ErrorKind::Schema, message)
74 }
75
76 pub fn codegen(message: impl fmt::Display) -> Self {
77 Self::message(ErrorKind::Codegen, message)
78 }
79
80 pub fn limit_exceeded(
81 limit: &str,
82 observed: usize,
83 max: usize,
84 message: impl fmt::Display,
85 ) -> Self {
86 let payload = serde_json::json!({
87 "code": "runtime.limit_exceeded",
88 "category": "limit-exceeded",
89 "retryable": false,
90 "recommendedAction": "reduceInput",
91 "limit": limit,
92 "observed": observed,
93 "max": max,
94 });
95 Self::message(ErrorKind::Config, format!("{message}: {payload}"))
96 }
97
98 pub fn kind(&self) -> ErrorKind {
99 self.kind
100 }
101
102 pub fn message_text(&self) -> String {
103 self.source.to_string()
104 }
105
106 pub fn debug_text(&self) -> String {
107 self.to_string()
108 }
109
110 pub fn requires_full_snapshot_resync(&self) -> bool {
111 self.message_text().contains(FULL_SNAPSHOT_RESYNC_REQUIRED)
112 }
113
114 pub fn classification(&self) -> SyncularErrorClassification {
115 let message = self.message_text();
116 if let Some(classification) = classification_from_server_error(&message) {
117 return classification;
118 }
119
120 if http_status_from_message(&message) == Some(401) {
121 return syncular_error_classification(
122 "sync.auth_required",
123 "auth-required",
124 true,
125 "refreshAuth",
126 );
127 }
128
129 if http_status_from_message(&message) == Some(403) {
130 return syncular_error_classification(
131 "sync.forbidden",
132 "forbidden",
133 false,
134 "checkPermissions",
135 );
136 }
137
138 let haystack = format!("{message}\n{}", self.debug_text());
139 if self.kind == ErrorKind::Schema || haystack_contains_schema_mismatch(&haystack) {
140 return syncular_error_classification(
141 "sync.schema_mismatch",
142 "schema-mismatch",
143 false,
144 "regenerateClient",
145 );
146 }
147
148 if self.kind == ErrorKind::Transport && haystack_contains_offline(&haystack) {
149 return syncular_error_classification("sync.offline", "offline", true, "retryLater");
150 }
151
152 if self.kind == ErrorKind::Protocol
153 && (haystack_contains_integrity_rejection(&haystack)
154 || self.requires_full_snapshot_resync())
155 {
156 return syncular_error_classification(
157 "sync.integrity_rejected",
158 "integrity-rejected",
159 false,
160 "forceResync",
161 );
162 }
163
164 match self.kind {
165 ErrorKind::Busy => {
166 syncular_error_classification("runtime.busy", "rate-limited", true, "retryLater")
167 }
168 ErrorKind::Config => syncular_error_classification(
169 "runtime.config_invalid",
170 "invalid-request",
171 false,
172 "fixRequest",
173 ),
174 ErrorKind::Storage => {
175 syncular_error_classification("storage.failed", "storage", false, "inspectStorage")
176 }
177 ErrorKind::Transport => syncular_error_classification(
178 "sync.transport_failed",
179 "transport",
180 true,
181 "retryLater",
182 ),
183 ErrorKind::Protocol => syncular_error_classification(
184 "sync.invalid_request",
185 "invalid-request",
186 false,
187 "fixRequest",
188 ),
189 ErrorKind::Schema => unreachable!("schema errors are classified above"),
190 ErrorKind::Codegen => syncular_error_classification(
191 "runtime.codegen_mismatch",
192 "schema-mismatch",
193 false,
194 "regenerateClient",
195 ),
196 ErrorKind::Internal => syncular_error_classification(
197 "runtime.internal",
198 "internal",
199 false,
200 "inspectServer",
201 ),
202 }
203 }
204
205 pub fn context(self, context: impl fmt::Display) -> Self {
206 Self {
207 kind: self.kind,
208 source: self.source.context(context.to_string()),
209 }
210 }
211}
212
213fn syncular_error_classification(
214 code: &str,
215 category: &str,
216 retryable: bool,
217 recommended_action: &str,
218) -> SyncularErrorClassification {
219 SyncularErrorClassification {
220 code: code.to_string(),
221 category: category.to_string(),
222 retryable,
223 recommended_action: recommended_action.to_string(),
224 }
225}
226
227fn known_error_classification(code: &str) -> Option<SyncularErrorClassification> {
228 let (category, retryable, recommended_action) = match code {
229 "sync.auth_required" => ("auth-required", true, "refreshAuth"),
230 "sync.auth_lease_missing" => ("auth-required", true, "refreshAuth"),
231 "sync.auth_lease_invalid" => ("auth-required", true, "refreshAuth"),
232 "sync.auth_lease_expired" => ("auth-required", true, "refreshAuth"),
233 "sync.auth_lease_schema_mismatch" => ("schema-mismatch", false, "regenerateClient"),
234 "sync.auth_lease_scope_mismatch" => ("forbidden", false, "checkPermissions"),
235 "sync.auth_lease_scope_revoked" => ("scope-revoked", false, "checkPermissions"),
236 "sync.auth_lease_business_rejected" => ("conflict", false, "resolveConflict"),
237 "sync.forbidden" => ("forbidden", false, "checkPermissions"),
238 "sync.invalid_request" => ("invalid-request", false, "fixRequest"),
239 "sync.invalid_client_id" => ("invalid-request", false, "resetClientId"),
240 "sync.invalid_subscription" => ("invalid-request", false, "fixRequest"),
241 "sync.empty_commit" => ("invalid-request", false, "fixRequest"),
242 "sync.unknown_table" => ("schema-mismatch", false, "regenerateClient"),
243 "sync.unsupported_operation" => ("invalid-request", false, "fixRequest"),
244 "sync.row_missing" => ("not-found", false, "forceResync"),
245 "sync.version_conflict" => ("conflict", false, "resolveConflict"),
246 "sync.constraint_violation" => ("invalid-request", false, "fixRequest"),
247 "sync.missing_scopes" => ("internal", false, "inspectServer"),
248 "sync.idempotency_cache_miss" => ("internal", true, "retryLater"),
249 "sync.too_many_operations" => ("invalid-request", false, "splitBatch"),
250 "sync.not_found" => ("not-found", false, "forceResync"),
251 "sync.rate_limited" => ("rate-limited", true, "retryLater"),
252 "sync.schema_mismatch" => ("schema-mismatch", false, "regenerateClient"),
253 "sync.client_schema_unsupported" => ("schema-mismatch", false, "upgradeClient"),
254 "sync.integrity_rejected" => ("integrity-rejected", false, "forceResync"),
255 "sync.scope_revoked" => ("scope-revoked", false, "checkPermissions"),
256 "sync.offline" => ("offline", true, "retryLater"),
257 "sync.websocket_not_configured" => ("server", false, "inspectServer"),
258 "sync.websocket_connection_limit" => ("rate-limited", true, "retryLater"),
259 "sync.transport_failed" => ("transport", true, "retryLater"),
260 "runtime.busy" => ("rate-limited", true, "retryLater"),
261 "runtime.limit_exceeded" => ("limit-exceeded", false, "reduceInput"),
262 "runtime.config_invalid" => ("invalid-request", false, "fixRequest"),
263 "runtime.codegen_mismatch" => ("schema-mismatch", false, "regenerateClient"),
264 "runtime.internal" => ("internal", false, "inspectServer"),
265 "storage.failed" => ("storage", false, "inspectStorage"),
266 "worker.closed" => ("invalid-request", false, "fixRequest"),
267 "worker.not_open" => ("invalid-request", false, "fixRequest"),
268 "worker.protocol_mismatch" => ("schema-mismatch", false, "regenerateClient"),
269 "worker.request_timeout" => ("rate-limited", true, "retryLater"),
270 "worker.failed" => ("internal", false, "recreateClient"),
271 "worker.message_unreadable" => ("internal", false, "recreateClient"),
272 "console.auth_required" => ("auth-required", true, "refreshAuth"),
273 "console.forbidden_origin" => ("forbidden", false, "checkPermissions"),
274 "console.invalid_request" => ("invalid-request", false, "fixRequest"),
275 "console.schema_unavailable" => ("server", true, "retryLater"),
276 "console.not_found" => ("not-found", false, "inspectServer"),
277 "console.downstream_unavailable" => ("server", true, "retryLater"),
278 "console.downstream_invalid_response" => ("server", false, "inspectServer"),
279 "console.internal" => ("internal", false, "inspectServer"),
280 "proxy.auth_required" => ("auth-required", true, "refreshAuth"),
281 "proxy.forbidden_origin" => ("forbidden", false, "checkPermissions"),
282 "proxy.connection_limit" => ("rate-limited", true, "retryLater"),
283 "blob.invalid_request" => ("blob", false, "fixRequest"),
284 "blob.storage_not_configured" => ("blob", false, "inspectServer"),
285 "blob.too_large" => ("blob", false, "fixRequest"),
286 "blob.not_found" => ("blob", false, "fixRequest"),
287 "blob.forbidden" => ("forbidden", false, "checkPermissions"),
288 "blob.invalid_token" => ("auth-required", true, "refreshAuth"),
289 "blob.upload_failed" => ("blob", true, "retryLater"),
290 "blob.hash_mismatch" => ("integrity-rejected", false, "fixRequest"),
291 "blob.size_mismatch" => ("blob", false, "fixRequest"),
292 _ => return None,
293 };
294
295 Some(syncular_error_classification(
296 code,
297 category,
298 retryable,
299 recommended_action,
300 ))
301}
302
303fn classification_from_server_error(message: &str) -> Option<SyncularErrorClassification> {
304 let parsed = parse_json_object_suffix(message)?;
305 let code = parsed
306 .get("code")
307 .and_then(serde_json::Value::as_str)
308 .or_else(|| parsed.get("error").and_then(serde_json::Value::as_str))?;
309
310 let base = known_error_classification(code)
311 .unwrap_or_else(|| syncular_error_classification(code, "server", false, "inspectServer"));
312 Some(SyncularErrorClassification {
313 code: code.to_string(),
314 category: parsed
315 .get("category")
316 .and_then(serde_json::Value::as_str)
317 .unwrap_or(&base.category)
318 .to_string(),
319 retryable: parsed
320 .get("retryable")
321 .and_then(serde_json::Value::as_bool)
322 .unwrap_or(base.retryable),
323 recommended_action: parsed
324 .get("recommendedAction")
325 .and_then(serde_json::Value::as_str)
326 .unwrap_or(&base.recommended_action)
327 .to_string(),
328 })
329}
330
331fn parse_json_object_suffix(message: &str) -> Option<serde_json::Map<String, serde_json::Value>> {
332 let start = message.find('{')?;
333 let tail = &message[start..];
334 let parsed = match serde_json::from_str::<serde_json::Value>(tail) {
335 Ok(value) => value,
336 Err(_) => {
337 let end = tail.rfind('}')?;
338 serde_json::from_str::<serde_json::Value>(&tail[..=end]).ok()?
339 }
340 };
341 match parsed {
342 serde_json::Value::Object(object) => Some(object),
343 _ => None,
344 }
345}
346
347fn http_status_from_message(message: &str) -> Option<u16> {
348 let index = message.find("HTTP ")?;
349 let status = message.get(index + 5..index + 8)?;
350 status.parse::<u16>().ok()
351}
352
353fn haystack_contains_schema_mismatch(haystack: &str) -> bool {
354 haystack.to_ascii_lowercase().contains("schema version")
355}
356
357fn haystack_contains_offline(haystack: &str) -> bool {
358 let haystack = haystack.to_ascii_lowercase();
359 haystack.contains("offline") || haystack.contains("network is unreachable")
360}
361
362fn haystack_contains_integrity_rejection(haystack: &str) -> bool {
363 let haystack = haystack.to_ascii_lowercase();
364 [
365 "hash mismatch",
366 "sha256 mismatch",
367 "byte length mismatch",
368 "manifest ",
369 "integrity",
370 "chain root",
371 "commit root",
372 "verified root",
373 ]
374 .iter()
375 .any(|needle| haystack.contains(needle))
376}
377
378impl fmt::Display for SyncularError {
379 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
380 write!(f, "{:?}: {}", self.kind, self.source)
381 }
382}
383
384impl std::error::Error for SyncularError {
385 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
386 Some(self.source.as_ref())
387 }
388}
389
390impl From<anyhow::Error> for SyncularError {
391 fn from(source: anyhow::Error) -> Self {
392 Self::new(ErrorKind::Internal, source)
393 }
394}
395
396#[cfg(feature = "native")]
397impl From<diesel::ConnectionError> for SyncularError {
398 fn from(source: diesel::ConnectionError) -> Self {
399 Self::storage(source)
400 }
401}
402
403#[cfg(feature = "native")]
404impl From<diesel::result::Error> for SyncularError {
405 fn from(source: diesel::result::Error) -> Self {
406 Self::storage(source)
407 }
408}
409
410#[cfg(feature = "native")]
411impl From<rusqlite::Error> for SyncularError {
412 fn from(source: rusqlite::Error) -> Self {
413 Self::storage(source)
414 }
415}
416
417#[cfg(feature = "native")]
418impl From<reqwest::Error> for SyncularError {
419 fn from(source: reqwest::Error) -> Self {
420 Self::transport(source)
421 }
422}
423
424#[cfg(feature = "native")]
425impl From<reqwest::header::InvalidHeaderValue> for SyncularError {
426 fn from(source: reqwest::header::InvalidHeaderValue) -> Self {
427 Self::transport(source)
428 }
429}
430
431#[cfg(feature = "native")]
432impl From<tungstenite::Error> for SyncularError {
433 fn from(source: tungstenite::Error) -> Self {
434 Self::transport(source)
435 }
436}
437
438impl From<serde_json::Error> for SyncularError {
439 fn from(source: serde_json::Error) -> Self {
440 Self::protocol(source)
441 }
442}
443
444impl From<syncular_protocol::ProtocolError> for SyncularError {
445 fn from(source: syncular_protocol::ProtocolError) -> Self {
446 Self::protocol(source)
447 }
448}
449
450impl From<std::io::Error> for SyncularError {
451 fn from(source: std::io::Error) -> Self {
452 Self::new(ErrorKind::Internal, source)
453 }
454}
455
456#[cfg(test)]
457mod tests {
458 use super::*;
459
460 #[test]
461 fn classification_prefers_server_error_envelope() {
462 let error = SyncularError::message(
463 ErrorKind::Transport,
464 r#"sync failed with HTTP 403: {"error":"sync.forbidden","code":"sync.forbidden","category":"forbidden","retryable":false,"recommendedAction":"checkPermissions","message":"Forbidden"}"#,
465 );
466
467 assert_eq!(
468 error.classification(),
469 SyncularErrorClassification {
470 code: "sync.forbidden".to_string(),
471 category: "forbidden".to_string(),
472 retryable: false,
473 recommended_action: "checkPermissions".to_string(),
474 }
475 );
476 }
477
478 #[test]
479 fn classification_knows_shared_taxonomy_codes_without_envelope_metadata() {
480 let conflict = SyncularError::message(
481 ErrorKind::Transport,
482 r#"sync failed with HTTP 409: {"error":"sync.version_conflict","code":"sync.version_conflict","message":"Version conflict"}"#,
483 );
484 let worker = SyncularError::message(
485 ErrorKind::Transport,
486 r#"worker failed: {"error":"worker.failed","code":"worker.failed","message":"Worker failed"}"#,
487 );
488
489 assert_eq!(conflict.classification().category, "conflict");
490 assert_eq!(
491 conflict.classification().recommended_action,
492 "resolveConflict"
493 );
494 assert_eq!(worker.classification().category, "internal");
495 assert_eq!(worker.classification().recommended_action, "recreateClient");
496 }
497
498 #[test]
499 fn classification_maps_http_auth_statuses_without_server_envelope() {
500 let auth = SyncularError::message(ErrorKind::Transport, "sync failed with HTTP 401");
501 let forbidden = SyncularError::message(ErrorKind::Transport, "sync failed with HTTP 403");
502
503 assert_eq!(auth.classification().code, "sync.auth_required");
504 assert_eq!(auth.classification().recommended_action, "refreshAuth");
505 assert_eq!(forbidden.classification().code, "sync.forbidden");
506 assert_eq!(
507 forbidden.classification().recommended_action,
508 "checkPermissions"
509 );
510 }
511
512 #[test]
513 fn classification_maps_schema_and_integrity_errors() {
514 let schema = SyncularError::schema("server schema version 12 is not compatible");
515 let integrity = SyncularError::protocol_message(
516 "snapshot chunk sha256 mismatch; full snapshot resync required",
517 );
518
519 assert_eq!(schema.classification().code, "sync.schema_mismatch");
520 assert_eq!(
521 schema.classification().recommended_action,
522 "regenerateClient"
523 );
524 assert_eq!(integrity.classification().code, "sync.integrity_rejected");
525 assert_eq!(integrity.classification().recommended_action, "forceResync");
526 }
527
528 #[test]
529 fn classification_maps_runtime_storage_failures() {
530 let error = SyncularError::message(ErrorKind::Storage, "database is locked");
531
532 assert_eq!(
533 error.classification(),
534 SyncularErrorClassification {
535 code: "storage.failed".to_string(),
536 category: "storage".to_string(),
537 retryable: false,
538 recommended_action: "inspectStorage".to_string(),
539 }
540 );
541 }
542
543 #[test]
544 fn classification_maps_offline_transport_failures() {
545 let error = SyncularError::message(ErrorKind::Transport, "browser fetch failed: offline");
546
547 assert_eq!(
548 error.classification(),
549 SyncularErrorClassification {
550 code: "sync.offline".to_string(),
551 category: "offline".to_string(),
552 retryable: true,
553 recommended_action: "retryLater".to_string(),
554 }
555 );
556 }
557}