Skip to main content

hirn_engine/db/
cross_agent.rs

1use hirn_core::offline::{GeneratedCognitionReview, GeneratedCognitionRollbackReceipt};
2use hirn_core::types::Origin;
3
4use super::*;
5
6fn quarantine_filter(id: MemoryId) -> String {
7    format!("memory_id = '{}'", id.to_string().replace('\'', "''"))
8}
9
10impl HirnDB {
11    // ── Cross-Agent Consolidation ───────────────────────────────────────
12
13    /// Detect and merge or flag semantic records from different agents
14    /// that describe the same concept within a given namespace.
15    ///
16    /// Returns a summary of what was merged and what contradictions were found.
17    pub(crate) async fn cross_agent_consolidate(
18        &self,
19        target_namespace: &Namespace,
20        auto_merge_threshold: f32,
21    ) -> HirnResult<CrossAgentConsolidationResult> {
22        // 1. Collect all semantic records in the target namespace.
23        let filter = SemanticFilter {
24            namespace: Some(target_namespace.clone()),
25            ..Default::default()
26        };
27        let records = self.list_semantics(&filter).await?;
28
29        // 2. Group by concept name (exact match).
30        let mut by_concept: std::collections::HashMap<String, Vec<SemanticRecord>> =
31            std::collections::HashMap::new();
32        for rec in records {
33            by_concept.entry(rec.concept.clone()).or_default().push(rec);
34        }
35
36        let mut merged_count = 0usize;
37        let mut contradiction_count = 0usize;
38        let mut merged_ids: Vec<MemoryId> = Vec::new();
39        let mut contradiction_pairs: Vec<(MemoryId, MemoryId)> = Vec::new();
40
41        // 3. For each concept with multiple records from different agents, decide merge vs contradict.
42        for group in by_concept.values() {
43            if group.len() < 2 {
44                continue;
45            }
46
47            // Only consider groups with records from different agents.
48            let agents: std::collections::HashSet<&hirn_core::types::AgentId> =
49                group.iter().map(|r| &r.provenance.created_by).collect();
50            if agents.len() < 2 {
51                continue;
52            }
53
54            // Check if all records agree (high confidence on all).
55            let all_confident = group.iter().all(|r| r.confidence >= auto_merge_threshold);
56
57            if all_confident {
58                // Merge: absorb the group into the strongest current head.
59                let source_ids: Vec<MemoryId> = group.iter().map(|r| r.id).collect();
60                let source_agents: Vec<hirn_core::types::AgentId> =
61                    agents.iter().cloned().cloned().collect();
62                let merged = self.merge_semantic_group(group).await?;
63
64                self.append_audit(
65                    None,
66                    hirn_core::audit::AuditAction::CrossAgentMerge {
67                        source_ids,
68                        result_id: merged,
69                        source_agents,
70                    },
71                )
72                .await?;
73
74                merged_ids.push(merged);
75                merged_count += 1;
76            } else {
77                // Flag contradictions between records.
78                for i in 0..group.len() {
79                    for j in (i + 1)..group.len() {
80                        let a = &group[i];
81                        let b = &group[j];
82
83                        // Check if there's already a Contradicts edge.
84                        let has_contradiction = {
85                            let existing = self
86                                .cached_graph()
87                                .get_edges_between(a.id, b.id)
88                                .await
89                                .unwrap_or_default();
90                            existing
91                                .iter()
92                                .any(|e| e.relation == EdgeRelation::Contradicts)
93                        };
94
95                        if !has_contradiction {
96                            self.connect_with(
97                                a.id,
98                                b.id,
99                                EdgeRelation::Contradicts,
100                                1.0,
101                                Metadata::default(),
102                            )
103                            .await?;
104                            contradiction_pairs.push((a.id, b.id));
105                            contradiction_count += 1;
106                        }
107                    }
108                }
109            }
110        }
111
112        Ok(CrossAgentConsolidationResult {
113            merged_count,
114            contradiction_count,
115            merged_ids,
116            contradiction_pairs,
117        })
118    }
119
120    /// Merge a group of semantic records about the same concept into one.
121    async fn merge_semantic_group(&self, group: &[SemanticRecord]) -> HirnResult<MemoryId> {
122        // Pick the highest-confidence record as the active target chain.
123        let best = group
124            .iter()
125            .max_by(|a, b| a.confidence.total_cmp(&b.confidence))
126            .unwrap();
127
128        let merged = self
129            .merge_semantic(
130                best.id,
131                SemanticMerge {
132                    source_ids: group
133                        .iter()
134                        .filter(|record| record.logical_memory_id != best.logical_memory_id)
135                        .map(|record| record.id)
136                        .collect(),
137                    reason: Some("cross-agent consolidation".to_string()),
138                    ..SemanticMerge::with_metadata(
139                        AgentId::well_known("cross_agent_consolidation"),
140                        best.id,
141                    )
142                },
143            )
144            .await?;
145
146        Ok(merged.target.id)
147    }
148
149    /// Compute an anomaly score for a record before insertion.
150    /// Returns a score in [0.0, 1.0] where higher = more anomalous.
151    pub(crate) async fn compute_anomaly_score(&self, record: &EpisodicRecord) -> HirnResult<f32> {
152        let embedding = match &record.embedding {
153            Some(emb) => emb,
154            None => return Ok(0.0), // no embedding = can't measure anomaly
155        };
156
157        // F-51: During cold start (fewer than 10 records), anomaly detection
158        // is unreliable because the sparse index gives low similarity to
159        // legitimate but topically diverse records.
160        let ep_count = self
161            .storage_runtime
162            .count(hirn_storage::datasets::episodic::DATASET_NAME, None)
163            .await
164            .unwrap_or(0);
165        let sem_count = self
166            .storage_runtime
167            .count(hirn_storage::datasets::semantic::DATASET_NAME, None)
168            .await
169            .unwrap_or(0);
170        let total_records = ep_count + sem_count;
171        if total_records < 10 {
172            return Ok(0.0);
173        }
174
175        // Find the nearest neighbor via LanceDB vector search.
176        let metric = self.distance_metric();
177        let results = self.vector_search_all(embedding, 1, metric).await?;
178        if results.is_empty() {
179            return Ok(0.5); // can't find neighbors, moderately suspicious
180        }
181
182        let similarity = results[0].1;
183
184        // Low similarity to all existing memories = outlier.
185        // Score: 1.0 - similarity (so similarity=0.1 → anomaly=0.9).
186        let embedding_anomaly = 1.0 - similarity;
187
188        // Future timestamp check.
189        let now = hirn_core::Timestamp::now();
190        let temporal_anomaly = if record.timestamp > now { 0.5 } else { 0.0 };
191
192        // Combined score (weighted average).
193        let score = (embedding_anomaly * 0.7 + temporal_anomaly * 0.3).min(1.0);
194        Ok(score)
195    }
196
197    /// Quarantine a record: store it in quarantine dataset instead of the main store.
198    /// Also records the event in the collective corruption defense tracker.
199    pub(crate) async fn quarantine_record(
200        &self,
201        record: &EpisodicRecord,
202        anomaly_score: f32,
203        agent_id: &hirn_core::types::AgentId,
204    ) -> HirnResult<MemoryId> {
205        // Collective corruption defense: check if this agent is already rate-limited.
206        if let Some(config) = self.admission_runtime().rate_limit_config(agent_id) {
207            return Err(HirnError::RateLimited(format!(
208                "agent '{}' exceeded {} quarantine events in {} seconds",
209                agent_id, config.max_quarantines_per_window, config.window_seconds,
210            )));
211        }
212
213        let id = record.id;
214        let record_bytes =
215            bincode::serialize(record).map_err(|e| StoreError::Serialization(e.to_string()))?;
216
217        let row = hirn_storage::datasets::quarantine::QuarantineRow {
218            memory_id: id,
219            record_kind: hirn_core::QuarantinedRecordKind::Episodic,
220            record_bytes,
221            anomaly_score,
222            reason: format!("anomaly score {anomaly_score:.2} exceeds threshold"),
223            status: hirn_storage::datasets::quarantine::QuarantineStatus::Pending,
224            created_at: Timestamp::now(),
225            reviewed_by: None,
226            reviewed_at: None,
227            generated_review: None,
228        };
229
230        let batch = hirn_storage::datasets::quarantine::to_batch(std::slice::from_ref(&row))
231            .map_err(|e| HirnError::storage(e))?;
232        self.storage_runtime
233            .append(hirn_storage::datasets::quarantine::DATASET_NAME, batch)
234            .await
235            .map_err(|e| HirnError::storage(e))?;
236
237        self.append_audit(
238            Some(agent_id.clone()),
239            hirn_core::audit::AuditAction::Quarantine {
240                memory_id: id,
241                anomaly_score,
242                reason: row.reason,
243            },
244        )
245        .await?;
246
247        // Track quarantine event for collective corruption defense.
248        let rate_limit_info = self.admission_runtime().record_quarantine(agent_id);
249        if let Some(config) = rate_limit_info {
250            let _ = self
251                .append_audit(
252                    Some(agent_id.clone()),
253                    hirn_core::audit::AuditAction::AgentRateLimited {
254                        agent_id: agent_id.clone(),
255                        quarantined_count: config.max_quarantines_per_window + 1,
256                        window_seconds: config.window_seconds,
257                    },
258                )
259                .await;
260        }
261
262        Err(HirnError::Quarantined(format!(
263            "memory {id} quarantined (anomaly score: {anomaly_score:.2})"
264        )))
265    }
266
267    /// List all quarantined records.
268    pub(crate) async fn review_quarantine(
269        &self,
270    ) -> HirnResult<Vec<crate::security::QuarantineEntry>> {
271        let filter = "status = 'Pending'".to_string();
272        let opts = hirn_storage::store::ScanOptions {
273            filter: Some(filter),
274            ..Default::default()
275        };
276        let batches = self
277            .storage_runtime
278            .scan(hirn_storage::datasets::quarantine::DATASET_NAME, opts)
279            .await
280            .map_err(|e| HirnError::storage(e))?;
281
282        let mut result = Vec::new();
283        for batch in &batches {
284            let rows = hirn_storage::datasets::quarantine::from_batch(batch)
285                .map_err(|e| HirnError::storage(e))?;
286            for row in rows {
287                result.push(crate::security::QuarantineEntry {
288                    memory_id: row.memory_id,
289                    record_kind: row.record_kind,
290                    record: row.record_bytes,
291                    anomaly_score: row.anomaly_score,
292                    reason: row.reason,
293                    status: match row.status {
294                        hirn_storage::datasets::quarantine::QuarantineStatus::Pending => {
295                            crate::security::QuarantineStatus::Pending
296                        }
297                        hirn_storage::datasets::quarantine::QuarantineStatus::Approved => {
298                            crate::security::QuarantineStatus::Approved
299                        }
300                        hirn_storage::datasets::quarantine::QuarantineStatus::Rejected => {
301                            crate::security::QuarantineStatus::Rejected
302                        }
303                        hirn_storage::datasets::quarantine::QuarantineStatus::RolledBack => {
304                            crate::security::QuarantineStatus::RolledBack
305                        }
306                    },
307                    created_at: row.created_at,
308                    reviewed_by: row.reviewed_by,
309                    reviewed_at: row.reviewed_at,
310                    generated_review: row.generated_review,
311                });
312            }
313        }
314        Ok(result)
315    }
316
317    async fn load_quarantine_row(
318        &self,
319        id: MemoryId,
320    ) -> HirnResult<hirn_storage::datasets::quarantine::QuarantineRow> {
321        let filter = quarantine_filter(id);
322        let opts = hirn_storage::store::ScanOptions {
323            filter: Some(filter),
324            ..Default::default()
325        };
326        let batches = self
327            .storage_runtime
328            .scan(hirn_storage::datasets::quarantine::DATASET_NAME, opts)
329            .await
330            .map_err(HirnError::storage)?;
331
332        for batch in &batches {
333            let rows = hirn_storage::datasets::quarantine::from_batch(batch)
334                .map_err(HirnError::storage)?;
335            if let Some(row) = rows.into_iter().next() {
336                return Ok(row);
337            }
338        }
339
340        Err(HirnError::NotFound(format!("quarantine entry {id}")))
341    }
342
343    async fn replace_quarantine_row(
344        &self,
345        row: &hirn_storage::datasets::quarantine::QuarantineRow,
346    ) -> HirnResult<()> {
347        let filter = quarantine_filter(row.memory_id);
348        self.storage_runtime
349            .delete(hirn_storage::datasets::quarantine::DATASET_NAME, &filter)
350            .await
351            .map_err(HirnError::storage)?;
352
353        let batch = hirn_storage::datasets::quarantine::to_batch(std::slice::from_ref(row))
354            .map_err(HirnError::storage)?;
355        self.storage_runtime
356            .append(hirn_storage::datasets::quarantine::DATASET_NAME, batch)
357            .await
358            .map_err(HirnError::storage)?;
359        Ok(())
360    }
361
362    /// Approve a quarantined memory: move it from quarantine to the main store.
363    pub(crate) async fn approve_quarantine(
364        &self,
365        id: MemoryId,
366        approved_by: AgentId,
367    ) -> HirnResult<crate::security::QuarantineApprovalOutcome> {
368        let mut row = self.load_quarantine_row(id).await?;
369        if row.status != hirn_storage::datasets::quarantine::QuarantineStatus::Pending {
370            return Err(HirnError::InvalidInput(format!(
371                "quarantine entry {id} is not pending review"
372            )));
373        }
374        if let Some(review) = row.generated_review.as_ref() {
375            if !review.allows_promotion() {
376                return Err(HirnError::InvalidInput(format!(
377                    "quarantine entry {id} failed the generated cognition quality gate"
378                )));
379            }
380        }
381
382        let outcome = match row.record_kind {
383            hirn_core::QuarantinedRecordKind::Episodic => {
384                let record: EpisodicRecord = bincode::deserialize(&row.record_bytes)
385                    .map_err(|e| StoreError::Serialization(e.to_string()))?;
386                let applied_id = self.remember(record).await?;
387                crate::security::QuarantineApprovalOutcome {
388                    approved_entry_id: id,
389                    applied_memory_ids: vec![applied_id],
390                    change_summary: "promoted quarantined episodic record".to_string(),
391                    generated_review: None,
392                }
393            }
394            hirn_core::QuarantinedRecordKind::Semantic => {
395                let record: SemanticRecord = bincode::deserialize(&row.record_bytes)
396                    .map_err(|e| StoreError::Serialization(e.to_string()))?;
397                self.approve_quarantined_semantic(
398                    id,
399                    record,
400                    approved_by,
401                    row.generated_review.clone(),
402                )
403                .await?
404            }
405        };
406
407        row.status = hirn_storage::datasets::quarantine::QuarantineStatus::Approved;
408        row.reviewed_by = Some(approved_by);
409        row.reviewed_at = Some(Timestamp::now());
410        row.generated_review.clone_from(&outcome.generated_review);
411        self.replace_quarantine_row(&row).await?;
412
413        self.append_audit(
414            Some(approved_by),
415            hirn_core::audit::AuditAction::QuarantineApproved { memory_id: id },
416        )
417        .await?;
418
419        Ok(outcome)
420    }
421
422    async fn approve_quarantined_semantic(
423        &self,
424        entry_id: MemoryId,
425        record: SemanticRecord,
426        approved_by: AgentId,
427        generated_review: Option<GeneratedCognitionReview>,
428    ) -> HirnResult<crate::security::QuarantineApprovalOutcome> {
429        let extraction_model = record
430            .provenance
431            .extraction_model
432            .as_deref()
433            .unwrap_or_default();
434        if extraction_model.starts_with("offline-reconcile:") {
435            let proposal = hirn_core::ReconcileProposal::from_json(&record.description)?;
436            return self
437                .approve_reconcile_proposal(
438                    entry_id,
439                    record.namespace,
440                    proposal,
441                    approved_by,
442                    generated_review,
443                )
444                .await;
445        }
446
447        let applied_id = self.store_semantic(record).await?;
448        let mut generated_review = generated_review;
449        if let Some(review) = generated_review.as_mut() {
450            review.attach_rollback_receipt(GeneratedCognitionRollbackReceipt {
451                applied_memory_ids: vec![applied_id],
452                previous_active_memory_ids: Vec::new(),
453            });
454            review.mark_approved();
455        }
456        Ok(crate::security::QuarantineApprovalOutcome {
457            approved_entry_id: entry_id,
458            applied_memory_ids: vec![applied_id],
459            change_summary: "promoted quarantined semantic record".to_string(),
460            generated_review,
461        })
462    }
463
464    async fn approve_reconcile_proposal(
465        &self,
466        entry_id: MemoryId,
467        namespace: Namespace,
468        proposal: hirn_core::ReconcileProposal,
469        approved_by: AgentId,
470        generated_review: Option<GeneratedCognitionReview>,
471    ) -> HirnResult<crate::security::QuarantineApprovalOutcome> {
472        let approved_at = Timestamp::now();
473        let mut resolved_heads = Vec::with_capacity(proposal.members.len());
474        for member in &proposal.members {
475            let head = self
476                .semantic_head_for_logical_id(member.logical_memory_id)
477                .await?;
478            if head.id != member.memory_id {
479                return Err(HirnError::InvalidInput(format!(
480                    "reconcile proposal {} is stale for logical memory {}: expected head {}, found {}",
481                    proposal.conflict_id, member.logical_memory_id, member.memory_id, head.id
482                )));
483            }
484            if !head.is_live() {
485                return Err(HirnError::InvalidInput(format!(
486                    "reconcile proposal {} targets non-live logical memory {}",
487                    proposal.conflict_id, member.logical_memory_id
488                )));
489            }
490            resolved_heads.push(head);
491        }
492
493        let winner_id = proposal
494            .preferred_memory_id
495            .or(proposal.authoritative_memory_id);
496        let winner_logical_id = winner_id.and_then(|memory_id| {
497            proposal
498                .members
499                .iter()
500                .find(|member| member.memory_id == memory_id)
501                .map(|member| member.logical_memory_id)
502        });
503        let rationale = format!(
504            "approved offline reconcile proposal {} with action {}: {}",
505            proposal.conflict_id,
506            proposal.action.as_str(),
507            proposal.rationale
508        );
509        let previous_active_memory_ids = resolved_heads.iter().map(|head| head.id).collect();
510        let mut applied_memory_ids = Vec::new();
511
512        match proposal.action {
513            hirn_core::ReconcileProposalAction::RetainBoth
514            | hirn_core::ReconcileProposalAction::EscalateForReview => {}
515            hirn_core::ReconcileProposalAction::Supersede => {
516                let winner_id = winner_id.ok_or_else(|| {
517                    HirnError::InvalidInput(format!(
518                        "reconcile proposal {} cannot supersede without a preferred memory",
519                        proposal.conflict_id
520                    ))
521                })?;
522                let superseding = self
523                    .supersede_semantic(
524                        winner_id,
525                        SemanticSupersession {
526                            reason: Some(rationale.clone()),
527                            actor_id: approved_by,
528                            observed_at: Some(approved_at),
529                            causation_id: entry_id,
530                            description: None,
531                            confidence: None,
532                            evidence_count: None,
533                        },
534                    )
535                    .await?;
536                applied_memory_ids.push(superseding.id);
537
538                for loser in resolved_heads
539                    .iter()
540                    .filter(|record| Some(record.logical_memory_id) != winner_logical_id)
541                {
542                    let tombstone = self
543                        .retract_semantic(
544                            loser.id,
545                            SemanticRetraction {
546                                reason: Some(rationale.clone()),
547                                actor_id: approved_by,
548                                observed_at: Some(approved_at),
549                                causation_id: entry_id,
550                            },
551                        )
552                        .await?;
553                    applied_memory_ids.push(tombstone.id);
554                }
555            }
556            hirn_core::ReconcileProposalAction::Retract => {
557                let winner_logical_id = winner_logical_id.ok_or_else(|| {
558                    HirnError::InvalidInput(format!(
559                        "reconcile proposal {} cannot retract losers without a preferred memory",
560                        proposal.conflict_id
561                    ))
562                })?;
563                for loser in resolved_heads
564                    .iter()
565                    .filter(|record| record.logical_memory_id != winner_logical_id)
566                {
567                    let tombstone = self
568                        .retract_semantic(
569                            loser.id,
570                            SemanticRetraction {
571                                reason: Some(rationale.clone()),
572                                actor_id: approved_by,
573                                observed_at: Some(approved_at),
574                                causation_id: entry_id,
575                            },
576                        )
577                        .await?;
578                    applied_memory_ids.push(tombstone.id);
579                }
580            }
581            hirn_core::ReconcileProposalAction::Quarantine => {
582                let winner_logical_id = winner_logical_id.ok_or_else(|| {
583                    HirnError::InvalidInput(format!(
584                        "reconcile proposal {} cannot quarantine generated losers without a preferred memory",
585                        proposal.conflict_id
586                    ))
587                })?;
588                let mut generated_losers = 0usize;
589                for loser in resolved_heads.iter().filter(|record| {
590                    record.logical_memory_id != winner_logical_id
591                        && matches!(
592                            *record.provenance.origin(),
593                            Origin::DreamReplay | Origin::LlmExtraction | Origin::Consolidation
594                        )
595                }) {
596                    let tombstone = self
597                        .retract_semantic(
598                            loser.id,
599                            SemanticRetraction {
600                                reason: Some(rationale.clone()),
601                                actor_id: approved_by,
602                                observed_at: Some(approved_at),
603                                causation_id: entry_id,
604                            },
605                        )
606                        .await?;
607                    applied_memory_ids.push(tombstone.id);
608                    generated_losers += 1;
609                }
610                if generated_losers == 0 {
611                    return Err(HirnError::InvalidInput(format!(
612                        "reconcile proposal {} selected quarantine but no generated losing heads remain",
613                        proposal.conflict_id
614                    )));
615                }
616            }
617        }
618
619        self.append_audit(
620            Some(approved_by),
621            hirn_core::audit::AuditAction::BeliefReconcileApproved {
622                conflict_id: proposal.conflict_id.clone(),
623                action: proposal.action.as_str().to_string(),
624                namespace: namespace.as_str().to_string(),
625                logical_memory_ids: proposal
626                    .members
627                    .iter()
628                    .map(|member| member.logical_memory_id)
629                    .collect(),
630                applied_memory_ids: applied_memory_ids.clone(),
631                rationale: proposal.rationale.clone(),
632            },
633        )
634        .await?;
635
636        let mut generated_review = generated_review;
637        if let Some(review) = generated_review.as_mut() {
638            review.attach_rollback_receipt(GeneratedCognitionRollbackReceipt {
639                applied_memory_ids: applied_memory_ids.clone(),
640                previous_active_memory_ids,
641            });
642            review.mark_approved();
643        }
644
645        Ok(crate::security::QuarantineApprovalOutcome {
646            approved_entry_id: entry_id,
647            applied_memory_ids,
648            change_summary: format!(
649                "approved reconcile action {} for conflict {}",
650                proposal.action.as_str(),
651                proposal.conflict_id
652            ),
653            generated_review,
654        })
655    }
656
657    /// Reject a quarantined memory and retain the review artifact for inspection.
658    pub(crate) async fn reject_quarantine(&self, id: MemoryId) -> HirnResult<()> {
659        let mut row = self.load_quarantine_row(id).await?;
660        row.status = hirn_storage::datasets::quarantine::QuarantineStatus::Rejected;
661        row.reviewed_at = Some(Timestamp::now());
662        if let Some(review) = row.generated_review.as_mut() {
663            review.mark_rejected("rejected during quarantine review");
664        }
665        self.replace_quarantine_row(&row).await?;
666
667        self.append_audit(
668            None,
669            hirn_core::audit::AuditAction::QuarantineRejected { memory_id: id },
670        )
671        .await?;
672
673        Ok(())
674    }
675
676    pub(crate) async fn rollback_quarantine_approval(
677        &self,
678        id: MemoryId,
679        rolled_back_by: AgentId,
680        reason: String,
681    ) -> HirnResult<crate::security::QuarantineRollbackOutcome> {
682        let mut row = self.load_quarantine_row(id).await?;
683        if row.status != hirn_storage::datasets::quarantine::QuarantineStatus::Approved {
684            return Err(HirnError::InvalidInput(format!(
685                "quarantine entry {id} is not approved"
686            )));
687        }
688
689        let mut generated_review = row.generated_review.clone().ok_or_else(|| {
690            HirnError::InvalidInput(format!(
691                "quarantine entry {id} does not carry generated cognition rollback metadata"
692            ))
693        })?;
694        let receipt = generated_review.rollback_receipt.clone().ok_or_else(|| {
695            HirnError::InvalidInput(format!(
696                "quarantine entry {id} cannot be rolled back because no rollback receipt was recorded"
697            ))
698        })?;
699
700        self.validate_generated_rollback_receipt(&receipt).await?;
701        let restore_logical_ids = self
702            .generated_semantic_logical_ids(&receipt.applied_memory_ids)
703            .await?;
704        let removed_memory_ids = self
705            .delete_generated_semantic_revisions(&receipt.applied_memory_ids)
706            .await?;
707        let restored_memory_ids = self
708            .restore_generated_semantic_heads(&restore_logical_ids)
709            .await?;
710
711        let rolled_back_at = Timestamp::now();
712        generated_review.mark_rolled_back(rolled_back_by.clone(), rolled_back_at, reason.clone());
713        row.status = hirn_storage::datasets::quarantine::QuarantineStatus::RolledBack;
714        row.reviewed_by = Some(rolled_back_by.clone());
715        row.reviewed_at = Some(rolled_back_at);
716        row.generated_review = Some(generated_review.clone());
717        self.replace_quarantine_row(&row).await?;
718
719        self.append_audit(
720            Some(rolled_back_by),
721            hirn_core::audit::AuditAction::QuarantineRolledBack {
722                memory_id: id,
723                removed_memory_ids: removed_memory_ids.clone(),
724                restored_memory_ids: restored_memory_ids.clone(),
725                reason: reason.clone(),
726            },
727        )
728        .await?;
729
730        Ok(crate::security::QuarantineRollbackOutcome {
731            rolled_back_entry_id: id,
732            removed_memory_ids,
733            restored_memory_ids,
734            reason,
735            generated_review: Some(generated_review),
736        })
737    }
738
739    async fn validate_generated_rollback_receipt(
740        &self,
741        receipt: &GeneratedCognitionRollbackReceipt,
742    ) -> HirnResult<()> {
743        for applied_id in &receipt.applied_memory_ids {
744            let record = self.read_semantic_record(*applied_id).await?;
745            let head = self
746                .semantic_head_for_logical_id(record.logical_memory_id)
747                .await?;
748            if head.id != record.id {
749                return Err(HirnError::InvalidInput(format!(
750                    "rollback cannot proceed because logical memory {} advanced beyond generated revision {}",
751                    record.logical_memory_id, applied_id
752                )));
753            }
754        }
755        Ok(())
756    }
757
758    async fn delete_generated_semantic_revisions(
759        &self,
760        applied_memory_ids: &[MemoryId],
761    ) -> HirnResult<Vec<MemoryId>> {
762        let mut removed = Vec::new();
763        for applied_id in applied_memory_ids {
764            let filter = format!("id = '{}'", applied_id.to_string().replace('\'', "''"));
765            self.storage_runtime
766                .delete(hirn_storage::datasets::semantic::DATASET_NAME, &filter)
767                .await
768                .map_err(HirnError::storage)?;
769            if let Err(error) = self.cached_graph().remove_node(*applied_id).await {
770                tracing::debug!(id = %applied_id, error = %error, "generated rollback graph cleanup skipped");
771            }
772            removed.push(*applied_id);
773        }
774        Ok(removed)
775    }
776
777    async fn generated_semantic_logical_ids(
778        &self,
779        applied_memory_ids: &[MemoryId],
780    ) -> HirnResult<std::collections::BTreeSet<hirn_core::revision::LogicalMemoryId>> {
781        let mut logical_ids = std::collections::BTreeSet::new();
782        for applied_id in applied_memory_ids {
783            let record = self.read_semantic_record(*applied_id).await?;
784            logical_ids.insert(record.logical_memory_id);
785        }
786        Ok(logical_ids)
787    }
788
789    async fn restore_generated_semantic_heads(
790        &self,
791        logical_memory_ids: &std::collections::BTreeSet<hirn_core::revision::LogicalMemoryId>,
792    ) -> HirnResult<Vec<MemoryId>> {
793        let mut restored = Vec::new();
794
795        for logical_memory_id in logical_memory_ids {
796            self.evict_semantic_head(*logical_memory_id);
797            match self.semantic_head_for_logical_id(*logical_memory_id).await {
798                Ok(head) => {
799                    self.ensure_semantic_graph_node(&head).await?;
800                    restored.push(head.id);
801                }
802                Err(HirnError::NotFound(_)) => {
803                    self.evict_semantic_head(*logical_memory_id);
804                }
805                Err(error) => return Err(error),
806            }
807        }
808
809        Ok(restored)
810    }
811
812    async fn ensure_semantic_graph_node(&self, record: &SemanticRecord) -> HirnResult<()> {
813        if !self
814            .cached_graph()
815            .has_node(record.id)
816            .await
817            .unwrap_or(false)
818        {
819            self.cached_graph()
820                .add_node(
821                    record.id,
822                    Layer::Semantic,
823                    record.confidence,
824                    record.created_at,
825                    record.namespace,
826                )
827                .await?;
828            if let Some(ref embedding) = record.embedding {
829                let candidates = self.find_similarity_candidates(embedding).await;
830                self.apply_similarity_edges(record.id, &candidates).await?;
831            }
832        }
833        self.cache_semantic_head(record);
834        Ok(())
835    }
836
837    /// Prepare a parameterized HirnQL query for later execution.
838    ///
839    /// Parameters use `$1`, `$2` (positional) or `$name` (named) syntax.
840    /// The returned `PreparedStatement` holds a pre-compiled plan that is
841    /// reused across multiple `execute_prepared` calls.
842    pub(crate) fn prepare(&self, query: &str) -> HirnResult<crate::ql::PreparedStatement> {
843        crate::ql::prepare(query, None).map_err(HirnError::from)
844    }
845
846    /// Execute a prepared statement with bound parameter values.
847    pub(crate) async fn execute_prepared(
848        &self,
849        prepared: &crate::ql::PreparedStatement,
850        params: &std::collections::HashMap<String, String>,
851    ) -> HirnResult<crate::ql::results::QueryResult> {
852        let compiled = crate::ql::bind(prepared, params).map_err(HirnError::from)?;
853        self.execute_ql(&compiled.source).await
854    }
855
856    /// Start building a HirnQL query via the programmatic API.
857    pub(crate) fn query(&self) -> crate::ql::builder::QueryBuilder<'_> {
858        crate::ql::builder::QueryBuilder::new(self)
859    }
860
861    // ── GDPR / Privacy: Right to Erasure ────────────────────────────────
862
863    /// Purge all data associated with an agent: episodic, semantic, procedural
864    /// records in the agent's private namespace, plus graph edges and
865    /// quarantine entries. Also clears corruption defense state.
866    ///
867    /// This implements GDPR Article 17 "Right to Erasure".
868    pub(crate) async fn purge_agent(&self, agent_id: &AgentId) -> HirnResult<PurgeReport> {
869        let private_ns = Namespace::private_for(agent_id);
870
871        // 1. Collect IDs of all records in the agent's private namespace.
872        let episodic_ids = self.list_episodic_ids_in_namespace(&private_ns).await?;
873        let semantic_ids = self.list_semantic_ids_in_namespace(&private_ns).await?;
874        let procedural_ids = self.list_procedural_ids_in_namespace(&private_ns).await?;
875
876        // 2. Delete episodic records (also removes graph nodes).
877        for id in &episodic_ids {
878            let _ = self.delete_episode(*id).await; // ignore NotFound if already cleaned
879        }
880
881        // 3. Delete semantic records.
882        for id in &semantic_ids {
883            let _ = self.purge_semantic(*id).await;
884        }
885
886        // 4. Delete procedural records.
887        for id in &procedural_ids {
888            let _ = self.delete_procedural(*id).await;
889        }
890
891        // 5. Remove any quarantined entries from this agent.
892        let quarantine_removed = self.purge_quarantine_for_agent(agent_id).await?;
893
894        // 6. Clear corruption defense state.
895        self.admission_runtime().clear_agent(agent_id);
896
897        // 7. Count graph edges removed (they were removed by delete_episode/delete_semantic).
898        let edges_removed = 0usize; // edges removed as side-effect of node deletion
899
900        let report = PurgeReport {
901            agent_id: agent_id.clone(),
902            episodic_deleted: episodic_ids.len(),
903            semantic_deleted: semantic_ids.len(),
904            procedural_deleted: procedural_ids.len(),
905            quarantine_removed,
906            edges_removed,
907        };
908
909        self.append_audit(
910            None,
911            hirn_core::audit::AuditAction::AgentPurged {
912                agent_id: agent_id.clone(),
913                episodic_deleted: report.episodic_deleted,
914                semantic_deleted: report.semantic_deleted,
915                procedural_deleted: report.procedural_deleted,
916                edges_removed: report.edges_removed,
917            },
918        )
919        .await?;
920
921        Ok(report)
922    }
923
924    /// Remove all quarantine entries belonging to a specific agent.
925    async fn purge_quarantine_for_agent(&self, agent_id: &AgentId) -> HirnResult<usize> {
926        let opts = hirn_storage::store::ScanOptions::default();
927        let batches = self
928            .storage_runtime
929            .scan(hirn_storage::datasets::quarantine::DATASET_NAME, opts)
930            .await
931            .map_err(|e| HirnError::storage(e))?;
932
933        let mut to_remove: Vec<MemoryId> = Vec::new();
934        for batch in &batches {
935            let rows = hirn_storage::datasets::quarantine::from_batch(batch)
936                .map_err(|e| HirnError::storage(e))?;
937            for row in rows {
938                // Try to deserialize the embedded record to check the agent.
939                if let Ok(rec) = bincode::deserialize::<EpisodicRecord>(&row.record_bytes) {
940                    if rec.provenance.created_by == *agent_id {
941                        to_remove.push(row.memory_id);
942                    }
943                }
944            }
945        }
946
947        let count = to_remove.len();
948        for mid in to_remove {
949            let filter = format!("memory_id = '{mid}'");
950            let _ = self
951                .storage_runtime
952                .delete(hirn_storage::datasets::quarantine::DATASET_NAME, &filter)
953                .await;
954        }
955
956        Ok(count)
957    }
958}
959
960/// Result of a GDPR agent data purge.
961#[derive(Debug, Clone)]
962pub struct PurgeReport {
963    pub agent_id: AgentId,
964    pub episodic_deleted: usize,
965    pub semantic_deleted: usize,
966    pub procedural_deleted: usize,
967    pub quarantine_removed: usize,
968    pub edges_removed: usize,
969}