Skip to main content

semantic_memory/
projection_lane.rs

1use crate::{
2    db, json_compat_import,
3    projection_batch::{encode_merge_decision, encode_review_state, ProjectionImportBatchLike},
4    projection_storage, MemoryError, MemoryStore,
5};
6use forge_memory_bridge::{
7    ImportProjectionRecord, ProjectionImportBatchV2, ProjectionImportBatchV3,
8    PROJECTION_IMPORT_BATCH_V2_SCHEMA,
9};
10use stack_ids::{DigestBuilder, ScopeKey};
11
12pub(crate) fn projection_import_failure_id(
13    source_envelope_id: &str,
14    schema_version: &str,
15    content_digest: &str,
16) -> String {
17    let mut builder = DigestBuilder::new();
18    builder
19        .update_str(source_envelope_id)
20        .separator()
21        .update_str(schema_version)
22        .separator()
23        .update_str(content_digest);
24    format!("projection-import-failure:{}", builder.finalize().hex())
25}
26
27fn serialized_import_evidence_bundle(
28    batch: &ProjectionImportBatchV2,
29) -> Result<(Option<String>, Option<String>), MemoryError> {
30    let Some(bundle) = batch.evidence_bundle.as_ref() else {
31        return Ok((None, None));
32    };
33
34    let bundle_json = serde_json::to_string(bundle).map_err(|err| {
35        MemoryError::Other(format!(
36            "failed to serialize canonical evidence bundle {} for import receipt: {}",
37            bundle.id, err
38        ))
39    })?;
40
41    Ok((Some(bundle.id.as_str().to_string()), Some(bundle_json)))
42}
43
44fn serialized_episode_bundle(
45    batch: &ProjectionImportBatchV2,
46) -> Result<(Option<String>, Option<String>), MemoryError> {
47    let Some(bundle) = batch.episode_bundle.as_ref() else {
48        return Ok((None, None));
49    };
50
51    let bundle_json = serde_json::to_string(bundle).map_err(|err| {
52        MemoryError::Other(format!(
53            "failed to serialize episode bundle {} for import receipt: {}",
54            bundle.bundle_id, err
55        ))
56    })?;
57
58    Ok((Some(bundle.bundle_id.clone()), Some(bundle_json)))
59}
60
61fn serialized_execution_context(
62    batch: &ProjectionImportBatchV2,
63) -> Result<Option<String>, MemoryError> {
64    batch
65        .execution_context
66        .as_ref()
67        .map(serde_json::to_string)
68        .transpose()
69        .map_err(|err| {
70            MemoryError::Other(format!(
71                "failed to serialize execution context for import receipt: {}",
72                err
73            ))
74        })
75}
76
77#[allow(clippy::too_many_arguments)]
78fn build_projection_import_log_row(
79    batch: &ProjectionImportBatchV2,
80    batch_id: String,
81    status: &str,
82    imported_at: String,
83    failure_reason: Option<String>,
84    kernel_payload_json: Option<String>,
85    record_count: usize,
86    claim_count: usize,
87    relation_count: usize,
88    episode_count: usize,
89    alias_count: usize,
90    evidence_count: usize,
91) -> Result<projection_storage::ProjectionImportLogRow, MemoryError> {
92    let (evidence_bundle_id, evidence_bundle_json) = serialized_import_evidence_bundle(batch)?;
93    let (episode_bundle_id, episode_bundle_json) = serialized_episode_bundle(batch)?;
94    let execution_context_json = serialized_execution_context(batch)?;
95
96    Ok(projection_storage::ProjectionImportLogRow {
97        batch_id,
98        source_envelope_id: batch.source_envelope_id.as_str().to_string(),
99        schema_version: batch.schema_version.clone(),
100        export_schema_version: batch.export_schema_version.clone(),
101        content_digest: batch.content_digest.hex().to_string(),
102        source_authority: batch.source_authority.clone(),
103        scope_namespace: batch.scope_key.namespace.clone(),
104        scope_domain: batch.scope_key.domain.clone(),
105        scope_workspace_id: batch.scope_key.workspace_id.clone(),
106        scope_repo_id: batch.scope_key.repo_id.clone(),
107        trace_id: batch.trace_ctx.as_ref().map(|ctx| ctx.trace_id.clone()),
108        record_count,
109        claim_count,
110        relation_count,
111        episode_count,
112        alias_count,
113        evidence_count,
114        status: status.into(),
115        source_exported_at: Some(batch.source_exported_at.clone()),
116        transformed_at: Some(batch.transformed_at.clone()),
117        imported_at,
118        source_run_id: batch
119            .export_meta
120            .as_ref()
121            .and_then(|meta| meta.run_id.clone()),
122        comparability_snapshot_version: batch
123            .export_meta
124            .as_ref()
125            .and_then(|meta| meta.comparability_snapshot_version.clone()),
126        direct_write: batch
127            .export_meta
128            .as_ref()
129            .map(|meta| meta.direct_write)
130            .unwrap_or(false),
131        failure_reason,
132        evidence_bundle_id,
133        evidence_bundle_json,
134        episode_bundle_id,
135        episode_bundle_json,
136        execution_context_json,
137        kernel_payload_json,
138    })
139}
140
141fn projection_identity_conflict_error(
142    source_envelope_id: &str,
143    content_digest: &str,
144    record_kind: &str,
145    record_id: &str,
146    existing_source_envelope_id: &str,
147) -> MemoryError {
148    if existing_source_envelope_id == source_envelope_id {
149        MemoryError::ImportMigrationRequired {
150            source_envelope_id: source_envelope_id.to_string(),
151            detail: format!(
152                "incoming {record_kind} {record_id} already exists for source_envelope_id {source_envelope_id} but the import receipt does not match digest {content_digest}; this usually means a historical digest migration replay or projection_import_log drift. Repair or clear the import receipts instead of replaying the same authoritative rows"
153            ),
154        }
155    } else {
156        MemoryError::ImportInvalid {
157            reason: format!(
158                "incoming {record_kind} {record_id} would collide with existing imported data from source_envelope_id {existing_source_envelope_id}; refusing ambiguous overwrite"
159            ),
160        }
161    }
162}
163
164fn check_projection_identity_conflicts(
165    conn: &rusqlite::Connection,
166    source_envelope_id: &str,
167    content_digest: &str,
168    claim_rows: &[projection_storage::ClaimVersionRow],
169    relation_rows: &[projection_storage::RelationVersionRow],
170) -> Result<(), MemoryError> {
171    for cv in claim_rows {
172        if let Some(existing_source_envelope_id) =
173            projection_storage::claim_version_source_envelope(conn, &cv.claim_version_id)?
174        {
175            return Err(projection_identity_conflict_error(
176                source_envelope_id,
177                content_digest,
178                "claim_version_id",
179                &cv.claim_version_id,
180                &existing_source_envelope_id,
181            ));
182        }
183    }
184
185    for rv in relation_rows {
186        if let Some(existing_source_envelope_id) =
187            projection_storage::relation_version_source_envelope(conn, &rv.relation_version_id)?
188        {
189            return Err(projection_identity_conflict_error(
190                source_envelope_id,
191                content_digest,
192                "relation_version_id",
193                &rv.relation_version_id,
194                &existing_source_envelope_id,
195            ));
196        }
197    }
198
199    Ok(())
200}
201
202fn parse_optional_receipt_json(
203    raw: Option<String>,
204    field_name: &str,
205    row_id: &str,
206) -> Result<Option<serde_json::Value>, MemoryError> {
207    raw.map(|value| {
208        serde_json::from_str(&value).map_err(|err| MemoryError::CorruptData {
209            table: "projection_import_receipts",
210            row_id: row_id.to_string(),
211            detail: format!("invalid {field_name}: {err}"),
212        })
213    })
214    .transpose()
215}
216
217fn parse_rebuildable_kernel_batch_v3(
218    kernel_payload_json: Option<&serde_json::Value>,
219    table: &'static str,
220    row_id: &str,
221) -> Result<Option<ProjectionImportBatchV3>, MemoryError> {
222    kernel_payload_json
223        .cloned()
224        .map(|value| {
225            serde_json::from_value(value).map_err(|err| MemoryError::CorruptData {
226                table,
227                row_id: row_id.to_string(),
228                detail: format!("invalid rebuildable kernel batch v3 receipt: {err}"),
229            })
230        })
231        .transpose()
232}
233
234fn parse_batch_timestamp(
235    value: Option<&str>,
236    column: &str,
237    row_kind: &str,
238    row_id: &str,
239) -> Result<Option<chrono::DateTime<chrono::Utc>>, MemoryError> {
240    match value {
241        Some(value) => chrono::DateTime::parse_from_rfc3339(value)
242            .map(|t| Some(t.with_timezone(&chrono::Utc)))
243            .map_err(|err| MemoryError::ImportInvalid {
244                reason: format!(
245                    "invalid {row_kind} {row_id} {column}: {value}; expected RFC3339 timestamp ({err})"
246                ),
247            }),
248        None => Ok(None),
249    }
250}
251
252fn parse_stored_timestamp(
253    value: Option<&str>,
254    table: &'static str,
255    row_id: &str,
256    column: &str,
257) -> Result<Option<chrono::DateTime<chrono::Utc>>, MemoryError> {
258    match value {
259        Some(value) => chrono::DateTime::parse_from_rfc3339(value)
260            .map(|t| Some(t.with_timezone(&chrono::Utc)))
261            .map_err(|err| MemoryError::CorruptData {
262                table,
263                row_id: row_id.to_string(),
264                detail: format!("invalid {column} timestamp '{value}' ({err})"),
265            }),
266        None => Ok(None),
267    }
268}
269
270fn validate_temporal_order(
271    row_kind: &str,
272    row_id: &str,
273    valid_from: Option<chrono::DateTime<chrono::Utc>>,
274    valid_to: Option<chrono::DateTime<chrono::Utc>>,
275) -> Result<(), MemoryError> {
276    if let (Some(from), Some(to)) = (valid_from, valid_to) {
277        if from >= to {
278            return Err(MemoryError::ImportInvalid {
279                reason: format!(
280                    "{row_kind} {row_id} has invalid interval: valid_from ({from}) is not < valid_to ({to})"
281                ),
282            });
283        }
284    }
285    Ok(())
286}
287
288fn intervals_overlap(
289    first_from: Option<chrono::DateTime<chrono::Utc>>,
290    first_to: Option<chrono::DateTime<chrono::Utc>>,
291    second_from: Option<chrono::DateTime<chrono::Utc>>,
292    second_to: Option<chrono::DateTime<chrono::Utc>>,
293) -> bool {
294    if let (Some(a_to), Some(b_from)) = (first_to, second_from) {
295        if b_from >= a_to {
296            return false;
297        }
298    }
299    if let (Some(a_from), Some(b_to)) = (first_from, second_to) {
300        if a_from >= b_to {
301            return false;
302        }
303    }
304    true
305}
306
307/// Result of a projection batch import (V11+).
308#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
309pub struct ProjectionImportResult {
310    /// Source envelope ID.
311    pub source_envelope_id: String,
312    /// Import status: "complete" or "already_imported".
313    pub status: String,
314    /// Number of records in the batch.
315    pub record_count: usize,
316    /// Whether this was a duplicate (idempotent no-op).
317    pub was_duplicate: bool,
318}
319
320/// Public view of a V11 projection import log entry.
321#[derive(Debug, Clone, schemars::JsonSchema, serde::Serialize, serde::Deserialize)]
322#[schemars(title = "ProjectionImportLogEntryV1")]
323pub struct ProjectionImportLogEntry {
324    pub batch_id: String,
325    pub source_envelope_id: String,
326    /// Import-side batch schema version recorded at the memory boundary.
327    pub schema_version: String,
328    /// Source export schema version preserved as provenance when provided.
329    pub export_schema_version: Option<String>,
330    pub content_digest: String,
331    pub source_authority: String,
332    pub scope_namespace: String,
333    pub scope_domain: Option<String>,
334    pub scope_workspace_id: Option<String>,
335    pub scope_repo_id: Option<String>,
336    pub record_count: usize,
337    pub claim_count: usize,
338    pub relation_count: usize,
339    pub episode_count: usize,
340    pub alias_count: usize,
341    pub evidence_count: usize,
342    pub status: String,
343    pub source_exported_at: Option<String>,
344    pub transformed_at: Option<String>,
345    pub imported_at: String,
346    pub source_run_id: Option<String>,
347    pub comparability_snapshot_version: Option<String>,
348    pub direct_write: bool,
349    pub failure_reason: Option<String>,
350    pub evidence_bundle_id: Option<String>,
351    pub evidence_bundle_json: Option<serde_json::Value>,
352    pub episode_bundle_id: Option<String>,
353    pub episode_bundle_json: Option<serde_json::Value>,
354    pub execution_context_json: Option<serde_json::Value>,
355    pub kernel_payload_json: Option<serde_json::Value>,
356}
357
358impl ProjectionImportLogEntry {
359    pub fn scope_key(&self) -> ScopeKey {
360        ScopeKey {
361            namespace: self.scope_namespace.clone(),
362            domain: self.scope_domain.clone(),
363            workspace_id: self.scope_workspace_id.clone(),
364            repo_id: self.scope_repo_id.clone(),
365        }
366    }
367
368    pub fn rebuildable_kernel_batch_v3(
369        &self,
370    ) -> Result<Option<ProjectionImportBatchV3>, MemoryError> {
371        parse_rebuildable_kernel_batch_v3(
372            self.kernel_payload_json.as_ref(),
373            "projection_import_log",
374            &self.batch_id,
375        )
376    }
377}
378
379/// Public view of a durable failed projection import receipt.
380#[derive(Debug, Clone, schemars::JsonSchema, serde::Serialize, serde::Deserialize)]
381#[schemars(title = "ImportFailureRecordV1")]
382pub struct ProjectionImportFailureReceiptEntry {
383    pub failure_id: String,
384    pub source_envelope_id: String,
385    pub schema_version: String,
386    pub export_schema_version: Option<String>,
387    pub content_digest: String,
388    pub source_authority: String,
389    pub scope_namespace: String,
390    pub scope_domain: Option<String>,
391    pub scope_workspace_id: Option<String>,
392    pub scope_repo_id: Option<String>,
393    pub record_count: usize,
394    pub error_kind: String,
395    pub error_message: String,
396    pub source_exported_at: Option<String>,
397    pub transformed_at: Option<String>,
398    pub failed_at: String,
399    pub source_run_id: Option<String>,
400    pub comparability_snapshot_version: Option<String>,
401    pub direct_write: bool,
402    pub evidence_bundle_id: Option<String>,
403    pub evidence_bundle_json: Option<serde_json::Value>,
404    pub episode_bundle_id: Option<String>,
405    pub episode_bundle_json: Option<serde_json::Value>,
406    pub execution_context_json: Option<serde_json::Value>,
407    pub kernel_payload_json: Option<serde_json::Value>,
408}
409
410impl ProjectionImportFailureReceiptEntry {
411    pub fn scope_key(&self) -> ScopeKey {
412        ScopeKey {
413            namespace: self.scope_namespace.clone(),
414            domain: self.scope_domain.clone(),
415            workspace_id: self.scope_workspace_id.clone(),
416            repo_id: self.scope_repo_id.clone(),
417        }
418    }
419
420    pub fn rebuildable_kernel_batch_v3(
421        &self,
422    ) -> Result<Option<ProjectionImportBatchV3>, MemoryError> {
423        parse_rebuildable_kernel_batch_v3(
424            self.kernel_payload_json.as_ref(),
425            "projection_import_failures",
426            &self.failure_id,
427        )
428    }
429}
430
431fn projection_import_log_entry_from_row(
432    r: projection_storage::ProjectionImportLogRow,
433) -> Result<ProjectionImportLogEntry, MemoryError> {
434    let row_id = r.batch_id.clone();
435    Ok(ProjectionImportLogEntry {
436        batch_id: r.batch_id,
437        source_envelope_id: r.source_envelope_id,
438        schema_version: r.schema_version,
439        export_schema_version: r.export_schema_version,
440        content_digest: r.content_digest,
441        source_authority: r.source_authority,
442        scope_namespace: r.scope_namespace,
443        scope_domain: r.scope_domain,
444        scope_workspace_id: r.scope_workspace_id,
445        scope_repo_id: r.scope_repo_id,
446        record_count: r.record_count,
447        claim_count: r.claim_count,
448        relation_count: r.relation_count,
449        episode_count: r.episode_count,
450        alias_count: r.alias_count,
451        evidence_count: r.evidence_count,
452        status: r.status,
453        source_exported_at: r.source_exported_at,
454        transformed_at: r.transformed_at,
455        imported_at: r.imported_at,
456        source_run_id: r.source_run_id,
457        comparability_snapshot_version: r.comparability_snapshot_version,
458        direct_write: r.direct_write,
459        failure_reason: r.failure_reason,
460        evidence_bundle_id: r.evidence_bundle_id,
461        evidence_bundle_json: parse_optional_receipt_json(
462            r.evidence_bundle_json,
463            "evidence_bundle_json",
464            &row_id,
465        )?,
466        episode_bundle_id: r.episode_bundle_id,
467        episode_bundle_json: parse_optional_receipt_json(
468            r.episode_bundle_json,
469            "episode_bundle_json",
470            &row_id,
471        )?,
472        execution_context_json: parse_optional_receipt_json(
473            r.execution_context_json,
474            "execution_context_json",
475            &row_id,
476        )?,
477        kernel_payload_json: parse_optional_receipt_json(
478            r.kernel_payload_json,
479            "kernel_payload_json",
480            &row_id,
481        )?,
482    })
483}
484
485fn projection_import_failure_entry_from_row(
486    r: projection_storage::ProjectionImportFailureRow,
487) -> Result<ProjectionImportFailureReceiptEntry, MemoryError> {
488    let row_id = r.failure_id.clone();
489    Ok(ProjectionImportFailureReceiptEntry {
490        failure_id: r.failure_id,
491        source_envelope_id: r.source_envelope_id,
492        schema_version: r.schema_version,
493        export_schema_version: r.export_schema_version,
494        content_digest: r.content_digest,
495        source_authority: r.source_authority,
496        scope_namespace: r.scope_namespace,
497        scope_domain: r.scope_domain,
498        scope_workspace_id: r.scope_workspace_id,
499        scope_repo_id: r.scope_repo_id,
500        record_count: r.record_count,
501        error_kind: r.error_kind,
502        error_message: r.error_message,
503        source_exported_at: r.source_exported_at,
504        transformed_at: r.transformed_at,
505        failed_at: r.failed_at,
506        source_run_id: r.source_run_id,
507        comparability_snapshot_version: r.comparability_snapshot_version,
508        direct_write: r.direct_write,
509        evidence_bundle_id: r.evidence_bundle_id,
510        evidence_bundle_json: parse_optional_receipt_json(
511            r.evidence_bundle_json,
512            "evidence_bundle_json",
513            &row_id,
514        )?,
515        episode_bundle_id: r.episode_bundle_id,
516        episode_bundle_json: parse_optional_receipt_json(
517            r.episode_bundle_json,
518            "episode_bundle_json",
519            &row_id,
520        )?,
521        execution_context_json: parse_optional_receipt_json(
522            r.execution_context_json,
523            "execution_context_json",
524            &row_id,
525        )?,
526        kernel_payload_json: parse_optional_receipt_json(
527            r.kernel_payload_json,
528            "kernel_payload_json",
529            &row_id,
530        )?,
531    })
532}
533
534impl MemoryStore {
535    async fn persist_projection_import_failure_receipt(
536        &self,
537        log_row: projection_storage::ProjectionImportLogRow,
538        error: &MemoryError,
539    ) {
540        let failed_log = projection_storage::ProjectionImportLogRow {
541            status: "failed".into(),
542            failure_reason: Some(error.to_string()),
543            ..log_row.clone()
544        };
545        let failure_row = projection_storage::ProjectionImportFailureRow {
546            failure_id: projection_import_failure_id(
547                &failed_log.source_envelope_id,
548                &failed_log.schema_version,
549                &failed_log.content_digest,
550            ),
551            source_envelope_id: failed_log.source_envelope_id.clone(),
552            schema_version: failed_log.schema_version.clone(),
553            export_schema_version: failed_log.export_schema_version.clone(),
554            content_digest: failed_log.content_digest.clone(),
555            source_authority: failed_log.source_authority.clone(),
556            scope_namespace: failed_log.scope_namespace.clone(),
557            scope_domain: failed_log.scope_domain.clone(),
558            scope_workspace_id: failed_log.scope_workspace_id.clone(),
559            scope_repo_id: failed_log.scope_repo_id.clone(),
560            trace_id: failed_log.trace_id.clone(),
561            record_count: failed_log.record_count,
562            error_kind: error.kind().into(),
563            error_message: error.to_string(),
564            source_exported_at: failed_log.source_exported_at.clone(),
565            transformed_at: failed_log.transformed_at.clone(),
566            failed_at: failed_log.imported_at.clone(),
567            source_run_id: failed_log.source_run_id.clone(),
568            comparability_snapshot_version: failed_log.comparability_snapshot_version.clone(),
569            direct_write: failed_log.direct_write,
570            evidence_bundle_id: failed_log.evidence_bundle_id.clone(),
571            evidence_bundle_json: failed_log.evidence_bundle_json.clone(),
572            episode_bundle_id: failed_log.episode_bundle_id.clone(),
573            episode_bundle_json: failed_log.episode_bundle_json.clone(),
574            execution_context_json: failed_log.execution_context_json.clone(),
575            kernel_payload_json: failed_log.kernel_payload_json.clone(),
576        };
577
578        let result = self
579            .with_write_conn(move |conn| {
580                projection_storage::upsert_projection_import_log_conn(conn, &failed_log)?;
581                projection_storage::insert_projection_import_failure(conn, &failure_row)?;
582                Ok(())
583            })
584            .await;
585
586        if let Err(log_error) = result {
587            tracing::warn!(
588                error = %log_error,
589                "failed to persist projection import failure receipt"
590            );
591        }
592    }
593
594    /// Import a projection batch from `forge-memory-bridge`.
595    ///
596    /// This is the canonical in-process import path for the stack:
597    /// `ExportEnvelopeV3 -> transform_envelope_v3() -> ProjectionImportBatchV3
598    /// -> semantic-memory import transaction`.
599    ///
600    /// V2 remains supported as a compatibility-normalized import batch shape.
601    /// The old `import_envelope()` method remains functional only during the
602    /// migration cycle. JSON parsing is retained only via
603    /// [`import_projection_batch_json_compat()`](Self::import_projection_batch_json_compat).
604    pub async fn import_projection_batch<B: ProjectionImportBatchLike>(
605        &self,
606        batch: &B,
607    ) -> Result<ProjectionImportResult, MemoryError> {
608        let kernel_payload_json = batch.kernel_payload_json()?;
609        let batch = batch.to_projection_import_batch_v2();
610
611        if batch.schema_version != PROJECTION_IMPORT_BATCH_V2_SCHEMA {
612            return Err(MemoryError::ImportInvalid {
613                reason: format!(
614                    "unsupported schema_version: {}; expected {}",
615                    batch.schema_version, PROJECTION_IMPORT_BATCH_V2_SCHEMA
616                ),
617            });
618        }
619
620        let source_envelope_id = batch.source_envelope_id.as_str().to_string();
621        let schema_version = batch.schema_version.clone();
622        let export_schema_version = batch.export_schema_version.clone();
623        let content_digest = batch.content_digest.hex().to_string();
624        let source_authority = batch.source_authority.clone();
625        let scope_namespace = batch.scope_key.namespace.clone();
626        let scope_domain = batch.scope_key.domain.clone();
627        let scope_workspace_id = batch.scope_key.workspace_id.clone();
628        let scope_repo_id = batch.scope_key.repo_id.clone();
629        let trace_id = batch.trace_ctx.as_ref().map(|ctx| ctx.trace_id.clone());
630        let source_exported_at = Some(batch.source_exported_at.clone());
631        let transformed_at = Some(batch.transformed_at.clone());
632        let source_run_id = batch
633            .export_meta
634            .as_ref()
635            .and_then(|meta| meta.run_id.clone());
636        let comparability_snapshot_version = batch
637            .export_meta
638            .as_ref()
639            .and_then(|meta| meta.comparability_snapshot_version.clone());
640        let direct_write = batch
641            .export_meta
642            .as_ref()
643            .map(|meta| meta.direct_write)
644            .unwrap_or(false);
645        let (evidence_bundle_id, evidence_bundle_json) = serialized_import_evidence_bundle(&batch)?;
646        let (episode_bundle_id, episode_bundle_json) = serialized_episode_bundle(&batch)?;
647        let execution_context_json = serialized_execution_context(&batch)?;
648        let record_len = batch.records.len();
649        let base_failure_log_row = projection_storage::ProjectionImportLogRow {
650            batch_id: projection_import_failure_id(
651                &source_envelope_id,
652                &schema_version,
653                &content_digest,
654            ),
655            source_envelope_id: source_envelope_id.clone(),
656            schema_version: schema_version.clone(),
657            export_schema_version: export_schema_version.clone(),
658            content_digest: content_digest.clone(),
659            source_authority: source_authority.clone(),
660            scope_namespace: scope_namespace.clone(),
661            scope_domain: scope_domain.clone(),
662            scope_workspace_id: scope_workspace_id.clone(),
663            scope_repo_id: scope_repo_id.clone(),
664            trace_id: trace_id.clone(),
665            record_count: record_len,
666            claim_count: 0,
667            relation_count: 0,
668            episode_count: 0,
669            alias_count: 0,
670            evidence_count: 0,
671            status: "failed".into(),
672            source_exported_at: source_exported_at.clone(),
673            transformed_at: transformed_at.clone(),
674            imported_at: chrono::Utc::now().to_rfc3339(),
675            source_run_id: source_run_id.clone(),
676            comparability_snapshot_version: comparability_snapshot_version.clone(),
677            direct_write,
678            failure_reason: None,
679            evidence_bundle_id,
680            evidence_bundle_json,
681            episode_bundle_id,
682            episode_bundle_json,
683            execution_context_json,
684            kernel_payload_json: kernel_payload_json.clone(),
685        };
686
687        if batch.schema_version != PROJECTION_IMPORT_BATCH_V2_SCHEMA {
688            let error = MemoryError::ImportInvalid {
689                reason: format!(
690                    "unsupported schema_version: {}; expected {}",
691                    batch.schema_version, PROJECTION_IMPORT_BATCH_V2_SCHEMA
692                ),
693            };
694            self.persist_projection_import_failure_receipt(base_failure_log_row, &error)
695                .await;
696            return Err(error);
697        }
698
699        let sei_c = source_envelope_id.clone();
700        let sv_c = schema_version.clone();
701        let cd_c = content_digest.clone();
702        let already = self
703            .with_read_conn(move |conn| {
704                projection_storage::check_projection_import_exists(conn, &sei_c, &sv_c, &cd_c)
705            })
706            .await?;
707
708        if already {
709            return Ok(ProjectionImportResult {
710                source_envelope_id,
711                status: "already_imported".into(),
712                record_count: record_len,
713                was_duplicate: true,
714            });
715        }
716
717        let batch_id = uuid::Uuid::new_v4().to_string();
718
719        let mut claim_count = 0usize;
720        let mut relation_count = 0usize;
721        let mut episode_count = 0usize;
722        let mut alias_count = 0usize;
723        let mut evidence_count = 0usize;
724
725        let mut claim_rows = Vec::new();
726        let mut relation_rows = Vec::new();
727        let mut alias_rows = Vec::new();
728        let mut evidence_rows = Vec::new();
729        let mut episode_rows = Vec::new();
730        let mut preferred_claim_intervals = Vec::new();
731        let mut preferred_relation_intervals = Vec::new();
732
733        for record in &batch.records {
734            match record {
735                ImportProjectionRecord::ClaimVersion(cv) => {
736                    let valid_from = parse_batch_timestamp(
737                        cv.valid_from.as_deref(),
738                        "valid_from",
739                        "claim_version",
740                        cv.claim_version_id.as_str(),
741                    )?;
742                    let valid_to = parse_batch_timestamp(
743                        cv.valid_to.as_deref(),
744                        "valid_to",
745                        "claim_version",
746                        cv.claim_version_id.as_str(),
747                    )?;
748                    validate_temporal_order(
749                        "claim_version",
750                        cv.claim_version_id.as_str(),
751                        valid_from,
752                        valid_to,
753                    )?;
754
755                    claim_count += 1;
756                    claim_rows.push(projection_storage::ClaimVersionRow {
757                        claim_version_id: cv.claim_version_id.as_str().to_string(),
758                        claim_id: cv.claim_id.as_str().to_string(),
759                        claim_state: cv.claim_state.as_str().to_string(),
760                        projection_family: cv.projection_family.clone(),
761                        subject_entity_id: cv.subject_entity_id.as_str().to_string(),
762                        predicate: cv.predicate.clone(),
763                        object_anchor: cv.object_anchor.to_string(),
764                        scope_namespace: cv.scope_key.namespace.clone(),
765                        scope_domain: cv.scope_key.domain.clone(),
766                        scope_workspace_id: cv.scope_key.workspace_id.clone(),
767                        scope_repo_id: cv.scope_key.repo_id.clone(),
768                        valid_from: cv.valid_from.clone(),
769                        valid_to: cv.valid_to.clone(),
770                        recorded_at: String::new(),
771                        preferred_open: cv.preferred_open,
772                        source_envelope_id: cv.source_envelope_id.as_str().to_string(),
773                        source_authority: cv.source_authority.clone(),
774                        trace_id: cv.trace_ctx.as_ref().map(|ctx| ctx.trace_id.clone()),
775                        freshness: cv.freshness.as_str().to_string(),
776                        contradiction_status: serde_json::to_string(&cv.contradiction_status)
777                            .unwrap_or_else(|_| "\"none\"".into()),
778                        supersedes_claim_version_id: cv
779                            .supersedes_claim_version_id
780                            .as_ref()
781                            .map(|id| id.as_str().to_string()),
782                        content: cv.content.clone(),
783                        confidence: cv.confidence,
784                        content_digest: Some(batch.content_digest.hex().to_string()),
785                        metadata: cv.metadata.as_ref().map(|v| v.to_string()),
786                    });
787
788                    if cv.preferred_open {
789                        preferred_claim_intervals.push((
790                            cv.claim_id.as_str().to_string(),
791                            cv.claim_version_id.as_str().to_string(),
792                            valid_from,
793                            valid_to,
794                        ));
795                    }
796                }
797                ImportProjectionRecord::RelationVersion(rv) => {
798                    let valid_from = parse_batch_timestamp(
799                        rv.valid_from.as_deref(),
800                        "valid_from",
801                        "relation_version",
802                        rv.relation_version_id.as_str(),
803                    )?;
804                    let valid_to = parse_batch_timestamp(
805                        rv.valid_to.as_deref(),
806                        "valid_to",
807                        "relation_version",
808                        rv.relation_version_id.as_str(),
809                    )?;
810                    validate_temporal_order(
811                        "relation_version",
812                        rv.relation_version_id.as_str(),
813                        valid_from,
814                        valid_to,
815                    )?;
816
817                    relation_count += 1;
818                    relation_rows.push(projection_storage::RelationVersionRow {
819                        relation_version_id: rv.relation_version_id.as_str().to_string(),
820                        subject_entity_id: rv.subject_entity_id.as_str().to_string(),
821                        predicate: rv.predicate.clone(),
822                        object_anchor: rv.object_anchor.to_string(),
823                        scope_namespace: rv.scope_key.namespace.clone(),
824                        scope_domain: rv.scope_key.domain.clone(),
825                        scope_workspace_id: rv.scope_key.workspace_id.clone(),
826                        scope_repo_id: rv.scope_key.repo_id.clone(),
827                        claim_id: rv.claim_id.as_ref().map(|id| id.as_str().to_string()),
828                        source_episode_id: rv
829                            .source_episode_id
830                            .as_ref()
831                            .map(|id| id.as_str().to_string()),
832                        valid_from: rv.valid_from.clone(),
833                        valid_to: rv.valid_to.clone(),
834                        recorded_at: String::new(),
835                        preferred_open: rv.preferred_open,
836                        supersedes_relation_version_id: rv
837                            .supersedes_relation_version_id
838                            .as_ref()
839                            .map(|id| id.as_str().to_string()),
840                        contradiction_status: serde_json::to_string(&rv.contradiction_status)
841                            .unwrap_or_else(|_| "\"none\"".into()),
842                        source_confidence: rv.source_confidence,
843                        projection_family: rv.projection_family.clone(),
844                        source_envelope_id: rv.source_envelope_id.as_str().to_string(),
845                        source_authority: rv.source_authority.clone(),
846                        trace_id: rv.trace_ctx.as_ref().map(|ctx| ctx.trace_id.clone()),
847                        freshness: rv.freshness.as_str().to_string(),
848                        metadata: rv.metadata.as_ref().map(|v| v.to_string()),
849                    });
850
851                    if rv.preferred_open {
852                        preferred_relation_intervals.push((
853                            rv.subject_entity_id.as_str().to_string(),
854                            rv.predicate.clone(),
855                            rv.object_anchor.to_string(),
856                            rv.scope_key.namespace.clone(),
857                            rv.scope_key.domain.clone(),
858                            rv.scope_key.workspace_id.clone(),
859                            rv.scope_key.repo_id.clone(),
860                            rv.projection_family.clone(),
861                            rv.relation_version_id.as_str().to_string(),
862                            valid_from,
863                            valid_to,
864                        ));
865                    }
866                }
867                ImportProjectionRecord::EntityAlias(ea) => {
868                    alias_count += 1;
869                    alias_rows.push(projection_storage::EntityAliasRow {
870                        canonical_entity_id: ea.canonical_entity_id.as_str().to_string(),
871                        alias_text: ea.alias_text.clone(),
872                        alias_source: ea.alias_source.clone(),
873                        match_evidence: ea.match_evidence.as_ref().map(|v| v.to_string()),
874                        confidence: ea.confidence,
875                        merge_decision: encode_merge_decision(&ea.merge_decision),
876                        scope_namespace: ea.scope.namespace.clone(),
877                        scope_domain: ea.scope.domain.clone(),
878                        scope_workspace_id: ea.scope.workspace_id.clone(),
879                        scope_repo_id: ea.scope.repo_id.clone(),
880                        review_state: encode_review_state(&ea.review_state),
881                        is_human_confirmed: ea.is_human_confirmed,
882                        is_human_confirmed_final: ea.is_human_confirmed_final,
883                        superseded_by_entity_id: ea
884                            .superseded_by_entity_id
885                            .as_ref()
886                            .map(|id| id.as_str().to_string()),
887                        split_from_entity_id: ea
888                            .split_from_entity_id
889                            .as_ref()
890                            .map(|id| id.as_str().to_string()),
891                        source_envelope_id: ea.source_envelope_id.as_str().to_string(),
892                        recorded_at: String::new(),
893                    });
894                }
895                ImportProjectionRecord::EvidenceRef(er) => {
896                    evidence_count += 1;
897                    evidence_rows.push(projection_storage::EvidenceRefRow {
898                        claim_id: er.claim_id.as_str().to_string(),
899                        claim_version_id: er
900                            .claim_version_id
901                            .as_ref()
902                            .map(|id| id.as_str().to_string()),
903                        fetch_handle: er.fetch_handle.clone(),
904                        source_authority: er.source_authority.clone(),
905                        source_envelope_id: er.source_envelope_id.as_str().to_string(),
906                        recorded_at: String::new(),
907                        metadata: er.metadata.as_ref().map(|v| v.to_string()),
908                    });
909                }
910                ImportProjectionRecord::Episode(ep) => {
911                    episode_count += 1;
912                    episode_rows.push(projection_storage::EpisodeLinkRow {
913                        episode_id: ep.episode_id.as_str().to_string(),
914                        document_id: ep.document_id.clone(),
915                        cause_ids: serde_json::to_string(&ep.cause_ids)
916                            .unwrap_or_else(|_| "[]".into()),
917                        effect_type: ep.effect_type.clone(),
918                        outcome: ep.outcome.clone(),
919                        confidence: ep.confidence,
920                        experiment_id: ep.experiment_id.clone(),
921                        source_envelope_id: ep.source_envelope_id.as_str().to_string(),
922                        source_authority: ep.source_authority.clone(),
923                        trace_id: ep.trace_ctx.as_ref().map(|ctx| ctx.trace_id.clone()),
924                        recorded_at: String::new(),
925                        metadata: ep.metadata.as_ref().map(|v| v.to_string()),
926                    });
927                }
928            }
929        }
930
931        for i in 0..preferred_claim_intervals.len() {
932            let (left_claim_id, left_version_id, left_from, left_to) =
933                &preferred_claim_intervals[i];
934            for (right_claim_id, right_version_id, right_from, right_to) in
935                preferred_claim_intervals.iter().skip(i + 1)
936            {
937                if left_claim_id != right_claim_id {
938                    continue;
939                }
940                if intervals_overlap(*left_from, *left_to, *right_from, *right_to) {
941                    let err = MemoryError::ImportInvalid {
942                        reason: format!(
943                            "preferred-open claim interval conflict for claim_id {left_claim_id}: \
944                             {left_version_id} ({left_from:?}, {left_to:?}) overlaps \
945                             {right_version_id} ({right_from:?}, {right_to:?})",
946                        ),
947                    };
948                    let failure_log_row = build_projection_import_log_row(
949                        &batch,
950                        batch_id.clone(),
951                        "failed",
952                        chrono::Utc::now().to_rfc3339(),
953                        Some(err.to_string()),
954                        kernel_payload_json.clone(),
955                        record_len,
956                        claim_count,
957                        relation_count,
958                        episode_count,
959                        alias_count,
960                        evidence_count,
961                    )?;
962                    self.persist_projection_import_failure_receipt(failure_log_row, &err)
963                        .await;
964                    return Err(err);
965                }
966            }
967        }
968
969        for i in 0..preferred_relation_intervals.len() {
970            let (
971                left_subject_entity_id,
972                left_predicate,
973                left_object_anchor,
974                left_scope_namespace,
975                left_scope_domain,
976                left_scope_workspace_id,
977                left_scope_repo_id,
978                left_projection_family,
979                left_relation_version_id,
980                left_from,
981                left_to,
982            ) = &preferred_relation_intervals[i];
983            for (
984                right_subject_entity_id,
985                right_predicate,
986                right_object_anchor,
987                right_scope_namespace,
988                right_scope_domain,
989                right_scope_workspace_id,
990                right_scope_repo_id,
991                right_projection_family,
992                right_relation_version_id,
993                right_from,
994                right_to,
995            ) in preferred_relation_intervals.iter().skip(i + 1)
996            {
997                if left_subject_entity_id != right_subject_entity_id
998                    || left_predicate != right_predicate
999                    || left_object_anchor != right_object_anchor
1000                    || left_scope_namespace != right_scope_namespace
1001                    || left_scope_domain != right_scope_domain
1002                    || left_scope_workspace_id != right_scope_workspace_id
1003                    || left_scope_repo_id != right_scope_repo_id
1004                    || left_projection_family != right_projection_family
1005                {
1006                    continue;
1007                }
1008
1009                if intervals_overlap(*left_from, *left_to, *right_from, *right_to) {
1010                    let err = MemoryError::ImportInvalid {
1011                        reason: format!(
1012                            "preferred-open relation interval conflict for relation key \
1013                             ({left_subject_entity_id}, {left_predicate}, {left_object_anchor}, \
1014                             {left_scope_namespace}/{left_scope_domain:?}/{left_scope_workspace_id:?}/{left_scope_repo_id:?}, \
1015                             {left_projection_family}): {left_relation_version_id} \
1016                             ({left_from:?}, {left_to:?}) overlaps {right_relation_version_id} \
1017                             ({right_from:?}, {right_to:?})",
1018                        ),
1019                    };
1020                    let failure_log_row = build_projection_import_log_row(
1021                        &batch,
1022                        batch_id.clone(),
1023                        "failed",
1024                        chrono::Utc::now().to_rfc3339(),
1025                        Some(err.to_string()),
1026                        kernel_payload_json.clone(),
1027                        record_len,
1028                        claim_count,
1029                        relation_count,
1030                        episode_count,
1031                        alias_count,
1032                        evidence_count,
1033                    )?;
1034                    self.persist_projection_import_failure_receipt(failure_log_row, &err)
1035                        .await;
1036                    return Err(err);
1037                }
1038            }
1039        }
1040
1041        let total_count = record_len;
1042        let success_log_row = build_projection_import_log_row(
1043            &batch,
1044            batch_id.clone(),
1045            "complete",
1046            chrono::Utc::now().to_rfc3339(),
1047            None,
1048            kernel_payload_json,
1049            total_count,
1050            claim_count,
1051            relation_count,
1052            episode_count,
1053            alias_count,
1054            evidence_count,
1055        )?;
1056
1057        let import_result = self
1058            .with_write_conn(move |conn| {
1059                let mut claim_rows = claim_rows;
1060                let mut relation_rows = relation_rows;
1061                let mut alias_rows = alias_rows;
1062                let mut evidence_rows = evidence_rows;
1063                let mut episode_rows = episode_rows;
1064                let preferred_claim_intervals = preferred_claim_intervals;
1065                let preferred_relation_intervals = preferred_relation_intervals;
1066                let mut log_row = success_log_row;
1067                db::with_transaction(conn, |tx| {
1068                    if projection_storage::check_projection_import_exists(
1069                        tx,
1070                        &source_envelope_id,
1071                        &schema_version,
1072                        &content_digest,
1073                    )? {
1074                        return Ok(ProjectionImportResult {
1075                            source_envelope_id,
1076                            status: "already_imported".into(),
1077                            record_count: total_count,
1078                            was_duplicate: true,
1079                        });
1080                    }
1081
1082                    check_projection_identity_conflicts(
1083                        tx,
1084                        &source_envelope_id,
1085                        &content_digest,
1086                        &claim_rows,
1087                        &relation_rows,
1088                    )?;
1089
1090                    let imported_at = chrono::Utc::now().to_rfc3339();
1091                    for cv in &mut claim_rows {
1092                        cv.recorded_at = imported_at.clone();
1093                    }
1094                    for rv in &mut relation_rows {
1095                        rv.recorded_at = imported_at.clone();
1096                    }
1097                    for ea in &mut alias_rows {
1098                        ea.recorded_at = imported_at.clone();
1099                    }
1100                    for er in &mut evidence_rows {
1101                        er.recorded_at = imported_at.clone();
1102                    }
1103                    for el in &mut episode_rows {
1104                        el.recorded_at = imported_at.clone();
1105                    }
1106
1107                    log_row.imported_at = imported_at.clone();
1108
1109                    for (claim_id, claim_version_id, interval_from, interval_to) in
1110                        preferred_claim_intervals.iter()
1111                    {
1112                        for (existing_claim_version_id, existing_valid_from, existing_valid_to) in
1113                            projection_storage::query_preferred_claim_intervals(tx, claim_id)?
1114                        {
1115                            let existing_valid_from = parse_stored_timestamp(
1116                                existing_valid_from.as_deref(),
1117                                "claim_versions",
1118                                &existing_claim_version_id,
1119                                "valid_from",
1120                            )?;
1121                            let existing_valid_to = parse_stored_timestamp(
1122                                existing_valid_to.as_deref(),
1123                                "claim_versions",
1124                                &existing_claim_version_id,
1125                                "valid_to",
1126                            )?;
1127                            if intervals_overlap(
1128                                *interval_from,
1129                                *interval_to,
1130                                existing_valid_from,
1131                                existing_valid_to,
1132                            ) {
1133                                return Err(MemoryError::ImportInvalid {
1134                                    reason: format!(
1135                                        "preferred-open claim interval conflict for claim_id {claim_id}: \
1136                                         incoming {claim_version_id} ({interval_from:?}, {interval_to:?}) \
1137                                         overlaps existing {existing_claim_version_id} \
1138                                         ({existing_valid_from:?}, {existing_valid_to:?})"
1139                                    ),
1140                                });
1141                            }
1142                        }
1143                    }
1144
1145                    for (
1146                        subject_entity_id,
1147                        predicate,
1148                        object_anchor,
1149                        scope_namespace,
1150                        scope_domain,
1151                        scope_workspace_id,
1152                        scope_repo_id,
1153                        projection_family,
1154                        relation_version_id,
1155                        interval_from,
1156                        interval_to,
1157                    ) in preferred_relation_intervals.iter()
1158                    {
1159                        for (
1160                            existing_relation_version_id,
1161                            existing_valid_from,
1162                            existing_valid_to,
1163                        ) in projection_storage::query_preferred_relation_intervals(
1164                            tx,
1165                            subject_entity_id,
1166                            predicate,
1167                            object_anchor,
1168                            scope_namespace,
1169                            scope_domain.as_deref(),
1170                            scope_workspace_id.as_deref(),
1171                            scope_repo_id.as_deref(),
1172                            projection_family,
1173                        )? {
1174                            let existing_valid_from = parse_stored_timestamp(
1175                                existing_valid_from.as_deref(),
1176                                "relation_versions",
1177                                &existing_relation_version_id,
1178                                "valid_from",
1179                            )?;
1180                            let existing_valid_to = parse_stored_timestamp(
1181                                existing_valid_to.as_deref(),
1182                                "relation_versions",
1183                                &existing_relation_version_id,
1184                                "valid_to",
1185                            )?;
1186                            if intervals_overlap(
1187                                *interval_from,
1188                                *interval_to,
1189                                existing_valid_from,
1190                                existing_valid_to,
1191                            ) {
1192                                return Err(MemoryError::ImportInvalid {
1193                                    reason: format!(
1194                                        "preferred-open relation interval conflict for relation key \
1195                                         ({subject_entity_id}, {predicate}, {object_anchor}, \
1196                                         {scope_namespace}/{scope_domain:?}/{scope_workspace_id:?}/{scope_repo_id:?}, \
1197                                         {projection_family}): incoming {relation_version_id} \
1198                                         ({interval_from:?}, {interval_to:?}) overlaps existing \
1199                                         {existing_relation_version_id} ({existing_valid_from:?}, \
1200                                         {existing_valid_to:?})"
1201                                    ),
1202                                });
1203                            }
1204                        }
1205                    }
1206
1207                    for cv in &claim_rows {
1208                        projection_storage::insert_claim_version(tx, cv)?;
1209                    }
1210                    for rv in &relation_rows {
1211                        projection_storage::insert_relation_version(tx, rv)?;
1212                    }
1213                    for ea in &alias_rows {
1214                        projection_storage::insert_entity_alias(tx, ea)?;
1215                    }
1216                    for er in &evidence_rows {
1217                        projection_storage::insert_evidence_ref(tx, er)?;
1218                    }
1219                    for el in &episode_rows {
1220                        projection_storage::insert_episode_link(tx, el)?;
1221                    }
1222
1223                    let add_edge = |source_kind: &str,
1224                                    source_id: &str,
1225                                    target_kind: &str,
1226                                    target_id: &str,
1227                                    derivation_type: &str,
1228                                    invalidation_mode: &str| {
1229                        projection_storage::insert_derivation_edge(
1230                            tx,
1231                            source_kind,
1232                            source_id,
1233                            target_kind,
1234                            target_id,
1235                            derivation_type,
1236                            invalidation_mode,
1237                        )
1238                    };
1239
1240                    for cv in &claim_rows {
1241                        add_edge(
1242                            "claim",
1243                            cv.claim_id.as_str(),
1244                            "claim_version",
1245                            cv.claim_version_id.as_str(),
1246                            "claim_version_of",
1247                            "on_source_change",
1248                        )?;
1249
1250                        if let Some(previous_version_id) = &cv.supersedes_claim_version_id {
1251                            add_edge(
1252                                "claim_version",
1253                                previous_version_id.as_str(),
1254                                "claim_version",
1255                                cv.claim_version_id.as_str(),
1256                                "supersedes",
1257                                "on_supersession",
1258                            )?;
1259                        }
1260                    }
1261
1262                    for rv in &relation_rows {
1263                        if let Some(claim_id) = &rv.claim_id {
1264                            add_edge(
1265                                "claim",
1266                                claim_id.as_str(),
1267                                "relation_version",
1268                                rv.relation_version_id.as_str(),
1269                                "supports_claim",
1270                                "on_source_change",
1271                            )?;
1272                        }
1273
1274                        if let Some(episode_id) = &rv.source_episode_id {
1275                            add_edge(
1276                                "episode",
1277                                episode_id.as_str(),
1278                                "relation_version",
1279                                rv.relation_version_id.as_str(),
1280                                "supports_episode",
1281                                "on_source_change",
1282                            )?;
1283                        }
1284
1285                        if let Some(previous_relation_id) = &rv.supersedes_relation_version_id {
1286                            add_edge(
1287                                "relation_version",
1288                                previous_relation_id.as_str(),
1289                                "relation_version",
1290                                rv.relation_version_id.as_str(),
1291                                "supersedes",
1292                                "on_supersession",
1293                            )?;
1294                        }
1295                    }
1296
1297                    for ea in &alias_rows {
1298                        add_edge(
1299                            "entity",
1300                            ea.canonical_entity_id.as_str(),
1301                            "entity_alias",
1302                            ea.canonical_entity_id.as_str(),
1303                            "canonical_alias",
1304                            "on_alias_split",
1305                        )?;
1306
1307                        if let Some(split_from_entity_id) = &ea.split_from_entity_id {
1308                            add_edge(
1309                                "entity",
1310                                split_from_entity_id.as_str(),
1311                                "entity_alias",
1312                                ea.canonical_entity_id.as_str(),
1313                                "alias_split_from",
1314                                "on_alias_split",
1315                            )?;
1316                        }
1317
1318                        if let Some(superseded_by_entity_id) = &ea.superseded_by_entity_id {
1319                            add_edge(
1320                                "entity",
1321                                superseded_by_entity_id.as_str(),
1322                                "entity_alias",
1323                                ea.canonical_entity_id.as_str(),
1324                                "alias_superseded_by",
1325                                "on_supersession",
1326                            )?;
1327                        }
1328                    }
1329
1330                    for er in &evidence_rows {
1331                        projection_storage::insert_derivation_edge(
1332                            tx,
1333                            "claim",
1334                            er.claim_id.as_str(),
1335                            "evidence_ref",
1336                            &er.fetch_handle,
1337                            "supports",
1338                            "on_source_change",
1339                        )?;
1340
1341                        if let Some(cvid) = &er.claim_version_id {
1342                            projection_storage::insert_derivation_edge(
1343                                tx,
1344                                "claim_version",
1345                                cvid.as_str(),
1346                                "evidence_ref",
1347                                &er.fetch_handle,
1348                                "supports",
1349                                "on_source_change",
1350                            )?;
1351                        }
1352                    }
1353
1354                    for el in &episode_rows {
1355                        let cause_ids: Vec<String> =
1356                            serde_json::from_str(&el.cause_ids).map_err(|err| {
1357                                MemoryError::ImportInvalid {
1358                                    reason: format!(
1359                                        "episode {} has invalid cause_ids JSON: {}",
1360                                        el.episode_id, err
1361                                    ),
1362                                }
1363                            })?;
1364
1365                        for cause_id in &cause_ids {
1366                            add_edge(
1367                                "claim",
1368                                cause_id,
1369                                "episode",
1370                                el.episode_id.as_str(),
1371                                "caused_by_claim",
1372                                "on_source_change",
1373                            )?;
1374                        }
1375                    }
1376
1377                    projection_storage::insert_projection_import_log(tx, &log_row)?;
1378
1379                    Ok(ProjectionImportResult {
1380                        source_envelope_id,
1381                        status: "complete".into(),
1382                        record_count: total_count,
1383                        was_duplicate: false,
1384                    })
1385                })
1386            })
1387            .await;
1388
1389        if let Err(ref error) = import_result {
1390            self.persist_projection_import_failure_receipt(base_failure_log_row, error)
1391                .await;
1392        }
1393
1394        import_result
1395    }
1396
1397    /// Deserialize and import a projection batch from JSON.
1398    ///
1399    /// This is a compatibility boundary for callers that still cross the
1400    /// in-process seam as serialized JSON. New code should pass
1401    /// `ProjectionImportBatchV3` directly to `import_projection_batch()`.
1402    pub async fn import_projection_batch_json_compat(
1403        &self,
1404        batch_json: &str,
1405    ) -> Result<ProjectionImportResult, MemoryError> {
1406        let batch = match json_compat_import::decode_projection_batch_json_compat(batch_json) {
1407            Ok(batch) => batch,
1408            Err(error) => {
1409                self.persist_projection_import_failure_receipt(
1410                    json_compat_import::build_json_compat_failure_log_row(
1411                        batch_json,
1412                        chrono::Utc::now().to_rfc3339(),
1413                    ),
1414                    &error,
1415                )
1416                .await;
1417                return Err(error);
1418            }
1419        };
1420        self.import_projection_batch(&batch).await
1421    }
1422
1423    /// Query the V11 projection import log.
1424    pub async fn query_projection_imports(
1425        &self,
1426        scope_namespace: Option<&str>,
1427        limit: usize,
1428    ) -> Result<Vec<ProjectionImportLogEntry>, MemoryError> {
1429        let ns = scope_namespace.map(|s| s.to_string());
1430        self.with_read_conn(move |conn| {
1431            let rows = projection_storage::query_projection_import_log(conn, ns.as_deref(), limit)?;
1432            rows.into_iter()
1433                .map(projection_import_log_entry_from_row)
1434                .collect::<Result<Vec<_>, MemoryError>>()
1435        })
1436        .await
1437    }
1438
1439    /// Return the most recent exact-scope import receipt carrying a rebuildable
1440    /// kernel V3 batch.
1441    pub async fn latest_rebuildable_kernel_projection_import_for_scope(
1442        &self,
1443        scope_key: &ScopeKey,
1444    ) -> Result<Option<ProjectionImportLogEntry>, MemoryError> {
1445        let scope_key = scope_key.clone();
1446        self.with_read_conn(move |conn| {
1447            projection_storage::latest_rebuildable_kernel_projection_import(conn, &scope_key)?
1448                .map(projection_import_log_entry_from_row)
1449                .transpose()
1450        })
1451        .await
1452    }
1453
1454    /// Query durable failed projection import receipts.
1455    pub async fn query_projection_import_failures(
1456        &self,
1457        scope_namespace: Option<&str>,
1458        limit: usize,
1459    ) -> Result<Vec<ProjectionImportFailureReceiptEntry>, MemoryError> {
1460        let ns = scope_namespace.map(|s| s.to_string());
1461        self.with_read_conn(move |conn| {
1462            let rows =
1463                projection_storage::query_projection_import_failures(conn, ns.as_deref(), limit)?;
1464            rows.into_iter()
1465                .map(projection_import_failure_entry_from_row)
1466                .collect::<Result<Vec<_>, MemoryError>>()
1467        })
1468        .await
1469    }
1470}