1use hirn_core::offline::{GeneratedCognitionReview, GeneratedCognitionRollbackReceipt};
2use hirn_core::types::Origin;
3
4use super::*;
5
6fn quarantine_filter(id: MemoryId) -> String {
7 format!("memory_id = '{}'", id.to_string().replace('\'', "''"))
8}
9
10impl HirnDB {
11 pub(crate) async fn cross_agent_consolidate(
18 &self,
19 target_namespace: &Namespace,
20 auto_merge_threshold: f32,
21 ) -> HirnResult<CrossAgentConsolidationResult> {
22 let filter = SemanticFilter {
24 namespace: Some(target_namespace.clone()),
25 ..Default::default()
26 };
27 let records = self.list_semantics(&filter).await?;
28
29 let mut by_concept: std::collections::HashMap<String, Vec<SemanticRecord>> =
31 std::collections::HashMap::new();
32 for rec in records {
33 by_concept.entry(rec.concept.clone()).or_default().push(rec);
34 }
35
36 let mut merged_count = 0usize;
37 let mut contradiction_count = 0usize;
38 let mut merged_ids: Vec<MemoryId> = Vec::new();
39 let mut contradiction_pairs: Vec<(MemoryId, MemoryId)> = Vec::new();
40
41 for group in by_concept.values() {
43 if group.len() < 2 {
44 continue;
45 }
46
47 let agents: std::collections::HashSet<&hirn_core::types::AgentId> =
49 group.iter().map(|r| &r.provenance.created_by).collect();
50 if agents.len() < 2 {
51 continue;
52 }
53
54 let all_confident = group.iter().all(|r| r.confidence >= auto_merge_threshold);
56
57 if all_confident {
58 let source_ids: Vec<MemoryId> = group.iter().map(|r| r.id).collect();
60 let source_agents: Vec<hirn_core::types::AgentId> =
61 agents.iter().cloned().cloned().collect();
62 let merged = self.merge_semantic_group(group).await?;
63
64 self.append_audit(
65 None,
66 hirn_core::audit::AuditAction::CrossAgentMerge {
67 source_ids,
68 result_id: merged,
69 source_agents,
70 },
71 )
72 .await?;
73
74 merged_ids.push(merged);
75 merged_count += 1;
76 } else {
77 for i in 0..group.len() {
79 for j in (i + 1)..group.len() {
80 let a = &group[i];
81 let b = &group[j];
82
83 let has_contradiction = {
85 let existing = self
86 .cached_graph()
87 .get_edges_between(a.id, b.id)
88 .await
89 .unwrap_or_default();
90 existing
91 .iter()
92 .any(|e| e.relation == EdgeRelation::Contradicts)
93 };
94
95 if !has_contradiction {
96 self.connect_with(
97 a.id,
98 b.id,
99 EdgeRelation::Contradicts,
100 1.0,
101 Metadata::default(),
102 )
103 .await?;
104 contradiction_pairs.push((a.id, b.id));
105 contradiction_count += 1;
106 }
107 }
108 }
109 }
110 }
111
112 Ok(CrossAgentConsolidationResult {
113 merged_count,
114 contradiction_count,
115 merged_ids,
116 contradiction_pairs,
117 })
118 }
119
120 async fn merge_semantic_group(&self, group: &[SemanticRecord]) -> HirnResult<MemoryId> {
122 let best = group
124 .iter()
125 .max_by(|a, b| a.confidence.total_cmp(&b.confidence))
126 .unwrap();
127
128 let merged = self
129 .merge_semantic(
130 best.id,
131 SemanticMerge {
132 source_ids: group
133 .iter()
134 .filter(|record| record.logical_memory_id != best.logical_memory_id)
135 .map(|record| record.id)
136 .collect(),
137 reason: Some("cross-agent consolidation".to_string()),
138 ..SemanticMerge::with_metadata(
139 AgentId::well_known("cross_agent_consolidation"),
140 best.id,
141 )
142 },
143 )
144 .await?;
145
146 Ok(merged.target.id)
147 }
148
149 pub(crate) async fn compute_anomaly_score(&self, record: &EpisodicRecord) -> HirnResult<f32> {
152 let embedding = match &record.embedding {
153 Some(emb) => emb,
154 None => return Ok(0.0), };
156
157 let ep_count = self
161 .storage_runtime
162 .count(hirn_storage::datasets::episodic::DATASET_NAME, None)
163 .await
164 .unwrap_or(0);
165 let sem_count = self
166 .storage_runtime
167 .count(hirn_storage::datasets::semantic::DATASET_NAME, None)
168 .await
169 .unwrap_or(0);
170 let total_records = ep_count + sem_count;
171 if total_records < 10 {
172 return Ok(0.0);
173 }
174
175 let metric = self.distance_metric();
177 let results = self.vector_search_all(embedding, 1, metric).await?;
178 if results.is_empty() {
179 return Ok(0.5); }
181
182 let similarity = results[0].1;
183
184 let embedding_anomaly = 1.0 - similarity;
187
188 let now = hirn_core::Timestamp::now();
190 let temporal_anomaly = if record.timestamp > now { 0.5 } else { 0.0 };
191
192 let score = (embedding_anomaly * 0.7 + temporal_anomaly * 0.3).min(1.0);
194 Ok(score)
195 }
196
197 pub(crate) async fn quarantine_record(
200 &self,
201 record: &EpisodicRecord,
202 anomaly_score: f32,
203 agent_id: &hirn_core::types::AgentId,
204 ) -> HirnResult<MemoryId> {
205 if let Some(config) = self.admission_runtime().rate_limit_config(agent_id) {
207 return Err(HirnError::RateLimited(format!(
208 "agent '{}' exceeded {} quarantine events in {} seconds",
209 agent_id, config.max_quarantines_per_window, config.window_seconds,
210 )));
211 }
212
213 let id = record.id;
214 let record_bytes =
215 bincode::serialize(record).map_err(|e| StoreError::Serialization(e.to_string()))?;
216
217 let row = hirn_storage::datasets::quarantine::QuarantineRow {
218 memory_id: id,
219 record_kind: hirn_core::QuarantinedRecordKind::Episodic,
220 record_bytes,
221 anomaly_score,
222 reason: format!("anomaly score {anomaly_score:.2} exceeds threshold"),
223 status: hirn_storage::datasets::quarantine::QuarantineStatus::Pending,
224 created_at: Timestamp::now(),
225 reviewed_by: None,
226 reviewed_at: None,
227 generated_review: None,
228 };
229
230 let batch = hirn_storage::datasets::quarantine::to_batch(std::slice::from_ref(&row))
231 .map_err(|e| HirnError::storage(e))?;
232 self.storage_runtime
233 .append(hirn_storage::datasets::quarantine::DATASET_NAME, batch)
234 .await
235 .map_err(|e| HirnError::storage(e))?;
236
237 self.append_audit(
238 Some(agent_id.clone()),
239 hirn_core::audit::AuditAction::Quarantine {
240 memory_id: id,
241 anomaly_score,
242 reason: row.reason,
243 },
244 )
245 .await?;
246
247 let rate_limit_info = self.admission_runtime().record_quarantine(agent_id);
249 if let Some(config) = rate_limit_info {
250 let _ = self
251 .append_audit(
252 Some(agent_id.clone()),
253 hirn_core::audit::AuditAction::AgentRateLimited {
254 agent_id: agent_id.clone(),
255 quarantined_count: config.max_quarantines_per_window + 1,
256 window_seconds: config.window_seconds,
257 },
258 )
259 .await;
260 }
261
262 Err(HirnError::Quarantined(format!(
263 "memory {id} quarantined (anomaly score: {anomaly_score:.2})"
264 )))
265 }
266
267 pub(crate) async fn review_quarantine(
269 &self,
270 ) -> HirnResult<Vec<crate::security::QuarantineEntry>> {
271 let filter = "status = 'Pending'".to_string();
272 let opts = hirn_storage::store::ScanOptions {
273 filter: Some(filter),
274 ..Default::default()
275 };
276 let batches = self
277 .storage_runtime
278 .scan(hirn_storage::datasets::quarantine::DATASET_NAME, opts)
279 .await
280 .map_err(|e| HirnError::storage(e))?;
281
282 let mut result = Vec::new();
283 for batch in &batches {
284 let rows = hirn_storage::datasets::quarantine::from_batch(batch)
285 .map_err(|e| HirnError::storage(e))?;
286 for row in rows {
287 result.push(crate::security::QuarantineEntry {
288 memory_id: row.memory_id,
289 record_kind: row.record_kind,
290 record: row.record_bytes,
291 anomaly_score: row.anomaly_score,
292 reason: row.reason,
293 status: match row.status {
294 hirn_storage::datasets::quarantine::QuarantineStatus::Pending => {
295 crate::security::QuarantineStatus::Pending
296 }
297 hirn_storage::datasets::quarantine::QuarantineStatus::Approved => {
298 crate::security::QuarantineStatus::Approved
299 }
300 hirn_storage::datasets::quarantine::QuarantineStatus::Rejected => {
301 crate::security::QuarantineStatus::Rejected
302 }
303 hirn_storage::datasets::quarantine::QuarantineStatus::RolledBack => {
304 crate::security::QuarantineStatus::RolledBack
305 }
306 },
307 created_at: row.created_at,
308 reviewed_by: row.reviewed_by,
309 reviewed_at: row.reviewed_at,
310 generated_review: row.generated_review,
311 });
312 }
313 }
314 Ok(result)
315 }
316
317 async fn load_quarantine_row(
318 &self,
319 id: MemoryId,
320 ) -> HirnResult<hirn_storage::datasets::quarantine::QuarantineRow> {
321 let filter = quarantine_filter(id);
322 let opts = hirn_storage::store::ScanOptions {
323 filter: Some(filter),
324 ..Default::default()
325 };
326 let batches = self
327 .storage_runtime
328 .scan(hirn_storage::datasets::quarantine::DATASET_NAME, opts)
329 .await
330 .map_err(HirnError::storage)?;
331
332 for batch in &batches {
333 let rows = hirn_storage::datasets::quarantine::from_batch(batch)
334 .map_err(HirnError::storage)?;
335 if let Some(row) = rows.into_iter().next() {
336 return Ok(row);
337 }
338 }
339
340 Err(HirnError::NotFound(format!("quarantine entry {id}")))
341 }
342
343 async fn replace_quarantine_row(
344 &self,
345 row: &hirn_storage::datasets::quarantine::QuarantineRow,
346 ) -> HirnResult<()> {
347 let filter = quarantine_filter(row.memory_id);
348 self.storage_runtime
349 .delete(hirn_storage::datasets::quarantine::DATASET_NAME, &filter)
350 .await
351 .map_err(HirnError::storage)?;
352
353 let batch = hirn_storage::datasets::quarantine::to_batch(std::slice::from_ref(row))
354 .map_err(HirnError::storage)?;
355 self.storage_runtime
356 .append(hirn_storage::datasets::quarantine::DATASET_NAME, batch)
357 .await
358 .map_err(HirnError::storage)?;
359 Ok(())
360 }
361
362 pub(crate) async fn approve_quarantine(
364 &self,
365 id: MemoryId,
366 approved_by: AgentId,
367 ) -> HirnResult<crate::security::QuarantineApprovalOutcome> {
368 let mut row = self.load_quarantine_row(id).await?;
369 if row.status != hirn_storage::datasets::quarantine::QuarantineStatus::Pending {
370 return Err(HirnError::InvalidInput(format!(
371 "quarantine entry {id} is not pending review"
372 )));
373 }
374 if let Some(review) = row.generated_review.as_ref() {
375 if !review.allows_promotion() {
376 return Err(HirnError::InvalidInput(format!(
377 "quarantine entry {id} failed the generated cognition quality gate"
378 )));
379 }
380 }
381
382 let outcome = match row.record_kind {
383 hirn_core::QuarantinedRecordKind::Episodic => {
384 let record: EpisodicRecord = bincode::deserialize(&row.record_bytes)
385 .map_err(|e| StoreError::Serialization(e.to_string()))?;
386 let applied_id = self.remember(record).await?;
387 crate::security::QuarantineApprovalOutcome {
388 approved_entry_id: id,
389 applied_memory_ids: vec![applied_id],
390 change_summary: "promoted quarantined episodic record".to_string(),
391 generated_review: None,
392 }
393 }
394 hirn_core::QuarantinedRecordKind::Semantic => {
395 let record: SemanticRecord = bincode::deserialize(&row.record_bytes)
396 .map_err(|e| StoreError::Serialization(e.to_string()))?;
397 self.approve_quarantined_semantic(
398 id,
399 record,
400 approved_by,
401 row.generated_review.clone(),
402 )
403 .await?
404 }
405 };
406
407 row.status = hirn_storage::datasets::quarantine::QuarantineStatus::Approved;
408 row.reviewed_by = Some(approved_by);
409 row.reviewed_at = Some(Timestamp::now());
410 row.generated_review.clone_from(&outcome.generated_review);
411 self.replace_quarantine_row(&row).await?;
412
413 self.append_audit(
414 Some(approved_by),
415 hirn_core::audit::AuditAction::QuarantineApproved { memory_id: id },
416 )
417 .await?;
418
419 Ok(outcome)
420 }
421
422 async fn approve_quarantined_semantic(
423 &self,
424 entry_id: MemoryId,
425 record: SemanticRecord,
426 approved_by: AgentId,
427 generated_review: Option<GeneratedCognitionReview>,
428 ) -> HirnResult<crate::security::QuarantineApprovalOutcome> {
429 let extraction_model = record
430 .provenance
431 .extraction_model
432 .as_deref()
433 .unwrap_or_default();
434 if extraction_model.starts_with("offline-reconcile:") {
435 let proposal = hirn_core::ReconcileProposal::from_json(&record.description)?;
436 return self
437 .approve_reconcile_proposal(
438 entry_id,
439 record.namespace,
440 proposal,
441 approved_by,
442 generated_review,
443 )
444 .await;
445 }
446
447 let applied_id = self.store_semantic(record).await?;
448 let mut generated_review = generated_review;
449 if let Some(review) = generated_review.as_mut() {
450 review.attach_rollback_receipt(GeneratedCognitionRollbackReceipt {
451 applied_memory_ids: vec![applied_id],
452 previous_active_memory_ids: Vec::new(),
453 });
454 review.mark_approved();
455 }
456 Ok(crate::security::QuarantineApprovalOutcome {
457 approved_entry_id: entry_id,
458 applied_memory_ids: vec![applied_id],
459 change_summary: "promoted quarantined semantic record".to_string(),
460 generated_review,
461 })
462 }
463
464 async fn approve_reconcile_proposal(
465 &self,
466 entry_id: MemoryId,
467 namespace: Namespace,
468 proposal: hirn_core::ReconcileProposal,
469 approved_by: AgentId,
470 generated_review: Option<GeneratedCognitionReview>,
471 ) -> HirnResult<crate::security::QuarantineApprovalOutcome> {
472 let approved_at = Timestamp::now();
473 let mut resolved_heads = Vec::with_capacity(proposal.members.len());
474 for member in &proposal.members {
475 let head = self
476 .semantic_head_for_logical_id(member.logical_memory_id)
477 .await?;
478 if head.id != member.memory_id {
479 return Err(HirnError::InvalidInput(format!(
480 "reconcile proposal {} is stale for logical memory {}: expected head {}, found {}",
481 proposal.conflict_id, member.logical_memory_id, member.memory_id, head.id
482 )));
483 }
484 if !head.is_live() {
485 return Err(HirnError::InvalidInput(format!(
486 "reconcile proposal {} targets non-live logical memory {}",
487 proposal.conflict_id, member.logical_memory_id
488 )));
489 }
490 resolved_heads.push(head);
491 }
492
493 let winner_id = proposal
494 .preferred_memory_id
495 .or(proposal.authoritative_memory_id);
496 let winner_logical_id = winner_id.and_then(|memory_id| {
497 proposal
498 .members
499 .iter()
500 .find(|member| member.memory_id == memory_id)
501 .map(|member| member.logical_memory_id)
502 });
503 let rationale = format!(
504 "approved offline reconcile proposal {} with action {}: {}",
505 proposal.conflict_id,
506 proposal.action.as_str(),
507 proposal.rationale
508 );
509 let previous_active_memory_ids = resolved_heads.iter().map(|head| head.id).collect();
510 let mut applied_memory_ids = Vec::new();
511
512 match proposal.action {
513 hirn_core::ReconcileProposalAction::RetainBoth
514 | hirn_core::ReconcileProposalAction::EscalateForReview => {}
515 hirn_core::ReconcileProposalAction::Supersede => {
516 let winner_id = winner_id.ok_or_else(|| {
517 HirnError::InvalidInput(format!(
518 "reconcile proposal {} cannot supersede without a preferred memory",
519 proposal.conflict_id
520 ))
521 })?;
522 let superseding = self
523 .supersede_semantic(
524 winner_id,
525 SemanticSupersession {
526 reason: Some(rationale.clone()),
527 actor_id: approved_by,
528 observed_at: Some(approved_at),
529 causation_id: entry_id,
530 description: None,
531 confidence: None,
532 evidence_count: None,
533 },
534 )
535 .await?;
536 applied_memory_ids.push(superseding.id);
537
538 for loser in resolved_heads
539 .iter()
540 .filter(|record| Some(record.logical_memory_id) != winner_logical_id)
541 {
542 let tombstone = self
543 .retract_semantic(
544 loser.id,
545 SemanticRetraction {
546 reason: Some(rationale.clone()),
547 actor_id: approved_by,
548 observed_at: Some(approved_at),
549 causation_id: entry_id,
550 },
551 )
552 .await?;
553 applied_memory_ids.push(tombstone.id);
554 }
555 }
556 hirn_core::ReconcileProposalAction::Retract => {
557 let winner_logical_id = winner_logical_id.ok_or_else(|| {
558 HirnError::InvalidInput(format!(
559 "reconcile proposal {} cannot retract losers without a preferred memory",
560 proposal.conflict_id
561 ))
562 })?;
563 for loser in resolved_heads
564 .iter()
565 .filter(|record| record.logical_memory_id != winner_logical_id)
566 {
567 let tombstone = self
568 .retract_semantic(
569 loser.id,
570 SemanticRetraction {
571 reason: Some(rationale.clone()),
572 actor_id: approved_by,
573 observed_at: Some(approved_at),
574 causation_id: entry_id,
575 },
576 )
577 .await?;
578 applied_memory_ids.push(tombstone.id);
579 }
580 }
581 hirn_core::ReconcileProposalAction::Quarantine => {
582 let winner_logical_id = winner_logical_id.ok_or_else(|| {
583 HirnError::InvalidInput(format!(
584 "reconcile proposal {} cannot quarantine generated losers without a preferred memory",
585 proposal.conflict_id
586 ))
587 })?;
588 let mut generated_losers = 0usize;
589 for loser in resolved_heads.iter().filter(|record| {
590 record.logical_memory_id != winner_logical_id
591 && matches!(
592 *record.provenance.origin(),
593 Origin::DreamReplay | Origin::LlmExtraction | Origin::Consolidation
594 )
595 }) {
596 let tombstone = self
597 .retract_semantic(
598 loser.id,
599 SemanticRetraction {
600 reason: Some(rationale.clone()),
601 actor_id: approved_by,
602 observed_at: Some(approved_at),
603 causation_id: entry_id,
604 },
605 )
606 .await?;
607 applied_memory_ids.push(tombstone.id);
608 generated_losers += 1;
609 }
610 if generated_losers == 0 {
611 return Err(HirnError::InvalidInput(format!(
612 "reconcile proposal {} selected quarantine but no generated losing heads remain",
613 proposal.conflict_id
614 )));
615 }
616 }
617 }
618
619 self.append_audit(
620 Some(approved_by),
621 hirn_core::audit::AuditAction::BeliefReconcileApproved {
622 conflict_id: proposal.conflict_id.clone(),
623 action: proposal.action.as_str().to_string(),
624 namespace: namespace.as_str().to_string(),
625 logical_memory_ids: proposal
626 .members
627 .iter()
628 .map(|member| member.logical_memory_id)
629 .collect(),
630 applied_memory_ids: applied_memory_ids.clone(),
631 rationale: proposal.rationale.clone(),
632 },
633 )
634 .await?;
635
636 let mut generated_review = generated_review;
637 if let Some(review) = generated_review.as_mut() {
638 review.attach_rollback_receipt(GeneratedCognitionRollbackReceipt {
639 applied_memory_ids: applied_memory_ids.clone(),
640 previous_active_memory_ids,
641 });
642 review.mark_approved();
643 }
644
645 Ok(crate::security::QuarantineApprovalOutcome {
646 approved_entry_id: entry_id,
647 applied_memory_ids,
648 change_summary: format!(
649 "approved reconcile action {} for conflict {}",
650 proposal.action.as_str(),
651 proposal.conflict_id
652 ),
653 generated_review,
654 })
655 }
656
657 pub(crate) async fn reject_quarantine(&self, id: MemoryId) -> HirnResult<()> {
659 let mut row = self.load_quarantine_row(id).await?;
660 row.status = hirn_storage::datasets::quarantine::QuarantineStatus::Rejected;
661 row.reviewed_at = Some(Timestamp::now());
662 if let Some(review) = row.generated_review.as_mut() {
663 review.mark_rejected("rejected during quarantine review");
664 }
665 self.replace_quarantine_row(&row).await?;
666
667 self.append_audit(
668 None,
669 hirn_core::audit::AuditAction::QuarantineRejected { memory_id: id },
670 )
671 .await?;
672
673 Ok(())
674 }
675
676 pub(crate) async fn rollback_quarantine_approval(
677 &self,
678 id: MemoryId,
679 rolled_back_by: AgentId,
680 reason: String,
681 ) -> HirnResult<crate::security::QuarantineRollbackOutcome> {
682 let mut row = self.load_quarantine_row(id).await?;
683 if row.status != hirn_storage::datasets::quarantine::QuarantineStatus::Approved {
684 return Err(HirnError::InvalidInput(format!(
685 "quarantine entry {id} is not approved"
686 )));
687 }
688
689 let mut generated_review = row.generated_review.clone().ok_or_else(|| {
690 HirnError::InvalidInput(format!(
691 "quarantine entry {id} does not carry generated cognition rollback metadata"
692 ))
693 })?;
694 let receipt = generated_review.rollback_receipt.clone().ok_or_else(|| {
695 HirnError::InvalidInput(format!(
696 "quarantine entry {id} cannot be rolled back because no rollback receipt was recorded"
697 ))
698 })?;
699
700 self.validate_generated_rollback_receipt(&receipt).await?;
701 let restore_logical_ids = self
702 .generated_semantic_logical_ids(&receipt.applied_memory_ids)
703 .await?;
704 let removed_memory_ids = self
705 .delete_generated_semantic_revisions(&receipt.applied_memory_ids)
706 .await?;
707 let restored_memory_ids = self
708 .restore_generated_semantic_heads(&restore_logical_ids)
709 .await?;
710
711 let rolled_back_at = Timestamp::now();
712 generated_review.mark_rolled_back(rolled_back_by.clone(), rolled_back_at, reason.clone());
713 row.status = hirn_storage::datasets::quarantine::QuarantineStatus::RolledBack;
714 row.reviewed_by = Some(rolled_back_by.clone());
715 row.reviewed_at = Some(rolled_back_at);
716 row.generated_review = Some(generated_review.clone());
717 self.replace_quarantine_row(&row).await?;
718
719 self.append_audit(
720 Some(rolled_back_by),
721 hirn_core::audit::AuditAction::QuarantineRolledBack {
722 memory_id: id,
723 removed_memory_ids: removed_memory_ids.clone(),
724 restored_memory_ids: restored_memory_ids.clone(),
725 reason: reason.clone(),
726 },
727 )
728 .await?;
729
730 Ok(crate::security::QuarantineRollbackOutcome {
731 rolled_back_entry_id: id,
732 removed_memory_ids,
733 restored_memory_ids,
734 reason,
735 generated_review: Some(generated_review),
736 })
737 }
738
739 async fn validate_generated_rollback_receipt(
740 &self,
741 receipt: &GeneratedCognitionRollbackReceipt,
742 ) -> HirnResult<()> {
743 for applied_id in &receipt.applied_memory_ids {
744 let record = self.read_semantic_record(*applied_id).await?;
745 let head = self
746 .semantic_head_for_logical_id(record.logical_memory_id)
747 .await?;
748 if head.id != record.id {
749 return Err(HirnError::InvalidInput(format!(
750 "rollback cannot proceed because logical memory {} advanced beyond generated revision {}",
751 record.logical_memory_id, applied_id
752 )));
753 }
754 }
755 Ok(())
756 }
757
758 async fn delete_generated_semantic_revisions(
759 &self,
760 applied_memory_ids: &[MemoryId],
761 ) -> HirnResult<Vec<MemoryId>> {
762 let mut removed = Vec::new();
763 for applied_id in applied_memory_ids {
764 let filter = format!("id = '{}'", applied_id.to_string().replace('\'', "''"));
765 self.storage_runtime
766 .delete(hirn_storage::datasets::semantic::DATASET_NAME, &filter)
767 .await
768 .map_err(HirnError::storage)?;
769 if let Err(error) = self.cached_graph().remove_node(*applied_id).await {
770 tracing::debug!(id = %applied_id, error = %error, "generated rollback graph cleanup skipped");
771 }
772 removed.push(*applied_id);
773 }
774 Ok(removed)
775 }
776
777 async fn generated_semantic_logical_ids(
778 &self,
779 applied_memory_ids: &[MemoryId],
780 ) -> HirnResult<std::collections::BTreeSet<hirn_core::revision::LogicalMemoryId>> {
781 let mut logical_ids = std::collections::BTreeSet::new();
782 for applied_id in applied_memory_ids {
783 let record = self.read_semantic_record(*applied_id).await?;
784 logical_ids.insert(record.logical_memory_id);
785 }
786 Ok(logical_ids)
787 }
788
789 async fn restore_generated_semantic_heads(
790 &self,
791 logical_memory_ids: &std::collections::BTreeSet<hirn_core::revision::LogicalMemoryId>,
792 ) -> HirnResult<Vec<MemoryId>> {
793 let mut restored = Vec::new();
794
795 for logical_memory_id in logical_memory_ids {
796 self.evict_semantic_head(*logical_memory_id);
797 match self.semantic_head_for_logical_id(*logical_memory_id).await {
798 Ok(head) => {
799 self.ensure_semantic_graph_node(&head).await?;
800 restored.push(head.id);
801 }
802 Err(HirnError::NotFound(_)) => {
803 self.evict_semantic_head(*logical_memory_id);
804 }
805 Err(error) => return Err(error),
806 }
807 }
808
809 Ok(restored)
810 }
811
812 async fn ensure_semantic_graph_node(&self, record: &SemanticRecord) -> HirnResult<()> {
813 if !self
814 .cached_graph()
815 .has_node(record.id)
816 .await
817 .unwrap_or(false)
818 {
819 self.cached_graph()
820 .add_node(
821 record.id,
822 Layer::Semantic,
823 record.confidence,
824 record.created_at,
825 record.namespace,
826 )
827 .await?;
828 if let Some(ref embedding) = record.embedding {
829 let candidates = self.find_similarity_candidates(embedding).await;
830 self.apply_similarity_edges(record.id, &candidates).await?;
831 }
832 }
833 self.cache_semantic_head(record);
834 Ok(())
835 }
836
837 pub(crate) fn prepare(&self, query: &str) -> HirnResult<crate::ql::PreparedStatement> {
843 crate::ql::prepare(query, None).map_err(HirnError::from)
844 }
845
846 pub(crate) async fn execute_prepared(
848 &self,
849 prepared: &crate::ql::PreparedStatement,
850 params: &std::collections::HashMap<String, String>,
851 ) -> HirnResult<crate::ql::results::QueryResult> {
852 let compiled = crate::ql::bind(prepared, params).map_err(HirnError::from)?;
853 self.execute_ql(&compiled.source).await
854 }
855
856 pub(crate) fn query(&self) -> crate::ql::builder::QueryBuilder<'_> {
858 crate::ql::builder::QueryBuilder::new(self)
859 }
860
861 pub(crate) async fn purge_agent(&self, agent_id: &AgentId) -> HirnResult<PurgeReport> {
869 let private_ns = Namespace::private_for(agent_id);
870
871 let episodic_ids = self.list_episodic_ids_in_namespace(&private_ns).await?;
873 let semantic_ids = self.list_semantic_ids_in_namespace(&private_ns).await?;
874 let procedural_ids = self.list_procedural_ids_in_namespace(&private_ns).await?;
875
876 for id in &episodic_ids {
878 let _ = self.delete_episode(*id).await; }
880
881 for id in &semantic_ids {
883 let _ = self.purge_semantic(*id).await;
884 }
885
886 for id in &procedural_ids {
888 let _ = self.delete_procedural(*id).await;
889 }
890
891 let quarantine_removed = self.purge_quarantine_for_agent(agent_id).await?;
893
894 self.admission_runtime().clear_agent(agent_id);
896
897 let edges_removed = 0usize; let report = PurgeReport {
901 agent_id: agent_id.clone(),
902 episodic_deleted: episodic_ids.len(),
903 semantic_deleted: semantic_ids.len(),
904 procedural_deleted: procedural_ids.len(),
905 quarantine_removed,
906 edges_removed,
907 };
908
909 self.append_audit(
910 None,
911 hirn_core::audit::AuditAction::AgentPurged {
912 agent_id: agent_id.clone(),
913 episodic_deleted: report.episodic_deleted,
914 semantic_deleted: report.semantic_deleted,
915 procedural_deleted: report.procedural_deleted,
916 edges_removed: report.edges_removed,
917 },
918 )
919 .await?;
920
921 Ok(report)
922 }
923
924 async fn purge_quarantine_for_agent(&self, agent_id: &AgentId) -> HirnResult<usize> {
926 let opts = hirn_storage::store::ScanOptions::default();
927 let batches = self
928 .storage_runtime
929 .scan(hirn_storage::datasets::quarantine::DATASET_NAME, opts)
930 .await
931 .map_err(|e| HirnError::storage(e))?;
932
933 let mut to_remove: Vec<MemoryId> = Vec::new();
934 for batch in &batches {
935 let rows = hirn_storage::datasets::quarantine::from_batch(batch)
936 .map_err(|e| HirnError::storage(e))?;
937 for row in rows {
938 if let Ok(rec) = bincode::deserialize::<EpisodicRecord>(&row.record_bytes) {
940 if rec.provenance.created_by == *agent_id {
941 to_remove.push(row.memory_id);
942 }
943 }
944 }
945 }
946
947 let count = to_remove.len();
948 for mid in to_remove {
949 let filter = format!("memory_id = '{mid}'");
950 let _ = self
951 .storage_runtime
952 .delete(hirn_storage::datasets::quarantine::DATASET_NAME, &filter)
953 .await;
954 }
955
956 Ok(count)
957 }
958}
959
960#[derive(Debug, Clone)]
962pub struct PurgeReport {
963 pub agent_id: AgentId,
964 pub episodic_deleted: usize,
965 pub semantic_deleted: usize,
966 pub procedural_deleted: usize,
967 pub quarantine_removed: usize,
968 pub edges_removed: usize,
969}