1mod admission_runtime;
2mod cross_agent;
3mod episodic;
4mod event_runtime;
5mod graph_ops;
6mod graph_runtime;
7mod mutation_contract;
8mod namespace;
9mod namespace_runtime;
10mod offline_scheduler_runtime;
11mod persistence;
12mod policy_runtime;
13mod procedural;
14mod provider_runtime;
15mod query_exec;
16mod query_runtime;
17mod recall_exec;
18mod semantic;
19mod services;
20mod storage_runtime;
21mod working;
22pub mod write_path;
23mod write_runtime;
24
25pub use cross_agent::PurgeReport;
26pub use graph_runtime::PrefetchStats;
27pub use mutation_contract::{
28 MutationWriteContract, MutationWriteGuarantee, mutation_write_contracts,
29};
30pub use services::{
31 AdminView, CausalView, EpisodicView, GraphView, NamespaceView, PolicyView, ProceduralView,
32 QueryView, RecallView, SemanticView, WorkingView,
33};
34
35use std::collections::HashMap;
36use std::path::Path;
37use std::sync::Arc;
38
39use hirn_core::embed::{Embedder, Reranker};
40use hirn_core::episodic::EpisodicRecord;
41use hirn_core::id::MemoryId;
42use hirn_core::metadata::Metadata;
43use hirn_core::procedural::ProceduralRecord;
44use hirn_core::provenance::Mutation;
45use hirn_core::record::MemoryRecord;
46use hirn_core::resource::{
47 DerivedArtifactId, DerivedArtifactKind, EvidenceLink, HydrationMode, ModalityProfile,
48 ResourceGovernanceState, ResourceId, ResourceLocation,
49};
50use hirn_core::revision::LogicalMemoryId;
51use hirn_core::semantic::SemanticRecord;
52use hirn_core::timestamp::Timestamp;
53use hirn_core::tokenizer::Tokenizer;
54use hirn_core::types::{AgentId, EdgeRelation, EventType, Layer, MutationTrigger, Namespace};
55use hirn_core::working::WorkingMemoryEntry;
56use hirn_core::{HirnConfig, HirnError, HirnResult};
57
58use hirn_storage::PhysicalStore;
59
60use crate::activation::{ActivationConfig, ActivationMode};
61use crate::error::StoreError;
62use crate::event_log::EventLog;
63use crate::graph_store::GraphStore;
64use crate::hebbian::HebbianConfig;
65use crate::persistent_graph::PersistentGraph;
66use crate::recall::{LayerFilter, RecallBuilder, RecallResult, ResourceEvidenceSummary};
67use crate::scoring::{self, ScoringWeights};
68
69use crate::event::MemoryEvent;
70use crate::policy::{Action, PolicyEngine};
71use crate::ql::results::ScoredMemory;
72use admission_runtime::AdmissionRuntime;
73use event_runtime::EventRuntime;
74use graph_runtime::GraphRuntime;
75use namespace_runtime::NamespaceRuntime;
76use offline_scheduler_runtime::OfflineSchedulerRuntime;
77use policy_runtime::PolicyRuntime;
78use provider_runtime::ProviderRuntime;
79use query_runtime::QueryRuntime;
80use storage_runtime::StorageRuntime;
81use write_runtime::WriteRuntime;
82
83#[derive(Debug, Clone, PartialEq, Eq)]
85pub struct DbStats {
86 pub working_count: u64,
87 pub episodic_count: u64,
88 pub semantic_count: u64,
89 pub procedural_count: u64,
90 pub total_count: u64,
91 pub edge_count: u64,
92 pub file_size_bytes: u64,
93}
94
95#[derive(Debug, Clone, PartialEq, Eq)]
97pub struct LayerCounts {
98 pub working: u64,
99 pub episodic: u64,
100 pub semantic: u64,
101 pub procedural: u64,
102 pub total: u64,
103}
104
105#[derive(Debug, Clone, Default)]
106struct CachedResourceEvidence {
107 lifecycle_state: ResourceGovernanceState,
108 modality: Option<ModalityProfile>,
109 mime_type: Option<String>,
110 display_name: Option<String>,
111 available_artifacts: Vec<DerivedArtifactKind>,
112 artifact_kinds_by_id: HashMap<DerivedArtifactId, DerivedArtifactKind>,
113 has_preview: bool,
114 has_full_payload: bool,
115}
116
117#[derive(Debug, Default)]
119pub struct EpisodicFilter {
120 pub event_type: Option<EventType>,
121 pub after: Option<Timestamp>,
122 pub before: Option<Timestamp>,
123 pub min_importance: Option<f32>,
124 pub entity_name: Option<String>,
125 pub namespace: Option<Namespace>,
126 pub include_archived: bool,
127 pub limit: Option<usize>,
128 pub offset: Option<usize>,
129 pub valid_at: Option<Timestamp>,
136}
137
138#[derive(Debug, Default)]
140pub struct SemanticFilter {
141 pub knowledge_type: Option<hirn_core::types::KnowledgeType>,
142 pub min_confidence: Option<f32>,
143 pub namespace: Option<Namespace>,
144 pub limit: Option<usize>,
145}
146
147#[derive(Debug)]
149pub struct CrossAgentConsolidationResult {
150 pub merged_count: usize,
152 pub contradiction_count: usize,
154 pub merged_ids: Vec<MemoryId>,
156 pub contradiction_pairs: Vec<(MemoryId, MemoryId)>,
158}
159
160#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
162pub struct SemanticUpdate {
163 pub description: Option<String>,
164 pub confidence: Option<f32>,
165 pub evidence_count: Option<u32>,
166 pub reason: Option<String>,
167 pub actor_id: AgentId,
168 pub observed_at: Option<Timestamp>,
169 pub causation_id: MemoryId,
170}
171
172impl SemanticUpdate {
173 #[must_use]
174 pub fn with_metadata(actor_id: AgentId, causation_id: MemoryId) -> Self {
175 Self {
176 description: None,
177 confidence: None,
178 evidence_count: None,
179 reason: None,
180 actor_id,
181 observed_at: None,
182 causation_id,
183 }
184 }
185}
186
187#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
189pub struct SemanticSupersession {
190 pub description: Option<String>,
191 pub confidence: Option<f32>,
192 pub evidence_count: Option<u32>,
193 pub reason: Option<String>,
194 pub actor_id: AgentId,
195 pub observed_at: Option<Timestamp>,
196 pub causation_id: MemoryId,
197}
198
199impl SemanticSupersession {
200 #[must_use]
201 pub fn with_metadata(actor_id: AgentId, causation_id: MemoryId) -> Self {
202 Self {
203 description: None,
204 confidence: None,
205 evidence_count: None,
206 reason: None,
207 actor_id,
208 observed_at: None,
209 causation_id,
210 }
211 }
212}
213
214#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
216pub struct SemanticOverride {
217 pub description: Option<String>,
218 pub confidence: Option<f32>,
219 pub evidence_count: Option<u32>,
220 pub reason: Option<String>,
221 pub actor_id: AgentId,
222 pub observed_at: Option<Timestamp>,
223 pub causation_id: MemoryId,
224}
225
226impl SemanticOverride {
227 #[must_use]
228 pub fn with_metadata(actor_id: AgentId, causation_id: MemoryId) -> Self {
229 Self {
230 description: None,
231 confidence: None,
232 evidence_count: None,
233 reason: None,
234 actor_id,
235 observed_at: None,
236 causation_id,
237 }
238 }
239}
240
241#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
243pub struct SemanticMerge {
244 pub source_ids: Vec<MemoryId>,
245 pub description: Option<String>,
246 pub confidence: Option<f32>,
247 pub evidence_count: Option<u32>,
248 pub reason: Option<String>,
249 pub actor_id: AgentId,
250 pub observed_at: Option<Timestamp>,
251 pub causation_id: MemoryId,
252}
253
254impl SemanticMerge {
255 #[must_use]
256 pub fn with_metadata(actor_id: AgentId, causation_id: MemoryId) -> Self {
257 Self {
258 source_ids: Vec::new(),
259 description: None,
260 confidence: None,
261 evidence_count: None,
262 reason: None,
263 actor_id,
264 observed_at: None,
265 causation_id,
266 }
267 }
268}
269
270#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
272pub struct SemanticMergeOutcome {
273 pub target: SemanticRecord,
274 pub merged_sources: Vec<SemanticRecord>,
275}
276
277impl From<SemanticSupersession> for SemanticUpdate {
278 fn from(value: SemanticSupersession) -> Self {
279 Self {
280 description: value.description,
281 confidence: value.confidence,
282 evidence_count: value.evidence_count,
283 reason: value.reason,
284 actor_id: value.actor_id,
285 observed_at: value.observed_at,
286 causation_id: value.causation_id,
287 }
288 }
289}
290
291impl From<SemanticUpdate> for SemanticSupersession {
292 fn from(value: SemanticUpdate) -> Self {
293 Self {
294 description: value.description,
295 confidence: value.confidence,
296 evidence_count: value.evidence_count,
297 reason: value.reason,
298 actor_id: value.actor_id,
299 observed_at: value.observed_at,
300 causation_id: value.causation_id,
301 }
302 }
303}
304
305#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
307pub struct SemanticRetraction {
308 pub reason: Option<String>,
309 pub actor_id: AgentId,
310 pub observed_at: Option<Timestamp>,
311 pub causation_id: MemoryId,
312}
313
314impl SemanticRetraction {
315 #[must_use]
316 pub fn with_metadata(actor_id: AgentId, causation_id: MemoryId) -> Self {
317 Self {
318 reason: None,
319 actor_id,
320 observed_at: None,
321 causation_id,
322 }
323 }
324}
325
326pub struct HirnDB {
328 config: HirnConfig,
332 storage_runtime: StorageRuntime,
334 admission_runtime: AdmissionRuntime,
336 event_runtime: EventRuntime,
338 provider_runtime: ProviderRuntime,
340 graph_runtime: GraphRuntime,
343 policy_runtime: PolicyRuntime,
345 query_runtime: QueryRuntime,
347 write_runtime: WriteRuntime,
350 offline_scheduler_runtime: OfflineSchedulerRuntime,
352 namespace_runtime: NamespaceRuntime,
354 tier_policy: parking_lot::RwLock<hirn_core::TierPolicy>,
357}
358
359impl HirnDB {
360 pub async fn open(path: impl AsRef<Path>, storage: Arc<dyn PhysicalStore>) -> HirnResult<Self> {
364 let config = HirnConfig::builder().db_path(path.as_ref()).build()?;
365 Self::open_with_config(config, storage).await
366 }
367
368 pub async fn open_with_config(
373 config: HirnConfig,
374 storage: Arc<dyn PhysicalStore>,
375 ) -> HirnResult<Self> {
376 config.validate()?;
377
378 let path = config.db_path.clone();
379
380 std::fs::create_dir_all(&path).map_err(|e| HirnError::StorageError(Box::new(e)))?;
382
383 hirn_storage::HirnDb::from_store(storage.clone())
384 .ensure_datasets_with_config(config.embedding_dimensions.as_usize(), Some(&config))
385 .await
386 .map_err(HirnError::storage)?;
387
388 {
390 let ns_name = hirn_core::types::Namespace::shared();
391 let filter = format!("id = '{}'", ns_name.as_str());
392 let count = storage
393 .count(
394 hirn_storage::datasets::namespace::DATASET_NAME,
395 Some(&filter),
396 )
397 .await
398 .unwrap_or(0);
399 if count == 0 {
400 let rec = hirn_core::namespace::NamespaceRecord::shared();
401 let batch = hirn_storage::datasets::namespace::to_batch(std::slice::from_ref(&rec))
402 .map_err(|e| HirnError::storage(e))?;
403 storage
404 .append(hirn_storage::datasets::namespace::DATASET_NAME, batch)
405 .await
406 .map_err(|e| HirnError::storage(e))?;
407 }
408 }
409 let admission_runtime = AdmissionRuntime::new();
410 let graph_runtime = GraphRuntime::new(storage.clone());
411 let policy_runtime = PolicyRuntime::new(storage.clone());
412 let provider_runtime = ProviderRuntime::new(config.embedding_dimensions.as_usize());
413 let query_runtime = QueryRuntime::new(
414 graph_runtime.cached_graph(),
415 &config,
416 storage.clone(),
417 provider_runtime.tokenizer(),
418 )?;
419 let storage_runtime =
420 StorageRuntime::new(path, storage, config.resource_quota_policy.clone());
421 let event_runtime = EventRuntime::new();
422 let event_log = Arc::new(EventLog::open(storage_runtime.storage_arc()).await?);
423 event_runtime.set_event_log(event_log);
424 let write_runtime = WriteRuntime::new(config.default_realm.clone());
425 write_runtime.load_rpe_stats(storage_runtime.path());
428 let offline_scheduler_runtime = OfflineSchedulerRuntime::new(
429 config.offline_scheduler.clone(),
430 config.default_realm.clone(),
431 storage_runtime.storage_arc(),
432 config.conflict_resolution_policy,
433 config.conflict_resolution_overrides.clone(),
434 config.offline_dream_quality_threshold,
435 config.offline_reconcile_quality_threshold,
436 config.offline_plan_quality_threshold,
437 f64::from(config.memory_decay_factor),
438 config.decay_sweep_window_secs,
439 )
440 .await?;
441 let namespace_runtime = NamespaceRuntime::new();
442 let tier_policy = parking_lot::RwLock::new(hirn_core::TierPolicy::from_config(&config));
443
444 let db = Self {
445 config,
446 storage_runtime,
447 admission_runtime,
448 event_runtime,
449 provider_runtime,
450 graph_runtime,
451 policy_runtime,
452 query_runtime,
453 write_runtime,
454 offline_scheduler_runtime,
455 namespace_runtime,
456 tier_policy,
457 };
458
459 {
463 let storage = db.storage_runtime.storage_arc();
464 tokio::spawn(async move {
465 match hirn_storage::reconcile_resource_head_mutations(storage.as_ref()).await {
466 Ok(n) if n > 0 => tracing::info!(
467 reconciled = n,
468 "background: reconciled resource-head mutations"
469 ),
470 Ok(_) => {}
471 Err(error) => {
472 tracing::warn!(%error, "background: resource-head reconcile failed");
473 }
474 }
475 });
476 }
477 {
478 let storage = db.storage_runtime.storage_arc();
479 tokio::spawn(async move {
480 match hirn_storage::reconcile_pending_resource_blob_staging(storage.as_ref()).await
481 {
482 Ok(n) if n > 0 => tracing::info!(
483 reconciled = n,
484 "background: reconciled pending resource blob staging records"
485 ),
486 Ok(_) => {}
487 Err(error) => {
488 tracing::warn!(
489 %error,
490 "background: resource blob staging reconcile failed"
491 );
492 }
493 }
494 });
495 }
496 db.cached_graph().load_from_cold().await?;
497 db.reconcile_pending_episode_mutations().await?;
498 db.reconcile_pending_semantic_create_mutations().await?;
499 db.reconcile_pending_semantic_successor_mutations().await?;
500 db.reconcile_pending_semantic_merge_mutations().await?;
501 db.reconcile_pending_semantic_contradiction_sync_mutations()
502 .await?;
503 db.reconcile_pending_semantic_retract_mutations().await?;
504 db.reconcile_pending_semantic_purge_mutations().await?;
505 db.reconcile_pending_procedural_create_mutations().await?;
506 db.reconcile_pending_procedural_successor_mutations()
507 .await?;
508 db.reconcile_pending_agent_register_mutations().await?;
509 db.reconcile_pending_namespace_delete_mutations().await?;
510 db.reconcile_pending_agent_deregister_mutations().await?;
511 db.hydrate_temporal_arrival_cursors().await?;
512 db.hydrate_working_l0_cache().await?;
513
514 Ok(db)
515 }
516
517 #[must_use]
519 pub const fn config(&self) -> &HirnConfig {
520 &self.config
521 }
522
523 fn rpe_model_id(&self) -> String {
524 self.provider_runtime.rpe_model_id()
525 }
526
527 #[must_use]
529 pub fn tier_policy(&self) -> hirn_core::TierPolicy {
530 self.tier_policy.read().clone()
531 }
532
533 pub fn set_tier_policy(&self, policy: hirn_core::TierPolicy) {
535 *self.tier_policy.write() = policy;
536 }
537
538 #[must_use]
540 pub fn path(&self) -> &Path {
541 self.storage_runtime.path()
542 }
543
544 #[must_use]
547 pub fn session(&self) -> &datafusion::prelude::SessionContext {
548 self.query_runtime.session()
549 }
550
551 #[must_use]
553 pub fn query_pipeline(&self) -> &hirn_query::QueryPipeline {
554 self.query_runtime.query_pipeline()
555 }
556
557 #[must_use]
559 pub fn plan_cache(&self) -> &Arc<hirn_query::PlanCache> {
560 self.query_runtime.plan_cache()
561 }
562
563 #[must_use]
564 pub(crate) fn write_runtime(&self) -> &WriteRuntime {
565 &self.write_runtime
566 }
567
568 pub(crate) fn record_importance_accesses(
573 &self,
574 ids: &[hirn_core::id::MemoryId],
575 ) -> Option<Vec<hirn_core::id::MemoryId>> {
576 self.write_runtime.record_importance_accesses(ids)
577 }
578
579 pub(crate) fn drain_importance_accumulator(&self) -> Vec<hirn_core::id::MemoryId> {
582 self.write_runtime.drain_importance_accumulator()
583 }
584
585 #[must_use]
586 pub(crate) fn graph_runtime(&self) -> &GraphRuntime {
587 &self.graph_runtime
588 }
589
590 #[must_use]
591 pub(crate) fn policy_runtime(&self) -> &PolicyRuntime {
592 &self.policy_runtime
593 }
594
595 #[must_use]
596 pub(crate) fn admission_runtime(&self) -> &AdmissionRuntime {
597 &self.admission_runtime
598 }
599
600 #[must_use]
601 pub(crate) fn event_runtime(&self) -> &EventRuntime {
602 &self.event_runtime
603 }
604
605 #[must_use]
606 pub(crate) fn provider_runtime(&self) -> &ProviderRuntime {
607 &self.provider_runtime
608 }
609
610 #[must_use]
611 pub(crate) fn offline_scheduler_runtime(&self) -> &OfflineSchedulerRuntime {
612 &self.offline_scheduler_runtime
613 }
614
615 #[must_use]
617 pub fn reconsolidation_tracker(&self) -> &crate::consolidation::ReconsolidationTracker {
618 self.graph_runtime.reconsolidation_tracker()
619 }
620
621 pub(crate) fn take_cached_community_result(
623 &self,
624 ) -> Option<crate::consolidation::CommunityResult> {
625 self.graph_runtime.take_cached_community_result()
626 }
627
628 pub(crate) fn set_cached_community_result(
630 &self,
631 result: crate::consolidation::CommunityResult,
632 ) {
633 self.graph_runtime.set_cached_community_result(result);
634 }
635
636 pub fn set_embedder(&self, embedder: Arc<dyn Embedder>) {
646 let embedder = provider_runtime::compose_embedder(
647 embedder,
648 self.storage_runtime.storage_arc(),
649 &self.config,
650 );
651 let embedder = self
652 .provider_runtime
653 .set_multimodal_embedder(Arc::new(hirn_provider::MultiModalEmbedder::new(embedder)));
654
655 if let Err(e) = self.query_runtime.register_runtime_state(
658 self.graph_runtime.cached_graph(),
659 &self.config,
660 self.storage_runtime.storage_arc(),
661 Some(embedder),
662 self.provider_runtime.tokenizer(),
663 ) {
664 tracing::warn!(error = %e, "Failed to update HirnSessionExt with new embedder");
665 }
666 }
667
668 pub fn set_multimodal_embedder(&self, embedder: Arc<hirn_provider::MultiModalEmbedder>) {
673 let embedder = provider_runtime::compose_multimodal_embedder(
674 embedder,
675 self.storage_runtime.storage_arc(),
676 &self.config,
677 );
678 let embedder = self.provider_runtime.set_multimodal_embedder(embedder);
679
680 if let Err(e) = self.query_runtime.register_runtime_state(
681 self.graph_runtime.cached_graph(),
682 &self.config,
683 self.storage_runtime.storage_arc(),
684 Some(embedder),
685 self.provider_runtime.tokenizer(),
686 ) {
687 tracing::warn!(error = %e, "Failed to update HirnSessionExt with new multimodal embedder");
688 }
689 }
690
691 pub fn set_multivec_embedder(&self, embedder: Arc<dyn Embedder>) {
696 self.provider_runtime.set_multivec_embedder(embedder);
697 }
698
699 pub fn set_tokenizer(&self, tokenizer: Arc<dyn Tokenizer>) {
701 self.provider_runtime.set_tokenizer(tokenizer);
702 if let Err(e) = self.query_runtime.register_runtime_state(
703 self.graph_runtime.cached_graph(),
704 &self.config,
705 self.storage_runtime.storage_arc(),
706 self.provider_runtime.embedder_arc(),
707 self.provider_runtime.tokenizer(),
708 ) {
709 tracing::warn!(error = %e, "Failed to update HirnSessionExt with new tokenizer");
710 }
711 }
712
713 #[must_use]
715 pub fn tokenizer(&self) -> Arc<dyn Tokenizer> {
716 self.provider_runtime.tokenizer()
717 }
718
719 pub fn pending_embed_count(&self) -> usize {
721 self.write_runtime.pending_embed_count()
722 }
723
724 pub async fn retry_pending_embeds(&self) -> (usize, usize) {
730 let embedder = match self.provider_runtime.embedder_arc() {
731 Some(embedder) => embedder,
732 None => return (0, 0),
733 };
734
735 let pending = self.write_runtime.drain_pending_embeds();
736 if pending.is_empty() {
737 return (0, 0);
738 }
739
740 tracing::info!(count = pending.len(), "Retrying pending embeds");
741
742 let mut succeeded = 0usize;
743 let mut failed = Vec::new();
744
745 for item in pending {
746 match self.retry_single_embed(item.id, &*embedder).await {
747 Ok(()) => {
748 succeeded += 1;
749 tracing::debug!(id = %item.id, "Pending embed retry succeeded");
750 }
751 Err(e) => {
752 tracing::warn!(id = %item.id, attempts = item.attempts + 1, error = %e, "Pending embed retry failed");
753 failed.push(item);
754 }
755 }
756 }
757
758 let fail_count = failed.len();
759 self.write_runtime.requeue_failed_embeds(failed, 3);
760
761 (succeeded, fail_count)
762 }
763
764 async fn retry_single_embed(&self, id: MemoryId, embedder: &dyn Embedder) -> HirnResult<()> {
766 let mut record = self.read_episodic_record(id).await?;
767 if record.embedding.is_some() {
768 return Ok(());
770 }
771
772 let text = if let Some(ref mc) = record.multi_content {
773 mc.text_for_embedding().to_string()
774 } else if !record.content.is_empty() {
775 record.content.clone()
776 } else {
777 return Err(HirnError::InvalidInput(
778 "no content available for embedding".into(),
779 ));
780 };
781
782 let embeddings = embedder.embed(&[text.as_str()]).await?;
783 if let Some(emb) = embeddings.into_iter().next() {
784 record.embedding = Some(emb.vector);
785 self.write_episodic_record(&record).await?;
786 }
787
788 Ok(())
789 }
790
791 pub fn set_reranker(&self, reranker: Arc<dyn Reranker>) {
797 self.provider_runtime.set_reranker(reranker);
798 }
799
800 pub async fn ensure_fts_indexes(&self) -> HirnResult<()> {
806 self.storage_runtime.ensure_fts_indexes().await
807 }
808
809 #[must_use]
811 pub fn fts_initialized(&self) -> bool {
812 self.storage_runtime.fts_initialized()
813 }
814
815 pub async fn create_vector_indexes(
820 &self,
821 index_type: hirn_storage::store::IndexType,
822 params: Option<hirn_storage::store::IndexParams>,
823 ) -> HirnResult<()> {
824 self.storage_runtime
825 .create_vector_indexes(index_type, params)
826 .await
827 }
828
829 pub async fn rebuild_vector_indexes(
834 &self,
835 index_type: hirn_storage::store::IndexType,
836 params: Option<hirn_storage::store::IndexParams>,
837 ) -> HirnResult<()> {
838 self.storage_runtime
839 .rebuild_vector_indexes(index_type, params)
840 .await
841 }
842
843 #[must_use]
845 pub fn prefetch_stats(&self) -> PrefetchStats {
846 self.graph_runtime.prefetch_stats()
847 }
848
849 #[must_use]
851 pub fn index_advisor(&self) -> &crate::index_advisor::IndexAdvisor {
852 self.graph_runtime.index_advisor()
853 }
854
855 pub fn set_event_log(&self, log: Arc<EventLog>) {
861 self.event_runtime.set_event_log(log);
862 }
863
864 #[must_use]
866 pub fn event_log(&self) -> Option<Arc<EventLog>> {
867 self.event_runtime.event_log()
868 }
869
870 #[must_use]
872 pub fn persistent_graph(&self) -> &PersistentGraph {
873 self.graph_runtime.persistent_graph()
874 }
875
876 #[must_use]
881 pub fn graph_store(&self) -> &dyn crate::graph_store::GraphStore {
882 self.graph_runtime.graph_store()
883 }
884
885 #[must_use]
887 pub fn cached_graph(&self) -> &crate::cached_graph_store::CachedGraphStore {
888 self.graph_runtime.cached_graph()
889 }
890
891 pub fn set_admission_pipeline(&mut self, pipeline: crate::admission::AdmissionPipeline) {
896 self.admission_runtime.set_pipeline(pipeline);
897 }
898
899 pub fn setup_default_admission_pipeline(&mut self) {
904 self.admission_runtime.setup_default_pipeline(
905 &self.config,
906 self.storage_runtime.storage_arc(),
907 self.provider_runtime.tokenizer(),
908 );
909 }
910
911 #[must_use]
913 pub fn admission_pipeline(&self) -> Option<&crate::admission::AdmissionPipeline> {
914 self.admission_runtime.admission_pipeline()
915 }
916
917 pub fn set_policy_engine(&mut self, engine: PolicyEngine) {
922 self.policy_runtime.set_engine(engine);
923 self.invalidate_plan_cache();
927 }
928
929 #[must_use]
931 pub fn policy_engine(&self) -> Option<&PolicyEngine> {
932 self.policy_runtime.engine()
933 }
934
935 pub(crate) async fn enforce(
944 &self,
945 agent_id: &str,
946 action: Action,
947 realm: &str,
948 namespace: &str,
949 ) -> HirnResult<()> {
950 let Some(decision) = self
951 .policy_runtime
952 .authorize(agent_id, action, realm, namespace)
953 else {
954 return Ok(());
955 };
956
957 self.emit_in_realm(realm, namespace, agent_id, decision.audit_event)
958 .await;
959
960 if let Some(err) = decision.denial_error {
961 Err(err)
962 } else {
963 Ok(())
964 }
965 }
966
967 pub(crate) fn is_action_allowed(
970 &self,
971 agent_id: &str,
972 action: Action,
973 realm: &str,
974 namespace: &str,
975 ) -> bool {
976 self.policy_runtime
977 .is_action_allowed(agent_id, action, realm, namespace)
978 }
979
980 pub(crate) fn can_read_raw_content(&self, agent_id: &str, record: &MemoryRecord) -> bool {
981 self.is_action_allowed(
982 agent_id,
983 Action::RecallRawText,
984 &self.config.default_realm,
985 record.effective_namespace().as_str(),
986 )
987 }
988
989 async fn collect_resource_evidence_summaries(
990 &self,
991 record: &MemoryRecord,
992 agent_id: &str,
993 cache: &mut HashMap<ResourceId, CachedResourceEvidence>,
994 ) -> HirnResult<Vec<ResourceEvidenceSummary>> {
995 let evidence_links = Self::record_evidence_links(record);
996 if evidence_links.is_empty() {
997 return Ok(Vec::new());
998 }
999
1000 let can_hydrate_full = self.can_read_raw_content(agent_id, record);
1001 let can_hydrate_preview = self.is_action_allowed(
1002 agent_id,
1003 Action::Recall,
1004 &self.config.default_realm,
1005 record.effective_namespace().as_str(),
1006 );
1007 let mut summaries = Vec::with_capacity(evidence_links.len());
1008
1009 for link in evidence_links {
1010 let cached = if let Some(existing) = cache.get(&link.resource_id) {
1011 existing.clone()
1012 } else {
1013 let resource = hirn_storage::get_resource(self.storage_backend(), link.resource_id)
1014 .await
1015 .map_err(HirnError::storage)?;
1016 let artifacts =
1017 hirn_storage::list_derived_artifacts(self.storage_backend(), link.resource_id)
1018 .await
1019 .map_err(HirnError::storage)?;
1020
1021 let mut available_artifacts: Vec<DerivedArtifactKind> = artifacts
1022 .iter()
1023 .filter(|artifact| artifact.kind != DerivedArtifactKind::GenerationFailure)
1024 .map(|artifact| artifact.kind)
1025 .collect();
1026 available_artifacts.sort_by_key(|kind| kind.as_str());
1027 available_artifacts.dedup_by_key(|kind| kind.as_str());
1028
1029 let cached = CachedResourceEvidence {
1030 lifecycle_state: resource
1031 .as_ref()
1032 .map_or(ResourceGovernanceState::Active, |resource| {
1033 resource.governance_state
1034 }),
1035 modality: resource.as_ref().map(|resource| resource.modality),
1036 mime_type: resource
1037 .as_ref()
1038 .and_then(|resource| resource.mime_type.clone()),
1039 display_name: resource
1040 .as_ref()
1041 .and_then(|resource| resource.display_name.clone()),
1042 artifact_kinds_by_id: artifacts
1043 .iter()
1044 .map(|artifact| (artifact.id, artifact.kind))
1045 .collect(),
1046 has_preview: available_artifacts.iter().any(|kind| kind.is_previewable()),
1047 has_full_payload: resource.as_ref().is_some_and(|resource| {
1048 matches!(resource.location, ResourceLocation::Blob { .. })
1049 && !resource.governance_state.hides_payload()
1050 }),
1051 available_artifacts,
1052 };
1053 cache.insert(link.resource_id, cached.clone());
1054 cached
1055 };
1056
1057 let artifact_kind = link
1058 .artifact_id
1059 .and_then(|artifact_id| cached.artifact_kinds_by_id.get(&artifact_id).copied());
1060 let has_preview =
1061 artifact_kind.map_or(cached.has_preview, DerivedArtifactKind::is_previewable);
1062 let available_artifacts = artifact_kind
1063 .map(|kind| vec![kind])
1064 .unwrap_or_else(|| cached.available_artifacts.clone());
1065 let can_hydrate_preview =
1066 link.artifact_id.is_none() && has_preview && can_hydrate_preview;
1067 let can_hydrate_full =
1068 link.artifact_id.is_none() && cached.has_full_payload && can_hydrate_full;
1069
1070 summaries.push(ResourceEvidenceSummary {
1071 resource_id: link.resource_id,
1072 role: link.role,
1073 provenance: link.provenance,
1074 artifact_id: link.artifact_id,
1075 artifact_kind,
1076 lifecycle_state: cached.lifecycle_state,
1077 modality: cached.modality,
1078 mime_type: cached.mime_type.clone(),
1079 display_name: cached.display_name.clone(),
1080 available_artifacts,
1081 has_preview,
1082 can_hydrate_preview,
1083 can_hydrate_full,
1084 });
1085 }
1086
1087 Ok(summaries)
1088 }
1089
1090 pub(crate) async fn resource_evidence_summaries_for_record(
1091 &self,
1092 record: &MemoryRecord,
1093 agent_id: &str,
1094 ) -> HirnResult<Vec<ResourceEvidenceSummary>> {
1095 let mut cache = HashMap::new();
1096 self.collect_resource_evidence_summaries(record, agent_id, &mut cache)
1097 .await
1098 }
1099
1100 pub(crate) async fn attach_resource_evidence_summaries(
1101 &self,
1102 results: &mut [RecallResult],
1103 agent_id: &str,
1104 ) -> HirnResult<()> {
1105 let mut cache: HashMap<ResourceId, CachedResourceEvidence> = HashMap::new();
1106
1107 for result in results {
1108 result.resource_evidence = self
1109 .collect_resource_evidence_summaries(&result.record, agent_id, &mut cache)
1110 .await?;
1111 }
1112
1113 Ok(())
1114 }
1115
1116 pub(crate) async fn attach_resource_evidence_summaries_to_scored_memories(
1117 &self,
1118 scored: &mut [ScoredMemory],
1119 agent_id: &str,
1120 ) -> HirnResult<()> {
1121 let mut cache: HashMap<ResourceId, CachedResourceEvidence> = HashMap::new();
1122
1123 for scored_memory in scored {
1124 scored_memory.resource_evidence = self
1125 .collect_resource_evidence_summaries(&scored_memory.record, agent_id, &mut cache)
1126 .await?;
1127 }
1128
1129 Ok(())
1130 }
1131
1132 fn record_evidence_links(record: &MemoryRecord) -> &[EvidenceLink] {
1133 match record {
1134 MemoryRecord::Working(_) => &[],
1135 MemoryRecord::Episodic(record) => &record.provenance.evidence_links,
1136 MemoryRecord::Semantic(record) => &record.provenance.evidence_links,
1137 MemoryRecord::Procedural(record) => &record.provenance.evidence_links,
1138 }
1139 }
1140
1141 #[must_use]
1143 pub fn storage_backend(&self) -> &dyn PhysicalStore {
1144 self.storage_runtime.storage_backend()
1145 }
1146
1147 #[must_use]
1152 pub fn storage_arc(&self) -> Arc<dyn PhysicalStore> {
1153 self.storage_runtime.storage_arc()
1154 }
1155
1156 pub async fn apply_resource_retention_policy(
1158 &self,
1159 policy: &hirn_core::ResourceRetentionPolicy,
1160 ) -> HirnResult<hirn_storage::ResourceRetentionApplyResult> {
1161 hirn_storage::apply_resource_retention_policy(self.storage_backend(), policy)
1162 .await
1163 .map_err(HirnError::storage)
1164 }
1165
1166 pub async fn apply_configured_resource_retention(
1168 &self,
1169 ) -> HirnResult<hirn_storage::ResourceRetentionApplyResult> {
1170 self.apply_resource_retention_policy(&self.config.resource_retention_policy)
1171 .await
1172 }
1173
1174 #[must_use]
1175 pub(crate) fn semantic_head_cache_get(
1176 &self,
1177 logical_memory_id: LogicalMemoryId,
1178 ) -> Option<SemanticRecord> {
1179 self.storage_runtime.cached_semantic_head(logical_memory_id)
1180 }
1181
1182 #[must_use]
1183 pub(crate) fn episodic_head_cache_get(
1184 &self,
1185 logical_memory_id: LogicalMemoryId,
1186 ) -> Option<EpisodicRecord> {
1187 self.storage_runtime.cached_episodic_head(logical_memory_id)
1188 }
1189
1190 pub(crate) async fn resolve_active_episodic_head(
1191 &self,
1192 id: MemoryId,
1193 ) -> HirnResult<EpisodicRecord> {
1194 let record = self.get_episode(id).await?;
1195 self.episodic_head_for_logical_id(record.logical_memory_id)
1196 .await
1197 }
1198
1199 pub(crate) fn semantic_head_cache_put(&self, record: SemanticRecord) {
1200 self.storage_runtime.cache_semantic_head(record);
1201 }
1202
1203 pub(crate) fn episodic_head_cache_put(&self, record: EpisodicRecord) {
1204 self.storage_runtime.cache_episodic_head(record);
1205 }
1206
1207 pub(crate) fn semantic_head_cache_evict(&self, logical_memory_id: LogicalMemoryId) {
1208 self.storage_runtime.evict_semantic_head(logical_memory_id);
1209 }
1210
1211 pub(crate) fn episodic_head_cache_evict(&self, logical_memory_id: LogicalMemoryId) {
1212 self.storage_runtime.evict_episodic_head(logical_memory_id);
1213 }
1214
1215 #[must_use]
1216 pub(crate) fn semantic_head_cache_snapshot(
1217 &self,
1218 ) -> std::collections::HashMap<LogicalMemoryId, SemanticRecord> {
1219 self.storage_runtime.cached_semantic_heads_snapshot()
1220 }
1221
1222 pub(crate) fn semantic_head_cache_replace(
1223 &self,
1224 records: impl IntoIterator<Item = SemanticRecord>,
1225 ) {
1226 self.storage_runtime.replace_semantic_heads(records);
1227 }
1228
1229 #[must_use]
1230 pub(crate) fn file_size_bytes(&self) -> u64 {
1231 self.storage_runtime.file_size_bytes()
1232 }
1233
1234 #[must_use]
1236 pub fn embedder(&self) -> Option<Arc<dyn Embedder>> {
1237 self.provider_runtime.embedder()
1238 }
1239
1240 pub async fn embed_text(&self, text: &str) -> HirnResult<Vec<f32>> {
1243 self.provider_runtime.embed_text(text).await
1244 }
1245
1246 pub async fn embed_content(
1250 &self,
1251 content: &hirn_core::content::MemoryContent,
1252 ) -> HirnResult<Vec<f32>> {
1253 self.provider_runtime.embed_content(content).await
1254 }
1255
1256 pub(crate) async fn extract_and_store_resources(
1259 &self,
1260 namespace: hirn_core::types::Namespace,
1261 owner_agent_id: AgentId,
1262 content: &hirn_core::content::MemoryContent,
1263 ) -> HirnResult<crate::db::storage_runtime::ExtractedResources> {
1264 self.storage_runtime
1265 .extract_and_store_resources(namespace, owner_agent_id, content)
1266 .await
1267 }
1268
1269 async fn enforce_resource_fetch(
1270 &self,
1271 actor_id: &AgentId,
1272 namespace: Namespace,
1273 hydration_mode: HydrationMode,
1274 ) -> HirnResult<()> {
1275 self.enforce(
1276 actor_id.as_str(),
1277 Action::Recall,
1278 &self.config.default_realm,
1279 namespace.as_str(),
1280 )
1281 .await?;
1282
1283 if matches!(hydration_mode, HydrationMode::Full) {
1284 self.enforce(
1285 actor_id.as_str(),
1286 Action::RecallRawText,
1287 &self.config.default_realm,
1288 namespace.as_str(),
1289 )
1290 .await
1291 } else {
1292 Ok(())
1293 }
1294 }
1295
1296 pub async fn fetch_resource(
1300 &self,
1301 actor_id: &AgentId,
1302 resource_id: ResourceId,
1303 hydration_mode: HydrationMode,
1304 ) -> HirnResult<Option<hirn_storage::HydratedResource>> {
1305 let Some(resource) = hirn_storage::get_resource(self.storage_backend(), resource_id)
1306 .await
1307 .map_err(HirnError::storage)?
1308 else {
1309 return Ok(None);
1310 };
1311
1312 self.enforce_resource_fetch(actor_id, resource.namespace, hydration_mode)
1313 .await?;
1314
1315 hirn_storage::fetch_resource(self.storage_backend(), resource_id, hydration_mode)
1316 .await
1317 .map_err(HirnError::storage)
1318 }
1319
1320 async fn enforce_raw_resource_read(
1321 &self,
1322 actor_id: &AgentId,
1323 namespace: Namespace,
1324 ) -> HirnResult<()> {
1325 self.enforce_resource_fetch(actor_id, namespace, HydrationMode::Full)
1326 .await
1327 }
1328
1329 fn content_requires_resource_hydration(content: &hirn_core::content::MemoryContent) -> bool {
1330 match content {
1331 hirn_core::content::MemoryContent::Image { data, .. }
1332 | hirn_core::content::MemoryContent::Audio { data, .. }
1333 | hirn_core::content::MemoryContent::Video { data, .. }
1334 | hirn_core::content::MemoryContent::Document { data, .. } => data.is_empty(),
1335 hirn_core::content::MemoryContent::Code { source, .. } => source.is_empty(),
1336 hirn_core::content::MemoryContent::ToolOutput { output, .. } => output.is_empty(),
1337 hirn_core::content::MemoryContent::Structured { data, .. } => data.is_null(),
1338 hirn_core::content::MemoryContent::Composite(parts) => {
1339 parts.iter().any(Self::content_requires_resource_hydration)
1340 }
1341 _ => false,
1342 }
1343 }
1344
1345 pub async fn load_resource_blob(
1347 &self,
1348 actor_id: &AgentId,
1349 id: hirn_core::id::MemoryId,
1350 blob_index: u32,
1351 ) -> HirnResult<Vec<u8>> {
1352 let record = self.get_episode(id).await?;
1353 self.enforce_raw_resource_read(actor_id, record.namespace)
1354 .await?;
1355 self.storage_runtime
1356 .load_resource_blob(&record.provenance.evidence_links, blob_index)
1357 .await
1358 }
1359
1360 pub async fn hydrate_content_resources(
1363 &self,
1364 actor_id: &AgentId,
1365 namespace: Namespace,
1366 content: &hirn_core::content::MemoryContent,
1367 evidence_links: &[hirn_core::resource::EvidenceLink],
1368 ) -> HirnResult<hirn_core::content::MemoryContent> {
1369 if evidence_links.is_empty() || !Self::content_requires_resource_hydration(content) {
1370 return Ok(content.clone());
1371 }
1372
1373 self.enforce_raw_resource_read(actor_id, namespace).await?;
1374 self.storage_runtime
1375 .hydrate_content_resources(content, evidence_links)
1376 .await
1377 }
1378
1379 pub async fn get_episode_with_resources(
1382 &self,
1383 actor_id: &AgentId,
1384 id: hirn_core::id::MemoryId,
1385 ) -> HirnResult<hirn_core::episodic::EpisodicRecord> {
1386 let mut record = self.get_episode(id).await?;
1387 if let Some(ref mc) = record.multi_content {
1388 record.multi_content = Some(
1389 self.hydrate_content_resources(
1390 actor_id,
1391 record.namespace,
1392 mc,
1393 &record.provenance.evidence_links,
1394 )
1395 .await?,
1396 );
1397 }
1398 Ok(record)
1399 }
1400
1401 pub async fn close(&self) -> HirnResult<()> {
1409 self.offline_scheduler_runtime.shutdown().await;
1410 self.flush_hebbian().await?;
1411 self.flush_episodic_access().await?;
1412 self.flush_semantic_access().await?;
1413 self.flush_importance_accumulator().await?;
1414 Ok(())
1415 }
1416
1417 pub(crate) async fn flush_importance_accumulator(&self) -> HirnResult<()> {
1420 let ids = self.drain_importance_accumulator();
1421 if ids.is_empty() {
1422 return Ok(());
1423 }
1424 crate::consolidation::apply_retrieval_effects(self.storage_arc(), ids).await
1425 }
1426}
1427
1428impl Drop for HirnDB {
1431 fn drop(&mut self) {
1432 let flush = async {
1433 let _ = self.flush_hebbian().await;
1434 let _ = self.flush_episodic_access().await;
1435 let _ = self.flush_semantic_access().await;
1436 let _ = self.flush_importance_accumulator().await;
1437 };
1438
1439 if tokio::runtime::Handle::try_current().is_ok() {
1444 std::thread::scope(|s| {
1445 let _ = s
1446 .spawn(|| {
1447 if let Ok(rt) = tokio::runtime::Builder::new_current_thread()
1448 .enable_all()
1449 .build()
1450 {
1451 rt.block_on(flush);
1452 }
1453 })
1454 .join();
1455 });
1456 } else if let Ok(rt) = tokio::runtime::Builder::new_current_thread()
1457 .enable_all()
1458 .build()
1459 {
1460 rt.block_on(flush);
1461 }
1462 }
1463}
1464
1465#[cfg(test)]
1466mod tests {
1467 use super::*;
1468
1469 use hirn_core::resource::{
1470 DerivedArtifactIndexPolicy, DerivedArtifactIndexRule, DerivedArtifactKind, ModalityProfile,
1471 ResourceIndexPolicy, ResourceIndexRule, SecondaryIndexType,
1472 };
1473 use hirn_storage::memory_store::MemoryStore;
1474 use hirn_storage::store::IndexType;
1475
1476 #[tokio::test(flavor = "multi_thread")]
1477 async fn open_with_config_bootstraps_storage_datasets_and_indices() {
1478 let dir = tempfile::tempdir().unwrap();
1479 let store = Arc::new(MemoryStore::new());
1480 let storage: Arc<dyn PhysicalStore> = store.clone();
1481 let config = HirnConfig::builder()
1482 .db_path(dir.path())
1483 .embedding_dimensions(32)
1484 .resource_index_policy(
1485 ResourceIndexPolicy::default().with_rule(
1486 ResourceIndexRule::new(ModalityProfile::Document, SecondaryIndexType::Bitmap)
1487 .with_column("mime_type"),
1488 ),
1489 )
1490 .derived_artifact_index_policy(
1491 DerivedArtifactIndexPolicy::default().with_rule(
1492 DerivedArtifactIndexRule::new(
1493 DerivedArtifactKind::Transcript,
1494 SecondaryIndexType::Bitmap,
1495 )
1496 .with_column("modality"),
1497 ),
1498 )
1499 .build()
1500 .unwrap();
1501
1502 let _db = HirnDB::open_with_config(config, storage).await.unwrap();
1503
1504 assert!(
1505 store
1506 .exists(hirn_storage::datasets::resource_object::DATASET_NAME)
1507 .await
1508 .unwrap()
1509 );
1510 assert!(
1511 store
1512 .exists(hirn_storage::datasets::derived_artifact::DATASET_NAME)
1513 .await
1514 .unwrap()
1515 );
1516 assert!(store.index_configs("resources").iter().any(|config| {
1517 config.columns == vec!["modality".to_string(), "mime_type".to_string()]
1518 && config.index_type == IndexType::Bitmap
1519 }));
1520 assert!(
1521 store
1522 .index_configs("derived_artifacts")
1523 .iter()
1524 .any(|config| {
1525 config.columns == vec!["kind".to_string(), "modality".to_string()]
1526 && config.index_type == IndexType::Bitmap
1527 })
1528 );
1529 }
1530}