Skip to main content

hirn_engine/db/
namespace.rs

1use arrow_array::Array;
2
3use super::*;
4
5pub(super) const NAMESPACE_DELETE_MUTATION_KIND: &str = "namespace_delete";
6pub(super) const AGENT_REGISTER_MUTATION_KIND: &str = "agent_register";
7pub(super) const AGENT_DEREGISTER_MUTATION_KIND: &str = "agent_deregister";
8
9#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
10struct NamespaceDeleteEnvelope {
11    namespace: Namespace,
12    episodic_ids: Vec<MemoryId>,
13    semantic_ids: Vec<MemoryId>,
14    procedural_ids: Vec<MemoryId>,
15    audit_entry_id: MemoryId,
16    audit_timestamp: Timestamp,
17}
18
19#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
20struct AgentRegisterEnvelope {
21    agent: hirn_core::agent::AgentRecord,
22    private_namespace: hirn_core::namespace::NamespaceRecord,
23    audit_entry_id: MemoryId,
24    audit_timestamp: Timestamp,
25}
26
27#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
28struct AgentDeregisterEnvelope {
29    agent_id: hirn_core::types::AgentId,
30    private_namespace: Namespace,
31    audit_entry_id: MemoryId,
32    audit_timestamp: Timestamp,
33}
34
35fn namespace_sql_value(namespace: &Namespace) -> String {
36    namespace.as_str().replace('\'', "''")
37}
38
39fn namespace_id_filter(namespace: &Namespace) -> String {
40    format!("id = '{}'", namespace_sql_value(namespace))
41}
42
43fn namespace_column_filter(namespace: &Namespace) -> String {
44    format!("namespace = '{}'", namespace_sql_value(namespace))
45}
46
47fn escaped_id_filter(id: &str) -> String {
48    format!("id = '{}'", id.replace('\'', "''"))
49}
50
51fn encode_namespace_delete_envelope(payload: &NamespaceDeleteEnvelope) -> HirnResult<Vec<u8>> {
52    serde_json::to_vec(payload).map_err(|error| {
53        HirnError::storage(format!("namespace delete envelope serialize: {error}"))
54    })
55}
56
57fn encode_agent_register_envelope(payload: &AgentRegisterEnvelope) -> HirnResult<Vec<u8>> {
58    serde_json::to_vec(payload)
59        .map_err(|error| HirnError::storage(format!("agent register envelope serialize: {error}")))
60}
61
62fn encode_agent_deregister_envelope(payload: &AgentDeregisterEnvelope) -> HirnResult<Vec<u8>> {
63    serde_json::to_vec(payload).map_err(|error| {
64        HirnError::storage(format!("agent deregister envelope serialize: {error}"))
65    })
66}
67
68fn decode_namespace_delete_envelope(
69    envelope: &hirn_storage::MutationEnvelopeRecord,
70) -> HirnResult<NamespaceDeleteEnvelope> {
71    serde_json::from_slice(&envelope.payload).map_err(|error| {
72        HirnError::storage(format!("namespace delete envelope deserialize: {error}"))
73    })
74}
75
76fn decode_agent_register_envelope(
77    envelope: &hirn_storage::MutationEnvelopeRecord,
78) -> HirnResult<AgentRegisterEnvelope> {
79    serde_json::from_slice(&envelope.payload).map_err(|error| {
80        HirnError::storage(format!("agent register envelope deserialize: {error}"))
81    })
82}
83
84fn decode_agent_deregister_envelope(
85    envelope: &hirn_storage::MutationEnvelopeRecord,
86) -> HirnResult<AgentDeregisterEnvelope> {
87    serde_json::from_slice(&envelope.payload).map_err(|error| {
88        HirnError::storage(format!("agent deregister envelope deserialize: {error}"))
89    })
90}
91
92fn build_namespace_delete_envelope(
93    namespace: Namespace,
94    mut episodic_ids: Vec<MemoryId>,
95    mut semantic_ids: Vec<MemoryId>,
96    mut procedural_ids: Vec<MemoryId>,
97) -> HirnResult<hirn_storage::MutationEnvelopeRecord> {
98    episodic_ids.sort_unstable();
99    episodic_ids.dedup();
100    semantic_ids.sort_unstable();
101    semantic_ids.dedup();
102    procedural_ids.sort_unstable();
103    procedural_ids.dedup();
104
105    let audit_entry_id = MemoryId::new();
106    let payload = NamespaceDeleteEnvelope {
107        namespace,
108        episodic_ids,
109        semantic_ids,
110        procedural_ids,
111        audit_entry_id,
112        audit_timestamp: Timestamp::now(),
113    };
114    let payload = encode_namespace_delete_envelope(&payload)?;
115
116    Ok(hirn_storage::MutationEnvelopeRecord::pending(
117        format!("namespace-delete:{namespace}:{audit_entry_id}"),
118        NAMESPACE_DELETE_MUTATION_KIND,
119        payload,
120    ))
121}
122
123fn build_agent_register_envelope(
124    agent: hirn_core::agent::AgentRecord,
125    private_namespace: hirn_core::namespace::NamespaceRecord,
126) -> HirnResult<hirn_storage::MutationEnvelopeRecord> {
127    let agent_id = agent.id;
128    let audit_entry_id = MemoryId::new();
129    let payload = AgentRegisterEnvelope {
130        agent,
131        private_namespace,
132        audit_entry_id,
133        audit_timestamp: Timestamp::now(),
134    };
135    let payload = encode_agent_register_envelope(&payload)?;
136
137    Ok(hirn_storage::MutationEnvelopeRecord::pending(
138        format!("agent-register:{agent_id}:{audit_entry_id}"),
139        AGENT_REGISTER_MUTATION_KIND,
140        payload,
141    ))
142}
143
144fn build_agent_deregister_envelope(
145    agent_id: hirn_core::types::AgentId,
146) -> HirnResult<hirn_storage::MutationEnvelopeRecord> {
147    let audit_entry_id = MemoryId::new();
148    let payload = AgentDeregisterEnvelope {
149        private_namespace: Namespace::private_for(&agent_id),
150        agent_id,
151        audit_entry_id,
152        audit_timestamp: Timestamp::now(),
153    };
154    let payload = encode_agent_deregister_envelope(&payload)?;
155
156    Ok(hirn_storage::MutationEnvelopeRecord::pending(
157        format!("agent-deregister:{agent_id}:{audit_entry_id}"),
158        AGENT_DEREGISTER_MUTATION_KIND,
159        payload,
160    ))
161}
162
163impl HirnDB {
164    // ── Event Subscription ──────────────────────────────────────────────
165
166    /// Subscribe to real-time memory events.
167    ///
168    /// Returns a [`tokio::sync::broadcast::Receiver<MemoryEvent>`] that yields
169    /// events whenever the database state changes (create, archive,
170    /// consolidate, etc.).  The broadcast ring buffer is lock-free; lagging
171    /// subscribers receive a
172    /// [`tokio::sync::broadcast::error::RecvError::Lagged`] error and skip
173    /// missed events rather than blocking the write path.
174    pub fn subscribe(&self) -> tokio::sync::broadcast::Receiver<crate::event::MemoryEvent> {
175        self.event_runtime().subscribe()
176    }
177
178    /// Broadcast an event with explicit realm/namespace/agent context.
179    pub(crate) async fn emit_in_realm(
180        &self,
181        realm: &str,
182        namespace: &str,
183        agent_id: &str,
184        event: MemoryEvent,
185    ) {
186        self.event_runtime()
187            .emit(realm, namespace, agent_id, event)
188            .await;
189    }
190
191    pub(crate) async fn emit_in_realm_checked(
192        &self,
193        realm: &str,
194        namespace: &str,
195        agent_id: &str,
196        event: MemoryEvent,
197    ) -> HirnResult<()> {
198        self.event_runtime()
199            .emit_checked(realm, namespace, agent_id, event)
200            .await
201    }
202
203    /// Broadcast an event using the default realm with explicit namespace/agent context.
204    pub(crate) async fn emit_scoped(&self, namespace: &str, agent_id: &str, event: MemoryEvent) {
205        self.emit_in_realm(&self.config.default_realm, namespace, agent_id, event)
206            .await;
207    }
208
209    pub(crate) async fn emit_scoped_checked(
210        &self,
211        namespace: &str,
212        agent_id: &str,
213        event: MemoryEvent,
214    ) -> HirnResult<()> {
215        self.emit_in_realm_checked(&self.config.default_realm, namespace, agent_id, event)
216            .await
217    }
218
219    /// Broadcast an event to all live subscribers and, when event sourcing
220    /// is enabled, append it to the durable event log.
221    pub(crate) async fn emit(&self, event: MemoryEvent) {
222        self.emit_in_realm(&self.config.default_realm, "shared", "", event)
223            .await;
224    }
225
226    // ── Namespace Management ────────────────────────────────────────────
227
228    /// Create a new namespace.
229    pub(crate) async fn create_namespace(
230        &self,
231        name: &str,
232        kind: hirn_core::types::NamespaceKind,
233        members: Vec<hirn_core::types::AgentId>,
234    ) -> HirnResult<()> {
235        let ns = Namespace::new(name)?;
236
237        // Check for existing namespace.
238        let filter = namespace_id_filter(&ns);
239        let count = self
240            .storage_runtime
241            .count(
242                hirn_storage::datasets::namespace::DATASET_NAME,
243                Some(&filter),
244            )
245            .await
246            .map_err(|e| HirnError::storage(e))?;
247        if count > 0 {
248            return Err(HirnError::AlreadyExists(format!(
249                "namespace '{name}' already exists"
250            )));
251        }
252
253        let rec = hirn_core::namespace::NamespaceRecord {
254            namespace: ns,
255            kind,
256            created_at: Timestamp::now(),
257            member_agents: members,
258        };
259
260        let batch = hirn_storage::datasets::namespace::to_batch(std::slice::from_ref(&rec))
261            .map_err(|e| HirnError::storage(e))?;
262        self.storage_runtime
263            .append(hirn_storage::datasets::namespace::DATASET_NAME, batch)
264            .await
265            .map_err(|e| HirnError::storage(e))?;
266
267        self.namespace_runtime.invalidate_namespaces();
268
269        self.append_audit(
270            None,
271            hirn_core::audit::AuditAction::NamespaceCreated {
272                namespace: name.to_string(),
273            },
274        )
275        .await?;
276
277        Ok(())
278    }
279
280    /// List all namespaces.
281    pub(crate) async fn list_namespaces(
282        &self,
283    ) -> HirnResult<Vec<hirn_core::namespace::NamespaceRecord>> {
284        if let Some(cached) = self.namespace_runtime.cached_namespaces() {
285            return Ok(cached.as_ref().clone());
286        }
287
288        let opts = hirn_storage::store::ScanOptions::default();
289        let batches = self
290            .storage_runtime
291            .scan(hirn_storage::datasets::namespace::DATASET_NAME, opts)
292            .await
293            .map_err(|e| HirnError::storage(e))?;
294
295        let mut result = Vec::new();
296        for batch in &batches {
297            let recs = hirn_storage::datasets::namespace::from_batch(batch)
298                .map_err(|e| HirnError::storage(e))?;
299            result.extend(recs);
300        }
301
302        self.namespace_runtime.cache_namespaces(result.clone());
303
304        Ok(result)
305    }
306
307    /// Get a namespace record by name.
308    pub(crate) async fn get_namespace(
309        &self,
310        name: &str,
311    ) -> HirnResult<hirn_core::namespace::NamespaceRecord> {
312        if let Some(cached) = self.namespace_runtime.cached_namespaces() {
313            if let Some(rec) = cached.iter().find(|rec| rec.namespace.as_str() == name) {
314                return Ok(rec.clone());
315            }
316        }
317
318        let ns = Namespace::new(name)?;
319        let filter = namespace_id_filter(&ns);
320        let opts = hirn_storage::store::ScanOptions {
321            filter: Some(filter),
322            ..Default::default()
323        };
324        let batches = self
325            .storage_runtime
326            .scan(hirn_storage::datasets::namespace::DATASET_NAME, opts)
327            .await
328            .map_err(|e| HirnError::storage(e))?;
329
330        for batch in &batches {
331            let recs = hirn_storage::datasets::namespace::from_batch(batch)
332                .map_err(|e| HirnError::storage(e))?;
333            if let Some(rec) = recs.into_iter().next() {
334                return Ok(rec);
335            }
336        }
337        Err(HirnError::NotFound(format!("namespace '{name}'")))
338    }
339
340    /// Delete a namespace and all its memories.
341    pub(crate) async fn delete_namespace(&self, name: &str) -> HirnResult<()> {
342        let ns = Namespace::new(name)?;
343        if ns == Namespace::shared() {
344            return Err(HirnError::InvalidInput(
345                "cannot delete the shared namespace".into(),
346            ));
347        }
348
349        // Verify namespace exists.
350        self.get_namespace(name).await?;
351
352        let ep_ids = self.list_episodic_ids_in_namespace(&ns).await?;
353        let sem_ids = self.list_semantic_ids_in_namespace(&ns).await?;
354        let proc_ids = self.list_procedural_ids_in_namespace(&ns).await?;
355        let envelope = build_namespace_delete_envelope(ns, ep_ids, sem_ids, proc_ids)?;
356        hirn_storage::append_mutation_envelope(self.storage_backend(), &envelope)
357            .await
358            .map_err(HirnError::storage)?;
359
360        let payload = decode_namespace_delete_envelope(&envelope)?;
361        self.apply_namespace_delete_plan(&payload).await?;
362
363        if let Err(error) = hirn_storage::update_mutation_envelope_state(
364            self.storage_backend(),
365            &envelope.id,
366            hirn_storage::MutationEnvelopeState::Applied,
367            None,
368        )
369        .await
370        {
371            tracing::warn!(
372                namespace = %ns,
373                envelope_id = %envelope.id,
374                error = %error,
375                "namespace delete mutation envelope finalize failed; recovery will retry"
376            );
377        }
378
379        Ok(())
380    }
381
382    pub(crate) async fn reconcile_pending_namespace_delete_mutations(&self) -> HirnResult<usize> {
383        let envelopes = hirn_storage::list_pending_mutation_envelopes(
384            self.storage_backend(),
385            Some(NAMESPACE_DELETE_MUTATION_KIND),
386        )
387        .await
388        .map_err(HirnError::storage)?;
389        let mut reconciled = 0usize;
390
391        for envelope in envelopes {
392            match self
393                .reconcile_single_pending_namespace_delete_mutation(&envelope)
394                .await
395            {
396                Ok(true) => reconciled += 1,
397                Ok(false) => {}
398                Err(error) => {
399                    hirn_storage::update_mutation_envelope_state(
400                        self.storage_backend(),
401                        &envelope.id,
402                        hirn_storage::MutationEnvelopeState::Failed,
403                        Some(error.to_string()),
404                    )
405                    .await
406                    .map_err(HirnError::storage)?;
407                }
408            }
409        }
410
411        Ok(reconciled)
412    }
413
414    pub(crate) async fn reconcile_pending_agent_register_mutations(&self) -> HirnResult<usize> {
415        let envelopes = hirn_storage::list_pending_mutation_envelopes(
416            self.storage_backend(),
417            Some(AGENT_REGISTER_MUTATION_KIND),
418        )
419        .await
420        .map_err(HirnError::storage)?;
421        let mut reconciled = 0usize;
422
423        for envelope in envelopes {
424            match self
425                .reconcile_single_pending_agent_register_mutation(&envelope)
426                .await
427            {
428                Ok(true) => reconciled += 1,
429                Ok(false) => {}
430                Err(error) => {
431                    hirn_storage::update_mutation_envelope_state(
432                        self.storage_backend(),
433                        &envelope.id,
434                        hirn_storage::MutationEnvelopeState::Failed,
435                        Some(error.to_string()),
436                    )
437                    .await
438                    .map_err(HirnError::storage)?;
439                }
440            }
441        }
442
443        Ok(reconciled)
444    }
445
446    pub(crate) async fn reconcile_pending_agent_deregister_mutations(&self) -> HirnResult<usize> {
447        let envelopes = hirn_storage::list_pending_mutation_envelopes(
448            self.storage_backend(),
449            Some(AGENT_DEREGISTER_MUTATION_KIND),
450        )
451        .await
452        .map_err(HirnError::storage)?;
453        let mut reconciled = 0usize;
454
455        for envelope in envelopes {
456            match self
457                .reconcile_single_pending_agent_deregister_mutation(&envelope)
458                .await
459            {
460                Ok(true) => reconciled += 1,
461                Ok(false) => {}
462                Err(error) => {
463                    hirn_storage::update_mutation_envelope_state(
464                        self.storage_backend(),
465                        &envelope.id,
466                        hirn_storage::MutationEnvelopeState::Failed,
467                        Some(error.to_string()),
468                    )
469                    .await
470                    .map_err(HirnError::storage)?;
471                }
472            }
473        }
474
475        Ok(reconciled)
476    }
477
478    async fn reconcile_single_pending_agent_register_mutation(
479        &self,
480        envelope: &hirn_storage::MutationEnvelopeRecord,
481    ) -> HirnResult<bool> {
482        let payload = decode_agent_register_envelope(envelope)?;
483        self.apply_agent_register_plan(&payload).await?;
484        hirn_storage::update_mutation_envelope_state(
485            self.storage_backend(),
486            &envelope.id,
487            hirn_storage::MutationEnvelopeState::Applied,
488            None,
489        )
490        .await
491        .map_err(HirnError::storage)?;
492        Ok(true)
493    }
494
495    async fn reconcile_single_pending_agent_deregister_mutation(
496        &self,
497        envelope: &hirn_storage::MutationEnvelopeRecord,
498    ) -> HirnResult<bool> {
499        let payload = decode_agent_deregister_envelope(envelope)?;
500        self.apply_agent_deregister_plan(&payload).await?;
501        hirn_storage::update_mutation_envelope_state(
502            self.storage_backend(),
503            &envelope.id,
504            hirn_storage::MutationEnvelopeState::Applied,
505            None,
506        )
507        .await
508        .map_err(HirnError::storage)?;
509        Ok(true)
510    }
511
512    async fn reconcile_single_pending_namespace_delete_mutation(
513        &self,
514        envelope: &hirn_storage::MutationEnvelopeRecord,
515    ) -> HirnResult<bool> {
516        let payload = decode_namespace_delete_envelope(envelope)?;
517        self.apply_namespace_delete_plan(&payload).await?;
518        hirn_storage::update_mutation_envelope_state(
519            self.storage_backend(),
520            &envelope.id,
521            hirn_storage::MutationEnvelopeState::Applied,
522            None,
523        )
524        .await
525        .map_err(HirnError::storage)?;
526        Ok(true)
527    }
528
529    async fn apply_namespace_delete_plan(
530        &self,
531        payload: &NamespaceDeleteEnvelope,
532    ) -> HirnResult<()> {
533        for id in &payload.episodic_ids {
534            self.delete_episode_if_present(*id).await?;
535        }
536        for id in &payload.semantic_ids {
537            self.purge_semantic_if_present(*id).await?;
538        }
539        for id in &payload.procedural_ids {
540            self.delete_procedural_if_present(*id).await?;
541        }
542
543        // Remove the namespace record.
544        let predicate = namespace_id_filter(&payload.namespace);
545        self.storage_runtime
546            .delete(hirn_storage::datasets::namespace::DATASET_NAME, &predicate)
547            .await
548            .map_err(|e| HirnError::storage(e))?;
549
550        self.namespace_runtime.invalidate_namespaces();
551
552        self.append_namespace_delete_audit_once(payload).await?;
553
554        Ok(())
555    }
556
557    async fn append_namespace_delete_audit_once(
558        &self,
559        payload: &NamespaceDeleteEnvelope,
560    ) -> HirnResult<()> {
561        let audit_filter = escaped_id_filter(&payload.audit_entry_id.to_string());
562        let existing = self
563            .storage_runtime
564            .count(
565                hirn_storage::datasets::audit::DATASET_NAME,
566                Some(&audit_filter),
567            )
568            .await
569            .map_err(HirnError::storage)?;
570        if existing > 0 {
571            return Ok(());
572        }
573
574        let entry = hirn_core::audit::AuditEntry {
575            id: payload.audit_entry_id,
576            timestamp: payload.audit_timestamp,
577            actor: None,
578            action: hirn_core::audit::AuditAction::NamespaceDeleted {
579                namespace: payload.namespace.to_string(),
580            },
581        };
582        let batch = hirn_storage::datasets::audit::to_batch(std::slice::from_ref(&entry))
583            .map_err(HirnError::storage)?;
584        self.storage_runtime
585            .append(hirn_storage::datasets::audit::DATASET_NAME, batch)
586            .await
587            .map_err(HirnError::storage)?;
588        Ok(())
589    }
590
591    async fn apply_agent_register_plan(&self, payload: &AgentRegisterEnvelope) -> HirnResult<()> {
592        let agent_batch =
593            hirn_storage::datasets::agent::to_batch(std::slice::from_ref(&payload.agent))
594                .map_err(HirnError::storage)?;
595        self.storage_backend()
596            .merge_insert(
597                hirn_storage::datasets::agent::DATASET_NAME,
598                &["id"],
599                agent_batch,
600            )
601            .await
602            .map_err(HirnError::storage)?;
603
604        let namespace_batch = hirn_storage::datasets::namespace::to_batch(std::slice::from_ref(
605            &payload.private_namespace,
606        ))
607        .map_err(HirnError::storage)?;
608        self.storage_backend()
609            .merge_insert(
610                hirn_storage::datasets::namespace::DATASET_NAME,
611                &["id"],
612                namespace_batch,
613            )
614            .await
615            .map_err(HirnError::storage)?;
616
617        self.namespace_runtime.cache_agent(payload.agent.clone());
618        self.namespace_runtime.invalidate_namespaces();
619        self.append_agent_register_audit_once(payload).await?;
620        Ok(())
621    }
622
623    async fn apply_agent_deregister_plan(
624        &self,
625        payload: &AgentDeregisterEnvelope,
626    ) -> HirnResult<()> {
627        match self
628            .delete_namespace(payload.private_namespace.as_str())
629            .await
630        {
631            Ok(()) | Err(HirnError::NotFound(_)) => {}
632            Err(error) => return Err(error),
633        }
634
635        let predicate = escaped_id_filter(payload.agent_id.as_str());
636        self.storage_runtime
637            .delete(hirn_storage::datasets::agent::DATASET_NAME, &predicate)
638            .await
639            .map_err(HirnError::storage)?;
640
641        self.namespace_runtime.evict_agent(&payload.agent_id);
642        self.namespace_runtime.invalidate_namespaces();
643        self.append_agent_deregister_audit_once(payload).await?;
644        Ok(())
645    }
646
647    async fn append_agent_register_audit_once(
648        &self,
649        payload: &AgentRegisterEnvelope,
650    ) -> HirnResult<()> {
651        let audit_filter = escaped_id_filter(&payload.audit_entry_id.to_string());
652        let existing = self
653            .storage_runtime
654            .count(
655                hirn_storage::datasets::audit::DATASET_NAME,
656                Some(&audit_filter),
657            )
658            .await
659            .map_err(HirnError::storage)?;
660        if existing > 0 {
661            return Ok(());
662        }
663
664        let entry = hirn_core::audit::AuditEntry {
665            id: payload.audit_entry_id,
666            timestamp: payload.audit_timestamp,
667            actor: None,
668            action: hirn_core::audit::AuditAction::AgentRegistered {
669                agent_id: payload.agent.id,
670            },
671        };
672        let batch = hirn_storage::datasets::audit::to_batch(std::slice::from_ref(&entry))
673            .map_err(HirnError::storage)?;
674        self.storage_runtime
675            .append(hirn_storage::datasets::audit::DATASET_NAME, batch)
676            .await
677            .map_err(HirnError::storage)?;
678        Ok(())
679    }
680
681    async fn append_agent_deregister_audit_once(
682        &self,
683        payload: &AgentDeregisterEnvelope,
684    ) -> HirnResult<()> {
685        let audit_filter = escaped_id_filter(&payload.audit_entry_id.to_string());
686        let existing = self
687            .storage_runtime
688            .count(
689                hirn_storage::datasets::audit::DATASET_NAME,
690                Some(&audit_filter),
691            )
692            .await
693            .map_err(HirnError::storage)?;
694        if existing > 0 {
695            return Ok(());
696        }
697
698        let entry = hirn_core::audit::AuditEntry {
699            id: payload.audit_entry_id,
700            timestamp: payload.audit_timestamp,
701            actor: None,
702            action: hirn_core::audit::AuditAction::AgentDeregistered {
703                agent_id: payload.agent_id,
704            },
705        };
706        let batch = hirn_storage::datasets::audit::to_batch(std::slice::from_ref(&entry))
707            .map_err(HirnError::storage)?;
708        self.storage_runtime
709            .append(hirn_storage::datasets::audit::DATASET_NAME, batch)
710            .await
711            .map_err(HirnError::storage)?;
712        Ok(())
713    }
714
715    async fn delete_episode_if_present(&self, id: MemoryId) -> HirnResult<()> {
716        match self.read_episodic_record(id).await {
717            Ok(_) => {}
718            Err(HirnError::NotFound(_)) => return Ok(()),
719            Err(error) => return Err(error),
720        }
721        match self.delete_episode(id).await {
722            Ok(()) | Err(HirnError::NotFound(_)) => Ok(()),
723            Err(error) => Err(error),
724        }
725    }
726
727    async fn purge_semantic_if_present(&self, id: MemoryId) -> HirnResult<()> {
728        match self.read_semantic_record(id).await {
729            Ok(_) => {}
730            Err(HirnError::NotFound(_)) => return Ok(()),
731            Err(error) => return Err(error),
732        }
733        match self.purge_semantic(id).await {
734            Ok(()) | Err(HirnError::NotFound(_)) => Ok(()),
735            Err(error) => Err(error),
736        }
737    }
738
739    async fn delete_procedural_if_present(&self, id: MemoryId) -> HirnResult<()> {
740        match self.get_procedural(id).await {
741            Ok(_) => {}
742            Err(HirnError::NotFound(_)) => return Ok(()),
743            Err(error) => return Err(error),
744        }
745        match self.delete_procedural(id).await {
746            Ok(()) | Err(HirnError::NotFound(_)) => Ok(()),
747            Err(error) => Err(error),
748        }
749    }
750
751    /// List episodic record IDs in a namespace.
752    pub(crate) async fn list_episodic_ids_in_namespace(
753        &self,
754        ns: &Namespace,
755    ) -> HirnResult<Vec<MemoryId>> {
756        let filter = namespace_column_filter(ns);
757        let opts = hirn_storage::store::ScanOptions {
758            filter: Some(filter),
759            columns: Some(vec!["id".to_string()]),
760            ..Default::default()
761        };
762        let batches = self
763            .storage_runtime
764            .scan(hirn_storage::datasets::episodic::DATASET_NAME, opts)
765            .await
766            .map_err(|e| HirnError::storage(e))?;
767
768        let mut ids = Vec::new();
769        for batch in &batches {
770            let id_col = batch
771                .column_by_name("id")
772                .and_then(|c| c.as_any().downcast_ref::<arrow_array::StringArray>());
773            if let Some(col) = id_col {
774                for i in 0..col.len() {
775                    let id = MemoryId::parse(col.value(i))
776                        .map_err(|e| HirnError::InvalidInput(e.to_string()))?;
777                    ids.push(id);
778                }
779            }
780        }
781        Ok(ids)
782    }
783
784    /// List semantic record IDs in a namespace.
785    pub(crate) async fn list_semantic_ids_in_namespace(
786        &self,
787        ns: &Namespace,
788    ) -> HirnResult<Vec<MemoryId>> {
789        let filter = namespace_column_filter(ns);
790        let opts = hirn_storage::store::ScanOptions {
791            filter: Some(filter),
792            columns: Some(vec!["id".to_string()]),
793            ..Default::default()
794        };
795        let batches = self
796            .storage_runtime
797            .scan(hirn_storage::datasets::semantic::DATASET_NAME, opts)
798            .await
799            .map_err(|e| HirnError::storage(e))?;
800
801        let mut ids = Vec::new();
802        for batch in &batches {
803            let id_col = batch
804                .column_by_name("id")
805                .and_then(|c| c.as_any().downcast_ref::<arrow_array::StringArray>());
806            if let Some(col) = id_col {
807                for i in 0..col.len() {
808                    let id = MemoryId::parse(col.value(i))
809                        .map_err(|e| HirnError::InvalidInput(e.to_string()))?;
810                    ids.push(id);
811                }
812            }
813        }
814        Ok(ids)
815    }
816
817    /// List procedural record IDs in a namespace.
818    pub(crate) async fn list_procedural_ids_in_namespace(
819        &self,
820        ns: &Namespace,
821    ) -> HirnResult<Vec<MemoryId>> {
822        let filter = namespace_column_filter(ns);
823        let opts = hirn_storage::store::ScanOptions {
824            filter: Some(filter),
825            columns: Some(vec!["id".to_string()]),
826            ..Default::default()
827        };
828        let batches = self
829            .storage_runtime
830            .scan(hirn_storage::datasets::procedural::DATASET_NAME, opts)
831            .await
832            .map_err(|e| HirnError::storage(e))?;
833
834        let mut ids = Vec::new();
835        for batch in &batches {
836            let id_col = batch
837                .column_by_name("id")
838                .and_then(|c| c.as_any().downcast_ref::<arrow_array::StringArray>());
839            if let Some(col) = id_col {
840                for i in 0..col.len() {
841                    let id = MemoryId::parse(col.value(i))
842                        .map_err(|e| HirnError::InvalidInput(e.to_string()))?;
843                    ids.push(id);
844                }
845            }
846        }
847        Ok(ids)
848    }
849
850    // ── Agent Registration ──────────────────────────────────────────────
851
852    /// Register a new agent. Creates private namespace `private:{agent_id}`.
853    pub async fn register_agent(
854        &self,
855        agent_id: &hirn_core::types::AgentId,
856        display_name: impl Into<String>,
857    ) -> HirnResult<()> {
858        if self.namespace_runtime.cached_agent(agent_id).is_some() {
859            return Err(HirnError::AlreadyExists(format!(
860                "agent '{}' already registered",
861                agent_id
862            )));
863        }
864
865        // Check for existing agent.
866        let filter = escaped_id_filter(agent_id.as_str());
867        let count = self
868            .storage_runtime
869            .count(hirn_storage::datasets::agent::DATASET_NAME, Some(&filter))
870            .await
871            .map_err(|e| HirnError::storage(e))?;
872        if count > 0 {
873            return Err(HirnError::AlreadyExists(format!(
874                "agent '{}' already registered",
875                agent_id
876            )));
877        }
878
879        let rec = hirn_core::agent::AgentRecord::new(agent_id.clone(), display_name);
880        let ns_rec = hirn_core::namespace::NamespaceRecord::private_for(agent_id);
881        let envelope = build_agent_register_envelope(rec, ns_rec)?;
882        hirn_storage::append_mutation_envelope(self.storage_backend(), &envelope)
883            .await
884            .map_err(HirnError::storage)?;
885
886        let payload = decode_agent_register_envelope(&envelope)?;
887        self.apply_agent_register_plan(&payload).await?;
888
889        if let Err(error) = hirn_storage::update_mutation_envelope_state(
890            self.storage_backend(),
891            &envelope.id,
892            hirn_storage::MutationEnvelopeState::Applied,
893            None,
894        )
895        .await
896        {
897            tracing::warn!(
898                agent_id = %agent_id,
899                envelope_id = %envelope.id,
900                error = %error,
901                "agent register mutation envelope finalize failed; recovery will retry"
902            );
903        }
904
905        Ok(())
906    }
907
908    /// List all registered agents.
909    pub async fn list_agents(&self) -> HirnResult<Vec<hirn_core::agent::AgentRecord>> {
910        let opts = hirn_storage::store::ScanOptions::default();
911        let batches = self
912            .storage_runtime
913            .scan(hirn_storage::datasets::agent::DATASET_NAME, opts)
914            .await
915            .map_err(|e| HirnError::storage(e))?;
916
917        let mut result = Vec::new();
918        for batch in &batches {
919            let recs = hirn_storage::datasets::agent::from_batch(batch)
920                .map_err(|e| HirnError::storage(e))?;
921            result.extend(recs);
922        }
923        Ok(result)
924    }
925
926    /// Get a registered agent.
927    pub async fn get_agent(
928        &self,
929        agent_id: &hirn_core::types::AgentId,
930    ) -> HirnResult<hirn_core::agent::AgentRecord> {
931        if let Some(agent) = self.namespace_runtime.cached_agent(agent_id) {
932            return Ok(agent);
933        }
934
935        let filter = escaped_id_filter(agent_id.as_str());
936        let opts = hirn_storage::store::ScanOptions {
937            filter: Some(filter),
938            ..Default::default()
939        };
940        let batches = self
941            .storage_runtime
942            .scan(hirn_storage::datasets::agent::DATASET_NAME, opts)
943            .await
944            .map_err(|e| HirnError::storage(e))?;
945
946        for batch in &batches {
947            let recs = hirn_storage::datasets::agent::from_batch(batch)
948                .map_err(|e| HirnError::storage(e))?;
949            if let Some(rec) = recs.into_iter().next() {
950                self.namespace_runtime.cache_agent(rec.clone());
951                return Ok(rec);
952            }
953        }
954        Err(HirnError::NotFound(format!("agent '{agent_id}'")))
955    }
956
957    /// Update a registered agent record.
958    pub async fn update_agent(&self, agent: &hirn_core::agent::AgentRecord) -> HirnResult<()> {
959        let filter = escaped_id_filter(agent.id.as_str());
960        let count = self
961            .storage_runtime
962            .count(hirn_storage::datasets::agent::DATASET_NAME, Some(&filter))
963            .await
964            .map_err(|e| HirnError::storage(e))?;
965        if count == 0 {
966            return Err(HirnError::NotFound(format!("agent '{}'", agent.id)));
967        }
968        let batch = hirn_storage::datasets::agent::to_batch(std::slice::from_ref(agent))
969            .map_err(|e| HirnError::storage(e))?;
970        self.storage_backend()
971            .merge_insert(hirn_storage::datasets::agent::DATASET_NAME, &["id"], batch)
972            .await
973            .map_err(|e| HirnError::storage(e))?;
974
975        self.namespace_runtime.cache_agent(agent.clone());
976        Ok(())
977    }
978
979    /// Deregister an agent and delete its private namespace.
980    pub async fn deregister_agent(&self, agent_id: &hirn_core::types::AgentId) -> HirnResult<()> {
981        // Verify agent exists.
982        self.get_agent(agent_id).await?;
983
984        let envelope = build_agent_deregister_envelope(*agent_id)?;
985        hirn_storage::append_mutation_envelope(self.storage_backend(), &envelope)
986            .await
987            .map_err(HirnError::storage)?;
988
989        let payload = decode_agent_deregister_envelope(&envelope)?;
990        self.apply_agent_deregister_plan(&payload).await?;
991
992        if let Err(error) = hirn_storage::update_mutation_envelope_state(
993            self.storage_backend(),
994            &envelope.id,
995            hirn_storage::MutationEnvelopeState::Applied,
996            None,
997        )
998        .await
999        {
1000            tracing::warn!(
1001                agent_id = %agent_id,
1002                envelope_id = %envelope.id,
1003                error = %error,
1004                "agent deregister mutation envelope finalize failed; recovery will retry"
1005            );
1006        }
1007
1008        Ok(())
1009    }
1010
1011    // ── Team Namespace Management ───────────────────────────────────────
1012
1013    /// Create a team namespace with the given agent members.
1014    pub async fn create_team_namespace(
1015        &self,
1016        name: &str,
1017        agent_ids: Vec<hirn_core::types::AgentId>,
1018    ) -> HirnResult<()> {
1019        self.create_namespace(name, hirn_core::types::NamespaceKind::Team, agent_ids)
1020            .await
1021    }
1022
1023    /// Add an agent to a team namespace.
1024    pub async fn add_agent_to_team(
1025        &self,
1026        agent_id: &hirn_core::types::AgentId,
1027        team_name: &str,
1028    ) -> HirnResult<()> {
1029        let mut ns_rec = self.get_namespace(team_name).await?;
1030        if ns_rec.kind != hirn_core::types::NamespaceKind::Team {
1031            return Err(HirnError::InvalidInput(format!(
1032                "'{team_name}' is not a team namespace"
1033            )));
1034        }
1035        if ns_rec.member_agents.contains(agent_id) {
1036            return Ok(()); // already a member
1037        }
1038        ns_rec.member_agents.push(agent_id.clone());
1039        self.update_namespace_record(&ns_rec).await?;
1040
1041        self.append_audit(
1042            None,
1043            hirn_core::audit::AuditAction::AgentAddedToTeam {
1044                agent_id: agent_id.clone(),
1045                team: team_name.to_string(),
1046            },
1047        )
1048        .await?;
1049        Ok(())
1050    }
1051
1052    /// Remove an agent from a team namespace.
1053    pub async fn remove_agent_from_team(
1054        &self,
1055        agent_id: &hirn_core::types::AgentId,
1056        team_name: &str,
1057    ) -> HirnResult<()> {
1058        let mut ns_rec = self.get_namespace(team_name).await?;
1059        if ns_rec.kind != hirn_core::types::NamespaceKind::Team {
1060            return Err(HirnError::InvalidInput(format!(
1061                "'{team_name}' is not a team namespace"
1062            )));
1063        }
1064        ns_rec.member_agents.retain(|a| a != agent_id);
1065        self.update_namespace_record(&ns_rec).await?;
1066
1067        self.append_audit(
1068            None,
1069            hirn_core::audit::AuditAction::AgentRemovedFromTeam {
1070                agent_id: agent_id.clone(),
1071                team: team_name.to_string(),
1072            },
1073        )
1074        .await?;
1075        Ok(())
1076    }
1077
1078    /// Update a namespace record in storage.
1079    async fn update_namespace_record(
1080        &self,
1081        rec: &hirn_core::namespace::NamespaceRecord,
1082    ) -> HirnResult<()> {
1083        let batch = hirn_storage::datasets::namespace::to_batch(std::slice::from_ref(rec))
1084            .map_err(|e| HirnError::storage(e))?;
1085        self.storage_backend()
1086            .merge_insert(
1087                hirn_storage::datasets::namespace::DATASET_NAME,
1088                &["id"],
1089                batch,
1090            )
1091            .await
1092            .map_err(|e| HirnError::storage(e))?;
1093
1094        self.namespace_runtime.invalidate_namespaces();
1095        Ok(())
1096    }
1097
1098    // ── Audit Trail ─────────────────────────────────────────────────────
1099
1100    /// Append an entry to the audit log.
1101    pub(crate) async fn append_audit(
1102        &self,
1103        actor: Option<hirn_core::types::AgentId>,
1104        action: hirn_core::audit::AuditAction,
1105    ) -> HirnResult<()> {
1106        self.policy_runtime().append_audit(actor, action).await
1107    }
1108
1109    /// Query the audit log, optionally filtering by time range.
1110    pub(crate) async fn audit_log(
1111        &self,
1112        after: Option<&Timestamp>,
1113        before: Option<&Timestamp>,
1114    ) -> HirnResult<Vec<hirn_core::audit::AuditEntry>> {
1115        self.policy_runtime().audit_log(after, before).await
1116    }
1117
1118    // ── Agent Context ───────────────────────────────────────────────────
1119
1120    /// Register an agent if not already registered. Returns `Ok(())` in either case.
1121    pub async fn ensure_agent(&self, agent_id: &hirn_core::types::AgentId) -> HirnResult<()> {
1122        if self.namespace_runtime.cached_agent(agent_id).is_some() {
1123            return Ok(());
1124        }
1125
1126        if self.get_agent(agent_id).await.is_ok() {
1127            return Ok(());
1128        }
1129
1130        match self.register_agent(agent_id, agent_id.as_str()).await {
1131            Ok(()) | Err(HirnError::AlreadyExists(_)) => Ok(()),
1132            Err(e) => Err(e),
1133        }
1134    }
1135
1136    /// Create an agent-scoped context for namespace-isolated operations.
1137    pub async fn as_agent(
1138        &self,
1139        agent_id: &hirn_core::types::AgentId,
1140    ) -> HirnResult<crate::agent_context::AgentContext<'_>> {
1141        let private_namespace = Namespace::private_for(agent_id);
1142
1143        if let Some(accessible) = self
1144            .namespace_runtime
1145            .cached_accessible_namespaces(agent_id)
1146        {
1147            let mut accessible = accessible;
1148            if !accessible.contains(&private_namespace) {
1149                accessible.push(private_namespace);
1150            }
1151            return Ok(crate::agent_context::AgentContext::new(
1152                self,
1153                agent_id.clone(),
1154                accessible,
1155            ));
1156        }
1157
1158        // Verify the agent is registered.
1159        self.get_agent(agent_id).await?;
1160
1161        // Collect namespaces accessible to this agent.
1162        let namespaces = self.list_namespaces().await?;
1163        let mut accessible: Vec<Namespace> = namespaces
1164            .iter()
1165            .filter(|ns| ns.agent_has_access(agent_id))
1166            .map(|ns| ns.namespace.clone())
1167            .collect();
1168        if !accessible.contains(&private_namespace) {
1169            accessible.push(private_namespace);
1170        }
1171
1172        self.namespace_runtime
1173            .cache_accessible_namespaces(agent_id.clone(), accessible.clone());
1174
1175        Ok(crate::agent_context::AgentContext::new(
1176            self,
1177            agent_id.clone(),
1178            accessible,
1179        ))
1180    }
1181}
1182
1183#[cfg(test)]
1184mod tests {
1185    use std::sync::Arc;
1186    use std::sync::atomic::{AtomicBool, Ordering as AtomicOrdering};
1187
1188    use arrow_array::RecordBatch;
1189    use async_trait::async_trait;
1190    use datafusion::catalog::TableProvider;
1191    use hirn_core::types::{AgentId, EventType, NamespaceKind};
1192    use hirn_storage::memory_store::MemoryStore;
1193    use hirn_storage::store::{
1194        ColumnTransform, CompactOptions, CompactResult, DatasetInfo, FtsSearchOptions,
1195        HybridSearchOptions, IndexConfig, MultivectorSearchOptions, RecordBatchStream, ScanOptions,
1196        VectorSearchOptions, VersionTag,
1197    };
1198    use hirn_storage::{HirnDb, HirnDbConfig, HirnDbError, PhysicalStore};
1199
1200    use super::*;
1201
1202    struct FaultInjectingNamespaceStore {
1203        inner: MemoryStore,
1204        fail_agent_writes: AtomicBool,
1205        fail_agent_deletes: AtomicBool,
1206        fail_namespace_writes: AtomicBool,
1207    }
1208
1209    impl FaultInjectingNamespaceStore {
1210        fn new() -> Self {
1211            Self {
1212                inner: MemoryStore::new(),
1213                fail_agent_writes: AtomicBool::new(false),
1214                fail_agent_deletes: AtomicBool::new(false),
1215                fail_namespace_writes: AtomicBool::new(false),
1216            }
1217        }
1218
1219        fn fail_agent_writes(&self) {
1220            self.fail_agent_writes.store(true, AtomicOrdering::Release);
1221        }
1222
1223        fn allow_agent_writes(&self) {
1224            self.fail_agent_writes.store(false, AtomicOrdering::Release);
1225        }
1226
1227        fn fail_agent_deletes(&self) {
1228            self.fail_agent_deletes.store(true, AtomicOrdering::Release);
1229        }
1230
1231        fn allow_agent_deletes(&self) {
1232            self.fail_agent_deletes
1233                .store(false, AtomicOrdering::Release);
1234        }
1235
1236        fn fail_namespace_writes(&self) {
1237            self.fail_namespace_writes
1238                .store(true, AtomicOrdering::Release);
1239        }
1240
1241        fn allow_namespace_writes(&self) {
1242            self.fail_namespace_writes
1243                .store(false, AtomicOrdering::Release);
1244        }
1245    }
1246
1247    #[async_trait]
1248    impl PhysicalStore for FaultInjectingNamespaceStore {
1249        async fn append(&self, dataset: &str, batch: RecordBatch) -> Result<(), HirnDbError> {
1250            if dataset == hirn_storage::datasets::agent::DATASET_NAME
1251                && self.fail_agent_writes.load(AtomicOrdering::Acquire)
1252            {
1253                return Err(HirnDbError::Unsupported(
1254                    "simulated agent append failure".to_string(),
1255                ));
1256            }
1257            if dataset == hirn_storage::datasets::namespace::DATASET_NAME
1258                && self.fail_namespace_writes.load(AtomicOrdering::Acquire)
1259            {
1260                return Err(HirnDbError::Unsupported(
1261                    "simulated namespace append failure".to_string(),
1262                ));
1263            }
1264            self.inner.append(dataset, batch).await
1265        }
1266
1267        async fn append_batches(
1268            &self,
1269            dataset: &str,
1270            batches: Vec<RecordBatch>,
1271        ) -> Result<(), HirnDbError> {
1272            self.inner.append_batches(dataset, batches).await
1273        }
1274
1275        async fn scan(
1276            &self,
1277            dataset: &str,
1278            opts: ScanOptions,
1279        ) -> Result<Vec<RecordBatch>, HirnDbError> {
1280            self.inner.scan(dataset, opts).await
1281        }
1282
1283        async fn scan_stream(
1284            &self,
1285            dataset: &str,
1286            opts: ScanOptions,
1287        ) -> Result<RecordBatchStream, HirnDbError> {
1288            self.inner.scan_stream(dataset, opts).await
1289        }
1290
1291        async fn delete(&self, dataset: &str, predicate: &str) -> Result<u64, HirnDbError> {
1292            if dataset == hirn_storage::datasets::agent::DATASET_NAME
1293                && self.fail_agent_deletes.load(AtomicOrdering::Acquire)
1294            {
1295                return Err(HirnDbError::Unsupported(
1296                    "simulated agent delete failure".to_string(),
1297                ));
1298            }
1299            self.inner.delete(dataset, predicate).await
1300        }
1301
1302        async fn update_where(
1303            &self,
1304            dataset: &str,
1305            filter: &str,
1306            updates: &[(&str, &str)],
1307        ) -> Result<u64, HirnDbError> {
1308            self.inner.update_where(dataset, filter, updates).await
1309        }
1310
1311        async fn merge_insert(
1312            &self,
1313            dataset: &str,
1314            on: &[&str],
1315            batch: RecordBatch,
1316        ) -> Result<(), HirnDbError> {
1317            if dataset == hirn_storage::datasets::agent::DATASET_NAME
1318                && self.fail_agent_writes.load(AtomicOrdering::Acquire)
1319            {
1320                return Err(HirnDbError::Unsupported(
1321                    "simulated agent merge_insert failure".to_string(),
1322                ));
1323            }
1324            if dataset == hirn_storage::datasets::namespace::DATASET_NAME
1325                && self.fail_namespace_writes.load(AtomicOrdering::Acquire)
1326            {
1327                return Err(HirnDbError::Unsupported(
1328                    "simulated namespace merge_insert failure".to_string(),
1329                ));
1330            }
1331            self.inner.merge_insert(dataset, on, batch).await
1332        }
1333
1334        async fn count(&self, dataset: &str, filter: Option<&str>) -> Result<u64, HirnDbError> {
1335            self.inner.count(dataset, filter).await
1336        }
1337
1338        async fn vector_search(
1339            &self,
1340            dataset: &str,
1341            opts: VectorSearchOptions,
1342        ) -> Result<Vec<RecordBatch>, HirnDbError> {
1343            self.inner.vector_search(dataset, opts).await
1344        }
1345
1346        async fn vector_search_many(
1347            &self,
1348            dataset: &str,
1349            queries: Vec<VectorSearchOptions>,
1350        ) -> Result<Vec<Vec<RecordBatch>>, HirnDbError> {
1351            self.inner.vector_search_many(dataset, queries).await
1352        }
1353
1354        async fn fts_search(
1355            &self,
1356            dataset: &str,
1357            opts: FtsSearchOptions,
1358        ) -> Result<Vec<RecordBatch>, HirnDbError> {
1359            self.inner.fts_search(dataset, opts).await
1360        }
1361
1362        async fn hybrid_search(
1363            &self,
1364            dataset: &str,
1365            opts: HybridSearchOptions,
1366        ) -> Result<Vec<RecordBatch>, HirnDbError> {
1367            self.inner.hybrid_search(dataset, opts).await
1368        }
1369
1370        async fn multivector_search(
1371            &self,
1372            dataset: &str,
1373            opts: MultivectorSearchOptions,
1374        ) -> Result<Vec<RecordBatch>, HirnDbError> {
1375            self.inner.multivector_search(dataset, opts).await
1376        }
1377
1378        async fn create_index(
1379            &self,
1380            dataset: &str,
1381            config: IndexConfig,
1382        ) -> Result<(), HirnDbError> {
1383            self.inner.create_index(dataset, config).await
1384        }
1385
1386        async fn optimize_indices(&self, dataset: &str) -> Result<(), HirnDbError> {
1387            self.inner.optimize_indices(dataset).await
1388        }
1389
1390        async fn compact(
1391            &self,
1392            dataset: &str,
1393            opts: CompactOptions,
1394        ) -> Result<CompactResult, HirnDbError> {
1395            self.inner.compact(dataset, opts).await
1396        }
1397
1398        async fn version(&self, dataset: &str) -> Result<u64, HirnDbError> {
1399            self.inner.version(dataset).await
1400        }
1401
1402        async fn tag(&self, dataset: &str, tag: &str) -> Result<(), HirnDbError> {
1403            self.inner.tag(dataset, tag).await
1404        }
1405
1406        async fn checkout(&self, dataset: &str, version: u64) -> Result<(), HirnDbError> {
1407            self.inner.checkout(dataset, version).await
1408        }
1409
1410        async fn list_tags(&self, dataset: &str) -> Result<Vec<VersionTag>, HirnDbError> {
1411            self.inner.list_tags(dataset).await
1412        }
1413
1414        async fn list_datasets(&self) -> Result<Vec<DatasetInfo>, HirnDbError> {
1415            self.inner.list_datasets().await
1416        }
1417
1418        async fn exists(&self, dataset: &str) -> Result<bool, HirnDbError> {
1419            self.inner.exists(dataset).await
1420        }
1421
1422        async fn list_namespaces(&self) -> Result<Vec<String>, HirnDbError> {
1423            self.inner.list_namespaces().await
1424        }
1425
1426        async fn create_namespace(&self, name: &str) -> Result<(), HirnDbError> {
1427            self.inner.create_namespace(name).await
1428        }
1429
1430        async fn drop_namespace(&self, name: &str) -> Result<(), HirnDbError> {
1431            self.inner.drop_namespace(name).await
1432        }
1433
1434        async fn add_columns(
1435            &self,
1436            dataset: &str,
1437            transforms: Vec<ColumnTransform>,
1438        ) -> Result<(), HirnDbError> {
1439            self.inner.add_columns(dataset, transforms).await
1440        }
1441
1442        async fn drop_columns(&self, dataset: &str, columns: &[&str]) -> Result<(), HirnDbError> {
1443            self.inner.drop_columns(dataset, columns).await
1444        }
1445
1446        async fn table_provider(&self, dataset: &str) -> Option<Arc<dyn TableProvider>> {
1447            self.inner.table_provider(dataset).await
1448        }
1449    }
1450
1451    fn test_agent() -> AgentId {
1452        AgentId::new("namespace_delete_agent").unwrap()
1453    }
1454
1455    async fn temp_db() -> (
1456        HirnDB,
1457        HirnConfig,
1458        Arc<dyn PhysicalStore>,
1459        tempfile::TempDir,
1460    ) {
1461        let dir = tempfile::tempdir().unwrap();
1462        let lance_path = dir.path().join("lance");
1463        let storage: Arc<dyn PhysicalStore> = HirnDb::open(HirnDbConfig::local(
1464            lance_path.to_str().expect("temp path should be utf8"),
1465        ))
1466        .await
1467        .unwrap()
1468        .store_arc();
1469        let config = HirnConfig::builder()
1470            .db_path(dir.path().join("db"))
1471            .working_memory_token_limit(100_000)
1472            .build()
1473            .unwrap();
1474        let db = HirnDB::open_with_config(config.clone(), Arc::clone(&storage))
1475            .await
1476            .unwrap();
1477        (db, config, storage, dir)
1478    }
1479
1480    async fn temp_db_with_storage(
1481        storage: Arc<dyn PhysicalStore>,
1482    ) -> (
1483        HirnDB,
1484        HirnConfig,
1485        Arc<dyn PhysicalStore>,
1486        tempfile::TempDir,
1487    ) {
1488        let dir = tempfile::tempdir().unwrap();
1489        let config = HirnConfig::builder()
1490            .db_path(dir.path().join("db"))
1491            .working_memory_token_limit(100_000)
1492            .build()
1493            .unwrap();
1494        let db = HirnDB::open_with_config(config.clone(), Arc::clone(&storage))
1495            .await
1496            .unwrap();
1497        (db, config, storage, dir)
1498    }
1499
1500    fn episode(namespace: Namespace, content: &str) -> EpisodicRecord {
1501        EpisodicRecord::builder()
1502            .content(content)
1503            .summary(content)
1504            .event_type(EventType::Observation)
1505            .importance(0.5)
1506            .namespace(namespace)
1507            .agent_id(test_agent())
1508            .build()
1509            .unwrap()
1510    }
1511
1512    #[tokio::test(flavor = "multi_thread")]
1513    async fn delete_namespace_records_applied_mutation_envelope() {
1514        let (db, _config, _storage, _dir) = temp_db().await;
1515        let namespace = Namespace::new("recoverable_ns_delete").unwrap();
1516        db.create_namespace(namespace.as_str(), NamespaceKind::Team, vec![])
1517            .await
1518            .unwrap();
1519        let survivor_id = db
1520            .episodic()
1521            .remember(episode(Namespace::shared(), "survivor"))
1522            .await
1523            .unwrap();
1524        let episode_id = db
1525            .episodic()
1526            .remember(episode(namespace, "delete me"))
1527            .await
1528            .unwrap();
1529
1530        db.delete_namespace(namespace.as_str()).await.unwrap();
1531
1532        assert!(matches!(
1533            db.get_namespace(namespace.as_str()).await,
1534            Err(HirnError::NotFound(_))
1535        ));
1536        assert!(matches!(
1537            db.read_episodic_record(episode_id).await,
1538            Err(HirnError::NotFound(_))
1539        ));
1540        assert!(db.read_episodic_record(survivor_id).await.is_ok());
1541
1542        let envelopes = hirn_storage::list_mutation_envelopes(
1543            db.storage_backend(),
1544            Some(NAMESPACE_DELETE_MUTATION_KIND),
1545            Some(hirn_storage::MutationEnvelopeState::Applied),
1546        )
1547        .await
1548        .unwrap();
1549        let envelope = envelopes
1550            .iter()
1551            .find(|envelope| {
1552                envelope
1553                    .id
1554                    .starts_with(&format!("namespace-delete:{namespace}:"))
1555            })
1556            .expect("namespace delete envelope should exist");
1557        assert_eq!(envelope.kind, NAMESPACE_DELETE_MUTATION_KIND);
1558        assert_eq!(envelope.state, hirn_storage::MutationEnvelopeState::Applied);
1559    }
1560
1561    #[tokio::test(flavor = "multi_thread")]
1562    async fn namespace_name_can_be_deleted_recreated_and_deleted_again() {
1563        let (db, _config, _storage, _dir) = temp_db().await;
1564        let namespace = Namespace::new("reusable_delete_name").unwrap();
1565
1566        db.create_namespace(namespace.as_str(), NamespaceKind::Team, vec![])
1567            .await
1568            .unwrap();
1569        db.delete_namespace(namespace.as_str()).await.unwrap();
1570        db.create_namespace(namespace.as_str(), NamespaceKind::Team, vec![])
1571            .await
1572            .unwrap();
1573        db.delete_namespace(namespace.as_str()).await.unwrap();
1574
1575        let envelopes = hirn_storage::list_mutation_envelopes(
1576            db.storage_backend(),
1577            Some(NAMESPACE_DELETE_MUTATION_KIND),
1578            Some(hirn_storage::MutationEnvelopeState::Applied),
1579        )
1580        .await
1581        .unwrap();
1582        let namespace_envelopes = envelopes
1583            .iter()
1584            .filter(|envelope| {
1585                envelope
1586                    .id
1587                    .starts_with(&format!("namespace-delete:{namespace}:"))
1588            })
1589            .count();
1590        assert_eq!(namespace_envelopes, 2);
1591    }
1592
1593    #[tokio::test(flavor = "multi_thread")]
1594    async fn open_reconciles_pending_namespace_delete_mutation_after_partial_cleanup() {
1595        let (db, config, storage, _dir) = temp_db().await;
1596        let namespace = Namespace::new("replay_ns_delete").unwrap();
1597        db.create_namespace(namespace.as_str(), NamespaceKind::Team, vec![])
1598            .await
1599            .unwrap();
1600        let survivor_id = db
1601            .episodic()
1602            .remember(episode(Namespace::shared(), "survivor before replay"))
1603            .await
1604            .unwrap();
1605        let episode_id = db
1606            .episodic()
1607            .remember(episode(namespace, "already gone before replay"))
1608            .await
1609            .unwrap();
1610        let envelope =
1611            build_namespace_delete_envelope(namespace, vec![episode_id], vec![], vec![]).unwrap();
1612        let payload = decode_namespace_delete_envelope(&envelope).unwrap();
1613        hirn_storage::append_mutation_envelope(db.storage_backend(), &envelope)
1614            .await
1615            .unwrap();
1616        db.delete_episode(episode_id).await.unwrap();
1617        db.append_namespace_delete_audit_once(&payload)
1618            .await
1619            .unwrap();
1620        drop(db);
1621
1622        let reopened = HirnDB::open_with_config(config, Arc::clone(&storage))
1623            .await
1624            .unwrap();
1625
1626        assert!(matches!(
1627            reopened.get_namespace(namespace.as_str()).await,
1628            Err(HirnError::NotFound(_))
1629        ));
1630        assert!(reopened.read_episodic_record(survivor_id).await.is_ok());
1631        let stored_envelope = hirn_storage::get_mutation_envelope(storage.as_ref(), &envelope.id)
1632            .await
1633            .unwrap()
1634            .expect("namespace delete envelope should remain queryable");
1635        assert_eq!(
1636            stored_envelope.state,
1637            hirn_storage::MutationEnvelopeState::Applied
1638        );
1639        let audit_log = reopened.audit_log(None, None).await.unwrap();
1640        let matching = audit_log
1641            .iter()
1642            .filter(|entry| {
1643                entry.id == payload.audit_entry_id
1644                    && matches!(
1645                        &entry.action,
1646                        hirn_core::audit::AuditAction::NamespaceDeleted { namespace: deleted }
1647                            if deleted == namespace.as_str()
1648                    )
1649            })
1650            .count();
1651        assert_eq!(
1652            matching, 1,
1653            "namespace delete audit replay must be idempotent"
1654        );
1655    }
1656
1657    #[tokio::test(flavor = "multi_thread")]
1658    async fn failed_agent_update_preserves_existing_row() {
1659        let fault_store = Arc::new(FaultInjectingNamespaceStore::new());
1660        let storage: Arc<dyn PhysicalStore> = fault_store.clone();
1661        let (db, _config, _storage, _dir) = temp_db_with_storage(storage).await;
1662
1663        let agent_id = AgentId::new("upserted_agent").unwrap();
1664        db.register_agent(&agent_id, "Original Agent")
1665            .await
1666            .unwrap();
1667
1668        let mut updated = db.get_agent(&agent_id).await.unwrap();
1669        updated.display_name = "Updated Agent".to_string();
1670
1671        fault_store.fail_agent_writes();
1672
1673        let error = db.update_agent(&updated).await.unwrap_err();
1674        assert!(
1675            error
1676                .to_string()
1677                .contains("simulated agent merge_insert failure"),
1678            "expected keyed-upsert failure, got: {error}"
1679        );
1680
1681        let agents = db.list_agents().await.unwrap();
1682        let stored = agents
1683            .iter()
1684            .find(|agent| agent.id == agent_id)
1685            .expect("original agent row should still exist");
1686        assert_eq!(stored.display_name, "Original Agent");
1687    }
1688
1689    #[tokio::test(flavor = "multi_thread")]
1690    async fn failed_team_membership_update_preserves_existing_namespace_row() {
1691        let fault_store = Arc::new(FaultInjectingNamespaceStore::new());
1692        let storage: Arc<dyn PhysicalStore> = fault_store.clone();
1693        let (db, _config, _storage, _dir) = temp_db_with_storage(storage).await;
1694
1695        let team = "team_upsert_guard";
1696        let member = AgentId::new("team_member").unwrap();
1697        db.create_namespace(team, NamespaceKind::Team, vec![])
1698            .await
1699            .unwrap();
1700
1701        fault_store.fail_namespace_writes();
1702
1703        let error = db.add_agent_to_team(&member, team).await.unwrap_err();
1704        assert!(
1705            error
1706                .to_string()
1707                .contains("simulated namespace merge_insert failure"),
1708            "expected keyed-upsert failure, got: {error}"
1709        );
1710
1711        let stored = db.get_namespace(team).await.unwrap();
1712        assert!(stored.member_agents.is_empty());
1713    }
1714
1715    #[tokio::test(flavor = "multi_thread")]
1716    async fn open_reconciles_pending_agent_register_mutation_after_partial_namespace_write() {
1717        let fault_store = Arc::new(FaultInjectingNamespaceStore::new());
1718        let storage: Arc<dyn PhysicalStore> = fault_store.clone();
1719        let (db, config, _storage, _dir) = temp_db_with_storage(storage.clone()).await;
1720
1721        let agent_id = AgentId::new("recoverable_agent_register").unwrap();
1722        let private_ns = Namespace::private_for(&agent_id);
1723
1724        fault_store.fail_namespace_writes();
1725        let error = db
1726            .register_agent(&agent_id, "Recoverable Agent")
1727            .await
1728            .unwrap_err();
1729        assert!(
1730            error
1731                .to_string()
1732                .contains("simulated namespace merge_insert failure"),
1733            "expected namespace upsert failure, got: {error}"
1734        );
1735
1736        let agents = db.list_agents().await.unwrap();
1737        assert!(agents.iter().any(|agent| agent.id == agent_id));
1738        assert!(matches!(
1739            db.get_namespace(private_ns.as_str()).await,
1740            Err(HirnError::NotFound(_))
1741        ));
1742        drop(db);
1743
1744        fault_store.allow_namespace_writes();
1745        let reopened = HirnDB::open_with_config(config, storage).await.unwrap();
1746
1747        assert_eq!(
1748            reopened.get_agent(&agent_id).await.unwrap().display_name,
1749            "Recoverable Agent"
1750        );
1751        assert_eq!(
1752            reopened
1753                .get_namespace(private_ns.as_str())
1754                .await
1755                .unwrap()
1756                .namespace,
1757            private_ns
1758        );
1759
1760        let envelopes = hirn_storage::list_mutation_envelopes(
1761            reopened.storage_backend(),
1762            Some(AGENT_REGISTER_MUTATION_KIND),
1763            Some(hirn_storage::MutationEnvelopeState::Applied),
1764        )
1765        .await
1766        .unwrap();
1767        assert!(envelopes.iter().any(|envelope| {
1768            envelope
1769                .id
1770                .starts_with(&format!("agent-register:{agent_id}:"))
1771        }));
1772
1773        let audit_entries = reopened.audit_log(None, None).await.unwrap();
1774        let matching = audit_entries
1775            .iter()
1776            .filter(|entry| {
1777                matches!(
1778                    &entry.action,
1779                    hirn_core::audit::AuditAction::AgentRegistered { agent_id: recorded }
1780                        if recorded == &agent_id
1781                )
1782            })
1783            .count();
1784        assert_eq!(matching, 1);
1785    }
1786
1787    #[tokio::test(flavor = "multi_thread")]
1788    async fn open_reconciles_pending_agent_deregister_mutation_after_partial_agent_delete() {
1789        let fault_store = Arc::new(FaultInjectingNamespaceStore::new());
1790        let storage: Arc<dyn PhysicalStore> = fault_store.clone();
1791        let (db, config, _storage, _dir) = temp_db_with_storage(storage.clone()).await;
1792
1793        let agent_id = AgentId::new("recoverable_agent_deregister").unwrap();
1794        let private_ns = Namespace::private_for(&agent_id);
1795        db.register_agent(&agent_id, "Recoverable Delete")
1796            .await
1797            .unwrap();
1798
1799        fault_store.fail_agent_deletes();
1800        let error = db.deregister_agent(&agent_id).await.unwrap_err();
1801        assert!(
1802            error.to_string().contains("simulated agent delete failure"),
1803            "expected agent delete failure, got: {error}"
1804        );
1805
1806        assert!(matches!(
1807            db.get_namespace(private_ns.as_str()).await,
1808            Err(HirnError::NotFound(_))
1809        ));
1810        let agents = db.list_agents().await.unwrap();
1811        assert!(agents.iter().any(|agent| agent.id == agent_id));
1812        drop(db);
1813
1814        fault_store.allow_agent_deletes();
1815        fault_store.allow_agent_writes();
1816        let reopened = HirnDB::open_with_config(config, storage).await.unwrap();
1817
1818        assert!(matches!(
1819            reopened.get_agent(&agent_id).await,
1820            Err(HirnError::NotFound(_))
1821        ));
1822        assert!(matches!(
1823            reopened.get_namespace(private_ns.as_str()).await,
1824            Err(HirnError::NotFound(_))
1825        ));
1826
1827        let envelopes = hirn_storage::list_mutation_envelopes(
1828            reopened.storage_backend(),
1829            Some(AGENT_DEREGISTER_MUTATION_KIND),
1830            Some(hirn_storage::MutationEnvelopeState::Applied),
1831        )
1832        .await
1833        .unwrap();
1834        assert!(envelopes.iter().any(|envelope| {
1835            envelope
1836                .id
1837                .starts_with(&format!("agent-deregister:{agent_id}:"))
1838        }));
1839
1840        let audit_entries = reopened.audit_log(None, None).await.unwrap();
1841        let matching = audit_entries
1842            .iter()
1843            .filter(|entry| {
1844                matches!(
1845                    &entry.action,
1846                    hirn_core::audit::AuditAction::AgentDeregistered { agent_id: recorded }
1847                        if recorded == &agent_id
1848                )
1849            })
1850            .count();
1851        assert_eq!(matching, 1);
1852    }
1853}