Skip to main content

syncular_runtime/core/
health.rs

1use crate::client::SubscriptionSpec;
2use crate::error::{Result, SyncularError};
3use crate::protocol::{BootstrapState, ScopeValues, COMMIT_INTEGRITY_HEX_LENGTH};
4use crate::store::{
5    now_ms, AppSchemaState, BlobHealthSummary, ConflictSummary, CrdtHealthSummary, OutboxSummary,
6    ScopedRowsHealthSummary, SubscriptionState, SyncStore, SyncStoreTx, VerifiedRoot,
7};
8use serde::{Deserialize, Serialize};
9use serde_json::Value;
10use std::collections::{BTreeMap, HashMap, HashSet};
11
12pub const LOCAL_SUPPORT_BUNDLE_FORMAT_VERSION: u32 = 1;
13
14#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
15#[serde(rename_all = "camelCase")]
16pub struct LocalHealthReport {
17    pub generated_at: i64,
18    pub ok: bool,
19    pub checked_subscriptions: usize,
20    pub checked_subscription_states: usize,
21    pub checked_verified_roots: usize,
22    pub checked_outbox_commits: usize,
23    pub checked_conflicts: usize,
24    pub checked_synced_rows: i64,
25    pub checked_blob_references: i64,
26    pub checked_crdt_documents: i64,
27    pub checked_crdt_update_log_entries: i64,
28    pub findings: Vec<LocalHealthFinding>,
29}
30
31#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
32#[serde(rename_all = "camelCase")]
33pub struct LocalHealthFinding {
34    pub severity: LocalHealthSeverity,
35    pub code: String,
36    pub component: String,
37    pub message: String,
38    #[serde(default, skip_serializing_if = "Option::is_none")]
39    pub subscription_id: Option<String>,
40    #[serde(default, skip_serializing_if = "Option::is_none")]
41    pub table: Option<String>,
42    #[serde(default, skip_serializing_if = "Option::is_none")]
43    pub repair_action: Option<LocalHealthRepairAction>,
44    #[serde(default, skip_serializing_if = "BTreeMap::is_empty")]
45    pub details: BTreeMap<String, Value>,
46}
47
48#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
49#[serde(rename_all = "camelCase")]
50pub struct LocalHealthRepairRequest {
51    pub action: LocalHealthRepairAction,
52    #[serde(default)]
53    pub subscription_ids: Vec<String>,
54    #[serde(default)]
55    pub tables: Vec<String>,
56}
57
58#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
59#[serde(rename_all = "camelCase")]
60pub struct LocalHealthRepairReport {
61    pub action: LocalHealthRepairAction,
62    pub deleted_subscription_states: usize,
63    pub deleted_verified_roots: usize,
64    pub forced_rebootstrap_subscriptions: usize,
65    pub cleared_orphaned_synced_rows: i64,
66    pub cleared_tables: Vec<String>,
67}
68
69#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
70#[serde(default, rename_all = "camelCase")]
71pub struct LocalSyncResetRequest {
72    pub subscription_ids: Vec<String>,
73    pub clear_synced_rows: bool,
74}
75
76impl Default for LocalSyncResetRequest {
77    fn default() -> Self {
78        Self {
79            subscription_ids: Vec::new(),
80            clear_synced_rows: false,
81        }
82    }
83}
84
85#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
86#[serde(rename_all = "camelCase")]
87pub struct LocalSyncResetReport {
88    pub reset_subscriptions: usize,
89    pub deleted_subscription_states: usize,
90    pub deleted_verified_roots: usize,
91    pub cleared_synced_rows: i64,
92    pub cleared_tables: Vec<String>,
93}
94
95#[derive(Debug, Clone, Serialize, Deserialize)]
96#[serde(rename_all = "camelCase")]
97pub struct LocalSupportBundle {
98    pub format_version: u32,
99    pub generated_at: i64,
100    pub redacted: bool,
101    pub source: String,
102    pub health: LocalHealthReport,
103    pub app_schema_state: AppSchemaState,
104    pub subscriptions: Vec<LocalSupportSubscription>,
105    pub subscription_states: Vec<LocalSupportSubscriptionState>,
106    pub verified_roots: Vec<LocalSupportVerifiedRoot>,
107    pub outbox: LocalSupportOutboxSummary,
108    pub conflicts: LocalSupportConflictSummary,
109    #[serde(default, skip_serializing_if = "Option::is_none")]
110    pub blob: Option<BlobHealthSummary>,
111    #[serde(default, skip_serializing_if = "Option::is_none")]
112    pub crdt: Option<CrdtHealthSummary>,
113}
114
115#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
116#[serde(rename_all = "camelCase")]
117pub struct LocalSupportSubscription {
118    pub id: String,
119    pub table: String,
120    pub scope_keys: Vec<String>,
121    pub scope_value_count: usize,
122    pub params_keys: Vec<String>,
123    pub params_value_count: usize,
124    pub bootstrap_phase: i64,
125}
126
127#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
128#[serde(rename_all = "camelCase")]
129pub struct LocalSupportSubscriptionState {
130    pub state_id: String,
131    pub subscription_id: String,
132    pub table: String,
133    pub scope_keys: Vec<String>,
134    pub scope_value_count: usize,
135    pub params_keys: Vec<String>,
136    pub params_value_count: usize,
137    pub cursor: i64,
138    pub status: String,
139    pub bootstrap_state_present: bool,
140    pub bootstrap_state_byte_len: usize,
141}
142
143#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
144#[serde(rename_all = "camelCase")]
145pub struct LocalSupportVerifiedRoot {
146    pub state_id: String,
147    pub subscription_id: String,
148    pub partition_id_present: bool,
149    pub partition_id_byte_len: usize,
150    pub commit_seq: i64,
151    pub root_byte_len: usize,
152    pub root_is_canonical_hex: bool,
153}
154
155#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
156#[serde(rename_all = "camelCase")]
157pub struct LocalSupportOutboxSummary {
158    pub total: usize,
159    pub by_status: BTreeMap<String, usize>,
160    pub by_schema_version: BTreeMap<i32, usize>,
161}
162
163#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
164#[serde(rename_all = "camelCase")]
165pub struct LocalSupportConflictSummary {
166    pub total: usize,
167    pub unresolved: usize,
168    pub resolved: usize,
169    pub by_result_status: BTreeMap<String, usize>,
170    pub by_code: BTreeMap<String, usize>,
171}
172
173#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
174#[serde(rename_all = "camelCase")]
175pub struct LocalSupportBundleImportReport {
176    pub format_version: u32,
177    pub generated_at: i64,
178    pub redacted: bool,
179    pub source: String,
180    pub health_ok: bool,
181    pub finding_count: usize,
182    pub subscription_count: usize,
183    pub subscription_state_count: usize,
184    pub verified_root_count: usize,
185    pub checked_subscription_states: usize,
186    pub checked_verified_roots: usize,
187    pub checked_outbox_commits: usize,
188    pub checked_conflicts: usize,
189    pub checked_synced_rows: i64,
190}
191
192#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
193#[serde(rename_all = "camelCase")]
194pub enum LocalHealthSeverity {
195    Info,
196    Warning,
197    Error,
198}
199
200#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
201#[serde(rename_all = "camelCase")]
202pub enum LocalHealthRepairAction {
203    ForceRebootstrap,
204    ClearOrphanedState,
205    ClearOrphanedSyncedRows,
206    ManualInspection,
207}
208
209impl LocalHealthReport {
210    fn new(checked_subscriptions: usize) -> Self {
211        Self {
212            generated_at: now_ms(),
213            ok: true,
214            checked_subscriptions,
215            checked_subscription_states: 0,
216            checked_verified_roots: 0,
217            checked_outbox_commits: 0,
218            checked_conflicts: 0,
219            checked_synced_rows: 0,
220            checked_blob_references: 0,
221            checked_crdt_documents: 0,
222            checked_crdt_update_log_entries: 0,
223            findings: Vec::new(),
224        }
225    }
226
227    fn add_finding(&mut self, finding: LocalHealthFinding) {
228        if finding.severity == LocalHealthSeverity::Error {
229            self.ok = false;
230        }
231        self.findings.push(finding);
232    }
233}
234
235pub fn check_local_health<S: SyncStore>(
236    store: &mut S,
237    state_id: &str,
238    subscriptions: &[SubscriptionSpec],
239) -> Result<LocalHealthReport> {
240    let mut states = Vec::new();
241    let mut roots = Vec::new();
242    store.transaction(|tx| {
243        states = tx.subscription_states(state_id)?;
244        roots = tx.verified_roots(state_id)?;
245        Ok(())
246    })?;
247    Ok(check_local_health_records(
248        state_id,
249        subscriptions,
250        &states,
251        &roots,
252    ))
253}
254
255pub fn check_local_health_records(
256    state_id: &str,
257    subscriptions: &[SubscriptionSpec],
258    states: &[SubscriptionState],
259    roots: &[VerifiedRoot],
260) -> LocalHealthReport {
261    let mut report = LocalHealthReport::new(subscriptions.len());
262    report.checked_subscription_states = states.len();
263    report.checked_verified_roots = roots.len();
264
265    let specs_by_id = subscriptions
266        .iter()
267        .map(|spec| (spec.id.as_str(), spec))
268        .collect::<HashMap<_, _>>();
269    let states_by_id = states
270        .iter()
271        .map(|state| (state.subscription_id.as_str(), state))
272        .collect::<HashMap<_, _>>();
273    let rooted_subscription_ids = roots
274        .iter()
275        .map(|root| root.subscription_id.as_str())
276        .collect::<HashSet<_>>();
277
278    for state in states {
279        if let Some(spec) = specs_by_id.get(state.subscription_id.as_str()) {
280            check_subscription_state(&mut report, spec, state);
281        } else {
282            check_orphaned_subscription_state(
283                &mut report,
284                state,
285                rooted_subscription_ids.contains(state.subscription_id.as_str()),
286            );
287        }
288    }
289
290    for root in roots {
291        if let Some(spec) = specs_by_id.get(root.subscription_id.as_str()) {
292            check_verified_root(
293                &mut report,
294                spec,
295                states_by_id.get(root.subscription_id.as_str()).copied(),
296                root,
297                state_id,
298            );
299        } else {
300            check_orphaned_verified_root(&mut report, root);
301        }
302    }
303
304    report
305}
306
307pub fn check_local_sync_state_health(
308    report: &mut LocalHealthReport,
309    current_schema_version: i32,
310    app_schema_state: &AppSchemaState,
311    outbox: &[OutboxSummary],
312    conflicts: &[ConflictSummary],
313    scoped_rows: Option<&ScopedRowsHealthSummary>,
314    blob: Option<&BlobHealthSummary>,
315    crdt: Option<&CrdtHealthSummary>,
316) {
317    check_app_schema_state(report, current_schema_version, app_schema_state);
318    check_outbox_summaries(report, current_schema_version, outbox);
319    check_conflict_summaries(report, conflicts);
320    if let Some(scoped_rows) = scoped_rows {
321        check_scoped_rows_health_summary(report, scoped_rows);
322    }
323    if let Some(blob) = blob {
324        check_blob_health_summary(report, blob);
325    }
326    if let Some(crdt) = crdt {
327        check_crdt_health_summary(report, crdt);
328    }
329}
330
331#[allow(clippy::too_many_arguments)]
332pub fn local_support_bundle_from_records(
333    source: impl Into<String>,
334    health: LocalHealthReport,
335    subscriptions: &[SubscriptionSpec],
336    states: &[SubscriptionState],
337    roots: &[VerifiedRoot],
338    app_schema_state: AppSchemaState,
339    outbox: &[OutboxSummary],
340    conflicts: &[ConflictSummary],
341    blob: Option<BlobHealthSummary>,
342    crdt: Option<CrdtHealthSummary>,
343) -> LocalSupportBundle {
344    LocalSupportBundle {
345        format_version: LOCAL_SUPPORT_BUNDLE_FORMAT_VERSION,
346        generated_at: now_ms(),
347        redacted: true,
348        source: source.into(),
349        health,
350        app_schema_state,
351        subscriptions: subscriptions
352            .iter()
353            .map(redacted_subscription)
354            .collect::<Vec<_>>(),
355        subscription_states: states
356            .iter()
357            .map(redacted_subscription_state)
358            .collect::<Vec<_>>(),
359        verified_roots: roots.iter().map(redacted_verified_root).collect::<Vec<_>>(),
360        outbox: redacted_outbox_summary(outbox),
361        conflicts: redacted_conflict_summary(conflicts),
362        blob,
363        crdt,
364    }
365}
366
367pub fn import_local_support_bundle_json(
368    bundle_json: &str,
369) -> Result<LocalSupportBundleImportReport> {
370    let bundle: LocalSupportBundle = serde_json::from_str(bundle_json)?;
371    if bundle.format_version != LOCAL_SUPPORT_BUNDLE_FORMAT_VERSION {
372        return Err(SyncularError::config(format!(
373            "unsupported local support bundle format version {}",
374            bundle.format_version
375        )));
376    }
377    if !bundle.redacted {
378        return Err(SyncularError::config(
379            "local support bundle import requires a redacted bundle",
380        ));
381    }
382    Ok(LocalSupportBundleImportReport {
383        format_version: bundle.format_version,
384        generated_at: bundle.generated_at,
385        redacted: bundle.redacted,
386        source: bundle.source,
387        health_ok: bundle.health.ok,
388        finding_count: bundle.health.findings.len(),
389        subscription_count: bundle.subscriptions.len(),
390        subscription_state_count: bundle.subscription_states.len(),
391        verified_root_count: bundle.verified_roots.len(),
392        checked_subscription_states: bundle.health.checked_subscription_states,
393        checked_verified_roots: bundle.health.checked_verified_roots,
394        checked_outbox_commits: bundle.health.checked_outbox_commits,
395        checked_conflicts: bundle.health.checked_conflicts,
396        checked_synced_rows: bundle.health.checked_synced_rows,
397    })
398}
399
400fn redacted_subscription(spec: &SubscriptionSpec) -> LocalSupportSubscription {
401    let mut scope_keys = spec.scopes.keys().cloned().collect::<Vec<_>>();
402    scope_keys.sort();
403    let mut params_keys = spec.params.keys().cloned().collect::<Vec<_>>();
404    params_keys.sort();
405    LocalSupportSubscription {
406        id: spec.id.clone(),
407        table: spec.table.clone(),
408        scope_keys,
409        scope_value_count: count_json_values(spec.scopes.values()),
410        params_keys,
411        params_value_count: count_json_values(spec.params.values()),
412        bootstrap_phase: spec.bootstrap_phase,
413    }
414}
415
416fn redacted_subscription_state(state: &SubscriptionState) -> LocalSupportSubscriptionState {
417    let (scope_keys, scope_value_count) = redacted_json_map_shape(&state.scopes_json);
418    let (params_keys, params_value_count) = redacted_json_map_shape(&state.params_json);
419    LocalSupportSubscriptionState {
420        state_id: state.state_id.clone(),
421        subscription_id: state.subscription_id.clone(),
422        table: state.table.clone(),
423        scope_keys,
424        scope_value_count,
425        params_keys,
426        params_value_count,
427        cursor: state.cursor,
428        status: state.status.clone(),
429        bootstrap_state_present: state.bootstrap_state_json.is_some(),
430        bootstrap_state_byte_len: state
431            .bootstrap_state_json
432            .as_ref()
433            .map_or(0, |value| value.len()),
434    }
435}
436
437fn redacted_verified_root(root: &VerifiedRoot) -> LocalSupportVerifiedRoot {
438    LocalSupportVerifiedRoot {
439        state_id: root.state_id.clone(),
440        subscription_id: root.subscription_id.clone(),
441        partition_id_present: !root.partition_id.is_empty(),
442        partition_id_byte_len: root.partition_id.len(),
443        commit_seq: root.commit_seq,
444        root_byte_len: root.root.len(),
445        root_is_canonical_hex: is_canonical_hex_root(&root.root),
446    }
447}
448
449fn redacted_outbox_summary(outbox: &[OutboxSummary]) -> LocalSupportOutboxSummary {
450    let mut by_status = BTreeMap::new();
451    let mut by_schema_version = BTreeMap::new();
452    for item in outbox {
453        *by_status.entry(item.status.clone()).or_insert(0) += 1;
454        *by_schema_version.entry(item.schema_version).or_insert(0) += 1;
455    }
456    LocalSupportOutboxSummary {
457        total: outbox.len(),
458        by_status,
459        by_schema_version,
460    }
461}
462
463fn redacted_conflict_summary(conflicts: &[ConflictSummary]) -> LocalSupportConflictSummary {
464    let mut by_result_status = BTreeMap::new();
465    let mut by_code = BTreeMap::new();
466    let mut resolved = 0usize;
467    for item in conflicts {
468        *by_result_status
469            .entry(item.result_status.clone())
470            .or_insert(0) += 1;
471        if let Some(code) = &item.code {
472            *by_code.entry(code.clone()).or_insert(0) += 1;
473        }
474        if item.resolved_at.is_some() || item.resolution.is_some() {
475            resolved += 1;
476        }
477    }
478    LocalSupportConflictSummary {
479        total: conflicts.len(),
480        unresolved: conflicts.len().saturating_sub(resolved),
481        resolved,
482        by_result_status,
483        by_code,
484    }
485}
486
487fn redacted_json_map_shape(json: &str) -> (Vec<String>, usize) {
488    match serde_json::from_str::<BTreeMap<String, Value>>(json) {
489        Ok(map) => {
490            let keys = map.keys().cloned().collect::<Vec<_>>();
491            let value_count = count_json_values(map.values());
492            (keys, value_count)
493        }
494        Err(_) => (Vec::new(), 0),
495    }
496}
497
498fn count_json_values<'a>(values: impl Iterator<Item = &'a Value>) -> usize {
499    values
500        .map(|value| value.as_array().map_or(1, Vec::len))
501        .sum()
502}
503
504fn check_app_schema_state(
505    report: &mut LocalHealthReport,
506    current_schema_version: i32,
507    app_schema_state: &AppSchemaState,
508) {
509    let Some(schema_version) = app_schema_state.schema_version else {
510        report.add_finding(finding(
511            LocalHealthSeverity::Error,
512            "local.app_schema_state_missing",
513            "appSchemaState",
514            "local app schema state has not been recorded",
515            None,
516            None,
517            Some(LocalHealthRepairAction::ManualInspection),
518            BTreeMap::new(),
519        ));
520        return;
521    };
522
523    if schema_version > current_schema_version {
524        let mut details = BTreeMap::new();
525        details.insert(
526            "localSchemaVersion".to_string(),
527            Value::from(schema_version),
528        );
529        details.insert(
530            "currentSchemaVersion".to_string(),
531            Value::from(current_schema_version),
532        );
533        report.add_finding(finding(
534            LocalHealthSeverity::Error,
535            "local.app_schema_state_future_version",
536            "appSchemaState",
537            "local app schema state was written by a newer generated client",
538            None,
539            None,
540            Some(LocalHealthRepairAction::ManualInspection),
541            details,
542        ));
543    } else if schema_version < current_schema_version {
544        let mut details = BTreeMap::new();
545        details.insert(
546            "localSchemaVersion".to_string(),
547            Value::from(schema_version),
548        );
549        details.insert(
550            "currentSchemaVersion".to_string(),
551            Value::from(current_schema_version),
552        );
553        report.add_finding(finding(
554            LocalHealthSeverity::Error,
555            "local.app_schema_state_stale_version",
556            "appSchemaState",
557            "local app schema state is older than the generated client",
558            None,
559            None,
560            Some(LocalHealthRepairAction::ManualInspection),
561            details,
562        ));
563    }
564}
565
566fn check_outbox_summaries(
567    report: &mut LocalHealthReport,
568    current_schema_version: i32,
569    outbox: &[OutboxSummary],
570) {
571    report.checked_outbox_commits = outbox.len();
572    let future_schema_count = outbox
573        .iter()
574        .filter(|item| item.schema_version > current_schema_version)
575        .count();
576    if future_schema_count > 0 {
577        let max_schema_version = outbox
578            .iter()
579            .map(|item| item.schema_version)
580            .max()
581            .unwrap_or(current_schema_version);
582        let mut details = BTreeMap::new();
583        details.insert("count".to_string(), Value::from(future_schema_count));
584        details.insert(
585            "currentSchemaVersion".to_string(),
586            Value::from(current_schema_version),
587        );
588        details.insert(
589            "maxSchemaVersion".to_string(),
590            Value::from(max_schema_version),
591        );
592        report.add_finding(finding(
593            LocalHealthSeverity::Error,
594            "local.outbox_future_schema_version",
595            "outbox",
596            "pending local outbox commits require a newer generated client",
597            None,
598            None,
599            Some(LocalHealthRepairAction::ManualInspection),
600            details,
601        ));
602    }
603
604    let failed_count = outbox.iter().filter(|item| item.status == "failed").count();
605    if failed_count > 0 {
606        let mut details = BTreeMap::new();
607        details.insert("count".to_string(), Value::from(failed_count));
608        report.add_finding(finding(
609            LocalHealthSeverity::Warning,
610            "local.outbox_failed_commits",
611            "outbox",
612            "local outbox contains failed commits that need app/user action",
613            None,
614            None,
615            Some(LocalHealthRepairAction::ManualInspection),
616            details,
617        ));
618    }
619}
620
621fn check_conflict_summaries(report: &mut LocalHealthReport, conflicts: &[ConflictSummary]) {
622    report.checked_conflicts = conflicts.len();
623    if conflicts.is_empty() {
624        return;
625    }
626
627    let mut details = BTreeMap::new();
628    details.insert("count".to_string(), Value::from(conflicts.len()));
629    report.add_finding(finding(
630        LocalHealthSeverity::Warning,
631        "local.conflicts_unresolved",
632        "conflicts",
633        "local store has unresolved sync conflicts",
634        None,
635        None,
636        Some(LocalHealthRepairAction::ManualInspection),
637        details,
638    ));
639}
640
641fn check_scoped_rows_health_summary(
642    report: &mut LocalHealthReport,
643    scoped_rows: &ScopedRowsHealthSummary,
644) {
645    report.checked_synced_rows = scoped_rows.checked_synced_rows;
646    for table in scoped_rows
647        .tables
648        .iter()
649        .filter(|table| table.orphaned_synced_rows > 0)
650    {
651        let mut details = BTreeMap::new();
652        details.insert("count".to_string(), Value::from(table.orphaned_synced_rows));
653        details.insert(
654            "checkedSyncedRows".to_string(),
655            Value::from(table.checked_synced_rows),
656        );
657        details.insert(
658            "totalOrphanedSyncedRows".to_string(),
659            Value::from(scoped_rows.orphaned_synced_rows),
660        );
661        report.add_finding(finding(
662            LocalHealthSeverity::Error,
663            "local.synced_rows_orphaned",
664            "appRows",
665            "local synced app rows are outside the configured subscription scopes",
666            None,
667            Some(&table.table),
668            Some(LocalHealthRepairAction::ClearOrphanedSyncedRows),
669            details,
670        ));
671    }
672}
673
674fn check_blob_health_summary(report: &mut LocalHealthReport, blob: &BlobHealthSummary) {
675    report.checked_blob_references = blob.checked_references;
676    if blob.invalid_references > 0 {
677        let mut details = BTreeMap::new();
678        details.insert("count".to_string(), Value::from(blob.invalid_references));
679        details.insert(
680            "checkedReferences".to_string(),
681            Value::from(blob.checked_references),
682        );
683        report.add_finding(finding(
684            LocalHealthSeverity::Error,
685            "local.blob_refs_invalid",
686            "blobs",
687            "local synced rows contain invalid blob references",
688            None,
689            None,
690            Some(LocalHealthRepairAction::ManualInspection),
691            details,
692        ));
693    }
694
695    if blob.upload_failed > 0 {
696        let mut details = BTreeMap::new();
697        details.insert("count".to_string(), Value::from(blob.upload_failed));
698        report.add_finding(finding(
699            LocalHealthSeverity::Warning,
700            "local.blob_uploads_failed",
701            "blobs",
702            "local blob upload queue contains failed uploads",
703            None,
704            None,
705            Some(LocalHealthRepairAction::ManualInspection),
706            details,
707        ));
708    }
709}
710
711fn check_crdt_health_summary(report: &mut LocalHealthReport, crdt: &CrdtHealthSummary) {
712    report.checked_crdt_documents = crdt.document_count;
713    report.checked_crdt_update_log_entries = crdt.log_updates;
714    if crdt.orphaned_documents > 0 {
715        let mut details = BTreeMap::new();
716        details.insert("count".to_string(), Value::from(crdt.orphaned_documents));
717        details.insert(
718            "documentCount".to_string(),
719            Value::from(crdt.document_count),
720        );
721        report.add_finding(finding(
722            LocalHealthSeverity::Error,
723            "local.crdt_documents_orphaned",
724            "crdt",
725            "local CRDT document metadata references missing app rows",
726            None,
727            None,
728            Some(LocalHealthRepairAction::ManualInspection),
729            details,
730        ));
731    }
732
733    if crdt.orphaned_log_entries > 0 {
734        let mut details = BTreeMap::new();
735        details.insert("count".to_string(), Value::from(crdt.orphaned_log_entries));
736        details.insert("logUpdates".to_string(), Value::from(crdt.log_updates));
737        report.add_finding(finding(
738            LocalHealthSeverity::Error,
739            "local.crdt_update_log_orphaned",
740            "crdt",
741            "local CRDT update log contains entries without document metadata",
742            None,
743            None,
744            Some(LocalHealthRepairAction::ManualInspection),
745            details,
746        ));
747    }
748}
749
750fn check_subscription_state(
751    report: &mut LocalHealthReport,
752    spec: &SubscriptionSpec,
753    state: &SubscriptionState,
754) {
755    if state.table != spec.table {
756        let mut details = BTreeMap::new();
757        details.insert(
758            "expectedTable".to_string(),
759            Value::String(spec.table.clone()),
760        );
761        details.insert(
762            "storedTable".to_string(),
763            Value::String(state.table.clone()),
764        );
765        report.add_finding(finding(
766            LocalHealthSeverity::Error,
767            "local.subscription_state_table_mismatch",
768            "subscriptionState",
769            "stored subscription state belongs to a different table",
770            Some(&spec.id),
771            Some(&spec.table),
772            Some(LocalHealthRepairAction::ForceRebootstrap),
773            details,
774        ));
775    }
776
777    if state.cursor < -1 {
778        let mut details = BTreeMap::new();
779        details.insert("cursor".to_string(), Value::from(state.cursor));
780        report.add_finding(finding(
781            LocalHealthSeverity::Error,
782            "local.subscription_state_invalid_cursor",
783            "subscriptionState",
784            "stored subscription cursor is below the bootstrap sentinel",
785            Some(&spec.id),
786            Some(&spec.table),
787            Some(LocalHealthRepairAction::ForceRebootstrap),
788            details,
789        ));
790    }
791
792    if let Err(error) = serde_json::from_str::<ScopeValues>(&state.scopes_json) {
793        report.add_finding(finding(
794            LocalHealthSeverity::Error,
795            "local.subscription_state_invalid_scopes_json",
796            "subscriptionState",
797            "stored subscription scopes are not valid JSON",
798            Some(&spec.id),
799            Some(&spec.table),
800            Some(LocalHealthRepairAction::ForceRebootstrap),
801            parse_error_details(error, state.scopes_json.len()),
802        ));
803    }
804
805    if let Err(error) = serde_json::from_str::<serde_json::Map<String, Value>>(&state.params_json) {
806        report.add_finding(finding(
807            LocalHealthSeverity::Error,
808            "local.subscription_state_invalid_params_json",
809            "subscriptionState",
810            "stored subscription params are not valid JSON",
811            Some(&spec.id),
812            Some(&spec.table),
813            Some(LocalHealthRepairAction::ForceRebootstrap),
814            parse_error_details(error, state.params_json.len()),
815        ));
816    }
817
818    if let Some(bootstrap_state_json) = state.bootstrap_state_json.as_deref() {
819        if let Err(error) = serde_json::from_str::<BootstrapState>(bootstrap_state_json) {
820            report.add_finding(finding(
821                LocalHealthSeverity::Error,
822                "local.subscription_state_invalid_bootstrap_json",
823                "subscriptionState",
824                "stored subscription bootstrap state is not valid",
825                Some(&spec.id),
826                Some(&spec.table),
827                Some(LocalHealthRepairAction::ForceRebootstrap),
828                parse_error_details(error, bootstrap_state_json.len()),
829            ));
830        }
831    }
832}
833
834fn check_orphaned_subscription_state(
835    report: &mut LocalHealthReport,
836    state: &SubscriptionState,
837    has_verified_root: bool,
838) {
839    let mut details = BTreeMap::new();
840    details.insert("status".to_string(), Value::String(state.status.clone()));
841    details.insert("cursor".to_string(), Value::from(state.cursor));
842    details.insert(
843        "hasVerifiedRoot".to_string(),
844        Value::from(has_verified_root),
845    );
846    report.add_finding(finding(
847        LocalHealthSeverity::Error,
848        "local.subscription_state_orphaned",
849        "subscriptionState",
850        "stored subscription state is not configured on this client",
851        Some(&state.subscription_id),
852        Some(&state.table),
853        Some(LocalHealthRepairAction::ClearOrphanedState),
854        details,
855    ));
856}
857
858fn check_orphaned_verified_root(report: &mut LocalHealthReport, root: &VerifiedRoot) {
859    let mut details = BTreeMap::new();
860    details.insert("commitSeq".to_string(), Value::from(root.commit_seq));
861    details.insert(
862        "partitionId".to_string(),
863        Value::String(root.partition_id.clone()),
864    );
865    report.add_finding(finding(
866        LocalHealthSeverity::Error,
867        "local.verified_root_orphaned",
868        "verifiedRoot",
869        "stored verified root is not configured on this client",
870        Some(&root.subscription_id),
871        None,
872        Some(LocalHealthRepairAction::ClearOrphanedState),
873        details,
874    ));
875}
876
877fn check_verified_root(
878    report: &mut LocalHealthReport,
879    spec: &SubscriptionSpec,
880    state: Option<&SubscriptionState>,
881    root: &VerifiedRoot,
882    state_id: &str,
883) {
884    if state.is_none() {
885        report.add_finding(finding(
886            LocalHealthSeverity::Error,
887            "local.verified_root_without_subscription_state",
888            "verifiedRoot",
889            "stored verified root has no matching subscription state",
890            Some(&spec.id),
891            Some(&spec.table),
892            Some(LocalHealthRepairAction::ForceRebootstrap),
893            BTreeMap::new(),
894        ));
895    }
896
897    if root.state_id != state_id {
898        let mut details = BTreeMap::new();
899        details.insert(
900            "expectedStateId".to_string(),
901            Value::String(state_id.to_string()),
902        );
903        details.insert(
904            "storedStateId".to_string(),
905            Value::String(root.state_id.clone()),
906        );
907        report.add_finding(finding(
908            LocalHealthSeverity::Error,
909            "local.verified_root_state_mismatch",
910            "verifiedRoot",
911            "stored verified root belongs to a different state",
912            Some(&spec.id),
913            Some(&spec.table),
914            Some(LocalHealthRepairAction::ForceRebootstrap),
915            details,
916        ));
917    }
918
919    if root.subscription_id != spec.id {
920        let mut details = BTreeMap::new();
921        details.insert(
922            "storedSubscriptionId".to_string(),
923            Value::String(root.subscription_id.clone()),
924        );
925        report.add_finding(finding(
926            LocalHealthSeverity::Error,
927            "local.verified_root_subscription_mismatch",
928            "verifiedRoot",
929            "stored verified root belongs to a different subscription",
930            Some(&spec.id),
931            Some(&spec.table),
932            Some(LocalHealthRepairAction::ForceRebootstrap),
933            details,
934        ));
935    }
936
937    if root.partition_id.is_empty() {
938        report.add_finding(finding(
939            LocalHealthSeverity::Error,
940            "local.verified_root_empty_partition",
941            "verifiedRoot",
942            "stored verified root has an empty partition",
943            Some(&spec.id),
944            Some(&spec.table),
945            Some(LocalHealthRepairAction::ForceRebootstrap),
946            BTreeMap::new(),
947        ));
948    }
949
950    if root.commit_seq < 0 {
951        let mut details = BTreeMap::new();
952        details.insert("commitSeq".to_string(), Value::from(root.commit_seq));
953        report.add_finding(finding(
954            LocalHealthSeverity::Error,
955            "local.verified_root_invalid_commit_seq",
956            "verifiedRoot",
957            "stored verified root has a negative commit sequence",
958            Some(&spec.id),
959            Some(&spec.table),
960            Some(LocalHealthRepairAction::ForceRebootstrap),
961            details,
962        ));
963    }
964
965    if !is_canonical_hex_root(&root.root) {
966        let mut details = BTreeMap::new();
967        details.insert("rootLength".to_string(), Value::from(root.root.len()));
968        details.insert(
969            "expectedRootLength".to_string(),
970            Value::from(COMMIT_INTEGRITY_HEX_LENGTH),
971        );
972        report.add_finding(finding(
973            LocalHealthSeverity::Error,
974            "local.verified_root_invalid_hex",
975            "verifiedRoot",
976            "stored verified root is not a canonical lowercase hex digest",
977            Some(&spec.id),
978            Some(&spec.table),
979            Some(LocalHealthRepairAction::ForceRebootstrap),
980            details,
981        ));
982    }
983
984    if let Some(state) = state {
985        if state.cursor >= 0 && root.commit_seq > state.cursor {
986            let mut details = BTreeMap::new();
987            details.insert("cursor".to_string(), Value::from(state.cursor));
988            details.insert("commitSeq".to_string(), Value::from(root.commit_seq));
989            report.add_finding(finding(
990                LocalHealthSeverity::Error,
991                "local.verified_root_ahead_of_cursor",
992                "verifiedRoot",
993                "stored verified root is ahead of the persisted subscription cursor",
994                Some(&spec.id),
995                Some(&spec.table),
996                Some(LocalHealthRepairAction::ForceRebootstrap),
997                details,
998            ));
999        }
1000    }
1001}
1002
1003fn finding(
1004    severity: LocalHealthSeverity,
1005    code: &str,
1006    component: &str,
1007    message: &str,
1008    subscription_id: Option<&str>,
1009    table: Option<&str>,
1010    repair_action: Option<LocalHealthRepairAction>,
1011    details: BTreeMap<String, Value>,
1012) -> LocalHealthFinding {
1013    LocalHealthFinding {
1014        severity,
1015        code: code.to_string(),
1016        component: component.to_string(),
1017        message: message.to_string(),
1018        subscription_id: subscription_id.map(str::to_string),
1019        table: table.map(str::to_string),
1020        repair_action,
1021        details,
1022    }
1023}
1024
1025fn parse_error_details(error: serde_json::Error, byte_len: usize) -> BTreeMap<String, Value> {
1026    let mut details = BTreeMap::new();
1027    details.insert("byteLength".to_string(), Value::from(byte_len));
1028    details.insert("line".to_string(), Value::from(error.line()));
1029    details.insert("column".to_string(), Value::from(error.column()));
1030    details.insert("error".to_string(), Value::String(error.to_string()));
1031    details
1032}
1033
1034fn is_canonical_hex_root(root: &str) -> bool {
1035    root.len() == COMMIT_INTEGRITY_HEX_LENGTH
1036        && root
1037            .bytes()
1038            .all(|byte| matches!(byte, b'0'..=b'9' | b'a'..=b'f'))
1039}