semantic-memory 0.5.1

Local-first hybrid semantic search (SQLite + FTS5 + usearch 2.25) with bitemporal truth and typed receipts
Documentation
//! JSON compatibility normalization for legacy projection batch payloads.

use crate::error::MemoryError;
use crate::projection_storage;
use forge_memory_bridge::{
    ContradictionStatus, MergeDecision, ProjectionImportBatchV2, ReviewState,
    PROJECTION_IMPORT_BATCH_V1_SCHEMA, PROJECTION_IMPORT_BATCH_V2_SCHEMA,
};
use stack_ids::ContentDigest;

pub(crate) const EXPORT_ENVELOPE_V1_JSON_COMPAT: &str = "export_envelope_v1";
pub(crate) const EXPORT_ENVELOPE_V2_JSON_COMPAT: &str = "export_envelope_v2";
pub(crate) const JSON_COMPAT_DEFAULT_TIMESTAMP: &str = "1970-01-01T00:00:00Z";

pub(crate) fn json_compat_invalid(reason: impl Into<String>) -> MemoryError {
    MemoryError::ImportInvalid {
        reason: format!("invalid batch JSON: {}", reason.into()),
    }
}

pub(crate) fn build_json_compat_failure_log_row(
    batch_json: &str,
    imported_at: String,
) -> projection_storage::ProjectionImportLogRow {
    let parsed = serde_json::from_str::<serde_json::Value>(batch_json).ok();
    let root = parsed.as_ref().and_then(|value| value.as_object());
    let scope_key = root
        .and_then(|obj| obj.get("scope_key"))
        .and_then(|value| value.as_object());
    let export_meta = root
        .and_then(|obj| obj.get("export_meta"))
        .and_then(|value| value.as_object());
    let evidence_bundle = root.and_then(|obj| obj.get("evidence_bundle"));
    let content_digest = root
        .and_then(|obj| obj.get("content_digest"))
        .and_then(|value| value.as_str())
        .map(|value| value.to_string())
        .unwrap_or_else(|| ContentDigest::compute_str(batch_json).hex().to_string());
    let original_schema_version = root
        .and_then(|obj| obj.get("schema_version"))
        .and_then(|value| value.as_str())
        .map(|value| value.to_string());
    let export_schema_version = root
        .and_then(|obj| obj.get("export_schema_version"))
        .and_then(|value| value.as_str())
        .map(|value| value.to_string());
    let (schema_version, export_schema_version) = match original_schema_version.as_deref() {
        Some(EXPORT_ENVELOPE_V1_JSON_COMPAT) => (
            PROJECTION_IMPORT_BATCH_V2_SCHEMA.to_string(),
            Some(EXPORT_ENVELOPE_V1_JSON_COMPAT.to_string()),
        ),
        Some(EXPORT_ENVELOPE_V2_JSON_COMPAT) => (
            PROJECTION_IMPORT_BATCH_V2_SCHEMA.to_string(),
            Some(EXPORT_ENVELOPE_V2_JSON_COMPAT.to_string()),
        ),
        Some(PROJECTION_IMPORT_BATCH_V1_SCHEMA) => (
            PROJECTION_IMPORT_BATCH_V2_SCHEMA.to_string(),
            export_schema_version,
        ),
        Some(other) => (other.to_string(), export_schema_version),
        None => ("json_compat_unknown".into(), export_schema_version),
    };
    let source_envelope_id = root
        .and_then(|obj| obj.get("source_envelope_id"))
        .and_then(|value| value.as_str())
        .map(|value| value.to_string())
        .unwrap_or_else(|| format!("json-compat-invalid:{content_digest}"));

    projection_storage::ProjectionImportLogRow {
        batch_id: crate::projection_import_failure_id(
            &source_envelope_id,
            &schema_version,
            &content_digest,
        ),
        source_envelope_id,
        schema_version,
        export_schema_version,
        content_digest,
        source_authority: root
            .and_then(|obj| obj.get("source_authority"))
            .and_then(|value| value.as_str())
            .unwrap_or("unknown")
            .to_string(),
        scope_namespace: scope_key
            .and_then(|obj| obj.get("namespace"))
            .and_then(|value| value.as_str())
            .or_else(|| {
                root.and_then(|obj| obj.get("namespace"))
                    .and_then(|value| value.as_str())
            })
            .unwrap_or("json-compat-invalid")
            .to_string(),
        scope_domain: scope_key
            .and_then(|obj| obj.get("domain"))
            .and_then(|value| value.as_str())
            .map(|value| value.to_string()),
        scope_workspace_id: scope_key
            .and_then(|obj| obj.get("workspace_id"))
            .and_then(|value| value.as_str())
            .map(|value| value.to_string()),
        scope_repo_id: scope_key
            .and_then(|obj| obj.get("repo_id"))
            .and_then(|value| value.as_str())
            .map(|value| value.to_string()),
        trace_id: root
            .and_then(|obj| obj.get("trace_ctx"))
            .and_then(|value| value.get("trace_id"))
            .and_then(|value| value.as_str())
            .or_else(|| {
                root.and_then(|obj| obj.get("trace_id"))
                    .and_then(|value| value.as_str())
            })
            .map(|value| value.to_string()),
        record_count: root
            .and_then(|obj| obj.get("records"))
            .and_then(|value| value.as_array())
            .map(|records| records.len())
            .unwrap_or(0),
        claim_count: 0,
        relation_count: 0,
        episode_count: 0,
        alias_count: 0,
        evidence_count: 0,
        status: "failed".into(),
        source_exported_at: root
            .and_then(|obj| obj.get("source_exported_at"))
            .and_then(|value| value.as_str())
            .map(|value| value.to_string()),
        transformed_at: root
            .and_then(|obj| obj.get("transformed_at"))
            .and_then(|value| value.as_str())
            .map(|value| value.to_string()),
        imported_at,
        source_run_id: export_meta
            .and_then(|obj| obj.get("run_id"))
            .and_then(|value| value.as_str())
            .map(|value| value.to_string()),
        comparability_snapshot_version: export_meta
            .and_then(|obj| obj.get("comparability_snapshot_version"))
            .and_then(|value| value.as_str())
            .map(|value| value.to_string()),
        direct_write: export_meta
            .and_then(|obj| obj.get("direct_write"))
            .and_then(|value| value.as_bool())
            .unwrap_or(false),
        failure_reason: None,
        evidence_bundle_id: evidence_bundle
            .and_then(|value| value.get("id"))
            .and_then(|value| value.as_str())
            .map(|value| value.to_string()),
        evidence_bundle_json: evidence_bundle.map(|value| value.to_string()),
        episode_bundle_id: None,
        episode_bundle_json: None,
        execution_context_json: None,
        kernel_payload_json: None,
    }
}

pub(crate) fn decode_projection_batch_json_compat(
    batch_json: &str,
) -> Result<ProjectionImportBatchV2, MemoryError> {
    let mut value: serde_json::Value =
        serde_json::from_str(batch_json).map_err(|e| json_compat_invalid(e.to_string()))?;
    let root = value
        .as_object_mut()
        .ok_or_else(|| json_compat_invalid("top-level payload must be an object"))?;

    let original_schema_version = root
        .get("schema_version")
        .and_then(|value| value.as_str())
        .map(|value| value.to_string());
    match original_schema_version.as_deref() {
        Some(EXPORT_ENVELOPE_V1_JSON_COMPAT) => {
            root.entry("export_schema_version".to_string())
                .or_insert_with(|| serde_json::json!(EXPORT_ENVELOPE_V1_JSON_COMPAT));
            root.insert(
                "schema_version".to_string(),
                serde_json::json!(PROJECTION_IMPORT_BATCH_V2_SCHEMA),
            );
        }
        Some(EXPORT_ENVELOPE_V2_JSON_COMPAT) => {
            root.entry("export_schema_version".to_string())
                .or_insert_with(|| serde_json::json!(EXPORT_ENVELOPE_V2_JSON_COMPAT));
            root.insert(
                "schema_version".to_string(),
                serde_json::json!(PROJECTION_IMPORT_BATCH_V2_SCHEMA),
            );
        }
        Some(PROJECTION_IMPORT_BATCH_V1_SCHEMA) => {
            root.insert(
                "schema_version".to_string(),
                serde_json::json!(PROJECTION_IMPORT_BATCH_V2_SCHEMA),
            );
        }
        Some(PROJECTION_IMPORT_BATCH_V2_SCHEMA) | None => {}
        Some(other) => {
            return Err(MemoryError::ImportInvalid {
                reason: format!(
                    "unsupported schema_version: {}; expected {}, {}, {}, or {}",
                    other,
                    PROJECTION_IMPORT_BATCH_V2_SCHEMA,
                    PROJECTION_IMPORT_BATCH_V1_SCHEMA,
                    EXPORT_ENVELOPE_V2_JSON_COMPAT,
                    EXPORT_ENVELOPE_V1_JSON_COMPAT
                ),
            });
        }
    }

    root.entry("source_exported_at".to_string())
        .or_insert_with(|| serde_json::json!(JSON_COMPAT_DEFAULT_TIMESTAMP));
    root.entry("transformed_at".to_string())
        .or_insert_with(|| serde_json::json!(JSON_COMPAT_DEFAULT_TIMESTAMP));

    let default_source_envelope_id = root.get("source_envelope_id").cloned();
    let default_source_authority = root.get("source_authority").cloned();
    let default_scope_key = root.get("scope_key").cloned();
    let default_trace_ctx = root.get("trace_ctx").cloned();

    if let Some(records) = root
        .get_mut("records")
        .and_then(|value| value.as_array_mut())
    {
        for record in records {
            let Some(obj) = record.as_object_mut() else {
                continue;
            };

            match obj.get("kind").and_then(|value| value.as_str()) {
                Some("claim_version") => {
                    insert_default_json_field(obj, "scope_key", default_scope_key.as_ref());
                    insert_default_json_field(
                        obj,
                        "source_envelope_id",
                        default_source_envelope_id.as_ref(),
                    );
                    insert_default_json_field(
                        obj,
                        "source_authority",
                        default_source_authority.as_ref(),
                    );
                    insert_default_json_field(obj, "trace_ctx", default_trace_ctx.as_ref());
                    insert_default_json_field(
                        obj,
                        "contradiction_status",
                        Some(&serde_json::json!(ContradictionStatus::None)),
                    );
                }
                Some("relation_version") => {
                    insert_default_json_field(obj, "scope_key", default_scope_key.as_ref());
                    insert_default_json_field(
                        obj,
                        "source_envelope_id",
                        default_source_envelope_id.as_ref(),
                    );
                    insert_default_json_field(
                        obj,
                        "source_authority",
                        default_source_authority.as_ref(),
                    );
                    insert_default_json_field(obj, "trace_ctx", default_trace_ctx.as_ref());
                    insert_default_json_field(
                        obj,
                        "contradiction_status",
                        Some(&serde_json::json!(ContradictionStatus::None)),
                    );
                }
                Some("episode") => {
                    insert_default_json_field(obj, "cause_ids", Some(&serde_json::json!([])));
                    insert_default_json_field(
                        obj,
                        "source_envelope_id",
                        default_source_envelope_id.as_ref(),
                    );
                    insert_default_json_field(
                        obj,
                        "source_authority",
                        default_source_authority.as_ref(),
                    );
                    insert_default_json_field(obj, "trace_ctx", default_trace_ctx.as_ref());
                }
                Some("entity_alias") => {
                    insert_default_json_field(obj, "scope", default_scope_key.as_ref());
                    insert_default_json_field(
                        obj,
                        "source_envelope_id",
                        default_source_envelope_id.as_ref(),
                    );
                    insert_default_json_field(
                        obj,
                        "review_state",
                        Some(&serde_json::json!(ReviewState::Unreviewed)),
                    );
                    insert_default_json_field(
                        obj,
                        "is_human_confirmed",
                        Some(&serde_json::json!(false)),
                    );
                    insert_default_json_field(
                        obj,
                        "is_human_confirmed_final",
                        Some(&serde_json::json!(false)),
                    );
                    validate_json_compat_field::<MergeDecision>(obj, "merge_decision")?;
                    validate_json_compat_field::<ReviewState>(obj, "review_state")?;
                }
                Some("evidence_ref") => {
                    insert_default_json_field(
                        obj,
                        "source_envelope_id",
                        default_source_envelope_id.as_ref(),
                    );
                }
                _ => {}
            }
        }
    }

    serde_json::from_value(value).map_err(|e| json_compat_invalid(e.to_string()))
}

fn insert_default_json_field(
    obj: &mut serde_json::Map<String, serde_json::Value>,
    key: &str,
    default: Option<&serde_json::Value>,
) {
    if !obj.contains_key(key) {
        if let Some(value) = default {
            obj.insert(key.to_string(), value.clone());
        }
    }
}

fn validate_json_compat_field<T>(
    obj: &serde_json::Map<String, serde_json::Value>,
    field: &str,
) -> Result<(), MemoryError>
where
    T: serde::de::DeserializeOwned,
{
    if let Some(value) = obj.get(field) {
        serde_json::from_value::<T>(value.clone())
            .map(|_| ())
            .map_err(|err| json_compat_invalid(format!("{field}: {err}")))?;
    }
    Ok(())
}