Skip to main content

hirn_engine/db/
mod.rs

1mod admission_runtime;
2mod cross_agent;
3mod episodic;
4mod event_runtime;
5mod graph_ops;
6mod graph_runtime;
7mod mutation_contract;
8mod namespace;
9mod namespace_runtime;
10mod offline_scheduler_runtime;
11mod persistence;
12mod policy_runtime;
13mod procedural;
14mod provider_runtime;
15mod query_exec;
16mod query_runtime;
17mod recall_exec;
18mod semantic;
19mod services;
20mod storage_runtime;
21mod working;
22pub mod write_path;
23mod write_runtime;
24
25pub use cross_agent::PurgeReport;
26pub use graph_runtime::PrefetchStats;
27pub use mutation_contract::{
28    MutationWriteContract, MutationWriteGuarantee, mutation_write_contracts,
29};
30pub use services::{
31    AdminView, CausalView, EpisodicView, GraphView, NamespaceView, PolicyView, ProceduralView,
32    QueryView, RecallView, SemanticView, WorkingView,
33};
34
35use std::collections::HashMap;
36use std::path::Path;
37use std::sync::Arc;
38
39use hirn_core::embed::{Embedder, Reranker};
40use hirn_core::episodic::EpisodicRecord;
41use hirn_core::id::MemoryId;
42use hirn_core::metadata::Metadata;
43use hirn_core::procedural::ProceduralRecord;
44use hirn_core::provenance::Mutation;
45use hirn_core::record::MemoryRecord;
46use hirn_core::resource::{
47    DerivedArtifactId, DerivedArtifactKind, EvidenceLink, HydrationMode, ModalityProfile,
48    ResourceGovernanceState, ResourceId, ResourceLocation,
49};
50use hirn_core::revision::LogicalMemoryId;
51use hirn_core::semantic::SemanticRecord;
52use hirn_core::timestamp::Timestamp;
53use hirn_core::tokenizer::Tokenizer;
54use hirn_core::types::{AgentId, EdgeRelation, EventType, Layer, MutationTrigger, Namespace};
55use hirn_core::working::WorkingMemoryEntry;
56use hirn_core::{HirnConfig, HirnError, HirnResult};
57
58use hirn_storage::PhysicalStore;
59
60use crate::activation::{ActivationConfig, ActivationMode};
61use crate::error::StoreError;
62use crate::event_log::EventLog;
63use crate::graph_store::GraphStore;
64use crate::hebbian::HebbianConfig;
65use crate::persistent_graph::PersistentGraph;
66use crate::recall::{LayerFilter, RecallBuilder, RecallResult, ResourceEvidenceSummary};
67use crate::scoring::{self, ScoringWeights};
68
69use crate::event::MemoryEvent;
70use crate::policy::{Action, PolicyEngine};
71use crate::ql::results::ScoredMemory;
72use admission_runtime::AdmissionRuntime;
73use event_runtime::EventRuntime;
74use graph_runtime::GraphRuntime;
75use namespace_runtime::NamespaceRuntime;
76use offline_scheduler_runtime::OfflineSchedulerRuntime;
77use policy_runtime::PolicyRuntime;
78use provider_runtime::ProviderRuntime;
79use query_runtime::QueryRuntime;
80use storage_runtime::StorageRuntime;
81use write_runtime::WriteRuntime;
82
83/// Database statistics.
84#[derive(Debug, Clone, PartialEq, Eq)]
85pub struct DbStats {
86    pub working_count: u64,
87    pub episodic_count: u64,
88    pub semantic_count: u64,
89    pub procedural_count: u64,
90    pub total_count: u64,
91    pub edge_count: u64,
92    pub file_size_bytes: u64,
93}
94
95/// Layer counts.
96#[derive(Debug, Clone, PartialEq, Eq)]
97pub struct LayerCounts {
98    pub working: u64,
99    pub episodic: u64,
100    pub semantic: u64,
101    pub procedural: u64,
102    pub total: u64,
103}
104
105#[derive(Debug, Clone, Default)]
106struct CachedResourceEvidence {
107    lifecycle_state: ResourceGovernanceState,
108    modality: Option<ModalityProfile>,
109    mime_type: Option<String>,
110    display_name: Option<String>,
111    available_artifacts: Vec<DerivedArtifactKind>,
112    artifact_kinds_by_id: HashMap<DerivedArtifactId, DerivedArtifactKind>,
113    has_preview: bool,
114    has_full_payload: bool,
115}
116
117/// Filter for listing episodic records.
118#[derive(Debug, Default)]
119pub struct EpisodicFilter {
120    pub event_type: Option<EventType>,
121    pub after: Option<Timestamp>,
122    pub before: Option<Timestamp>,
123    pub min_importance: Option<f32>,
124    pub entity_name: Option<String>,
125    pub namespace: Option<Namespace>,
126    pub include_archived: bool,
127    pub limit: Option<usize>,
128    pub offset: Option<usize>,
129    /// Bi-temporal valid-time filter. When set, only records whose validity
130    /// period covers this timestamp are returned:
131    ///   `timestamp <= valid_at AND (valid_until IS NULL OR valid_until > valid_at)`
132    ///
133    /// Distinct from `after`/`before` which filter on `timestamp` (event occurrence
134    /// time) without regard to validity period.
135    pub valid_at: Option<Timestamp>,
136}
137
138/// Filter for listing semantic records.
139#[derive(Debug, Default)]
140pub struct SemanticFilter {
141    pub knowledge_type: Option<hirn_core::types::KnowledgeType>,
142    pub min_confidence: Option<f32>,
143    pub namespace: Option<Namespace>,
144    pub limit: Option<usize>,
145}
146
147/// Result of cross-agent consolidation.
148#[derive(Debug)]
149pub struct CrossAgentConsolidationResult {
150    /// Number of concept groups that were merged.
151    pub merged_count: usize,
152    /// Number of contradiction edges created.
153    pub contradiction_count: usize,
154    /// IDs of the active merged revisions.
155    pub merged_ids: Vec<MemoryId>,
156    /// Pairs of records connected with Contradicts edges.
157    pub contradiction_pairs: Vec<(MemoryId, MemoryId)>,
158}
159
160/// Describes updates to apply to a semantic record.
161#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
162pub struct SemanticUpdate {
163    pub description: Option<String>,
164    pub confidence: Option<f32>,
165    pub evidence_count: Option<u32>,
166    pub reason: Option<String>,
167    pub actor_id: AgentId,
168    pub observed_at: Option<Timestamp>,
169    pub causation_id: MemoryId,
170}
171
172impl SemanticUpdate {
173    #[must_use]
174    pub fn with_metadata(actor_id: AgentId, causation_id: MemoryId) -> Self {
175        Self {
176            description: None,
177            confidence: None,
178            evidence_count: None,
179            reason: None,
180            actor_id,
181            observed_at: None,
182            causation_id,
183        }
184    }
185}
186
187/// Describes replacement metadata for superseding a semantic record.
188#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
189pub struct SemanticSupersession {
190    pub description: Option<String>,
191    pub confidence: Option<f32>,
192    pub evidence_count: Option<u32>,
193    pub reason: Option<String>,
194    pub actor_id: AgentId,
195    pub observed_at: Option<Timestamp>,
196    pub causation_id: MemoryId,
197}
198
199impl SemanticSupersession {
200    #[must_use]
201    pub fn with_metadata(actor_id: AgentId, causation_id: MemoryId) -> Self {
202        Self {
203            description: None,
204            confidence: None,
205            evidence_count: None,
206            reason: None,
207            actor_id,
208            observed_at: None,
209            causation_id,
210        }
211    }
212}
213
214/// Describes a durable human/admin override that selects a semantic revision head.
215#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
216pub struct SemanticOverride {
217    pub description: Option<String>,
218    pub confidence: Option<f32>,
219    pub evidence_count: Option<u32>,
220    pub reason: Option<String>,
221    pub actor_id: AgentId,
222    pub observed_at: Option<Timestamp>,
223    pub causation_id: MemoryId,
224}
225
226impl SemanticOverride {
227    #[must_use]
228    pub fn with_metadata(actor_id: AgentId, causation_id: MemoryId) -> Self {
229        Self {
230            description: None,
231            confidence: None,
232            evidence_count: None,
233            reason: None,
234            actor_id,
235            observed_at: None,
236            causation_id,
237        }
238    }
239}
240
241/// Describes how one active semantic memory should absorb other logical memories.
242#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
243pub struct SemanticMerge {
244    pub source_ids: Vec<MemoryId>,
245    pub description: Option<String>,
246    pub confidence: Option<f32>,
247    pub evidence_count: Option<u32>,
248    pub reason: Option<String>,
249    pub actor_id: AgentId,
250    pub observed_at: Option<Timestamp>,
251    pub causation_id: MemoryId,
252}
253
254impl SemanticMerge {
255    #[must_use]
256    pub fn with_metadata(actor_id: AgentId, causation_id: MemoryId) -> Self {
257        Self {
258            source_ids: Vec::new(),
259            description: None,
260            confidence: None,
261            evidence_count: None,
262            reason: None,
263            actor_id,
264            observed_at: None,
265            causation_id,
266        }
267    }
268}
269
270/// Result of merging one or more semantic logical memories into a target chain.
271#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
272pub struct SemanticMergeOutcome {
273    pub target: SemanticRecord,
274    pub merged_sources: Vec<SemanticRecord>,
275}
276
277impl From<SemanticSupersession> for SemanticUpdate {
278    fn from(value: SemanticSupersession) -> Self {
279        Self {
280            description: value.description,
281            confidence: value.confidence,
282            evidence_count: value.evidence_count,
283            reason: value.reason,
284            actor_id: value.actor_id,
285            observed_at: value.observed_at,
286            causation_id: value.causation_id,
287        }
288    }
289}
290
291impl From<SemanticUpdate> for SemanticSupersession {
292    fn from(value: SemanticUpdate) -> Self {
293        Self {
294            description: value.description,
295            confidence: value.confidence,
296            evidence_count: value.evidence_count,
297            reason: value.reason,
298            actor_id: value.actor_id,
299            observed_at: value.observed_at,
300            causation_id: value.causation_id,
301        }
302    }
303}
304
305/// Describes metadata for retracting a semantic record.
306#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
307pub struct SemanticRetraction {
308    pub reason: Option<String>,
309    pub actor_id: AgentId,
310    pub observed_at: Option<Timestamp>,
311    pub causation_id: MemoryId,
312}
313
314impl SemanticRetraction {
315    #[must_use]
316    pub fn with_metadata(actor_id: AgentId, causation_id: MemoryId) -> Self {
317        Self {
318            reason: None,
319            actor_id,
320            observed_at: None,
321            causation_id,
322        }
323    }
324}
325
326/// The main database handle.
327pub struct HirnDB {
328    /// Immutable configuration snapshot captured at `open()` time.
329    /// Drives all defaults: embedding dimensions, scoring weights, tier thresholds,
330    /// RPE fast-path, prospective indexing templates, and more.
331    config: HirnConfig,
332    /// Storage runtime: backend handle, db path, FTS/index admin, and blob IO.
333    storage_runtime: StorageRuntime,
334    /// Admission control and corruption-defense runtime.
335    admission_runtime: AdmissionRuntime,
336    /// Event subscription and durable event-log runtime.
337    event_runtime: EventRuntime,
338    /// Active provider handles for embedding, reranking, and tokenization.
339    provider_runtime: ProviderRuntime,
340    /// Graph runtime: hot/cold graph store plus graph-adjacent mutable state
341    /// used by recall assembly, consolidation, and semantic buffering.
342    graph_runtime: GraphRuntime,
343    /// Policy runtime: Cedar engine handle plus authorization and audit helpers.
344    policy_runtime: PolicyRuntime,
345    /// Query execution runtime: DataFusion session, HirnQL pipeline, and plan cache.
346    query_runtime: QueryRuntime,
347    /// Write-path runtime: TemporalNext sequencing, interference tracking,
348    /// partitioned RPE state, and pending embed retries.
349    write_runtime: WriteRuntime,
350    /// Offline scheduler runtime: budgeted queued cognition jobs.
351    offline_scheduler_runtime: OfflineSchedulerRuntime,
352    /// Namespace runtime: cached agent records and namespace access scopes.
353    namespace_runtime: NamespaceRuntime,
354    /// Runtime-mutable tier transition policy.
355    /// Initialized from `HirnConfig` at startup, updated via `SET TIER_POLICY`.
356    tier_policy: parking_lot::RwLock<hirn_core::TierPolicy>,
357}
358
359impl HirnDB {
360    // ── Lifecycle ────────────────────────────────────────────────────────
361
362    /// Open or create a database at the given path with the given storage backend.
363    pub async fn open(path: impl AsRef<Path>, storage: Arc<dyn PhysicalStore>) -> HirnResult<Self> {
364        let config = HirnConfig::builder().db_path(path.as_ref()).build()?;
365        Self::open_with_config(config, storage).await
366    }
367
368    /// Open or create a database with the given configuration and storage backend.
369    ///
370    /// All data is stored exclusively in LanceDB via the `PhysicalStore`.
371    /// On startup, the in-memory namespace index is rebuilt from stored records.
372    pub async fn open_with_config(
373        config: HirnConfig,
374        storage: Arc<dyn PhysicalStore>,
375    ) -> HirnResult<Self> {
376        config.validate()?;
377
378        let path = config.db_path.clone();
379
380        // Ensure the db_path directory exists on disk.
381        std::fs::create_dir_all(&path).map_err(|e| HirnError::StorageError(Box::new(e)))?;
382
383        hirn_storage::HirnDb::from_store(storage.clone())
384            .ensure_datasets_with_config(config.embedding_dimensions.as_usize(), Some(&config))
385            .await
386            .map_err(HirnError::storage)?;
387
388        // Ensure the `shared` default namespace exists in LanceDB.
389        {
390            let ns_name = hirn_core::types::Namespace::shared();
391            let filter = format!("id = '{}'", ns_name.as_str());
392            let count = storage
393                .count(
394                    hirn_storage::datasets::namespace::DATASET_NAME,
395                    Some(&filter),
396                )
397                .await
398                .unwrap_or(0);
399            if count == 0 {
400                let rec = hirn_core::namespace::NamespaceRecord::shared();
401                let batch = hirn_storage::datasets::namespace::to_batch(std::slice::from_ref(&rec))
402                    .map_err(|e| HirnError::storage(e))?;
403                storage
404                    .append(hirn_storage::datasets::namespace::DATASET_NAME, batch)
405                    .await
406                    .map_err(|e| HirnError::storage(e))?;
407            }
408        }
409        let admission_runtime = AdmissionRuntime::new();
410        let graph_runtime = GraphRuntime::new(storage.clone());
411        let policy_runtime = PolicyRuntime::new(storage.clone());
412        let provider_runtime = ProviderRuntime::new(config.embedding_dimensions.as_usize());
413        let query_runtime = QueryRuntime::new(
414            graph_runtime.cached_graph(),
415            &config,
416            storage.clone(),
417            provider_runtime.tokenizer(),
418        )?;
419        let storage_runtime =
420            StorageRuntime::new(path, storage, config.resource_quota_policy.clone());
421        let event_runtime = EventRuntime::new();
422        let event_log = Arc::new(EventLog::open(storage_runtime.storage_arc()).await?);
423        event_runtime.set_event_log(event_log);
424        let write_runtime = WriteRuntime::new(config.default_realm.clone());
425        // Restore RPE population stats from the previous session so novelty
426        // calibration is not reset on every restart.
427        write_runtime.load_rpe_stats(storage_runtime.path());
428        let offline_scheduler_runtime = OfflineSchedulerRuntime::new(
429            config.offline_scheduler.clone(),
430            config.default_realm.clone(),
431            storage_runtime.storage_arc(),
432            config.conflict_resolution_policy,
433            config.conflict_resolution_overrides.clone(),
434            config.offline_dream_quality_threshold,
435            config.offline_reconcile_quality_threshold,
436            config.offline_plan_quality_threshold,
437            f64::from(config.memory_decay_factor),
438            config.decay_sweep_window_secs,
439        )
440        .await?;
441        let namespace_runtime = NamespaceRuntime::new();
442        let tier_policy = parking_lot::RwLock::new(hirn_core::TierPolicy::from_config(&config));
443
444        let db = Self {
445            config,
446            storage_runtime,
447            admission_runtime,
448            event_runtime,
449            provider_runtime,
450            graph_runtime,
451            policy_runtime,
452            query_runtime,
453            write_runtime,
454            offline_scheduler_runtime,
455            namespace_runtime,
456            tier_policy,
457        };
458
459        // Spawn resource-reconcile tasks as background work so they do not block
460        // open() on large stores.  Errors are logged; a subsequent open() will
461        // re-attempt reconciliation.
462        {
463            let storage = db.storage_runtime.storage_arc();
464            tokio::spawn(async move {
465                match hirn_storage::reconcile_resource_head_mutations(storage.as_ref()).await {
466                    Ok(n) if n > 0 => tracing::info!(
467                        reconciled = n,
468                        "background: reconciled resource-head mutations"
469                    ),
470                    Ok(_) => {}
471                    Err(error) => {
472                        tracing::warn!(%error, "background: resource-head reconcile failed");
473                    }
474                }
475            });
476        }
477        {
478            let storage = db.storage_runtime.storage_arc();
479            tokio::spawn(async move {
480                match hirn_storage::reconcile_pending_resource_blob_staging(storage.as_ref()).await
481                {
482                    Ok(n) if n > 0 => tracing::info!(
483                        reconciled = n,
484                        "background: reconciled pending resource blob staging records"
485                    ),
486                    Ok(_) => {}
487                    Err(error) => {
488                        tracing::warn!(
489                            %error,
490                            "background: resource blob staging reconcile failed"
491                        );
492                    }
493                }
494            });
495        }
496        db.cached_graph().load_from_cold().await?;
497        db.reconcile_pending_episode_mutations().await?;
498        db.reconcile_pending_semantic_create_mutations().await?;
499        db.reconcile_pending_semantic_successor_mutations().await?;
500        db.reconcile_pending_semantic_merge_mutations().await?;
501        db.reconcile_pending_semantic_contradiction_sync_mutations()
502            .await?;
503        db.reconcile_pending_semantic_retract_mutations().await?;
504        db.reconcile_pending_semantic_purge_mutations().await?;
505        db.reconcile_pending_procedural_create_mutations().await?;
506        db.reconcile_pending_procedural_successor_mutations()
507            .await?;
508        db.reconcile_pending_agent_register_mutations().await?;
509        db.reconcile_pending_namespace_delete_mutations().await?;
510        db.reconcile_pending_agent_deregister_mutations().await?;
511        db.hydrate_temporal_arrival_cursors().await?;
512        db.hydrate_working_l0_cache().await?;
513
514        Ok(db)
515    }
516
517    /// Get the config.
518    #[must_use]
519    pub const fn config(&self) -> &HirnConfig {
520        &self.config
521    }
522
523    fn rpe_model_id(&self) -> String {
524        self.provider_runtime.rpe_model_id()
525    }
526
527    /// Get a snapshot of the current tier policy.
528    #[must_use]
529    pub fn tier_policy(&self) -> hirn_core::TierPolicy {
530        self.tier_policy.read().clone()
531    }
532
533    /// Update the tier policy at runtime (used by `SET TIER_POLICY`).
534    pub fn set_tier_policy(&self, policy: hirn_core::TierPolicy) {
535        *self.tier_policy.write() = policy;
536    }
537
538    /// Get the database file path.
539    #[must_use]
540    pub fn path(&self) -> &Path {
541        self.storage_runtime.path()
542    }
543
544    /// Get the DataFusion `SessionContext` with scoring UDFs and
545    /// `HirnSessionExt` pre-registered.
546    #[must_use]
547    pub fn session(&self) -> &datafusion::prelude::SessionContext {
548        self.query_runtime.session()
549    }
550
551    /// Get the 7-stage query pipeline (stages 1–4 in `hirn-query`).
552    #[must_use]
553    pub fn query_pipeline(&self) -> &hirn_query::QueryPipeline {
554        self.query_runtime.query_pipeline()
555    }
556
557    /// Get the shared plan cache.
558    #[must_use]
559    pub fn plan_cache(&self) -> &Arc<hirn_query::PlanCache> {
560        self.query_runtime.plan_cache()
561    }
562
563    #[must_use]
564    pub(crate) fn write_runtime(&self) -> &WriteRuntime {
565        &self.write_runtime
566    }
567
568    /// Record that `ids` were retrieved in a recall and accumulate importance-
569    /// boost credits.  Returns IDs to flush via a batched `update_where` when
570    /// the accumulated count crosses the flush threshold; returns `None` when
571    /// below threshold (PERF-2: replaces one-`update_where`-per-recall).
572    pub(crate) fn record_importance_accesses(
573        &self,
574        ids: &[hirn_core::id::MemoryId],
575    ) -> Option<Vec<hirn_core::id::MemoryId>> {
576        self.write_runtime.record_importance_accesses(ids)
577    }
578
579    /// Drain all accumulated importance-boost credits unconditionally.
580    /// Called at consolidation and DB close.
581    pub(crate) fn drain_importance_accumulator(&self) -> Vec<hirn_core::id::MemoryId> {
582        self.write_runtime.drain_importance_accumulator()
583    }
584
585    #[must_use]
586    pub(crate) fn graph_runtime(&self) -> &GraphRuntime {
587        &self.graph_runtime
588    }
589
590    #[must_use]
591    pub(crate) fn policy_runtime(&self) -> &PolicyRuntime {
592        &self.policy_runtime
593    }
594
595    #[must_use]
596    pub(crate) fn admission_runtime(&self) -> &AdmissionRuntime {
597        &self.admission_runtime
598    }
599
600    #[must_use]
601    pub(crate) fn event_runtime(&self) -> &EventRuntime {
602        &self.event_runtime
603    }
604
605    #[must_use]
606    pub(crate) fn provider_runtime(&self) -> &ProviderRuntime {
607        &self.provider_runtime
608    }
609
610    #[must_use]
611    pub(crate) fn offline_scheduler_runtime(&self) -> &OfflineSchedulerRuntime {
612        &self.offline_scheduler_runtime
613    }
614
615    /// Get the reconsolidation tracker.
616    #[must_use]
617    pub fn reconsolidation_tracker(&self) -> &crate::consolidation::ReconsolidationTracker {
618        self.graph_runtime.reconsolidation_tracker()
619    }
620
621    /// F-058 FIX: Take the cached community result (leaving `None` in its place).
622    pub(crate) fn take_cached_community_result(
623        &self,
624    ) -> Option<crate::consolidation::CommunityResult> {
625        self.graph_runtime.take_cached_community_result()
626    }
627
628    /// F-058 FIX: Store a community result for incremental use next run.
629    pub(crate) fn set_cached_community_result(
630        &self,
631        result: crate::consolidation::CommunityResult,
632    ) {
633        self.graph_runtime.set_cached_community_result(result);
634    }
635
636    /// Set a custom embedding provider (F-39).
637    ///
638    /// When set, `RECALL`, `REMEMBER`, and `UPSERT SEMANTIC` will use this
639    /// embedder instead of the built-in pseudo-embedding hash. The provider is
640    /// wrapped in the default multimodal router so `multi_content` and
641    /// composite auto-embedding use the same configured runtime wrappers.
642    /// Also updates the `HirnSessionExt` in the DataFusion `SessionContext`
643    /// so that operators (e.g. `RpeScoreExec`, `ProspectiveIndexingExec`)
644    /// can access the embedder at execution time.
645    pub fn set_embedder(&self, embedder: Arc<dyn Embedder>) {
646        let embedder = provider_runtime::compose_embedder(
647            embedder,
648            self.storage_runtime.storage_arc(),
649            &self.config,
650        );
651        let embedder = self
652            .provider_runtime
653            .set_multimodal_embedder(Arc::new(hirn_provider::MultiModalEmbedder::new(embedder)));
654
655        // Re-register HirnSessionExt with the new embedder so DataFusion
656        // operators pick it up when executing HirnQL REMEMBER plans.
657        if let Err(e) = self.query_runtime.register_runtime_state(
658            self.graph_runtime.cached_graph(),
659            &self.config,
660            self.storage_runtime.storage_arc(),
661            Some(embedder),
662            self.provider_runtime.tokenizer(),
663        ) {
664            tracing::warn!(error = %e, "Failed to update HirnSessionExt with new embedder");
665        }
666    }
667
668    /// Set a modality-aware embedding provider chain.
669    ///
670    /// Each configured underlying embedder is wrapped through the standard
671    /// retry/cache/circuit-breaker/batching pipeline before being installed.
672    pub fn set_multimodal_embedder(&self, embedder: Arc<hirn_provider::MultiModalEmbedder>) {
673        let embedder = provider_runtime::compose_multimodal_embedder(
674            embedder,
675            self.storage_runtime.storage_arc(),
676            &self.config,
677        );
678        let embedder = self.provider_runtime.set_multimodal_embedder(embedder);
679
680        if let Err(e) = self.query_runtime.register_runtime_state(
681            self.graph_runtime.cached_graph(),
682            &self.config,
683            self.storage_runtime.storage_arc(),
684            Some(embedder),
685            self.provider_runtime.tokenizer(),
686        ) {
687            tracing::warn!(error = %e, "Failed to update HirnSessionExt with new multimodal embedder");
688        }
689    }
690
691    /// Set a multivector (ColBERT-style) embedder for late interaction search.
692    ///
693    /// When set and `config.multivector_enabled` is true, recall queries will
694    /// additionally compute MaxSim scores from token-level embeddings.
695    pub fn set_multivec_embedder(&self, embedder: Arc<dyn Embedder>) {
696        self.provider_runtime.set_multivec_embedder(embedder);
697    }
698
699    /// Set the tokenizer used for token-aware budgeting paths.
700    pub fn set_tokenizer(&self, tokenizer: Arc<dyn Tokenizer>) {
701        self.provider_runtime.set_tokenizer(tokenizer);
702        if let Err(e) = self.query_runtime.register_runtime_state(
703            self.graph_runtime.cached_graph(),
704            &self.config,
705            self.storage_runtime.storage_arc(),
706            self.provider_runtime.embedder_arc(),
707            self.provider_runtime.tokenizer(),
708        ) {
709            tracing::warn!(error = %e, "Failed to update HirnSessionExt with new tokenizer");
710        }
711    }
712
713    /// Get the tokenizer used by this database instance.
714    #[must_use]
715    pub fn tokenizer(&self) -> Arc<dyn Tokenizer> {
716        self.provider_runtime.tokenizer()
717    }
718
719    /// Number of memory IDs awaiting background embed retry.
720    pub fn pending_embed_count(&self) -> usize {
721        self.write_runtime.pending_embed_count()
722    }
723
724    /// Retry embedding for records that were stored without embeddings due to
725    /// provider failure. Call this after the embed provider recovers.
726    ///
727    /// Returns `(succeeded, failed)` counts. Failed items are requeued up to
728    /// `max_attempts` (default 3) with exponential backoff.
729    pub async fn retry_pending_embeds(&self) -> (usize, usize) {
730        let embedder = match self.provider_runtime.embedder_arc() {
731            Some(embedder) => embedder,
732            None => return (0, 0),
733        };
734
735        let pending = self.write_runtime.drain_pending_embeds();
736        if pending.is_empty() {
737            return (0, 0);
738        }
739
740        tracing::info!(count = pending.len(), "Retrying pending embeds");
741
742        let mut succeeded = 0usize;
743        let mut failed = Vec::new();
744
745        for item in pending {
746            match self.retry_single_embed(item.id, &*embedder).await {
747                Ok(()) => {
748                    succeeded += 1;
749                    tracing::debug!(id = %item.id, "Pending embed retry succeeded");
750                }
751                Err(e) => {
752                    tracing::warn!(id = %item.id, attempts = item.attempts + 1, error = %e, "Pending embed retry failed");
753                    failed.push(item);
754                }
755            }
756        }
757
758        let fail_count = failed.len();
759        self.write_runtime.requeue_failed_embeds(failed, 3);
760
761        (succeeded, fail_count)
762    }
763
764    /// Retry embedding for a single record: read → embed → write back.
765    async fn retry_single_embed(&self, id: MemoryId, embedder: &dyn Embedder) -> HirnResult<()> {
766        let mut record = self.read_episodic_record(id).await?;
767        if record.embedding.is_some() {
768            // Already has an embedding (e.g., concurrent retry succeeded).
769            return Ok(());
770        }
771
772        let text = if let Some(ref mc) = record.multi_content {
773            mc.text_for_embedding().to_string()
774        } else if !record.content.is_empty() {
775            record.content.clone()
776        } else {
777            return Err(HirnError::InvalidInput(
778                "no content available for embedding".into(),
779            ));
780        };
781
782        let embeddings = embedder.embed(&[text.as_str()]).await?;
783        if let Some(emb) = embeddings.into_iter().next() {
784            record.embedding = Some(emb.vector);
785            self.write_episodic_record(&record).await?;
786        }
787
788        Ok(())
789    }
790
791    /// Set a reranker for post-retrieval relevance reordering.
792    ///
793    /// When set and `query_text` is provided, recall results are reranked after
794    /// composite scoring. Use with `CohereReranker`, `LlmReranker`, or any
795    /// custom `Reranker` implementation.
796    pub fn set_reranker(&self, reranker: Arc<dyn Reranker>) {
797        self.provider_runtime.set_reranker(reranker);
798    }
799
800    /// Ensure FTS indexes exist on all LanceDB datasets.
801    ///
802    /// Creates full-text search indexes on the text columns of each dataset
803    /// (episodic → `content`, semantic → `description`, procedural → `description`).
804    /// Idempotent: only runs once per `HirnDB` instance. Subsequent calls are no-ops.
805    pub async fn ensure_fts_indexes(&self) -> HirnResult<()> {
806        self.storage_runtime.ensure_fts_indexes().await
807    }
808
809    /// Check whether FTS indexes have been created.
810    #[must_use]
811    pub fn fts_initialized(&self) -> bool {
812        self.storage_runtime.fts_initialized()
813    }
814
815    /// Create vector indexes on all embedding columns (episodic, semantic, procedural).
816    ///
817    /// Skips datasets that don't exist or have no rows. Uses `replace: false`
818    /// so existing indexes are kept.
819    pub async fn create_vector_indexes(
820        &self,
821        index_type: hirn_storage::store::IndexType,
822        params: Option<hirn_storage::store::IndexParams>,
823    ) -> HirnResult<()> {
824        self.storage_runtime
825            .create_vector_indexes(index_type, params)
826            .await
827    }
828
829    /// Rebuild vector indexes on all embedding columns (episodic, semantic, procedural).
830    ///
831    /// Same as [`create_vector_indexes`](Self::create_vector_indexes) but with
832    /// `replace: true`, so any existing vector index is dropped and recreated.
833    pub async fn rebuild_vector_indexes(
834        &self,
835        index_type: hirn_storage::store::IndexType,
836        params: Option<hirn_storage::store::IndexParams>,
837    ) -> HirnResult<()> {
838        self.storage_runtime
839            .rebuild_vector_indexes(index_type, params)
840            .await
841    }
842
843    /// Get a snapshot of prefetch statistics.
844    #[must_use]
845    pub fn prefetch_stats(&self) -> PrefetchStats {
846        self.graph_runtime.prefetch_stats()
847    }
848
849    /// Get a reference to the index advisor for query pattern analysis.
850    #[must_use]
851    pub fn index_advisor(&self) -> &crate::index_advisor::IndexAdvisor {
852        self.graph_runtime.index_advisor()
853    }
854
855    /// Enable event sourcing by attaching an [`EventLog`].
856    ///
857    /// Once set, every mutation (`remember`, `archive`, `store_semantic`, etc.)
858    /// will be appended to the durable event log in addition to the in-memory
859    /// broadcast channel.
860    pub fn set_event_log(&self, log: Arc<EventLog>) {
861        self.event_runtime.set_event_log(log);
862    }
863
864    /// Get a reference to the event log, if event sourcing is enabled.
865    #[must_use]
866    pub fn event_log(&self) -> Option<Arc<EventLog>> {
867        self.event_runtime.event_log()
868    }
869
870    /// Get a reference to the persistent graph (cold tier).
871    #[must_use]
872    pub fn persistent_graph(&self) -> &PersistentGraph {
873        self.graph_runtime.persistent_graph()
874    }
875
876    /// Get a unified graph store reference.
877    ///
878    /// Returns the `CachedGraphStore` as `&dyn GraphStore` — reads use the
879    /// hot tier (sub-ms), writes are write-through to both tiers.
880    #[must_use]
881    pub fn graph_store(&self) -> &dyn crate::graph_store::GraphStore {
882        self.graph_runtime.graph_store()
883    }
884
885    /// Get a reference to the two-tier cached graph store.
886    #[must_use]
887    pub fn cached_graph(&self) -> &crate::cached_graph_store::CachedGraphStore {
888        self.graph_runtime.cached_graph()
889    }
890
891    /// Set the admission control pipeline.
892    ///
893    /// When configured, `remember()` runs candidates through the pipeline
894    /// before materializing them. Rejected candidates return an error.
895    pub fn set_admission_pipeline(&mut self, pipeline: crate::admission::AdmissionPipeline) {
896        self.admission_runtime.set_pipeline(pipeline);
897    }
898
899    /// Build and set the default admission pipeline from config.
900    ///
901    /// Default order: [SurpriseGate, DuplicateDetector, TokenBudgetGate, RateLimiter].
902    /// Only sets the pipeline if `config.admission_enabled` is true.
903    pub fn setup_default_admission_pipeline(&mut self) {
904        self.admission_runtime.setup_default_pipeline(
905            &self.config,
906            self.storage_runtime.storage_arc(),
907            self.provider_runtime.tokenizer(),
908        );
909    }
910
911    /// Get a reference to the admission pipeline, if configured.
912    #[must_use]
913    pub fn admission_pipeline(&self) -> Option<&crate::admission::AdmissionPipeline> {
914        self.admission_runtime.admission_pipeline()
915    }
916
917    /// Set the Cedar policy engine for fine-grained authorization.
918    ///
919    /// When set, `enforce()` evaluates every operation against loaded Cedar
920    /// policies. When unset, all operations are allowed (embedded mode).
921    pub fn set_policy_engine(&mut self, engine: PolicyEngine) {
922        self.policy_runtime.set_engine(engine);
923        // Policy pushdown rules embed namespace-allow-lists into compiled plans.
924        // A stale cached plan could bypass newly added or removed policies, so
925        // we must flush the plan cache whenever the policy engine is swapped.
926        self.invalidate_plan_cache();
927    }
928
929    /// Get a reference to the policy engine, if configured.
930    #[must_use]
931    pub fn policy_engine(&self) -> Option<&PolicyEngine> {
932        self.policy_runtime.engine()
933    }
934
935    /// Authorize an operation against the Cedar policy engine.
936    ///
937    /// When no policy engine is configured, all operations are allowed.
938    /// Returns `Ok(())` if allowed, or `HirnError::AccessDenied` if denied.
939    ///
940    /// Every authorization decision is logged as an audit event through the
941    /// event log (when configured), enabling tamper-evident audit trails
942    ///.
943    pub(crate) async fn enforce(
944        &self,
945        agent_id: &str,
946        action: Action,
947        realm: &str,
948        namespace: &str,
949    ) -> HirnResult<()> {
950        let Some(decision) = self
951            .policy_runtime
952            .authorize(agent_id, action, realm, namespace)
953        else {
954            return Ok(());
955        };
956
957        self.emit_in_realm(realm, namespace, agent_id, decision.audit_event)
958            .await;
959
960        if let Some(err) = decision.denial_error {
961            Err(err)
962        } else {
963            Ok(())
964        }
965    }
966
967    /// Soft authorization check — returns `true` if the action is allowed (or no policy engine).
968    /// Does NOT emit audit events or return errors on deny.
969    pub(crate) fn is_action_allowed(
970        &self,
971        agent_id: &str,
972        action: Action,
973        realm: &str,
974        namespace: &str,
975    ) -> bool {
976        self.policy_runtime
977            .is_action_allowed(agent_id, action, realm, namespace)
978    }
979
980    pub(crate) fn can_read_raw_content(&self, agent_id: &str, record: &MemoryRecord) -> bool {
981        self.is_action_allowed(
982            agent_id,
983            Action::RecallRawText,
984            &self.config.default_realm,
985            record.effective_namespace().as_str(),
986        )
987    }
988
989    async fn collect_resource_evidence_summaries(
990        &self,
991        record: &MemoryRecord,
992        agent_id: &str,
993        cache: &mut HashMap<ResourceId, CachedResourceEvidence>,
994    ) -> HirnResult<Vec<ResourceEvidenceSummary>> {
995        let evidence_links = Self::record_evidence_links(record);
996        if evidence_links.is_empty() {
997            return Ok(Vec::new());
998        }
999
1000        let can_hydrate_full = self.can_read_raw_content(agent_id, record);
1001        let can_hydrate_preview = self.is_action_allowed(
1002            agent_id,
1003            Action::Recall,
1004            &self.config.default_realm,
1005            record.effective_namespace().as_str(),
1006        );
1007        let mut summaries = Vec::with_capacity(evidence_links.len());
1008
1009        for link in evidence_links {
1010            let cached = if let Some(existing) = cache.get(&link.resource_id) {
1011                existing.clone()
1012            } else {
1013                let resource = hirn_storage::get_resource(self.storage_backend(), link.resource_id)
1014                    .await
1015                    .map_err(HirnError::storage)?;
1016                let artifacts =
1017                    hirn_storage::list_derived_artifacts(self.storage_backend(), link.resource_id)
1018                        .await
1019                        .map_err(HirnError::storage)?;
1020
1021                let mut available_artifacts: Vec<DerivedArtifactKind> = artifacts
1022                    .iter()
1023                    .filter(|artifact| artifact.kind != DerivedArtifactKind::GenerationFailure)
1024                    .map(|artifact| artifact.kind)
1025                    .collect();
1026                available_artifacts.sort_by_key(|kind| kind.as_str());
1027                available_artifacts.dedup_by_key(|kind| kind.as_str());
1028
1029                let cached = CachedResourceEvidence {
1030                    lifecycle_state: resource
1031                        .as_ref()
1032                        .map_or(ResourceGovernanceState::Active, |resource| {
1033                            resource.governance_state
1034                        }),
1035                    modality: resource.as_ref().map(|resource| resource.modality),
1036                    mime_type: resource
1037                        .as_ref()
1038                        .and_then(|resource| resource.mime_type.clone()),
1039                    display_name: resource
1040                        .as_ref()
1041                        .and_then(|resource| resource.display_name.clone()),
1042                    artifact_kinds_by_id: artifacts
1043                        .iter()
1044                        .map(|artifact| (artifact.id, artifact.kind))
1045                        .collect(),
1046                    has_preview: available_artifacts.iter().any(|kind| kind.is_previewable()),
1047                    has_full_payload: resource.as_ref().is_some_and(|resource| {
1048                        matches!(resource.location, ResourceLocation::Blob { .. })
1049                            && !resource.governance_state.hides_payload()
1050                    }),
1051                    available_artifacts,
1052                };
1053                cache.insert(link.resource_id, cached.clone());
1054                cached
1055            };
1056
1057            let artifact_kind = link
1058                .artifact_id
1059                .and_then(|artifact_id| cached.artifact_kinds_by_id.get(&artifact_id).copied());
1060            let has_preview =
1061                artifact_kind.map_or(cached.has_preview, DerivedArtifactKind::is_previewable);
1062            let available_artifacts = artifact_kind
1063                .map(|kind| vec![kind])
1064                .unwrap_or_else(|| cached.available_artifacts.clone());
1065            let can_hydrate_preview =
1066                link.artifact_id.is_none() && has_preview && can_hydrate_preview;
1067            let can_hydrate_full =
1068                link.artifact_id.is_none() && cached.has_full_payload && can_hydrate_full;
1069
1070            summaries.push(ResourceEvidenceSummary {
1071                resource_id: link.resource_id,
1072                role: link.role,
1073                provenance: link.provenance,
1074                artifact_id: link.artifact_id,
1075                artifact_kind,
1076                lifecycle_state: cached.lifecycle_state,
1077                modality: cached.modality,
1078                mime_type: cached.mime_type.clone(),
1079                display_name: cached.display_name.clone(),
1080                available_artifacts,
1081                has_preview,
1082                can_hydrate_preview,
1083                can_hydrate_full,
1084            });
1085        }
1086
1087        Ok(summaries)
1088    }
1089
1090    pub(crate) async fn resource_evidence_summaries_for_record(
1091        &self,
1092        record: &MemoryRecord,
1093        agent_id: &str,
1094    ) -> HirnResult<Vec<ResourceEvidenceSummary>> {
1095        let mut cache = HashMap::new();
1096        self.collect_resource_evidence_summaries(record, agent_id, &mut cache)
1097            .await
1098    }
1099
1100    pub(crate) async fn attach_resource_evidence_summaries(
1101        &self,
1102        results: &mut [RecallResult],
1103        agent_id: &str,
1104    ) -> HirnResult<()> {
1105        let mut cache: HashMap<ResourceId, CachedResourceEvidence> = HashMap::new();
1106
1107        for result in results {
1108            result.resource_evidence = self
1109                .collect_resource_evidence_summaries(&result.record, agent_id, &mut cache)
1110                .await?;
1111        }
1112
1113        Ok(())
1114    }
1115
1116    pub(crate) async fn attach_resource_evidence_summaries_to_scored_memories(
1117        &self,
1118        scored: &mut [ScoredMemory],
1119        agent_id: &str,
1120    ) -> HirnResult<()> {
1121        let mut cache: HashMap<ResourceId, CachedResourceEvidence> = HashMap::new();
1122
1123        for scored_memory in scored {
1124            scored_memory.resource_evidence = self
1125                .collect_resource_evidence_summaries(&scored_memory.record, agent_id, &mut cache)
1126                .await?;
1127        }
1128
1129        Ok(())
1130    }
1131
1132    fn record_evidence_links(record: &MemoryRecord) -> &[EvidenceLink] {
1133        match record {
1134            MemoryRecord::Working(_) => &[],
1135            MemoryRecord::Episodic(record) => &record.provenance.evidence_links,
1136            MemoryRecord::Semantic(record) => &record.provenance.evidence_links,
1137            MemoryRecord::Procedural(record) => &record.provenance.evidence_links,
1138        }
1139    }
1140
1141    /// Get the storage backend.
1142    #[must_use]
1143    pub fn storage_backend(&self) -> &dyn PhysicalStore {
1144        self.storage_runtime.storage_backend()
1145    }
1146
1147    /// Get a cloned `Arc` to the underlying storage backend.
1148    ///
1149    /// Use this when a long-lived (possibly `'static`) reference to the
1150    /// storage is needed (e.g. fire-and-forget background tasks).
1151    #[must_use]
1152    pub fn storage_arc(&self) -> Arc<dyn PhysicalStore> {
1153        self.storage_runtime.storage_arc()
1154    }
1155
1156    /// Apply a specific resource retention policy to active resource heads.
1157    pub async fn apply_resource_retention_policy(
1158        &self,
1159        policy: &hirn_core::ResourceRetentionPolicy,
1160    ) -> HirnResult<hirn_storage::ResourceRetentionApplyResult> {
1161        hirn_storage::apply_resource_retention_policy(self.storage_backend(), policy)
1162            .await
1163            .map_err(HirnError::storage)
1164    }
1165
1166    /// Apply the configured resource retention policy from [`HirnConfig`].
1167    pub async fn apply_configured_resource_retention(
1168        &self,
1169    ) -> HirnResult<hirn_storage::ResourceRetentionApplyResult> {
1170        self.apply_resource_retention_policy(&self.config.resource_retention_policy)
1171            .await
1172    }
1173
1174    #[must_use]
1175    pub(crate) fn semantic_head_cache_get(
1176        &self,
1177        logical_memory_id: LogicalMemoryId,
1178    ) -> Option<SemanticRecord> {
1179        self.storage_runtime.cached_semantic_head(logical_memory_id)
1180    }
1181
1182    #[must_use]
1183    pub(crate) fn episodic_head_cache_get(
1184        &self,
1185        logical_memory_id: LogicalMemoryId,
1186    ) -> Option<EpisodicRecord> {
1187        self.storage_runtime.cached_episodic_head(logical_memory_id)
1188    }
1189
1190    pub(crate) async fn resolve_active_episodic_head(
1191        &self,
1192        id: MemoryId,
1193    ) -> HirnResult<EpisodicRecord> {
1194        let record = self.get_episode(id).await?;
1195        self.episodic_head_for_logical_id(record.logical_memory_id)
1196            .await
1197    }
1198
1199    pub(crate) fn semantic_head_cache_put(&self, record: SemanticRecord) {
1200        self.storage_runtime.cache_semantic_head(record);
1201    }
1202
1203    pub(crate) fn episodic_head_cache_put(&self, record: EpisodicRecord) {
1204        self.storage_runtime.cache_episodic_head(record);
1205    }
1206
1207    pub(crate) fn semantic_head_cache_evict(&self, logical_memory_id: LogicalMemoryId) {
1208        self.storage_runtime.evict_semantic_head(logical_memory_id);
1209    }
1210
1211    pub(crate) fn episodic_head_cache_evict(&self, logical_memory_id: LogicalMemoryId) {
1212        self.storage_runtime.evict_episodic_head(logical_memory_id);
1213    }
1214
1215    #[must_use]
1216    pub(crate) fn semantic_head_cache_snapshot(
1217        &self,
1218    ) -> std::collections::HashMap<LogicalMemoryId, SemanticRecord> {
1219        self.storage_runtime.cached_semantic_heads_snapshot()
1220    }
1221
1222    pub(crate) fn semantic_head_cache_replace(
1223        &self,
1224        records: impl IntoIterator<Item = SemanticRecord>,
1225    ) {
1226        self.storage_runtime.replace_semantic_heads(records);
1227    }
1228
1229    #[must_use]
1230    pub(crate) fn file_size_bytes(&self) -> u64 {
1231        self.storage_runtime.file_size_bytes()
1232    }
1233
1234    /// Get the configured embedder, if any.
1235    #[must_use]
1236    pub fn embedder(&self) -> Option<Arc<dyn Embedder>> {
1237        self.provider_runtime.embedder()
1238    }
1239
1240    /// Embed a single text using the configured embedder, falling back to
1241    /// pseudo-embedding when no real model is available.
1242    pub async fn embed_text(&self, text: &str) -> HirnResult<Vec<f32>> {
1243        self.provider_runtime.embed_text(text).await
1244    }
1245
1246    /// Embed a `MemoryContent` value using the text representation for each
1247    /// modality. Images use their description, code uses source, audio uses
1248    /// transcript, structured uses JSON serialization.
1249    pub async fn embed_content(
1250        &self,
1251        content: &hirn_core::content::MemoryContent,
1252    ) -> HirnResult<Vec<f32>> {
1253        self.provider_runtime.embed_content(content).await
1254    }
1255
1256    /// Extract large binary payloads from `MemoryContent` into first-class resources.
1257    /// Returns modified content plus evidence links that map placeholders to resources.
1258    pub(crate) async fn extract_and_store_resources(
1259        &self,
1260        namespace: hirn_core::types::Namespace,
1261        owner_agent_id: AgentId,
1262        content: &hirn_core::content::MemoryContent,
1263    ) -> HirnResult<crate::db::storage_runtime::ExtractedResources> {
1264        self.storage_runtime
1265            .extract_and_store_resources(namespace, owner_agent_id, content)
1266            .await
1267    }
1268
1269    async fn enforce_resource_fetch(
1270        &self,
1271        actor_id: &AgentId,
1272        namespace: Namespace,
1273        hydration_mode: HydrationMode,
1274    ) -> HirnResult<()> {
1275        self.enforce(
1276            actor_id.as_str(),
1277            Action::Recall,
1278            &self.config.default_realm,
1279            namespace.as_str(),
1280        )
1281        .await?;
1282
1283        if matches!(hydration_mode, HydrationMode::Full) {
1284            self.enforce(
1285                actor_id.as_str(),
1286                Action::RecallRawText,
1287                &self.config.default_realm,
1288                namespace.as_str(),
1289            )
1290            .await
1291        } else {
1292            Ok(())
1293        }
1294    }
1295
1296    /// Fetch a resource with actor-scoped metadata/preview/full hydration semantics.
1297    /// `MetadataOnly` and `Preview` require `Recall`; `Full` additionally requires
1298    /// `RecallRawText` for the resource namespace.
1299    pub async fn fetch_resource(
1300        &self,
1301        actor_id: &AgentId,
1302        resource_id: ResourceId,
1303        hydration_mode: HydrationMode,
1304    ) -> HirnResult<Option<hirn_storage::HydratedResource>> {
1305        let Some(resource) = hirn_storage::get_resource(self.storage_backend(), resource_id)
1306            .await
1307            .map_err(HirnError::storage)?
1308        else {
1309            return Ok(None);
1310        };
1311
1312        self.enforce_resource_fetch(actor_id, resource.namespace, hydration_mode)
1313            .await?;
1314
1315        hirn_storage::fetch_resource(self.storage_backend(), resource_id, hydration_mode)
1316            .await
1317            .map_err(HirnError::storage)
1318    }
1319
1320    async fn enforce_raw_resource_read(
1321        &self,
1322        actor_id: &AgentId,
1323        namespace: Namespace,
1324    ) -> HirnResult<()> {
1325        self.enforce_resource_fetch(actor_id, namespace, HydrationMode::Full)
1326            .await
1327    }
1328
1329    fn content_requires_resource_hydration(content: &hirn_core::content::MemoryContent) -> bool {
1330        match content {
1331            hirn_core::content::MemoryContent::Image { data, .. }
1332            | hirn_core::content::MemoryContent::Audio { data, .. }
1333            | hirn_core::content::MemoryContent::Video { data, .. }
1334            | hirn_core::content::MemoryContent::Document { data, .. } => data.is_empty(),
1335            hirn_core::content::MemoryContent::Code { source, .. } => source.is_empty(),
1336            hirn_core::content::MemoryContent::ToolOutput { output, .. } => output.is_empty(),
1337            hirn_core::content::MemoryContent::Structured { data, .. } => data.is_null(),
1338            hirn_core::content::MemoryContent::Composite(parts) => {
1339                parts.iter().any(Self::content_requires_resource_hydration)
1340            }
1341            _ => false,
1342        }
1343    }
1344
1345    /// Load resource-backed blob data for an episodic memory record slot.
1346    pub async fn load_resource_blob(
1347        &self,
1348        actor_id: &AgentId,
1349        id: hirn_core::id::MemoryId,
1350        blob_index: u32,
1351    ) -> HirnResult<Vec<u8>> {
1352        let record = self.get_episode(id).await?;
1353        self.enforce_raw_resource_read(actor_id, record.namespace)
1354            .await?;
1355        self.storage_runtime
1356            .load_resource_blob(&record.provenance.evidence_links, blob_index)
1357            .await
1358    }
1359
1360    /// Hydrate a `MemoryContent` by restoring binary payloads referenced through evidence links.
1361    /// Raw hydration is explicit and requires `RecallRawText` permission for the namespace.
1362    pub async fn hydrate_content_resources(
1363        &self,
1364        actor_id: &AgentId,
1365        namespace: Namespace,
1366        content: &hirn_core::content::MemoryContent,
1367        evidence_links: &[hirn_core::resource::EvidenceLink],
1368    ) -> HirnResult<hirn_core::content::MemoryContent> {
1369        if evidence_links.is_empty() || !Self::content_requires_resource_hydration(content) {
1370            return Ok(content.clone());
1371        }
1372
1373        self.enforce_raw_resource_read(actor_id, namespace).await?;
1374        self.storage_runtime
1375            .hydrate_content_resources(content, evidence_links)
1376            .await
1377    }
1378
1379    /// Retrieve an episodic record with all resource-backed payloads hydrated.
1380    /// Full hydration is explicit and requires `RecallRawText` permission.
1381    pub async fn get_episode_with_resources(
1382        &self,
1383        actor_id: &AgentId,
1384        id: hirn_core::id::MemoryId,
1385    ) -> HirnResult<hirn_core::episodic::EpisodicRecord> {
1386        let mut record = self.get_episode(id).await?;
1387        if let Some(ref mc) = record.multi_content {
1388            record.multi_content = Some(
1389                self.hydrate_content_resources(
1390                    actor_id,
1391                    record.namespace,
1392                    mc,
1393                    &record.provenance.evidence_links,
1394                )
1395                .await?,
1396            );
1397        }
1398        Ok(record)
1399    }
1400
1401    /// Explicitly flush all pending buffers (Hebbian, episodic access, semantic access) and
1402    /// prepare the database for shutdown.
1403    ///
1404    /// This drains the Hebbian weight buffer, flushes access-count deltas, completes
1405    /// pending offline scheduler jobs, and returns cleanly. Prefer calling this before
1406    /// dropping the last `Arc<HirnDB>` reference; if omitted, `Drop` runs a best-effort
1407    /// synchronous flush on a helper thread (F-125 fix).
1408    pub async fn close(&self) -> HirnResult<()> {
1409        self.offline_scheduler_runtime.shutdown().await;
1410        self.flush_hebbian().await?;
1411        self.flush_episodic_access().await?;
1412        self.flush_semantic_access().await?;
1413        self.flush_importance_accumulator().await?;
1414        Ok(())
1415    }
1416
1417    /// Drain and persist any remaining importance-boost credits accumulated since
1418    /// the last threshold flush (PERF-2). Called at close and in Drop.
1419    pub(crate) async fn flush_importance_accumulator(&self) -> HirnResult<()> {
1420        let ids = self.drain_importance_accumulator();
1421        if ids.is_empty() {
1422            return Ok(());
1423        }
1424        crate::consolidation::apply_retrieval_effects(self.storage_arc(), ids).await
1425    }
1426}
1427
1428/// F-S1: Flush remaining Hebbian buffer on drop to ensure weight updates
1429/// are persisted even if the caller doesn't explicitly flush.
1430impl Drop for HirnDB {
1431    fn drop(&mut self) {
1432        let flush = async {
1433            let _ = self.flush_hebbian().await;
1434            let _ = self.flush_episodic_access().await;
1435            let _ = self.flush_semantic_access().await;
1436            let _ = self.flush_importance_accumulator().await;
1437        };
1438
1439        // Dropping inside a current-thread Tokio runtime cannot safely re-enter
1440        // that runtime to drive async flush work; doing so deadlocks the single
1441        // runtime thread during test teardown. Use a lightweight standalone
1442        // current-thread runtime on a helper OS thread instead.
1443        if tokio::runtime::Handle::try_current().is_ok() {
1444            std::thread::scope(|s| {
1445                let _ = s
1446                    .spawn(|| {
1447                        if let Ok(rt) = tokio::runtime::Builder::new_current_thread()
1448                            .enable_all()
1449                            .build()
1450                        {
1451                            rt.block_on(flush);
1452                        }
1453                    })
1454                    .join();
1455            });
1456        } else if let Ok(rt) = tokio::runtime::Builder::new_current_thread()
1457            .enable_all()
1458            .build()
1459        {
1460            rt.block_on(flush);
1461        }
1462    }
1463}
1464
1465#[cfg(test)]
1466mod tests {
1467    use super::*;
1468
1469    use hirn_core::resource::{
1470        DerivedArtifactIndexPolicy, DerivedArtifactIndexRule, DerivedArtifactKind, ModalityProfile,
1471        ResourceIndexPolicy, ResourceIndexRule, SecondaryIndexType,
1472    };
1473    use hirn_storage::memory_store::MemoryStore;
1474    use hirn_storage::store::IndexType;
1475
1476    #[tokio::test(flavor = "multi_thread")]
1477    async fn open_with_config_bootstraps_storage_datasets_and_indices() {
1478        let dir = tempfile::tempdir().unwrap();
1479        let store = Arc::new(MemoryStore::new());
1480        let storage: Arc<dyn PhysicalStore> = store.clone();
1481        let config = HirnConfig::builder()
1482            .db_path(dir.path())
1483            .embedding_dimensions(32)
1484            .resource_index_policy(
1485                ResourceIndexPolicy::default().with_rule(
1486                    ResourceIndexRule::new(ModalityProfile::Document, SecondaryIndexType::Bitmap)
1487                        .with_column("mime_type"),
1488                ),
1489            )
1490            .derived_artifact_index_policy(
1491                DerivedArtifactIndexPolicy::default().with_rule(
1492                    DerivedArtifactIndexRule::new(
1493                        DerivedArtifactKind::Transcript,
1494                        SecondaryIndexType::Bitmap,
1495                    )
1496                    .with_column("modality"),
1497                ),
1498            )
1499            .build()
1500            .unwrap();
1501
1502        let _db = HirnDB::open_with_config(config, storage).await.unwrap();
1503
1504        assert!(
1505            store
1506                .exists(hirn_storage::datasets::resource_object::DATASET_NAME)
1507                .await
1508                .unwrap()
1509        );
1510        assert!(
1511            store
1512                .exists(hirn_storage::datasets::derived_artifact::DATASET_NAME)
1513                .await
1514                .unwrap()
1515        );
1516        assert!(store.index_configs("resources").iter().any(|config| {
1517            config.columns == vec!["modality".to_string(), "mime_type".to_string()]
1518                && config.index_type == IndexType::Bitmap
1519        }));
1520        assert!(
1521            store
1522                .index_configs("derived_artifacts")
1523                .iter()
1524                .any(|config| {
1525                    config.columns == vec!["kind".to_string(), "modality".to_string()]
1526                        && config.index_type == IndexType::Bitmap
1527                })
1528        );
1529    }
1530}