1use crate::client::SubscriptionSpec;
2use crate::error::{Result, SyncularError};
3use crate::protocol::{BootstrapState, ScopeValues, COMMIT_INTEGRITY_HEX_LENGTH};
4use crate::store::{
5 now_ms, AppSchemaState, BlobHealthSummary, ConflictSummary, CrdtHealthSummary, OutboxSummary,
6 ScopedRowsHealthSummary, SubscriptionState, SyncStore, SyncStoreTx, VerifiedRoot,
7};
8use serde::{Deserialize, Serialize};
9use serde_json::Value;
10use std::collections::{BTreeMap, HashMap, HashSet};
11
12pub const LOCAL_SUPPORT_BUNDLE_FORMAT_VERSION: u32 = 1;
13
14#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
15#[serde(rename_all = "camelCase")]
16pub struct LocalHealthReport {
17 pub generated_at: i64,
18 pub ok: bool,
19 pub checked_subscriptions: usize,
20 pub checked_subscription_states: usize,
21 pub checked_verified_roots: usize,
22 pub checked_outbox_commits: usize,
23 pub checked_conflicts: usize,
24 pub checked_synced_rows: i64,
25 pub checked_blob_references: i64,
26 pub checked_crdt_documents: i64,
27 pub checked_crdt_update_log_entries: i64,
28 pub findings: Vec<LocalHealthFinding>,
29}
30
31#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
32#[serde(rename_all = "camelCase")]
33pub struct LocalHealthFinding {
34 pub severity: LocalHealthSeverity,
35 pub code: String,
36 pub component: String,
37 pub message: String,
38 #[serde(default, skip_serializing_if = "Option::is_none")]
39 pub subscription_id: Option<String>,
40 #[serde(default, skip_serializing_if = "Option::is_none")]
41 pub table: Option<String>,
42 #[serde(default, skip_serializing_if = "Option::is_none")]
43 pub repair_action: Option<LocalHealthRepairAction>,
44 #[serde(default, skip_serializing_if = "BTreeMap::is_empty")]
45 pub details: BTreeMap<String, Value>,
46}
47
48#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
49#[serde(rename_all = "camelCase")]
50pub struct LocalHealthRepairRequest {
51 pub action: LocalHealthRepairAction,
52 #[serde(default)]
53 pub subscription_ids: Vec<String>,
54 #[serde(default)]
55 pub tables: Vec<String>,
56}
57
58#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
59#[serde(rename_all = "camelCase")]
60pub struct LocalHealthRepairReport {
61 pub action: LocalHealthRepairAction,
62 pub deleted_subscription_states: usize,
63 pub deleted_verified_roots: usize,
64 pub forced_rebootstrap_subscriptions: usize,
65 pub cleared_orphaned_synced_rows: i64,
66 pub cleared_tables: Vec<String>,
67}
68
69#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
70#[serde(default, rename_all = "camelCase")]
71pub struct LocalSyncResetRequest {
72 pub subscription_ids: Vec<String>,
73 pub clear_synced_rows: bool,
74}
75
76impl Default for LocalSyncResetRequest {
77 fn default() -> Self {
78 Self {
79 subscription_ids: Vec::new(),
80 clear_synced_rows: false,
81 }
82 }
83}
84
85#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
86#[serde(rename_all = "camelCase")]
87pub struct LocalSyncResetReport {
88 pub reset_subscriptions: usize,
89 pub deleted_subscription_states: usize,
90 pub deleted_verified_roots: usize,
91 pub cleared_synced_rows: i64,
92 pub cleared_tables: Vec<String>,
93}
94
95#[derive(Debug, Clone, Serialize, Deserialize)]
96#[serde(rename_all = "camelCase")]
97pub struct LocalSupportBundle {
98 pub format_version: u32,
99 pub generated_at: i64,
100 pub redacted: bool,
101 pub source: String,
102 pub health: LocalHealthReport,
103 pub app_schema_state: AppSchemaState,
104 pub subscriptions: Vec<LocalSupportSubscription>,
105 pub subscription_states: Vec<LocalSupportSubscriptionState>,
106 pub verified_roots: Vec<LocalSupportVerifiedRoot>,
107 pub outbox: LocalSupportOutboxSummary,
108 pub conflicts: LocalSupportConflictSummary,
109 #[serde(default, skip_serializing_if = "Option::is_none")]
110 pub blob: Option<BlobHealthSummary>,
111 #[serde(default, skip_serializing_if = "Option::is_none")]
112 pub crdt: Option<CrdtHealthSummary>,
113}
114
115#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
116#[serde(rename_all = "camelCase")]
117pub struct LocalSupportSubscription {
118 pub id: String,
119 pub table: String,
120 pub scope_keys: Vec<String>,
121 pub scope_value_count: usize,
122 pub params_keys: Vec<String>,
123 pub params_value_count: usize,
124 pub bootstrap_phase: i64,
125}
126
127#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
128#[serde(rename_all = "camelCase")]
129pub struct LocalSupportSubscriptionState {
130 pub state_id: String,
131 pub subscription_id: String,
132 pub table: String,
133 pub scope_keys: Vec<String>,
134 pub scope_value_count: usize,
135 pub params_keys: Vec<String>,
136 pub params_value_count: usize,
137 pub cursor: i64,
138 pub status: String,
139 pub bootstrap_state_present: bool,
140 pub bootstrap_state_byte_len: usize,
141}
142
143#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
144#[serde(rename_all = "camelCase")]
145pub struct LocalSupportVerifiedRoot {
146 pub state_id: String,
147 pub subscription_id: String,
148 pub partition_id_present: bool,
149 pub partition_id_byte_len: usize,
150 pub commit_seq: i64,
151 pub root_byte_len: usize,
152 pub root_is_canonical_hex: bool,
153}
154
155#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
156#[serde(rename_all = "camelCase")]
157pub struct LocalSupportOutboxSummary {
158 pub total: usize,
159 pub by_status: BTreeMap<String, usize>,
160 pub by_schema_version: BTreeMap<i32, usize>,
161}
162
163#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
164#[serde(rename_all = "camelCase")]
165pub struct LocalSupportConflictSummary {
166 pub total: usize,
167 pub unresolved: usize,
168 pub resolved: usize,
169 pub by_result_status: BTreeMap<String, usize>,
170 pub by_code: BTreeMap<String, usize>,
171}
172
173#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
174#[serde(rename_all = "camelCase")]
175pub struct LocalSupportBundleImportReport {
176 pub format_version: u32,
177 pub generated_at: i64,
178 pub redacted: bool,
179 pub source: String,
180 pub health_ok: bool,
181 pub finding_count: usize,
182 pub subscription_count: usize,
183 pub subscription_state_count: usize,
184 pub verified_root_count: usize,
185 pub checked_subscription_states: usize,
186 pub checked_verified_roots: usize,
187 pub checked_outbox_commits: usize,
188 pub checked_conflicts: usize,
189 pub checked_synced_rows: i64,
190}
191
192#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
193#[serde(rename_all = "camelCase")]
194pub enum LocalHealthSeverity {
195 Info,
196 Warning,
197 Error,
198}
199
200#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
201#[serde(rename_all = "camelCase")]
202pub enum LocalHealthRepairAction {
203 ForceRebootstrap,
204 ClearOrphanedState,
205 ClearOrphanedSyncedRows,
206 ManualInspection,
207}
208
209impl LocalHealthReport {
210 fn new(checked_subscriptions: usize) -> Self {
211 Self {
212 generated_at: now_ms(),
213 ok: true,
214 checked_subscriptions,
215 checked_subscription_states: 0,
216 checked_verified_roots: 0,
217 checked_outbox_commits: 0,
218 checked_conflicts: 0,
219 checked_synced_rows: 0,
220 checked_blob_references: 0,
221 checked_crdt_documents: 0,
222 checked_crdt_update_log_entries: 0,
223 findings: Vec::new(),
224 }
225 }
226
227 fn add_finding(&mut self, finding: LocalHealthFinding) {
228 if finding.severity == LocalHealthSeverity::Error {
229 self.ok = false;
230 }
231 self.findings.push(finding);
232 }
233}
234
235pub fn check_local_health<S: SyncStore>(
236 store: &mut S,
237 state_id: &str,
238 subscriptions: &[SubscriptionSpec],
239) -> Result<LocalHealthReport> {
240 let mut states = Vec::new();
241 let mut roots = Vec::new();
242 store.transaction(|tx| {
243 states = tx.subscription_states(state_id)?;
244 roots = tx.verified_roots(state_id)?;
245 Ok(())
246 })?;
247 Ok(check_local_health_records(
248 state_id,
249 subscriptions,
250 &states,
251 &roots,
252 ))
253}
254
255pub fn check_local_health_records(
256 state_id: &str,
257 subscriptions: &[SubscriptionSpec],
258 states: &[SubscriptionState],
259 roots: &[VerifiedRoot],
260) -> LocalHealthReport {
261 let mut report = LocalHealthReport::new(subscriptions.len());
262 report.checked_subscription_states = states.len();
263 report.checked_verified_roots = roots.len();
264
265 let specs_by_id = subscriptions
266 .iter()
267 .map(|spec| (spec.id.as_str(), spec))
268 .collect::<HashMap<_, _>>();
269 let states_by_id = states
270 .iter()
271 .map(|state| (state.subscription_id.as_str(), state))
272 .collect::<HashMap<_, _>>();
273 let rooted_subscription_ids = roots
274 .iter()
275 .map(|root| root.subscription_id.as_str())
276 .collect::<HashSet<_>>();
277
278 for state in states {
279 if let Some(spec) = specs_by_id.get(state.subscription_id.as_str()) {
280 check_subscription_state(&mut report, spec, state);
281 } else {
282 check_orphaned_subscription_state(
283 &mut report,
284 state,
285 rooted_subscription_ids.contains(state.subscription_id.as_str()),
286 );
287 }
288 }
289
290 for root in roots {
291 if let Some(spec) = specs_by_id.get(root.subscription_id.as_str()) {
292 check_verified_root(
293 &mut report,
294 spec,
295 states_by_id.get(root.subscription_id.as_str()).copied(),
296 root,
297 state_id,
298 );
299 } else {
300 check_orphaned_verified_root(&mut report, root);
301 }
302 }
303
304 report
305}
306
307pub fn check_local_sync_state_health(
308 report: &mut LocalHealthReport,
309 current_schema_version: i32,
310 app_schema_state: &AppSchemaState,
311 outbox: &[OutboxSummary],
312 conflicts: &[ConflictSummary],
313 scoped_rows: Option<&ScopedRowsHealthSummary>,
314 blob: Option<&BlobHealthSummary>,
315 crdt: Option<&CrdtHealthSummary>,
316) {
317 check_app_schema_state(report, current_schema_version, app_schema_state);
318 check_outbox_summaries(report, current_schema_version, outbox);
319 check_conflict_summaries(report, conflicts);
320 if let Some(scoped_rows) = scoped_rows {
321 check_scoped_rows_health_summary(report, scoped_rows);
322 }
323 if let Some(blob) = blob {
324 check_blob_health_summary(report, blob);
325 }
326 if let Some(crdt) = crdt {
327 check_crdt_health_summary(report, crdt);
328 }
329}
330
331#[allow(clippy::too_many_arguments)]
332pub fn local_support_bundle_from_records(
333 source: impl Into<String>,
334 health: LocalHealthReport,
335 subscriptions: &[SubscriptionSpec],
336 states: &[SubscriptionState],
337 roots: &[VerifiedRoot],
338 app_schema_state: AppSchemaState,
339 outbox: &[OutboxSummary],
340 conflicts: &[ConflictSummary],
341 blob: Option<BlobHealthSummary>,
342 crdt: Option<CrdtHealthSummary>,
343) -> LocalSupportBundle {
344 LocalSupportBundle {
345 format_version: LOCAL_SUPPORT_BUNDLE_FORMAT_VERSION,
346 generated_at: now_ms(),
347 redacted: true,
348 source: source.into(),
349 health,
350 app_schema_state,
351 subscriptions: subscriptions
352 .iter()
353 .map(redacted_subscription)
354 .collect::<Vec<_>>(),
355 subscription_states: states
356 .iter()
357 .map(redacted_subscription_state)
358 .collect::<Vec<_>>(),
359 verified_roots: roots.iter().map(redacted_verified_root).collect::<Vec<_>>(),
360 outbox: redacted_outbox_summary(outbox),
361 conflicts: redacted_conflict_summary(conflicts),
362 blob,
363 crdt,
364 }
365}
366
367pub fn import_local_support_bundle_json(
368 bundle_json: &str,
369) -> Result<LocalSupportBundleImportReport> {
370 let bundle: LocalSupportBundle = serde_json::from_str(bundle_json)?;
371 if bundle.format_version != LOCAL_SUPPORT_BUNDLE_FORMAT_VERSION {
372 return Err(SyncularError::config(format!(
373 "unsupported local support bundle format version {}",
374 bundle.format_version
375 )));
376 }
377 if !bundle.redacted {
378 return Err(SyncularError::config(
379 "local support bundle import requires a redacted bundle",
380 ));
381 }
382 Ok(LocalSupportBundleImportReport {
383 format_version: bundle.format_version,
384 generated_at: bundle.generated_at,
385 redacted: bundle.redacted,
386 source: bundle.source,
387 health_ok: bundle.health.ok,
388 finding_count: bundle.health.findings.len(),
389 subscription_count: bundle.subscriptions.len(),
390 subscription_state_count: bundle.subscription_states.len(),
391 verified_root_count: bundle.verified_roots.len(),
392 checked_subscription_states: bundle.health.checked_subscription_states,
393 checked_verified_roots: bundle.health.checked_verified_roots,
394 checked_outbox_commits: bundle.health.checked_outbox_commits,
395 checked_conflicts: bundle.health.checked_conflicts,
396 checked_synced_rows: bundle.health.checked_synced_rows,
397 })
398}
399
400fn redacted_subscription(spec: &SubscriptionSpec) -> LocalSupportSubscription {
401 let mut scope_keys = spec.scopes.keys().cloned().collect::<Vec<_>>();
402 scope_keys.sort();
403 let mut params_keys = spec.params.keys().cloned().collect::<Vec<_>>();
404 params_keys.sort();
405 LocalSupportSubscription {
406 id: spec.id.clone(),
407 table: spec.table.clone(),
408 scope_keys,
409 scope_value_count: count_json_values(spec.scopes.values()),
410 params_keys,
411 params_value_count: count_json_values(spec.params.values()),
412 bootstrap_phase: spec.bootstrap_phase,
413 }
414}
415
416fn redacted_subscription_state(state: &SubscriptionState) -> LocalSupportSubscriptionState {
417 let (scope_keys, scope_value_count) = redacted_json_map_shape(&state.scopes_json);
418 let (params_keys, params_value_count) = redacted_json_map_shape(&state.params_json);
419 LocalSupportSubscriptionState {
420 state_id: state.state_id.clone(),
421 subscription_id: state.subscription_id.clone(),
422 table: state.table.clone(),
423 scope_keys,
424 scope_value_count,
425 params_keys,
426 params_value_count,
427 cursor: state.cursor,
428 status: state.status.clone(),
429 bootstrap_state_present: state.bootstrap_state_json.is_some(),
430 bootstrap_state_byte_len: state
431 .bootstrap_state_json
432 .as_ref()
433 .map_or(0, |value| value.len()),
434 }
435}
436
437fn redacted_verified_root(root: &VerifiedRoot) -> LocalSupportVerifiedRoot {
438 LocalSupportVerifiedRoot {
439 state_id: root.state_id.clone(),
440 subscription_id: root.subscription_id.clone(),
441 partition_id_present: !root.partition_id.is_empty(),
442 partition_id_byte_len: root.partition_id.len(),
443 commit_seq: root.commit_seq,
444 root_byte_len: root.root.len(),
445 root_is_canonical_hex: is_canonical_hex_root(&root.root),
446 }
447}
448
449fn redacted_outbox_summary(outbox: &[OutboxSummary]) -> LocalSupportOutboxSummary {
450 let mut by_status = BTreeMap::new();
451 let mut by_schema_version = BTreeMap::new();
452 for item in outbox {
453 *by_status.entry(item.status.clone()).or_insert(0) += 1;
454 *by_schema_version.entry(item.schema_version).or_insert(0) += 1;
455 }
456 LocalSupportOutboxSummary {
457 total: outbox.len(),
458 by_status,
459 by_schema_version,
460 }
461}
462
463fn redacted_conflict_summary(conflicts: &[ConflictSummary]) -> LocalSupportConflictSummary {
464 let mut by_result_status = BTreeMap::new();
465 let mut by_code = BTreeMap::new();
466 let mut resolved = 0usize;
467 for item in conflicts {
468 *by_result_status
469 .entry(item.result_status.clone())
470 .or_insert(0) += 1;
471 if let Some(code) = &item.code {
472 *by_code.entry(code.clone()).or_insert(0) += 1;
473 }
474 if item.resolved_at.is_some() || item.resolution.is_some() {
475 resolved += 1;
476 }
477 }
478 LocalSupportConflictSummary {
479 total: conflicts.len(),
480 unresolved: conflicts.len().saturating_sub(resolved),
481 resolved,
482 by_result_status,
483 by_code,
484 }
485}
486
487fn redacted_json_map_shape(json: &str) -> (Vec<String>, usize) {
488 match serde_json::from_str::<BTreeMap<String, Value>>(json) {
489 Ok(map) => {
490 let keys = map.keys().cloned().collect::<Vec<_>>();
491 let value_count = count_json_values(map.values());
492 (keys, value_count)
493 }
494 Err(_) => (Vec::new(), 0),
495 }
496}
497
498fn count_json_values<'a>(values: impl Iterator<Item = &'a Value>) -> usize {
499 values
500 .map(|value| value.as_array().map_or(1, Vec::len))
501 .sum()
502}
503
504fn check_app_schema_state(
505 report: &mut LocalHealthReport,
506 current_schema_version: i32,
507 app_schema_state: &AppSchemaState,
508) {
509 let Some(schema_version) = app_schema_state.schema_version else {
510 report.add_finding(finding(
511 LocalHealthSeverity::Error,
512 "local.app_schema_state_missing",
513 "appSchemaState",
514 "local app schema state has not been recorded",
515 None,
516 None,
517 Some(LocalHealthRepairAction::ManualInspection),
518 BTreeMap::new(),
519 ));
520 return;
521 };
522
523 if schema_version > current_schema_version {
524 let mut details = BTreeMap::new();
525 details.insert(
526 "localSchemaVersion".to_string(),
527 Value::from(schema_version),
528 );
529 details.insert(
530 "currentSchemaVersion".to_string(),
531 Value::from(current_schema_version),
532 );
533 report.add_finding(finding(
534 LocalHealthSeverity::Error,
535 "local.app_schema_state_future_version",
536 "appSchemaState",
537 "local app schema state was written by a newer generated client",
538 None,
539 None,
540 Some(LocalHealthRepairAction::ManualInspection),
541 details,
542 ));
543 } else if schema_version < current_schema_version {
544 let mut details = BTreeMap::new();
545 details.insert(
546 "localSchemaVersion".to_string(),
547 Value::from(schema_version),
548 );
549 details.insert(
550 "currentSchemaVersion".to_string(),
551 Value::from(current_schema_version),
552 );
553 report.add_finding(finding(
554 LocalHealthSeverity::Error,
555 "local.app_schema_state_stale_version",
556 "appSchemaState",
557 "local app schema state is older than the generated client",
558 None,
559 None,
560 Some(LocalHealthRepairAction::ManualInspection),
561 details,
562 ));
563 }
564}
565
566fn check_outbox_summaries(
567 report: &mut LocalHealthReport,
568 current_schema_version: i32,
569 outbox: &[OutboxSummary],
570) {
571 report.checked_outbox_commits = outbox.len();
572 let future_schema_count = outbox
573 .iter()
574 .filter(|item| item.schema_version > current_schema_version)
575 .count();
576 if future_schema_count > 0 {
577 let max_schema_version = outbox
578 .iter()
579 .map(|item| item.schema_version)
580 .max()
581 .unwrap_or(current_schema_version);
582 let mut details = BTreeMap::new();
583 details.insert("count".to_string(), Value::from(future_schema_count));
584 details.insert(
585 "currentSchemaVersion".to_string(),
586 Value::from(current_schema_version),
587 );
588 details.insert(
589 "maxSchemaVersion".to_string(),
590 Value::from(max_schema_version),
591 );
592 report.add_finding(finding(
593 LocalHealthSeverity::Error,
594 "local.outbox_future_schema_version",
595 "outbox",
596 "pending local outbox commits require a newer generated client",
597 None,
598 None,
599 Some(LocalHealthRepairAction::ManualInspection),
600 details,
601 ));
602 }
603
604 let failed_count = outbox.iter().filter(|item| item.status == "failed").count();
605 if failed_count > 0 {
606 let mut details = BTreeMap::new();
607 details.insert("count".to_string(), Value::from(failed_count));
608 report.add_finding(finding(
609 LocalHealthSeverity::Warning,
610 "local.outbox_failed_commits",
611 "outbox",
612 "local outbox contains failed commits that need app/user action",
613 None,
614 None,
615 Some(LocalHealthRepairAction::ManualInspection),
616 details,
617 ));
618 }
619}
620
621fn check_conflict_summaries(report: &mut LocalHealthReport, conflicts: &[ConflictSummary]) {
622 report.checked_conflicts = conflicts.len();
623 if conflicts.is_empty() {
624 return;
625 }
626
627 let mut details = BTreeMap::new();
628 details.insert("count".to_string(), Value::from(conflicts.len()));
629 report.add_finding(finding(
630 LocalHealthSeverity::Warning,
631 "local.conflicts_unresolved",
632 "conflicts",
633 "local store has unresolved sync conflicts",
634 None,
635 None,
636 Some(LocalHealthRepairAction::ManualInspection),
637 details,
638 ));
639}
640
641fn check_scoped_rows_health_summary(
642 report: &mut LocalHealthReport,
643 scoped_rows: &ScopedRowsHealthSummary,
644) {
645 report.checked_synced_rows = scoped_rows.checked_synced_rows;
646 for table in scoped_rows
647 .tables
648 .iter()
649 .filter(|table| table.orphaned_synced_rows > 0)
650 {
651 let mut details = BTreeMap::new();
652 details.insert("count".to_string(), Value::from(table.orphaned_synced_rows));
653 details.insert(
654 "checkedSyncedRows".to_string(),
655 Value::from(table.checked_synced_rows),
656 );
657 details.insert(
658 "totalOrphanedSyncedRows".to_string(),
659 Value::from(scoped_rows.orphaned_synced_rows),
660 );
661 report.add_finding(finding(
662 LocalHealthSeverity::Error,
663 "local.synced_rows_orphaned",
664 "appRows",
665 "local synced app rows are outside the configured subscription scopes",
666 None,
667 Some(&table.table),
668 Some(LocalHealthRepairAction::ClearOrphanedSyncedRows),
669 details,
670 ));
671 }
672}
673
674fn check_blob_health_summary(report: &mut LocalHealthReport, blob: &BlobHealthSummary) {
675 report.checked_blob_references = blob.checked_references;
676 if blob.invalid_references > 0 {
677 let mut details = BTreeMap::new();
678 details.insert("count".to_string(), Value::from(blob.invalid_references));
679 details.insert(
680 "checkedReferences".to_string(),
681 Value::from(blob.checked_references),
682 );
683 report.add_finding(finding(
684 LocalHealthSeverity::Error,
685 "local.blob_refs_invalid",
686 "blobs",
687 "local synced rows contain invalid blob references",
688 None,
689 None,
690 Some(LocalHealthRepairAction::ManualInspection),
691 details,
692 ));
693 }
694
695 if blob.upload_failed > 0 {
696 let mut details = BTreeMap::new();
697 details.insert("count".to_string(), Value::from(blob.upload_failed));
698 report.add_finding(finding(
699 LocalHealthSeverity::Warning,
700 "local.blob_uploads_failed",
701 "blobs",
702 "local blob upload queue contains failed uploads",
703 None,
704 None,
705 Some(LocalHealthRepairAction::ManualInspection),
706 details,
707 ));
708 }
709}
710
711fn check_crdt_health_summary(report: &mut LocalHealthReport, crdt: &CrdtHealthSummary) {
712 report.checked_crdt_documents = crdt.document_count;
713 report.checked_crdt_update_log_entries = crdt.log_updates;
714 if crdt.orphaned_documents > 0 {
715 let mut details = BTreeMap::new();
716 details.insert("count".to_string(), Value::from(crdt.orphaned_documents));
717 details.insert(
718 "documentCount".to_string(),
719 Value::from(crdt.document_count),
720 );
721 report.add_finding(finding(
722 LocalHealthSeverity::Error,
723 "local.crdt_documents_orphaned",
724 "crdt",
725 "local CRDT document metadata references missing app rows",
726 None,
727 None,
728 Some(LocalHealthRepairAction::ManualInspection),
729 details,
730 ));
731 }
732
733 if crdt.orphaned_log_entries > 0 {
734 let mut details = BTreeMap::new();
735 details.insert("count".to_string(), Value::from(crdt.orphaned_log_entries));
736 details.insert("logUpdates".to_string(), Value::from(crdt.log_updates));
737 report.add_finding(finding(
738 LocalHealthSeverity::Error,
739 "local.crdt_update_log_orphaned",
740 "crdt",
741 "local CRDT update log contains entries without document metadata",
742 None,
743 None,
744 Some(LocalHealthRepairAction::ManualInspection),
745 details,
746 ));
747 }
748}
749
750fn check_subscription_state(
751 report: &mut LocalHealthReport,
752 spec: &SubscriptionSpec,
753 state: &SubscriptionState,
754) {
755 if state.table != spec.table {
756 let mut details = BTreeMap::new();
757 details.insert(
758 "expectedTable".to_string(),
759 Value::String(spec.table.clone()),
760 );
761 details.insert(
762 "storedTable".to_string(),
763 Value::String(state.table.clone()),
764 );
765 report.add_finding(finding(
766 LocalHealthSeverity::Error,
767 "local.subscription_state_table_mismatch",
768 "subscriptionState",
769 "stored subscription state belongs to a different table",
770 Some(&spec.id),
771 Some(&spec.table),
772 Some(LocalHealthRepairAction::ForceRebootstrap),
773 details,
774 ));
775 }
776
777 if state.cursor < -1 {
778 let mut details = BTreeMap::new();
779 details.insert("cursor".to_string(), Value::from(state.cursor));
780 report.add_finding(finding(
781 LocalHealthSeverity::Error,
782 "local.subscription_state_invalid_cursor",
783 "subscriptionState",
784 "stored subscription cursor is below the bootstrap sentinel",
785 Some(&spec.id),
786 Some(&spec.table),
787 Some(LocalHealthRepairAction::ForceRebootstrap),
788 details,
789 ));
790 }
791
792 if let Err(error) = serde_json::from_str::<ScopeValues>(&state.scopes_json) {
793 report.add_finding(finding(
794 LocalHealthSeverity::Error,
795 "local.subscription_state_invalid_scopes_json",
796 "subscriptionState",
797 "stored subscription scopes are not valid JSON",
798 Some(&spec.id),
799 Some(&spec.table),
800 Some(LocalHealthRepairAction::ForceRebootstrap),
801 parse_error_details(error, state.scopes_json.len()),
802 ));
803 }
804
805 if let Err(error) = serde_json::from_str::<serde_json::Map<String, Value>>(&state.params_json) {
806 report.add_finding(finding(
807 LocalHealthSeverity::Error,
808 "local.subscription_state_invalid_params_json",
809 "subscriptionState",
810 "stored subscription params are not valid JSON",
811 Some(&spec.id),
812 Some(&spec.table),
813 Some(LocalHealthRepairAction::ForceRebootstrap),
814 parse_error_details(error, state.params_json.len()),
815 ));
816 }
817
818 if let Some(bootstrap_state_json) = state.bootstrap_state_json.as_deref() {
819 if let Err(error) = serde_json::from_str::<BootstrapState>(bootstrap_state_json) {
820 report.add_finding(finding(
821 LocalHealthSeverity::Error,
822 "local.subscription_state_invalid_bootstrap_json",
823 "subscriptionState",
824 "stored subscription bootstrap state is not valid",
825 Some(&spec.id),
826 Some(&spec.table),
827 Some(LocalHealthRepairAction::ForceRebootstrap),
828 parse_error_details(error, bootstrap_state_json.len()),
829 ));
830 }
831 }
832}
833
834fn check_orphaned_subscription_state(
835 report: &mut LocalHealthReport,
836 state: &SubscriptionState,
837 has_verified_root: bool,
838) {
839 let mut details = BTreeMap::new();
840 details.insert("status".to_string(), Value::String(state.status.clone()));
841 details.insert("cursor".to_string(), Value::from(state.cursor));
842 details.insert(
843 "hasVerifiedRoot".to_string(),
844 Value::from(has_verified_root),
845 );
846 report.add_finding(finding(
847 LocalHealthSeverity::Error,
848 "local.subscription_state_orphaned",
849 "subscriptionState",
850 "stored subscription state is not configured on this client",
851 Some(&state.subscription_id),
852 Some(&state.table),
853 Some(LocalHealthRepairAction::ClearOrphanedState),
854 details,
855 ));
856}
857
858fn check_orphaned_verified_root(report: &mut LocalHealthReport, root: &VerifiedRoot) {
859 let mut details = BTreeMap::new();
860 details.insert("commitSeq".to_string(), Value::from(root.commit_seq));
861 details.insert(
862 "partitionId".to_string(),
863 Value::String(root.partition_id.clone()),
864 );
865 report.add_finding(finding(
866 LocalHealthSeverity::Error,
867 "local.verified_root_orphaned",
868 "verifiedRoot",
869 "stored verified root is not configured on this client",
870 Some(&root.subscription_id),
871 None,
872 Some(LocalHealthRepairAction::ClearOrphanedState),
873 details,
874 ));
875}
876
877fn check_verified_root(
878 report: &mut LocalHealthReport,
879 spec: &SubscriptionSpec,
880 state: Option<&SubscriptionState>,
881 root: &VerifiedRoot,
882 state_id: &str,
883) {
884 if state.is_none() {
885 report.add_finding(finding(
886 LocalHealthSeverity::Error,
887 "local.verified_root_without_subscription_state",
888 "verifiedRoot",
889 "stored verified root has no matching subscription state",
890 Some(&spec.id),
891 Some(&spec.table),
892 Some(LocalHealthRepairAction::ForceRebootstrap),
893 BTreeMap::new(),
894 ));
895 }
896
897 if root.state_id != state_id {
898 let mut details = BTreeMap::new();
899 details.insert(
900 "expectedStateId".to_string(),
901 Value::String(state_id.to_string()),
902 );
903 details.insert(
904 "storedStateId".to_string(),
905 Value::String(root.state_id.clone()),
906 );
907 report.add_finding(finding(
908 LocalHealthSeverity::Error,
909 "local.verified_root_state_mismatch",
910 "verifiedRoot",
911 "stored verified root belongs to a different state",
912 Some(&spec.id),
913 Some(&spec.table),
914 Some(LocalHealthRepairAction::ForceRebootstrap),
915 details,
916 ));
917 }
918
919 if root.subscription_id != spec.id {
920 let mut details = BTreeMap::new();
921 details.insert(
922 "storedSubscriptionId".to_string(),
923 Value::String(root.subscription_id.clone()),
924 );
925 report.add_finding(finding(
926 LocalHealthSeverity::Error,
927 "local.verified_root_subscription_mismatch",
928 "verifiedRoot",
929 "stored verified root belongs to a different subscription",
930 Some(&spec.id),
931 Some(&spec.table),
932 Some(LocalHealthRepairAction::ForceRebootstrap),
933 details,
934 ));
935 }
936
937 if root.partition_id.is_empty() {
938 report.add_finding(finding(
939 LocalHealthSeverity::Error,
940 "local.verified_root_empty_partition",
941 "verifiedRoot",
942 "stored verified root has an empty partition",
943 Some(&spec.id),
944 Some(&spec.table),
945 Some(LocalHealthRepairAction::ForceRebootstrap),
946 BTreeMap::new(),
947 ));
948 }
949
950 if root.commit_seq < 0 {
951 let mut details = BTreeMap::new();
952 details.insert("commitSeq".to_string(), Value::from(root.commit_seq));
953 report.add_finding(finding(
954 LocalHealthSeverity::Error,
955 "local.verified_root_invalid_commit_seq",
956 "verifiedRoot",
957 "stored verified root has a negative commit sequence",
958 Some(&spec.id),
959 Some(&spec.table),
960 Some(LocalHealthRepairAction::ForceRebootstrap),
961 details,
962 ));
963 }
964
965 if !is_canonical_hex_root(&root.root) {
966 let mut details = BTreeMap::new();
967 details.insert("rootLength".to_string(), Value::from(root.root.len()));
968 details.insert(
969 "expectedRootLength".to_string(),
970 Value::from(COMMIT_INTEGRITY_HEX_LENGTH),
971 );
972 report.add_finding(finding(
973 LocalHealthSeverity::Error,
974 "local.verified_root_invalid_hex",
975 "verifiedRoot",
976 "stored verified root is not a canonical lowercase hex digest",
977 Some(&spec.id),
978 Some(&spec.table),
979 Some(LocalHealthRepairAction::ForceRebootstrap),
980 details,
981 ));
982 }
983
984 if let Some(state) = state {
985 if state.cursor >= 0 && root.commit_seq > state.cursor {
986 let mut details = BTreeMap::new();
987 details.insert("cursor".to_string(), Value::from(state.cursor));
988 details.insert("commitSeq".to_string(), Value::from(root.commit_seq));
989 report.add_finding(finding(
990 LocalHealthSeverity::Error,
991 "local.verified_root_ahead_of_cursor",
992 "verifiedRoot",
993 "stored verified root is ahead of the persisted subscription cursor",
994 Some(&spec.id),
995 Some(&spec.table),
996 Some(LocalHealthRepairAction::ForceRebootstrap),
997 details,
998 ));
999 }
1000 }
1001}
1002
1003fn finding(
1004 severity: LocalHealthSeverity,
1005 code: &str,
1006 component: &str,
1007 message: &str,
1008 subscription_id: Option<&str>,
1009 table: Option<&str>,
1010 repair_action: Option<LocalHealthRepairAction>,
1011 details: BTreeMap<String, Value>,
1012) -> LocalHealthFinding {
1013 LocalHealthFinding {
1014 severity,
1015 code: code.to_string(),
1016 component: component.to_string(),
1017 message: message.to_string(),
1018 subscription_id: subscription_id.map(str::to_string),
1019 table: table.map(str::to_string),
1020 repair_action,
1021 details,
1022 }
1023}
1024
1025fn parse_error_details(error: serde_json::Error, byte_len: usize) -> BTreeMap<String, Value> {
1026 let mut details = BTreeMap::new();
1027 details.insert("byteLength".to_string(), Value::from(byte_len));
1028 details.insert("line".to_string(), Value::from(error.line()));
1029 details.insert("column".to_string(), Value::from(error.column()));
1030 details.insert("error".to_string(), Value::String(error.to_string()));
1031 details
1032}
1033
1034fn is_canonical_hex_root(root: &str) -> bool {
1035 root.len() == COMMIT_INTEGRITY_HEX_LENGTH
1036 && root
1037 .bytes()
1038 .all(|byte| matches!(byte, b'0'..=b'9' | b'a'..=b'f'))
1039}