1use crate::{
2 db, json_compat_import,
3 projection_batch::{encode_merge_decision, encode_review_state, ProjectionImportBatchLike},
4 projection_storage, MemoryError, MemoryStore,
5};
6use forge_memory_bridge::{
7 ImportProjectionRecord, ProjectionImportBatchV2, ProjectionImportBatchV3,
8 PROJECTION_IMPORT_BATCH_V2_SCHEMA,
9};
10use stack_ids::{DigestBuilder, ScopeKey};
11
12pub(crate) fn projection_import_failure_id(
13 source_envelope_id: &str,
14 schema_version: &str,
15 content_digest: &str,
16) -> String {
17 let mut builder = DigestBuilder::new();
18 builder
19 .update_str(source_envelope_id)
20 .separator()
21 .update_str(schema_version)
22 .separator()
23 .update_str(content_digest);
24 format!("projection-import-failure:{}", builder.finalize().hex())
25}
26
27fn serialized_import_evidence_bundle(
28 batch: &ProjectionImportBatchV2,
29) -> Result<(Option<String>, Option<String>), MemoryError> {
30 let Some(bundle) = batch.evidence_bundle.as_ref() else {
31 return Ok((None, None));
32 };
33
34 let bundle_json = serde_json::to_string(bundle).map_err(|err| {
35 MemoryError::Other(format!(
36 "failed to serialize canonical evidence bundle {} for import receipt: {}",
37 bundle.id, err
38 ))
39 })?;
40
41 Ok((Some(bundle.id.as_str().to_string()), Some(bundle_json)))
42}
43
44fn serialized_episode_bundle(
45 batch: &ProjectionImportBatchV2,
46) -> Result<(Option<String>, Option<String>), MemoryError> {
47 let Some(bundle) = batch.episode_bundle.as_ref() else {
48 return Ok((None, None));
49 };
50
51 let bundle_json = serde_json::to_string(bundle).map_err(|err| {
52 MemoryError::Other(format!(
53 "failed to serialize episode bundle {} for import receipt: {}",
54 bundle.bundle_id, err
55 ))
56 })?;
57
58 Ok((Some(bundle.bundle_id.clone()), Some(bundle_json)))
59}
60
61fn serialized_execution_context(
62 batch: &ProjectionImportBatchV2,
63) -> Result<Option<String>, MemoryError> {
64 batch
65 .execution_context
66 .as_ref()
67 .map(serde_json::to_string)
68 .transpose()
69 .map_err(|err| {
70 MemoryError::Other(format!(
71 "failed to serialize execution context for import receipt: {}",
72 err
73 ))
74 })
75}
76
77#[allow(clippy::too_many_arguments)]
78fn build_projection_import_log_row(
79 batch: &ProjectionImportBatchV2,
80 batch_id: String,
81 status: &str,
82 imported_at: String,
83 failure_reason: Option<String>,
84 kernel_payload_json: Option<String>,
85 record_count: usize,
86 claim_count: usize,
87 relation_count: usize,
88 episode_count: usize,
89 alias_count: usize,
90 evidence_count: usize,
91) -> Result<projection_storage::ProjectionImportLogRow, MemoryError> {
92 let (evidence_bundle_id, evidence_bundle_json) = serialized_import_evidence_bundle(batch)?;
93 let (episode_bundle_id, episode_bundle_json) = serialized_episode_bundle(batch)?;
94 let execution_context_json = serialized_execution_context(batch)?;
95
96 Ok(projection_storage::ProjectionImportLogRow {
97 batch_id,
98 source_envelope_id: batch.source_envelope_id.as_str().to_string(),
99 schema_version: batch.schema_version.clone(),
100 export_schema_version: batch.export_schema_version.clone(),
101 content_digest: batch.content_digest.hex().to_string(),
102 source_authority: batch.source_authority.clone(),
103 scope_namespace: batch.scope_key.namespace.clone(),
104 scope_domain: batch.scope_key.domain.clone(),
105 scope_workspace_id: batch.scope_key.workspace_id.clone(),
106 scope_repo_id: batch.scope_key.repo_id.clone(),
107 trace_id: batch.trace_ctx.as_ref().map(|ctx| ctx.trace_id.clone()),
108 record_count,
109 claim_count,
110 relation_count,
111 episode_count,
112 alias_count,
113 evidence_count,
114 status: status.into(),
115 source_exported_at: Some(batch.source_exported_at.clone()),
116 transformed_at: Some(batch.transformed_at.clone()),
117 imported_at,
118 source_run_id: batch
119 .export_meta
120 .as_ref()
121 .and_then(|meta| meta.run_id.clone()),
122 comparability_snapshot_version: batch
123 .export_meta
124 .as_ref()
125 .and_then(|meta| meta.comparability_snapshot_version.clone()),
126 direct_write: batch
127 .export_meta
128 .as_ref()
129 .map(|meta| meta.direct_write)
130 .unwrap_or(false),
131 failure_reason,
132 evidence_bundle_id,
133 evidence_bundle_json,
134 episode_bundle_id,
135 episode_bundle_json,
136 execution_context_json,
137 kernel_payload_json,
138 })
139}
140
141fn projection_identity_conflict_error(
142 source_envelope_id: &str,
143 content_digest: &str,
144 record_kind: &str,
145 record_id: &str,
146 existing_source_envelope_id: &str,
147) -> MemoryError {
148 if existing_source_envelope_id == source_envelope_id {
149 MemoryError::ImportMigrationRequired {
150 source_envelope_id: source_envelope_id.to_string(),
151 detail: format!(
152 "incoming {record_kind} {record_id} already exists for source_envelope_id {source_envelope_id} but the import receipt does not match digest {content_digest}; this usually means a historical digest migration replay or projection_import_log drift. Repair or clear the import receipts instead of replaying the same authoritative rows"
153 ),
154 }
155 } else {
156 MemoryError::ImportInvalid {
157 reason: format!(
158 "incoming {record_kind} {record_id} would collide with existing imported data from source_envelope_id {existing_source_envelope_id}; refusing ambiguous overwrite"
159 ),
160 }
161 }
162}
163
164fn check_projection_identity_conflicts(
165 conn: &rusqlite::Connection,
166 source_envelope_id: &str,
167 content_digest: &str,
168 claim_rows: &[projection_storage::ClaimVersionRow],
169 relation_rows: &[projection_storage::RelationVersionRow],
170) -> Result<(), MemoryError> {
171 for cv in claim_rows {
172 if let Some(existing_source_envelope_id) =
173 projection_storage::claim_version_source_envelope(conn, &cv.claim_version_id)?
174 {
175 return Err(projection_identity_conflict_error(
176 source_envelope_id,
177 content_digest,
178 "claim_version_id",
179 &cv.claim_version_id,
180 &existing_source_envelope_id,
181 ));
182 }
183 }
184
185 for rv in relation_rows {
186 if let Some(existing_source_envelope_id) =
187 projection_storage::relation_version_source_envelope(conn, &rv.relation_version_id)?
188 {
189 return Err(projection_identity_conflict_error(
190 source_envelope_id,
191 content_digest,
192 "relation_version_id",
193 &rv.relation_version_id,
194 &existing_source_envelope_id,
195 ));
196 }
197 }
198
199 Ok(())
200}
201
202fn parse_optional_receipt_json(
203 raw: Option<String>,
204 field_name: &str,
205 row_id: &str,
206) -> Result<Option<serde_json::Value>, MemoryError> {
207 raw.map(|value| {
208 serde_json::from_str(&value).map_err(|err| MemoryError::CorruptData {
209 table: "projection_import_receipts",
210 row_id: row_id.to_string(),
211 detail: format!("invalid {field_name}: {err}"),
212 })
213 })
214 .transpose()
215}
216
217fn parse_rebuildable_kernel_batch_v3(
218 kernel_payload_json: Option<&serde_json::Value>,
219 table: &'static str,
220 row_id: &str,
221) -> Result<Option<ProjectionImportBatchV3>, MemoryError> {
222 kernel_payload_json
223 .cloned()
224 .map(|value| {
225 serde_json::from_value(value).map_err(|err| MemoryError::CorruptData {
226 table,
227 row_id: row_id.to_string(),
228 detail: format!("invalid rebuildable kernel batch v3 receipt: {err}"),
229 })
230 })
231 .transpose()
232}
233
234fn parse_batch_timestamp(
235 value: Option<&str>,
236 column: &str,
237 row_kind: &str,
238 row_id: &str,
239) -> Result<Option<chrono::DateTime<chrono::Utc>>, MemoryError> {
240 match value {
241 Some(value) => chrono::DateTime::parse_from_rfc3339(value)
242 .map(|t| Some(t.with_timezone(&chrono::Utc)))
243 .map_err(|err| MemoryError::ImportInvalid {
244 reason: format!(
245 "invalid {row_kind} {row_id} {column}: {value}; expected RFC3339 timestamp ({err})"
246 ),
247 }),
248 None => Ok(None),
249 }
250}
251
252fn parse_stored_timestamp(
253 value: Option<&str>,
254 table: &'static str,
255 row_id: &str,
256 column: &str,
257) -> Result<Option<chrono::DateTime<chrono::Utc>>, MemoryError> {
258 match value {
259 Some(value) => chrono::DateTime::parse_from_rfc3339(value)
260 .map(|t| Some(t.with_timezone(&chrono::Utc)))
261 .map_err(|err| MemoryError::CorruptData {
262 table,
263 row_id: row_id.to_string(),
264 detail: format!("invalid {column} timestamp '{value}' ({err})"),
265 }),
266 None => Ok(None),
267 }
268}
269
270fn validate_temporal_order(
271 row_kind: &str,
272 row_id: &str,
273 valid_from: Option<chrono::DateTime<chrono::Utc>>,
274 valid_to: Option<chrono::DateTime<chrono::Utc>>,
275) -> Result<(), MemoryError> {
276 if let (Some(from), Some(to)) = (valid_from, valid_to) {
277 if from >= to {
278 return Err(MemoryError::ImportInvalid {
279 reason: format!(
280 "{row_kind} {row_id} has invalid interval: valid_from ({from}) is not < valid_to ({to})"
281 ),
282 });
283 }
284 }
285 Ok(())
286}
287
288fn intervals_overlap(
289 first_from: Option<chrono::DateTime<chrono::Utc>>,
290 first_to: Option<chrono::DateTime<chrono::Utc>>,
291 second_from: Option<chrono::DateTime<chrono::Utc>>,
292 second_to: Option<chrono::DateTime<chrono::Utc>>,
293) -> bool {
294 if let (Some(a_to), Some(b_from)) = (first_to, second_from) {
295 if b_from >= a_to {
296 return false;
297 }
298 }
299 if let (Some(a_from), Some(b_to)) = (first_from, second_to) {
300 if a_from >= b_to {
301 return false;
302 }
303 }
304 true
305}
306
307#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
309pub struct ProjectionImportResult {
310 pub source_envelope_id: String,
312 pub status: String,
314 pub record_count: usize,
316 pub was_duplicate: bool,
318}
319
320#[derive(Debug, Clone, schemars::JsonSchema, serde::Serialize, serde::Deserialize)]
322#[schemars(title = "ProjectionImportLogEntryV1")]
323pub struct ProjectionImportLogEntry {
324 pub batch_id: String,
325 pub source_envelope_id: String,
326 pub schema_version: String,
328 pub export_schema_version: Option<String>,
330 pub content_digest: String,
331 pub source_authority: String,
332 pub scope_namespace: String,
333 pub scope_domain: Option<String>,
334 pub scope_workspace_id: Option<String>,
335 pub scope_repo_id: Option<String>,
336 pub record_count: usize,
337 pub claim_count: usize,
338 pub relation_count: usize,
339 pub episode_count: usize,
340 pub alias_count: usize,
341 pub evidence_count: usize,
342 pub status: String,
343 pub source_exported_at: Option<String>,
344 pub transformed_at: Option<String>,
345 pub imported_at: String,
346 pub source_run_id: Option<String>,
347 pub comparability_snapshot_version: Option<String>,
348 pub direct_write: bool,
349 pub failure_reason: Option<String>,
350 pub evidence_bundle_id: Option<String>,
351 pub evidence_bundle_json: Option<serde_json::Value>,
352 pub episode_bundle_id: Option<String>,
353 pub episode_bundle_json: Option<serde_json::Value>,
354 pub execution_context_json: Option<serde_json::Value>,
355 pub kernel_payload_json: Option<serde_json::Value>,
356}
357
358impl ProjectionImportLogEntry {
359 pub fn scope_key(&self) -> ScopeKey {
360 ScopeKey {
361 namespace: self.scope_namespace.clone(),
362 domain: self.scope_domain.clone(),
363 workspace_id: self.scope_workspace_id.clone(),
364 repo_id: self.scope_repo_id.clone(),
365 }
366 }
367
368 pub fn rebuildable_kernel_batch_v3(
369 &self,
370 ) -> Result<Option<ProjectionImportBatchV3>, MemoryError> {
371 parse_rebuildable_kernel_batch_v3(
372 self.kernel_payload_json.as_ref(),
373 "projection_import_log",
374 &self.batch_id,
375 )
376 }
377}
378
379#[derive(Debug, Clone, schemars::JsonSchema, serde::Serialize, serde::Deserialize)]
381#[schemars(title = "ImportFailureRecordV1")]
382pub struct ProjectionImportFailureReceiptEntry {
383 pub failure_id: String,
384 pub source_envelope_id: String,
385 pub schema_version: String,
386 pub export_schema_version: Option<String>,
387 pub content_digest: String,
388 pub source_authority: String,
389 pub scope_namespace: String,
390 pub scope_domain: Option<String>,
391 pub scope_workspace_id: Option<String>,
392 pub scope_repo_id: Option<String>,
393 pub record_count: usize,
394 pub error_kind: String,
395 pub error_message: String,
396 pub source_exported_at: Option<String>,
397 pub transformed_at: Option<String>,
398 pub failed_at: String,
399 pub source_run_id: Option<String>,
400 pub comparability_snapshot_version: Option<String>,
401 pub direct_write: bool,
402 pub evidence_bundle_id: Option<String>,
403 pub evidence_bundle_json: Option<serde_json::Value>,
404 pub episode_bundle_id: Option<String>,
405 pub episode_bundle_json: Option<serde_json::Value>,
406 pub execution_context_json: Option<serde_json::Value>,
407 pub kernel_payload_json: Option<serde_json::Value>,
408}
409
410impl ProjectionImportFailureReceiptEntry {
411 pub fn scope_key(&self) -> ScopeKey {
412 ScopeKey {
413 namespace: self.scope_namespace.clone(),
414 domain: self.scope_domain.clone(),
415 workspace_id: self.scope_workspace_id.clone(),
416 repo_id: self.scope_repo_id.clone(),
417 }
418 }
419
420 pub fn rebuildable_kernel_batch_v3(
421 &self,
422 ) -> Result<Option<ProjectionImportBatchV3>, MemoryError> {
423 parse_rebuildable_kernel_batch_v3(
424 self.kernel_payload_json.as_ref(),
425 "projection_import_failures",
426 &self.failure_id,
427 )
428 }
429}
430
431fn projection_import_log_entry_from_row(
432 r: projection_storage::ProjectionImportLogRow,
433) -> Result<ProjectionImportLogEntry, MemoryError> {
434 let row_id = r.batch_id.clone();
435 Ok(ProjectionImportLogEntry {
436 batch_id: r.batch_id,
437 source_envelope_id: r.source_envelope_id,
438 schema_version: r.schema_version,
439 export_schema_version: r.export_schema_version,
440 content_digest: r.content_digest,
441 source_authority: r.source_authority,
442 scope_namespace: r.scope_namespace,
443 scope_domain: r.scope_domain,
444 scope_workspace_id: r.scope_workspace_id,
445 scope_repo_id: r.scope_repo_id,
446 record_count: r.record_count,
447 claim_count: r.claim_count,
448 relation_count: r.relation_count,
449 episode_count: r.episode_count,
450 alias_count: r.alias_count,
451 evidence_count: r.evidence_count,
452 status: r.status,
453 source_exported_at: r.source_exported_at,
454 transformed_at: r.transformed_at,
455 imported_at: r.imported_at,
456 source_run_id: r.source_run_id,
457 comparability_snapshot_version: r.comparability_snapshot_version,
458 direct_write: r.direct_write,
459 failure_reason: r.failure_reason,
460 evidence_bundle_id: r.evidence_bundle_id,
461 evidence_bundle_json: parse_optional_receipt_json(
462 r.evidence_bundle_json,
463 "evidence_bundle_json",
464 &row_id,
465 )?,
466 episode_bundle_id: r.episode_bundle_id,
467 episode_bundle_json: parse_optional_receipt_json(
468 r.episode_bundle_json,
469 "episode_bundle_json",
470 &row_id,
471 )?,
472 execution_context_json: parse_optional_receipt_json(
473 r.execution_context_json,
474 "execution_context_json",
475 &row_id,
476 )?,
477 kernel_payload_json: parse_optional_receipt_json(
478 r.kernel_payload_json,
479 "kernel_payload_json",
480 &row_id,
481 )?,
482 })
483}
484
485fn projection_import_failure_entry_from_row(
486 r: projection_storage::ProjectionImportFailureRow,
487) -> Result<ProjectionImportFailureReceiptEntry, MemoryError> {
488 let row_id = r.failure_id.clone();
489 Ok(ProjectionImportFailureReceiptEntry {
490 failure_id: r.failure_id,
491 source_envelope_id: r.source_envelope_id,
492 schema_version: r.schema_version,
493 export_schema_version: r.export_schema_version,
494 content_digest: r.content_digest,
495 source_authority: r.source_authority,
496 scope_namespace: r.scope_namespace,
497 scope_domain: r.scope_domain,
498 scope_workspace_id: r.scope_workspace_id,
499 scope_repo_id: r.scope_repo_id,
500 record_count: r.record_count,
501 error_kind: r.error_kind,
502 error_message: r.error_message,
503 source_exported_at: r.source_exported_at,
504 transformed_at: r.transformed_at,
505 failed_at: r.failed_at,
506 source_run_id: r.source_run_id,
507 comparability_snapshot_version: r.comparability_snapshot_version,
508 direct_write: r.direct_write,
509 evidence_bundle_id: r.evidence_bundle_id,
510 evidence_bundle_json: parse_optional_receipt_json(
511 r.evidence_bundle_json,
512 "evidence_bundle_json",
513 &row_id,
514 )?,
515 episode_bundle_id: r.episode_bundle_id,
516 episode_bundle_json: parse_optional_receipt_json(
517 r.episode_bundle_json,
518 "episode_bundle_json",
519 &row_id,
520 )?,
521 execution_context_json: parse_optional_receipt_json(
522 r.execution_context_json,
523 "execution_context_json",
524 &row_id,
525 )?,
526 kernel_payload_json: parse_optional_receipt_json(
527 r.kernel_payload_json,
528 "kernel_payload_json",
529 &row_id,
530 )?,
531 })
532}
533
534impl MemoryStore {
535 async fn persist_projection_import_failure_receipt(
536 &self,
537 log_row: projection_storage::ProjectionImportLogRow,
538 error: &MemoryError,
539 ) {
540 let failed_log = projection_storage::ProjectionImportLogRow {
541 status: "failed".into(),
542 failure_reason: Some(error.to_string()),
543 ..log_row.clone()
544 };
545 let failure_row = projection_storage::ProjectionImportFailureRow {
546 failure_id: projection_import_failure_id(
547 &failed_log.source_envelope_id,
548 &failed_log.schema_version,
549 &failed_log.content_digest,
550 ),
551 source_envelope_id: failed_log.source_envelope_id.clone(),
552 schema_version: failed_log.schema_version.clone(),
553 export_schema_version: failed_log.export_schema_version.clone(),
554 content_digest: failed_log.content_digest.clone(),
555 source_authority: failed_log.source_authority.clone(),
556 scope_namespace: failed_log.scope_namespace.clone(),
557 scope_domain: failed_log.scope_domain.clone(),
558 scope_workspace_id: failed_log.scope_workspace_id.clone(),
559 scope_repo_id: failed_log.scope_repo_id.clone(),
560 trace_id: failed_log.trace_id.clone(),
561 record_count: failed_log.record_count,
562 error_kind: error.kind().into(),
563 error_message: error.to_string(),
564 source_exported_at: failed_log.source_exported_at.clone(),
565 transformed_at: failed_log.transformed_at.clone(),
566 failed_at: failed_log.imported_at.clone(),
567 source_run_id: failed_log.source_run_id.clone(),
568 comparability_snapshot_version: failed_log.comparability_snapshot_version.clone(),
569 direct_write: failed_log.direct_write,
570 evidence_bundle_id: failed_log.evidence_bundle_id.clone(),
571 evidence_bundle_json: failed_log.evidence_bundle_json.clone(),
572 episode_bundle_id: failed_log.episode_bundle_id.clone(),
573 episode_bundle_json: failed_log.episode_bundle_json.clone(),
574 execution_context_json: failed_log.execution_context_json.clone(),
575 kernel_payload_json: failed_log.kernel_payload_json.clone(),
576 };
577
578 let result = self
579 .with_write_conn(move |conn| {
580 projection_storage::upsert_projection_import_log_conn(conn, &failed_log)?;
581 projection_storage::insert_projection_import_failure(conn, &failure_row)?;
582 Ok(())
583 })
584 .await;
585
586 if let Err(log_error) = result {
587 tracing::warn!(
588 error = %log_error,
589 "failed to persist projection import failure receipt"
590 );
591 }
592 }
593
594 pub async fn import_projection_batch<B: ProjectionImportBatchLike>(
605 &self,
606 batch: &B,
607 ) -> Result<ProjectionImportResult, MemoryError> {
608 let kernel_payload_json = batch.kernel_payload_json()?;
609 let batch = batch.to_projection_import_batch_v2();
610
611 if batch.schema_version != PROJECTION_IMPORT_BATCH_V2_SCHEMA {
612 return Err(MemoryError::ImportInvalid {
613 reason: format!(
614 "unsupported schema_version: {}; expected {}",
615 batch.schema_version, PROJECTION_IMPORT_BATCH_V2_SCHEMA
616 ),
617 });
618 }
619
620 let source_envelope_id = batch.source_envelope_id.as_str().to_string();
621 let schema_version = batch.schema_version.clone();
622 let export_schema_version = batch.export_schema_version.clone();
623 let content_digest = batch.content_digest.hex().to_string();
624 let source_authority = batch.source_authority.clone();
625 let scope_namespace = batch.scope_key.namespace.clone();
626 let scope_domain = batch.scope_key.domain.clone();
627 let scope_workspace_id = batch.scope_key.workspace_id.clone();
628 let scope_repo_id = batch.scope_key.repo_id.clone();
629 let trace_id = batch.trace_ctx.as_ref().map(|ctx| ctx.trace_id.clone());
630 let source_exported_at = Some(batch.source_exported_at.clone());
631 let transformed_at = Some(batch.transformed_at.clone());
632 let source_run_id = batch
633 .export_meta
634 .as_ref()
635 .and_then(|meta| meta.run_id.clone());
636 let comparability_snapshot_version = batch
637 .export_meta
638 .as_ref()
639 .and_then(|meta| meta.comparability_snapshot_version.clone());
640 let direct_write = batch
641 .export_meta
642 .as_ref()
643 .map(|meta| meta.direct_write)
644 .unwrap_or(false);
645 let (evidence_bundle_id, evidence_bundle_json) = serialized_import_evidence_bundle(&batch)?;
646 let (episode_bundle_id, episode_bundle_json) = serialized_episode_bundle(&batch)?;
647 let execution_context_json = serialized_execution_context(&batch)?;
648 let record_len = batch.records.len();
649 let base_failure_log_row = projection_storage::ProjectionImportLogRow {
650 batch_id: projection_import_failure_id(
651 &source_envelope_id,
652 &schema_version,
653 &content_digest,
654 ),
655 source_envelope_id: source_envelope_id.clone(),
656 schema_version: schema_version.clone(),
657 export_schema_version: export_schema_version.clone(),
658 content_digest: content_digest.clone(),
659 source_authority: source_authority.clone(),
660 scope_namespace: scope_namespace.clone(),
661 scope_domain: scope_domain.clone(),
662 scope_workspace_id: scope_workspace_id.clone(),
663 scope_repo_id: scope_repo_id.clone(),
664 trace_id: trace_id.clone(),
665 record_count: record_len,
666 claim_count: 0,
667 relation_count: 0,
668 episode_count: 0,
669 alias_count: 0,
670 evidence_count: 0,
671 status: "failed".into(),
672 source_exported_at: source_exported_at.clone(),
673 transformed_at: transformed_at.clone(),
674 imported_at: chrono::Utc::now().to_rfc3339(),
675 source_run_id: source_run_id.clone(),
676 comparability_snapshot_version: comparability_snapshot_version.clone(),
677 direct_write,
678 failure_reason: None,
679 evidence_bundle_id,
680 evidence_bundle_json,
681 episode_bundle_id,
682 episode_bundle_json,
683 execution_context_json,
684 kernel_payload_json: kernel_payload_json.clone(),
685 };
686
687 if batch.schema_version != PROJECTION_IMPORT_BATCH_V2_SCHEMA {
688 let error = MemoryError::ImportInvalid {
689 reason: format!(
690 "unsupported schema_version: {}; expected {}",
691 batch.schema_version, PROJECTION_IMPORT_BATCH_V2_SCHEMA
692 ),
693 };
694 self.persist_projection_import_failure_receipt(base_failure_log_row, &error)
695 .await;
696 return Err(error);
697 }
698
699 let sei_c = source_envelope_id.clone();
700 let sv_c = schema_version.clone();
701 let cd_c = content_digest.clone();
702 let already = self
703 .with_read_conn(move |conn| {
704 projection_storage::check_projection_import_exists(conn, &sei_c, &sv_c, &cd_c)
705 })
706 .await?;
707
708 if already {
709 return Ok(ProjectionImportResult {
710 source_envelope_id,
711 status: "already_imported".into(),
712 record_count: record_len,
713 was_duplicate: true,
714 });
715 }
716
717 let batch_id = uuid::Uuid::new_v4().to_string();
718
719 let mut claim_count = 0usize;
720 let mut relation_count = 0usize;
721 let mut episode_count = 0usize;
722 let mut alias_count = 0usize;
723 let mut evidence_count = 0usize;
724
725 let mut claim_rows = Vec::new();
726 let mut relation_rows = Vec::new();
727 let mut alias_rows = Vec::new();
728 let mut evidence_rows = Vec::new();
729 let mut episode_rows = Vec::new();
730 let mut preferred_claim_intervals = Vec::new();
731 let mut preferred_relation_intervals = Vec::new();
732
733 for record in &batch.records {
734 match record {
735 ImportProjectionRecord::ClaimVersion(cv) => {
736 let valid_from = parse_batch_timestamp(
737 cv.valid_from.as_deref(),
738 "valid_from",
739 "claim_version",
740 cv.claim_version_id.as_str(),
741 )?;
742 let valid_to = parse_batch_timestamp(
743 cv.valid_to.as_deref(),
744 "valid_to",
745 "claim_version",
746 cv.claim_version_id.as_str(),
747 )?;
748 validate_temporal_order(
749 "claim_version",
750 cv.claim_version_id.as_str(),
751 valid_from,
752 valid_to,
753 )?;
754
755 claim_count += 1;
756 claim_rows.push(projection_storage::ClaimVersionRow {
757 claim_version_id: cv.claim_version_id.as_str().to_string(),
758 claim_id: cv.claim_id.as_str().to_string(),
759 claim_state: cv.claim_state.as_str().to_string(),
760 projection_family: cv.projection_family.clone(),
761 subject_entity_id: cv.subject_entity_id.as_str().to_string(),
762 predicate: cv.predicate.clone(),
763 object_anchor: cv.object_anchor.to_string(),
764 scope_namespace: cv.scope_key.namespace.clone(),
765 scope_domain: cv.scope_key.domain.clone(),
766 scope_workspace_id: cv.scope_key.workspace_id.clone(),
767 scope_repo_id: cv.scope_key.repo_id.clone(),
768 valid_from: cv.valid_from.clone(),
769 valid_to: cv.valid_to.clone(),
770 recorded_at: String::new(),
771 preferred_open: cv.preferred_open,
772 source_envelope_id: cv.source_envelope_id.as_str().to_string(),
773 source_authority: cv.source_authority.clone(),
774 trace_id: cv.trace_ctx.as_ref().map(|ctx| ctx.trace_id.clone()),
775 freshness: cv.freshness.as_str().to_string(),
776 contradiction_status: serde_json::to_string(&cv.contradiction_status)
777 .unwrap_or_else(|_| "\"none\"".into()),
778 supersedes_claim_version_id: cv
779 .supersedes_claim_version_id
780 .as_ref()
781 .map(|id| id.as_str().to_string()),
782 content: cv.content.clone(),
783 confidence: cv.confidence,
784 content_digest: Some(batch.content_digest.hex().to_string()),
785 metadata: cv.metadata.as_ref().map(|v| v.to_string()),
786 });
787
788 if cv.preferred_open {
789 preferred_claim_intervals.push((
790 cv.claim_id.as_str().to_string(),
791 cv.claim_version_id.as_str().to_string(),
792 valid_from,
793 valid_to,
794 ));
795 }
796 }
797 ImportProjectionRecord::RelationVersion(rv) => {
798 let valid_from = parse_batch_timestamp(
799 rv.valid_from.as_deref(),
800 "valid_from",
801 "relation_version",
802 rv.relation_version_id.as_str(),
803 )?;
804 let valid_to = parse_batch_timestamp(
805 rv.valid_to.as_deref(),
806 "valid_to",
807 "relation_version",
808 rv.relation_version_id.as_str(),
809 )?;
810 validate_temporal_order(
811 "relation_version",
812 rv.relation_version_id.as_str(),
813 valid_from,
814 valid_to,
815 )?;
816
817 relation_count += 1;
818 relation_rows.push(projection_storage::RelationVersionRow {
819 relation_version_id: rv.relation_version_id.as_str().to_string(),
820 subject_entity_id: rv.subject_entity_id.as_str().to_string(),
821 predicate: rv.predicate.clone(),
822 object_anchor: rv.object_anchor.to_string(),
823 scope_namespace: rv.scope_key.namespace.clone(),
824 scope_domain: rv.scope_key.domain.clone(),
825 scope_workspace_id: rv.scope_key.workspace_id.clone(),
826 scope_repo_id: rv.scope_key.repo_id.clone(),
827 claim_id: rv.claim_id.as_ref().map(|id| id.as_str().to_string()),
828 source_episode_id: rv
829 .source_episode_id
830 .as_ref()
831 .map(|id| id.as_str().to_string()),
832 valid_from: rv.valid_from.clone(),
833 valid_to: rv.valid_to.clone(),
834 recorded_at: String::new(),
835 preferred_open: rv.preferred_open,
836 supersedes_relation_version_id: rv
837 .supersedes_relation_version_id
838 .as_ref()
839 .map(|id| id.as_str().to_string()),
840 contradiction_status: serde_json::to_string(&rv.contradiction_status)
841 .unwrap_or_else(|_| "\"none\"".into()),
842 source_confidence: rv.source_confidence,
843 projection_family: rv.projection_family.clone(),
844 source_envelope_id: rv.source_envelope_id.as_str().to_string(),
845 source_authority: rv.source_authority.clone(),
846 trace_id: rv.trace_ctx.as_ref().map(|ctx| ctx.trace_id.clone()),
847 freshness: rv.freshness.as_str().to_string(),
848 metadata: rv.metadata.as_ref().map(|v| v.to_string()),
849 });
850
851 if rv.preferred_open {
852 preferred_relation_intervals.push((
853 rv.subject_entity_id.as_str().to_string(),
854 rv.predicate.clone(),
855 rv.object_anchor.to_string(),
856 rv.scope_key.namespace.clone(),
857 rv.scope_key.domain.clone(),
858 rv.scope_key.workspace_id.clone(),
859 rv.scope_key.repo_id.clone(),
860 rv.projection_family.clone(),
861 rv.relation_version_id.as_str().to_string(),
862 valid_from,
863 valid_to,
864 ));
865 }
866 }
867 ImportProjectionRecord::EntityAlias(ea) => {
868 alias_count += 1;
869 alias_rows.push(projection_storage::EntityAliasRow {
870 canonical_entity_id: ea.canonical_entity_id.as_str().to_string(),
871 alias_text: ea.alias_text.clone(),
872 alias_source: ea.alias_source.clone(),
873 match_evidence: ea.match_evidence.as_ref().map(|v| v.to_string()),
874 confidence: ea.confidence,
875 merge_decision: encode_merge_decision(&ea.merge_decision),
876 scope_namespace: ea.scope.namespace.clone(),
877 scope_domain: ea.scope.domain.clone(),
878 scope_workspace_id: ea.scope.workspace_id.clone(),
879 scope_repo_id: ea.scope.repo_id.clone(),
880 review_state: encode_review_state(&ea.review_state),
881 is_human_confirmed: ea.is_human_confirmed,
882 is_human_confirmed_final: ea.is_human_confirmed_final,
883 superseded_by_entity_id: ea
884 .superseded_by_entity_id
885 .as_ref()
886 .map(|id| id.as_str().to_string()),
887 split_from_entity_id: ea
888 .split_from_entity_id
889 .as_ref()
890 .map(|id| id.as_str().to_string()),
891 source_envelope_id: ea.source_envelope_id.as_str().to_string(),
892 recorded_at: String::new(),
893 });
894 }
895 ImportProjectionRecord::EvidenceRef(er) => {
896 evidence_count += 1;
897 evidence_rows.push(projection_storage::EvidenceRefRow {
898 claim_id: er.claim_id.as_str().to_string(),
899 claim_version_id: er
900 .claim_version_id
901 .as_ref()
902 .map(|id| id.as_str().to_string()),
903 fetch_handle: er.fetch_handle.clone(),
904 source_authority: er.source_authority.clone(),
905 source_envelope_id: er.source_envelope_id.as_str().to_string(),
906 recorded_at: String::new(),
907 metadata: er.metadata.as_ref().map(|v| v.to_string()),
908 });
909 }
910 ImportProjectionRecord::Episode(ep) => {
911 episode_count += 1;
912 episode_rows.push(projection_storage::EpisodeLinkRow {
913 episode_id: ep.episode_id.as_str().to_string(),
914 document_id: ep.document_id.clone(),
915 cause_ids: serde_json::to_string(&ep.cause_ids)
916 .unwrap_or_else(|_| "[]".into()),
917 effect_type: ep.effect_type.clone(),
918 outcome: ep.outcome.clone(),
919 confidence: ep.confidence,
920 experiment_id: ep.experiment_id.clone(),
921 source_envelope_id: ep.source_envelope_id.as_str().to_string(),
922 source_authority: ep.source_authority.clone(),
923 trace_id: ep.trace_ctx.as_ref().map(|ctx| ctx.trace_id.clone()),
924 recorded_at: String::new(),
925 metadata: ep.metadata.as_ref().map(|v| v.to_string()),
926 });
927 }
928 }
929 }
930
931 for i in 0..preferred_claim_intervals.len() {
932 let (left_claim_id, left_version_id, left_from, left_to) =
933 &preferred_claim_intervals[i];
934 for (right_claim_id, right_version_id, right_from, right_to) in
935 preferred_claim_intervals.iter().skip(i + 1)
936 {
937 if left_claim_id != right_claim_id {
938 continue;
939 }
940 if intervals_overlap(*left_from, *left_to, *right_from, *right_to) {
941 let err = MemoryError::ImportInvalid {
942 reason: format!(
943 "preferred-open claim interval conflict for claim_id {left_claim_id}: \
944 {left_version_id} ({left_from:?}, {left_to:?}) overlaps \
945 {right_version_id} ({right_from:?}, {right_to:?})",
946 ),
947 };
948 let failure_log_row = build_projection_import_log_row(
949 &batch,
950 batch_id.clone(),
951 "failed",
952 chrono::Utc::now().to_rfc3339(),
953 Some(err.to_string()),
954 kernel_payload_json.clone(),
955 record_len,
956 claim_count,
957 relation_count,
958 episode_count,
959 alias_count,
960 evidence_count,
961 )?;
962 self.persist_projection_import_failure_receipt(failure_log_row, &err)
963 .await;
964 return Err(err);
965 }
966 }
967 }
968
969 for i in 0..preferred_relation_intervals.len() {
970 let (
971 left_subject_entity_id,
972 left_predicate,
973 left_object_anchor,
974 left_scope_namespace,
975 left_scope_domain,
976 left_scope_workspace_id,
977 left_scope_repo_id,
978 left_projection_family,
979 left_relation_version_id,
980 left_from,
981 left_to,
982 ) = &preferred_relation_intervals[i];
983 for (
984 right_subject_entity_id,
985 right_predicate,
986 right_object_anchor,
987 right_scope_namespace,
988 right_scope_domain,
989 right_scope_workspace_id,
990 right_scope_repo_id,
991 right_projection_family,
992 right_relation_version_id,
993 right_from,
994 right_to,
995 ) in preferred_relation_intervals.iter().skip(i + 1)
996 {
997 if left_subject_entity_id != right_subject_entity_id
998 || left_predicate != right_predicate
999 || left_object_anchor != right_object_anchor
1000 || left_scope_namespace != right_scope_namespace
1001 || left_scope_domain != right_scope_domain
1002 || left_scope_workspace_id != right_scope_workspace_id
1003 || left_scope_repo_id != right_scope_repo_id
1004 || left_projection_family != right_projection_family
1005 {
1006 continue;
1007 }
1008
1009 if intervals_overlap(*left_from, *left_to, *right_from, *right_to) {
1010 let err = MemoryError::ImportInvalid {
1011 reason: format!(
1012 "preferred-open relation interval conflict for relation key \
1013 ({left_subject_entity_id}, {left_predicate}, {left_object_anchor}, \
1014 {left_scope_namespace}/{left_scope_domain:?}/{left_scope_workspace_id:?}/{left_scope_repo_id:?}, \
1015 {left_projection_family}): {left_relation_version_id} \
1016 ({left_from:?}, {left_to:?}) overlaps {right_relation_version_id} \
1017 ({right_from:?}, {right_to:?})",
1018 ),
1019 };
1020 let failure_log_row = build_projection_import_log_row(
1021 &batch,
1022 batch_id.clone(),
1023 "failed",
1024 chrono::Utc::now().to_rfc3339(),
1025 Some(err.to_string()),
1026 kernel_payload_json.clone(),
1027 record_len,
1028 claim_count,
1029 relation_count,
1030 episode_count,
1031 alias_count,
1032 evidence_count,
1033 )?;
1034 self.persist_projection_import_failure_receipt(failure_log_row, &err)
1035 .await;
1036 return Err(err);
1037 }
1038 }
1039 }
1040
1041 let total_count = record_len;
1042 let success_log_row = build_projection_import_log_row(
1043 &batch,
1044 batch_id.clone(),
1045 "complete",
1046 chrono::Utc::now().to_rfc3339(),
1047 None,
1048 kernel_payload_json,
1049 total_count,
1050 claim_count,
1051 relation_count,
1052 episode_count,
1053 alias_count,
1054 evidence_count,
1055 )?;
1056
1057 let import_result = self
1058 .with_write_conn(move |conn| {
1059 let mut claim_rows = claim_rows;
1060 let mut relation_rows = relation_rows;
1061 let mut alias_rows = alias_rows;
1062 let mut evidence_rows = evidence_rows;
1063 let mut episode_rows = episode_rows;
1064 let preferred_claim_intervals = preferred_claim_intervals;
1065 let preferred_relation_intervals = preferred_relation_intervals;
1066 let mut log_row = success_log_row;
1067 db::with_transaction(conn, |tx| {
1068 if projection_storage::check_projection_import_exists(
1069 tx,
1070 &source_envelope_id,
1071 &schema_version,
1072 &content_digest,
1073 )? {
1074 return Ok(ProjectionImportResult {
1075 source_envelope_id,
1076 status: "already_imported".into(),
1077 record_count: total_count,
1078 was_duplicate: true,
1079 });
1080 }
1081
1082 check_projection_identity_conflicts(
1083 tx,
1084 &source_envelope_id,
1085 &content_digest,
1086 &claim_rows,
1087 &relation_rows,
1088 )?;
1089
1090 let imported_at = chrono::Utc::now().to_rfc3339();
1091 for cv in &mut claim_rows {
1092 cv.recorded_at = imported_at.clone();
1093 }
1094 for rv in &mut relation_rows {
1095 rv.recorded_at = imported_at.clone();
1096 }
1097 for ea in &mut alias_rows {
1098 ea.recorded_at = imported_at.clone();
1099 }
1100 for er in &mut evidence_rows {
1101 er.recorded_at = imported_at.clone();
1102 }
1103 for el in &mut episode_rows {
1104 el.recorded_at = imported_at.clone();
1105 }
1106
1107 log_row.imported_at = imported_at.clone();
1108
1109 for (claim_id, claim_version_id, interval_from, interval_to) in
1110 preferred_claim_intervals.iter()
1111 {
1112 for (existing_claim_version_id, existing_valid_from, existing_valid_to) in
1113 projection_storage::query_preferred_claim_intervals(tx, claim_id)?
1114 {
1115 let existing_valid_from = parse_stored_timestamp(
1116 existing_valid_from.as_deref(),
1117 "claim_versions",
1118 &existing_claim_version_id,
1119 "valid_from",
1120 )?;
1121 let existing_valid_to = parse_stored_timestamp(
1122 existing_valid_to.as_deref(),
1123 "claim_versions",
1124 &existing_claim_version_id,
1125 "valid_to",
1126 )?;
1127 if intervals_overlap(
1128 *interval_from,
1129 *interval_to,
1130 existing_valid_from,
1131 existing_valid_to,
1132 ) {
1133 return Err(MemoryError::ImportInvalid {
1134 reason: format!(
1135 "preferred-open claim interval conflict for claim_id {claim_id}: \
1136 incoming {claim_version_id} ({interval_from:?}, {interval_to:?}) \
1137 overlaps existing {existing_claim_version_id} \
1138 ({existing_valid_from:?}, {existing_valid_to:?})"
1139 ),
1140 });
1141 }
1142 }
1143 }
1144
1145 for (
1146 subject_entity_id,
1147 predicate,
1148 object_anchor,
1149 scope_namespace,
1150 scope_domain,
1151 scope_workspace_id,
1152 scope_repo_id,
1153 projection_family,
1154 relation_version_id,
1155 interval_from,
1156 interval_to,
1157 ) in preferred_relation_intervals.iter()
1158 {
1159 for (
1160 existing_relation_version_id,
1161 existing_valid_from,
1162 existing_valid_to,
1163 ) in projection_storage::query_preferred_relation_intervals(
1164 tx,
1165 subject_entity_id,
1166 predicate,
1167 object_anchor,
1168 scope_namespace,
1169 scope_domain.as_deref(),
1170 scope_workspace_id.as_deref(),
1171 scope_repo_id.as_deref(),
1172 projection_family,
1173 )? {
1174 let existing_valid_from = parse_stored_timestamp(
1175 existing_valid_from.as_deref(),
1176 "relation_versions",
1177 &existing_relation_version_id,
1178 "valid_from",
1179 )?;
1180 let existing_valid_to = parse_stored_timestamp(
1181 existing_valid_to.as_deref(),
1182 "relation_versions",
1183 &existing_relation_version_id,
1184 "valid_to",
1185 )?;
1186 if intervals_overlap(
1187 *interval_from,
1188 *interval_to,
1189 existing_valid_from,
1190 existing_valid_to,
1191 ) {
1192 return Err(MemoryError::ImportInvalid {
1193 reason: format!(
1194 "preferred-open relation interval conflict for relation key \
1195 ({subject_entity_id}, {predicate}, {object_anchor}, \
1196 {scope_namespace}/{scope_domain:?}/{scope_workspace_id:?}/{scope_repo_id:?}, \
1197 {projection_family}): incoming {relation_version_id} \
1198 ({interval_from:?}, {interval_to:?}) overlaps existing \
1199 {existing_relation_version_id} ({existing_valid_from:?}, \
1200 {existing_valid_to:?})"
1201 ),
1202 });
1203 }
1204 }
1205 }
1206
1207 for cv in &claim_rows {
1208 projection_storage::insert_claim_version(tx, cv)?;
1209 }
1210 for rv in &relation_rows {
1211 projection_storage::insert_relation_version(tx, rv)?;
1212 }
1213 for ea in &alias_rows {
1214 projection_storage::insert_entity_alias(tx, ea)?;
1215 }
1216 for er in &evidence_rows {
1217 projection_storage::insert_evidence_ref(tx, er)?;
1218 }
1219 for el in &episode_rows {
1220 projection_storage::insert_episode_link(tx, el)?;
1221 }
1222
1223 let add_edge = |source_kind: &str,
1224 source_id: &str,
1225 target_kind: &str,
1226 target_id: &str,
1227 derivation_type: &str,
1228 invalidation_mode: &str| {
1229 projection_storage::insert_derivation_edge(
1230 tx,
1231 source_kind,
1232 source_id,
1233 target_kind,
1234 target_id,
1235 derivation_type,
1236 invalidation_mode,
1237 )
1238 };
1239
1240 for cv in &claim_rows {
1241 add_edge(
1242 "claim",
1243 cv.claim_id.as_str(),
1244 "claim_version",
1245 cv.claim_version_id.as_str(),
1246 "claim_version_of",
1247 "on_source_change",
1248 )?;
1249
1250 if let Some(previous_version_id) = &cv.supersedes_claim_version_id {
1251 add_edge(
1252 "claim_version",
1253 previous_version_id.as_str(),
1254 "claim_version",
1255 cv.claim_version_id.as_str(),
1256 "supersedes",
1257 "on_supersession",
1258 )?;
1259 }
1260 }
1261
1262 for rv in &relation_rows {
1263 if let Some(claim_id) = &rv.claim_id {
1264 add_edge(
1265 "claim",
1266 claim_id.as_str(),
1267 "relation_version",
1268 rv.relation_version_id.as_str(),
1269 "supports_claim",
1270 "on_source_change",
1271 )?;
1272 }
1273
1274 if let Some(episode_id) = &rv.source_episode_id {
1275 add_edge(
1276 "episode",
1277 episode_id.as_str(),
1278 "relation_version",
1279 rv.relation_version_id.as_str(),
1280 "supports_episode",
1281 "on_source_change",
1282 )?;
1283 }
1284
1285 if let Some(previous_relation_id) = &rv.supersedes_relation_version_id {
1286 add_edge(
1287 "relation_version",
1288 previous_relation_id.as_str(),
1289 "relation_version",
1290 rv.relation_version_id.as_str(),
1291 "supersedes",
1292 "on_supersession",
1293 )?;
1294 }
1295 }
1296
1297 for ea in &alias_rows {
1298 add_edge(
1299 "entity",
1300 ea.canonical_entity_id.as_str(),
1301 "entity_alias",
1302 ea.canonical_entity_id.as_str(),
1303 "canonical_alias",
1304 "on_alias_split",
1305 )?;
1306
1307 if let Some(split_from_entity_id) = &ea.split_from_entity_id {
1308 add_edge(
1309 "entity",
1310 split_from_entity_id.as_str(),
1311 "entity_alias",
1312 ea.canonical_entity_id.as_str(),
1313 "alias_split_from",
1314 "on_alias_split",
1315 )?;
1316 }
1317
1318 if let Some(superseded_by_entity_id) = &ea.superseded_by_entity_id {
1319 add_edge(
1320 "entity",
1321 superseded_by_entity_id.as_str(),
1322 "entity_alias",
1323 ea.canonical_entity_id.as_str(),
1324 "alias_superseded_by",
1325 "on_supersession",
1326 )?;
1327 }
1328 }
1329
1330 for er in &evidence_rows {
1331 projection_storage::insert_derivation_edge(
1332 tx,
1333 "claim",
1334 er.claim_id.as_str(),
1335 "evidence_ref",
1336 &er.fetch_handle,
1337 "supports",
1338 "on_source_change",
1339 )?;
1340
1341 if let Some(cvid) = &er.claim_version_id {
1342 projection_storage::insert_derivation_edge(
1343 tx,
1344 "claim_version",
1345 cvid.as_str(),
1346 "evidence_ref",
1347 &er.fetch_handle,
1348 "supports",
1349 "on_source_change",
1350 )?;
1351 }
1352 }
1353
1354 for el in &episode_rows {
1355 let cause_ids: Vec<String> =
1356 serde_json::from_str(&el.cause_ids).map_err(|err| {
1357 MemoryError::ImportInvalid {
1358 reason: format!(
1359 "episode {} has invalid cause_ids JSON: {}",
1360 el.episode_id, err
1361 ),
1362 }
1363 })?;
1364
1365 for cause_id in &cause_ids {
1366 add_edge(
1367 "claim",
1368 cause_id,
1369 "episode",
1370 el.episode_id.as_str(),
1371 "caused_by_claim",
1372 "on_source_change",
1373 )?;
1374 }
1375 }
1376
1377 projection_storage::insert_projection_import_log(tx, &log_row)?;
1378
1379 Ok(ProjectionImportResult {
1380 source_envelope_id,
1381 status: "complete".into(),
1382 record_count: total_count,
1383 was_duplicate: false,
1384 })
1385 })
1386 })
1387 .await;
1388
1389 if let Err(ref error) = import_result {
1390 self.persist_projection_import_failure_receipt(base_failure_log_row, error)
1391 .await;
1392 }
1393
1394 import_result
1395 }
1396
1397 pub async fn import_projection_batch_json_compat(
1403 &self,
1404 batch_json: &str,
1405 ) -> Result<ProjectionImportResult, MemoryError> {
1406 let batch = match json_compat_import::decode_projection_batch_json_compat(batch_json) {
1407 Ok(batch) => batch,
1408 Err(error) => {
1409 self.persist_projection_import_failure_receipt(
1410 json_compat_import::build_json_compat_failure_log_row(
1411 batch_json,
1412 chrono::Utc::now().to_rfc3339(),
1413 ),
1414 &error,
1415 )
1416 .await;
1417 return Err(error);
1418 }
1419 };
1420 self.import_projection_batch(&batch).await
1421 }
1422
1423 pub async fn query_projection_imports(
1425 &self,
1426 scope_namespace: Option<&str>,
1427 limit: usize,
1428 ) -> Result<Vec<ProjectionImportLogEntry>, MemoryError> {
1429 let ns = scope_namespace.map(|s| s.to_string());
1430 self.with_read_conn(move |conn| {
1431 let rows = projection_storage::query_projection_import_log(conn, ns.as_deref(), limit)?;
1432 rows.into_iter()
1433 .map(projection_import_log_entry_from_row)
1434 .collect::<Result<Vec<_>, MemoryError>>()
1435 })
1436 .await
1437 }
1438
1439 pub async fn latest_rebuildable_kernel_projection_import_for_scope(
1442 &self,
1443 scope_key: &ScopeKey,
1444 ) -> Result<Option<ProjectionImportLogEntry>, MemoryError> {
1445 let scope_key = scope_key.clone();
1446 self.with_read_conn(move |conn| {
1447 projection_storage::latest_rebuildable_kernel_projection_import(conn, &scope_key)?
1448 .map(projection_import_log_entry_from_row)
1449 .transpose()
1450 })
1451 .await
1452 }
1453
1454 pub async fn query_projection_import_failures(
1456 &self,
1457 scope_namespace: Option<&str>,
1458 limit: usize,
1459 ) -> Result<Vec<ProjectionImportFailureReceiptEntry>, MemoryError> {
1460 let ns = scope_namespace.map(|s| s.to_string());
1461 self.with_read_conn(move |conn| {
1462 let rows =
1463 projection_storage::query_projection_import_failures(conn, ns.as_deref(), limit)?;
1464 rows.into_iter()
1465 .map(projection_import_failure_entry_from_row)
1466 .collect::<Result<Vec<_>, MemoryError>>()
1467 })
1468 .await
1469 }
1470}