1use arrow_array::Array;
2
3use super::*;
4
5pub(super) const NAMESPACE_DELETE_MUTATION_KIND: &str = "namespace_delete";
6pub(super) const AGENT_REGISTER_MUTATION_KIND: &str = "agent_register";
7pub(super) const AGENT_DEREGISTER_MUTATION_KIND: &str = "agent_deregister";
8
9#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
10struct NamespaceDeleteEnvelope {
11 namespace: Namespace,
12 episodic_ids: Vec<MemoryId>,
13 semantic_ids: Vec<MemoryId>,
14 procedural_ids: Vec<MemoryId>,
15 audit_entry_id: MemoryId,
16 audit_timestamp: Timestamp,
17}
18
19#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
20struct AgentRegisterEnvelope {
21 agent: hirn_core::agent::AgentRecord,
22 private_namespace: hirn_core::namespace::NamespaceRecord,
23 audit_entry_id: MemoryId,
24 audit_timestamp: Timestamp,
25}
26
27#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
28struct AgentDeregisterEnvelope {
29 agent_id: hirn_core::types::AgentId,
30 private_namespace: Namespace,
31 audit_entry_id: MemoryId,
32 audit_timestamp: Timestamp,
33}
34
35fn namespace_sql_value(namespace: &Namespace) -> String {
36 namespace.as_str().replace('\'', "''")
37}
38
39fn namespace_id_filter(namespace: &Namespace) -> String {
40 format!("id = '{}'", namespace_sql_value(namespace))
41}
42
43fn namespace_column_filter(namespace: &Namespace) -> String {
44 format!("namespace = '{}'", namespace_sql_value(namespace))
45}
46
47fn escaped_id_filter(id: &str) -> String {
48 format!("id = '{}'", id.replace('\'', "''"))
49}
50
51fn encode_namespace_delete_envelope(payload: &NamespaceDeleteEnvelope) -> HirnResult<Vec<u8>> {
52 serde_json::to_vec(payload).map_err(|error| {
53 HirnError::storage(format!("namespace delete envelope serialize: {error}"))
54 })
55}
56
57fn encode_agent_register_envelope(payload: &AgentRegisterEnvelope) -> HirnResult<Vec<u8>> {
58 serde_json::to_vec(payload)
59 .map_err(|error| HirnError::storage(format!("agent register envelope serialize: {error}")))
60}
61
62fn encode_agent_deregister_envelope(payload: &AgentDeregisterEnvelope) -> HirnResult<Vec<u8>> {
63 serde_json::to_vec(payload).map_err(|error| {
64 HirnError::storage(format!("agent deregister envelope serialize: {error}"))
65 })
66}
67
68fn decode_namespace_delete_envelope(
69 envelope: &hirn_storage::MutationEnvelopeRecord,
70) -> HirnResult<NamespaceDeleteEnvelope> {
71 serde_json::from_slice(&envelope.payload).map_err(|error| {
72 HirnError::storage(format!("namespace delete envelope deserialize: {error}"))
73 })
74}
75
76fn decode_agent_register_envelope(
77 envelope: &hirn_storage::MutationEnvelopeRecord,
78) -> HirnResult<AgentRegisterEnvelope> {
79 serde_json::from_slice(&envelope.payload).map_err(|error| {
80 HirnError::storage(format!("agent register envelope deserialize: {error}"))
81 })
82}
83
84fn decode_agent_deregister_envelope(
85 envelope: &hirn_storage::MutationEnvelopeRecord,
86) -> HirnResult<AgentDeregisterEnvelope> {
87 serde_json::from_slice(&envelope.payload).map_err(|error| {
88 HirnError::storage(format!("agent deregister envelope deserialize: {error}"))
89 })
90}
91
92fn build_namespace_delete_envelope(
93 namespace: Namespace,
94 mut episodic_ids: Vec<MemoryId>,
95 mut semantic_ids: Vec<MemoryId>,
96 mut procedural_ids: Vec<MemoryId>,
97) -> HirnResult<hirn_storage::MutationEnvelopeRecord> {
98 episodic_ids.sort_unstable();
99 episodic_ids.dedup();
100 semantic_ids.sort_unstable();
101 semantic_ids.dedup();
102 procedural_ids.sort_unstable();
103 procedural_ids.dedup();
104
105 let audit_entry_id = MemoryId::new();
106 let payload = NamespaceDeleteEnvelope {
107 namespace,
108 episodic_ids,
109 semantic_ids,
110 procedural_ids,
111 audit_entry_id,
112 audit_timestamp: Timestamp::now(),
113 };
114 let payload = encode_namespace_delete_envelope(&payload)?;
115
116 Ok(hirn_storage::MutationEnvelopeRecord::pending(
117 format!("namespace-delete:{namespace}:{audit_entry_id}"),
118 NAMESPACE_DELETE_MUTATION_KIND,
119 payload,
120 ))
121}
122
123fn build_agent_register_envelope(
124 agent: hirn_core::agent::AgentRecord,
125 private_namespace: hirn_core::namespace::NamespaceRecord,
126) -> HirnResult<hirn_storage::MutationEnvelopeRecord> {
127 let agent_id = agent.id;
128 let audit_entry_id = MemoryId::new();
129 let payload = AgentRegisterEnvelope {
130 agent,
131 private_namespace,
132 audit_entry_id,
133 audit_timestamp: Timestamp::now(),
134 };
135 let payload = encode_agent_register_envelope(&payload)?;
136
137 Ok(hirn_storage::MutationEnvelopeRecord::pending(
138 format!("agent-register:{agent_id}:{audit_entry_id}"),
139 AGENT_REGISTER_MUTATION_KIND,
140 payload,
141 ))
142}
143
144fn build_agent_deregister_envelope(
145 agent_id: hirn_core::types::AgentId,
146) -> HirnResult<hirn_storage::MutationEnvelopeRecord> {
147 let audit_entry_id = MemoryId::new();
148 let payload = AgentDeregisterEnvelope {
149 private_namespace: Namespace::private_for(&agent_id),
150 agent_id,
151 audit_entry_id,
152 audit_timestamp: Timestamp::now(),
153 };
154 let payload = encode_agent_deregister_envelope(&payload)?;
155
156 Ok(hirn_storage::MutationEnvelopeRecord::pending(
157 format!("agent-deregister:{agent_id}:{audit_entry_id}"),
158 AGENT_DEREGISTER_MUTATION_KIND,
159 payload,
160 ))
161}
162
163impl HirnDB {
164 pub fn subscribe(&self) -> tokio::sync::broadcast::Receiver<crate::event::MemoryEvent> {
175 self.event_runtime().subscribe()
176 }
177
178 pub(crate) async fn emit_in_realm(
180 &self,
181 realm: &str,
182 namespace: &str,
183 agent_id: &str,
184 event: MemoryEvent,
185 ) {
186 self.event_runtime()
187 .emit(realm, namespace, agent_id, event)
188 .await;
189 }
190
191 pub(crate) async fn emit_in_realm_checked(
192 &self,
193 realm: &str,
194 namespace: &str,
195 agent_id: &str,
196 event: MemoryEvent,
197 ) -> HirnResult<()> {
198 self.event_runtime()
199 .emit_checked(realm, namespace, agent_id, event)
200 .await
201 }
202
203 pub(crate) async fn emit_scoped(&self, namespace: &str, agent_id: &str, event: MemoryEvent) {
205 self.emit_in_realm(&self.config.default_realm, namespace, agent_id, event)
206 .await;
207 }
208
209 pub(crate) async fn emit_scoped_checked(
210 &self,
211 namespace: &str,
212 agent_id: &str,
213 event: MemoryEvent,
214 ) -> HirnResult<()> {
215 self.emit_in_realm_checked(&self.config.default_realm, namespace, agent_id, event)
216 .await
217 }
218
219 pub(crate) async fn emit(&self, event: MemoryEvent) {
222 self.emit_in_realm(&self.config.default_realm, "shared", "", event)
223 .await;
224 }
225
226 pub(crate) async fn create_namespace(
230 &self,
231 name: &str,
232 kind: hirn_core::types::NamespaceKind,
233 members: Vec<hirn_core::types::AgentId>,
234 ) -> HirnResult<()> {
235 let ns = Namespace::new(name)?;
236
237 let filter = namespace_id_filter(&ns);
239 let count = self
240 .storage_runtime
241 .count(
242 hirn_storage::datasets::namespace::DATASET_NAME,
243 Some(&filter),
244 )
245 .await
246 .map_err(|e| HirnError::storage(e))?;
247 if count > 0 {
248 return Err(HirnError::AlreadyExists(format!(
249 "namespace '{name}' already exists"
250 )));
251 }
252
253 let rec = hirn_core::namespace::NamespaceRecord {
254 namespace: ns,
255 kind,
256 created_at: Timestamp::now(),
257 member_agents: members,
258 };
259
260 let batch = hirn_storage::datasets::namespace::to_batch(std::slice::from_ref(&rec))
261 .map_err(|e| HirnError::storage(e))?;
262 self.storage_runtime
263 .append(hirn_storage::datasets::namespace::DATASET_NAME, batch)
264 .await
265 .map_err(|e| HirnError::storage(e))?;
266
267 self.namespace_runtime.invalidate_namespaces();
268
269 self.append_audit(
270 None,
271 hirn_core::audit::AuditAction::NamespaceCreated {
272 namespace: name.to_string(),
273 },
274 )
275 .await?;
276
277 Ok(())
278 }
279
280 pub(crate) async fn list_namespaces(
282 &self,
283 ) -> HirnResult<Vec<hirn_core::namespace::NamespaceRecord>> {
284 if let Some(cached) = self.namespace_runtime.cached_namespaces() {
285 return Ok(cached.as_ref().clone());
286 }
287
288 let opts = hirn_storage::store::ScanOptions::default();
289 let batches = self
290 .storage_runtime
291 .scan(hirn_storage::datasets::namespace::DATASET_NAME, opts)
292 .await
293 .map_err(|e| HirnError::storage(e))?;
294
295 let mut result = Vec::new();
296 for batch in &batches {
297 let recs = hirn_storage::datasets::namespace::from_batch(batch)
298 .map_err(|e| HirnError::storage(e))?;
299 result.extend(recs);
300 }
301
302 self.namespace_runtime.cache_namespaces(result.clone());
303
304 Ok(result)
305 }
306
307 pub(crate) async fn get_namespace(
309 &self,
310 name: &str,
311 ) -> HirnResult<hirn_core::namespace::NamespaceRecord> {
312 if let Some(cached) = self.namespace_runtime.cached_namespaces() {
313 if let Some(rec) = cached.iter().find(|rec| rec.namespace.as_str() == name) {
314 return Ok(rec.clone());
315 }
316 }
317
318 let ns = Namespace::new(name)?;
319 let filter = namespace_id_filter(&ns);
320 let opts = hirn_storage::store::ScanOptions {
321 filter: Some(filter),
322 ..Default::default()
323 };
324 let batches = self
325 .storage_runtime
326 .scan(hirn_storage::datasets::namespace::DATASET_NAME, opts)
327 .await
328 .map_err(|e| HirnError::storage(e))?;
329
330 for batch in &batches {
331 let recs = hirn_storage::datasets::namespace::from_batch(batch)
332 .map_err(|e| HirnError::storage(e))?;
333 if let Some(rec) = recs.into_iter().next() {
334 return Ok(rec);
335 }
336 }
337 Err(HirnError::NotFound(format!("namespace '{name}'")))
338 }
339
340 pub(crate) async fn delete_namespace(&self, name: &str) -> HirnResult<()> {
342 let ns = Namespace::new(name)?;
343 if ns == Namespace::shared() {
344 return Err(HirnError::InvalidInput(
345 "cannot delete the shared namespace".into(),
346 ));
347 }
348
349 self.get_namespace(name).await?;
351
352 let ep_ids = self.list_episodic_ids_in_namespace(&ns).await?;
353 let sem_ids = self.list_semantic_ids_in_namespace(&ns).await?;
354 let proc_ids = self.list_procedural_ids_in_namespace(&ns).await?;
355 let envelope = build_namespace_delete_envelope(ns, ep_ids, sem_ids, proc_ids)?;
356 hirn_storage::append_mutation_envelope(self.storage_backend(), &envelope)
357 .await
358 .map_err(HirnError::storage)?;
359
360 let payload = decode_namespace_delete_envelope(&envelope)?;
361 self.apply_namespace_delete_plan(&payload).await?;
362
363 if let Err(error) = hirn_storage::update_mutation_envelope_state(
364 self.storage_backend(),
365 &envelope.id,
366 hirn_storage::MutationEnvelopeState::Applied,
367 None,
368 )
369 .await
370 {
371 tracing::warn!(
372 namespace = %ns,
373 envelope_id = %envelope.id,
374 error = %error,
375 "namespace delete mutation envelope finalize failed; recovery will retry"
376 );
377 }
378
379 Ok(())
380 }
381
382 pub(crate) async fn reconcile_pending_namespace_delete_mutations(&self) -> HirnResult<usize> {
383 let envelopes = hirn_storage::list_pending_mutation_envelopes(
384 self.storage_backend(),
385 Some(NAMESPACE_DELETE_MUTATION_KIND),
386 )
387 .await
388 .map_err(HirnError::storage)?;
389 let mut reconciled = 0usize;
390
391 for envelope in envelopes {
392 match self
393 .reconcile_single_pending_namespace_delete_mutation(&envelope)
394 .await
395 {
396 Ok(true) => reconciled += 1,
397 Ok(false) => {}
398 Err(error) => {
399 hirn_storage::update_mutation_envelope_state(
400 self.storage_backend(),
401 &envelope.id,
402 hirn_storage::MutationEnvelopeState::Failed,
403 Some(error.to_string()),
404 )
405 .await
406 .map_err(HirnError::storage)?;
407 }
408 }
409 }
410
411 Ok(reconciled)
412 }
413
414 pub(crate) async fn reconcile_pending_agent_register_mutations(&self) -> HirnResult<usize> {
415 let envelopes = hirn_storage::list_pending_mutation_envelopes(
416 self.storage_backend(),
417 Some(AGENT_REGISTER_MUTATION_KIND),
418 )
419 .await
420 .map_err(HirnError::storage)?;
421 let mut reconciled = 0usize;
422
423 for envelope in envelopes {
424 match self
425 .reconcile_single_pending_agent_register_mutation(&envelope)
426 .await
427 {
428 Ok(true) => reconciled += 1,
429 Ok(false) => {}
430 Err(error) => {
431 hirn_storage::update_mutation_envelope_state(
432 self.storage_backend(),
433 &envelope.id,
434 hirn_storage::MutationEnvelopeState::Failed,
435 Some(error.to_string()),
436 )
437 .await
438 .map_err(HirnError::storage)?;
439 }
440 }
441 }
442
443 Ok(reconciled)
444 }
445
446 pub(crate) async fn reconcile_pending_agent_deregister_mutations(&self) -> HirnResult<usize> {
447 let envelopes = hirn_storage::list_pending_mutation_envelopes(
448 self.storage_backend(),
449 Some(AGENT_DEREGISTER_MUTATION_KIND),
450 )
451 .await
452 .map_err(HirnError::storage)?;
453 let mut reconciled = 0usize;
454
455 for envelope in envelopes {
456 match self
457 .reconcile_single_pending_agent_deregister_mutation(&envelope)
458 .await
459 {
460 Ok(true) => reconciled += 1,
461 Ok(false) => {}
462 Err(error) => {
463 hirn_storage::update_mutation_envelope_state(
464 self.storage_backend(),
465 &envelope.id,
466 hirn_storage::MutationEnvelopeState::Failed,
467 Some(error.to_string()),
468 )
469 .await
470 .map_err(HirnError::storage)?;
471 }
472 }
473 }
474
475 Ok(reconciled)
476 }
477
478 async fn reconcile_single_pending_agent_register_mutation(
479 &self,
480 envelope: &hirn_storage::MutationEnvelopeRecord,
481 ) -> HirnResult<bool> {
482 let payload = decode_agent_register_envelope(envelope)?;
483 self.apply_agent_register_plan(&payload).await?;
484 hirn_storage::update_mutation_envelope_state(
485 self.storage_backend(),
486 &envelope.id,
487 hirn_storage::MutationEnvelopeState::Applied,
488 None,
489 )
490 .await
491 .map_err(HirnError::storage)?;
492 Ok(true)
493 }
494
495 async fn reconcile_single_pending_agent_deregister_mutation(
496 &self,
497 envelope: &hirn_storage::MutationEnvelopeRecord,
498 ) -> HirnResult<bool> {
499 let payload = decode_agent_deregister_envelope(envelope)?;
500 self.apply_agent_deregister_plan(&payload).await?;
501 hirn_storage::update_mutation_envelope_state(
502 self.storage_backend(),
503 &envelope.id,
504 hirn_storage::MutationEnvelopeState::Applied,
505 None,
506 )
507 .await
508 .map_err(HirnError::storage)?;
509 Ok(true)
510 }
511
512 async fn reconcile_single_pending_namespace_delete_mutation(
513 &self,
514 envelope: &hirn_storage::MutationEnvelopeRecord,
515 ) -> HirnResult<bool> {
516 let payload = decode_namespace_delete_envelope(envelope)?;
517 self.apply_namespace_delete_plan(&payload).await?;
518 hirn_storage::update_mutation_envelope_state(
519 self.storage_backend(),
520 &envelope.id,
521 hirn_storage::MutationEnvelopeState::Applied,
522 None,
523 )
524 .await
525 .map_err(HirnError::storage)?;
526 Ok(true)
527 }
528
529 async fn apply_namespace_delete_plan(
530 &self,
531 payload: &NamespaceDeleteEnvelope,
532 ) -> HirnResult<()> {
533 for id in &payload.episodic_ids {
534 self.delete_episode_if_present(*id).await?;
535 }
536 for id in &payload.semantic_ids {
537 self.purge_semantic_if_present(*id).await?;
538 }
539 for id in &payload.procedural_ids {
540 self.delete_procedural_if_present(*id).await?;
541 }
542
543 let predicate = namespace_id_filter(&payload.namespace);
545 self.storage_runtime
546 .delete(hirn_storage::datasets::namespace::DATASET_NAME, &predicate)
547 .await
548 .map_err(|e| HirnError::storage(e))?;
549
550 self.namespace_runtime.invalidate_namespaces();
551
552 self.append_namespace_delete_audit_once(payload).await?;
553
554 Ok(())
555 }
556
557 async fn append_namespace_delete_audit_once(
558 &self,
559 payload: &NamespaceDeleteEnvelope,
560 ) -> HirnResult<()> {
561 let audit_filter = escaped_id_filter(&payload.audit_entry_id.to_string());
562 let existing = self
563 .storage_runtime
564 .count(
565 hirn_storage::datasets::audit::DATASET_NAME,
566 Some(&audit_filter),
567 )
568 .await
569 .map_err(HirnError::storage)?;
570 if existing > 0 {
571 return Ok(());
572 }
573
574 let entry = hirn_core::audit::AuditEntry {
575 id: payload.audit_entry_id,
576 timestamp: payload.audit_timestamp,
577 actor: None,
578 action: hirn_core::audit::AuditAction::NamespaceDeleted {
579 namespace: payload.namespace.to_string(),
580 },
581 };
582 let batch = hirn_storage::datasets::audit::to_batch(std::slice::from_ref(&entry))
583 .map_err(HirnError::storage)?;
584 self.storage_runtime
585 .append(hirn_storage::datasets::audit::DATASET_NAME, batch)
586 .await
587 .map_err(HirnError::storage)?;
588 Ok(())
589 }
590
591 async fn apply_agent_register_plan(&self, payload: &AgentRegisterEnvelope) -> HirnResult<()> {
592 let agent_batch =
593 hirn_storage::datasets::agent::to_batch(std::slice::from_ref(&payload.agent))
594 .map_err(HirnError::storage)?;
595 self.storage_backend()
596 .merge_insert(
597 hirn_storage::datasets::agent::DATASET_NAME,
598 &["id"],
599 agent_batch,
600 )
601 .await
602 .map_err(HirnError::storage)?;
603
604 let namespace_batch = hirn_storage::datasets::namespace::to_batch(std::slice::from_ref(
605 &payload.private_namespace,
606 ))
607 .map_err(HirnError::storage)?;
608 self.storage_backend()
609 .merge_insert(
610 hirn_storage::datasets::namespace::DATASET_NAME,
611 &["id"],
612 namespace_batch,
613 )
614 .await
615 .map_err(HirnError::storage)?;
616
617 self.namespace_runtime.cache_agent(payload.agent.clone());
618 self.namespace_runtime.invalidate_namespaces();
619 self.append_agent_register_audit_once(payload).await?;
620 Ok(())
621 }
622
623 async fn apply_agent_deregister_plan(
624 &self,
625 payload: &AgentDeregisterEnvelope,
626 ) -> HirnResult<()> {
627 match self
628 .delete_namespace(payload.private_namespace.as_str())
629 .await
630 {
631 Ok(()) | Err(HirnError::NotFound(_)) => {}
632 Err(error) => return Err(error),
633 }
634
635 let predicate = escaped_id_filter(payload.agent_id.as_str());
636 self.storage_runtime
637 .delete(hirn_storage::datasets::agent::DATASET_NAME, &predicate)
638 .await
639 .map_err(HirnError::storage)?;
640
641 self.namespace_runtime.evict_agent(&payload.agent_id);
642 self.namespace_runtime.invalidate_namespaces();
643 self.append_agent_deregister_audit_once(payload).await?;
644 Ok(())
645 }
646
647 async fn append_agent_register_audit_once(
648 &self,
649 payload: &AgentRegisterEnvelope,
650 ) -> HirnResult<()> {
651 let audit_filter = escaped_id_filter(&payload.audit_entry_id.to_string());
652 let existing = self
653 .storage_runtime
654 .count(
655 hirn_storage::datasets::audit::DATASET_NAME,
656 Some(&audit_filter),
657 )
658 .await
659 .map_err(HirnError::storage)?;
660 if existing > 0 {
661 return Ok(());
662 }
663
664 let entry = hirn_core::audit::AuditEntry {
665 id: payload.audit_entry_id,
666 timestamp: payload.audit_timestamp,
667 actor: None,
668 action: hirn_core::audit::AuditAction::AgentRegistered {
669 agent_id: payload.agent.id,
670 },
671 };
672 let batch = hirn_storage::datasets::audit::to_batch(std::slice::from_ref(&entry))
673 .map_err(HirnError::storage)?;
674 self.storage_runtime
675 .append(hirn_storage::datasets::audit::DATASET_NAME, batch)
676 .await
677 .map_err(HirnError::storage)?;
678 Ok(())
679 }
680
681 async fn append_agent_deregister_audit_once(
682 &self,
683 payload: &AgentDeregisterEnvelope,
684 ) -> HirnResult<()> {
685 let audit_filter = escaped_id_filter(&payload.audit_entry_id.to_string());
686 let existing = self
687 .storage_runtime
688 .count(
689 hirn_storage::datasets::audit::DATASET_NAME,
690 Some(&audit_filter),
691 )
692 .await
693 .map_err(HirnError::storage)?;
694 if existing > 0 {
695 return Ok(());
696 }
697
698 let entry = hirn_core::audit::AuditEntry {
699 id: payload.audit_entry_id,
700 timestamp: payload.audit_timestamp,
701 actor: None,
702 action: hirn_core::audit::AuditAction::AgentDeregistered {
703 agent_id: payload.agent_id,
704 },
705 };
706 let batch = hirn_storage::datasets::audit::to_batch(std::slice::from_ref(&entry))
707 .map_err(HirnError::storage)?;
708 self.storage_runtime
709 .append(hirn_storage::datasets::audit::DATASET_NAME, batch)
710 .await
711 .map_err(HirnError::storage)?;
712 Ok(())
713 }
714
715 async fn delete_episode_if_present(&self, id: MemoryId) -> HirnResult<()> {
716 match self.read_episodic_record(id).await {
717 Ok(_) => {}
718 Err(HirnError::NotFound(_)) => return Ok(()),
719 Err(error) => return Err(error),
720 }
721 match self.delete_episode(id).await {
722 Ok(()) | Err(HirnError::NotFound(_)) => Ok(()),
723 Err(error) => Err(error),
724 }
725 }
726
727 async fn purge_semantic_if_present(&self, id: MemoryId) -> HirnResult<()> {
728 match self.read_semantic_record(id).await {
729 Ok(_) => {}
730 Err(HirnError::NotFound(_)) => return Ok(()),
731 Err(error) => return Err(error),
732 }
733 match self.purge_semantic(id).await {
734 Ok(()) | Err(HirnError::NotFound(_)) => Ok(()),
735 Err(error) => Err(error),
736 }
737 }
738
739 async fn delete_procedural_if_present(&self, id: MemoryId) -> HirnResult<()> {
740 match self.get_procedural(id).await {
741 Ok(_) => {}
742 Err(HirnError::NotFound(_)) => return Ok(()),
743 Err(error) => return Err(error),
744 }
745 match self.delete_procedural(id).await {
746 Ok(()) | Err(HirnError::NotFound(_)) => Ok(()),
747 Err(error) => Err(error),
748 }
749 }
750
751 pub(crate) async fn list_episodic_ids_in_namespace(
753 &self,
754 ns: &Namespace,
755 ) -> HirnResult<Vec<MemoryId>> {
756 let filter = namespace_column_filter(ns);
757 let opts = hirn_storage::store::ScanOptions {
758 filter: Some(filter),
759 columns: Some(vec!["id".to_string()]),
760 ..Default::default()
761 };
762 let batches = self
763 .storage_runtime
764 .scan(hirn_storage::datasets::episodic::DATASET_NAME, opts)
765 .await
766 .map_err(|e| HirnError::storage(e))?;
767
768 let mut ids = Vec::new();
769 for batch in &batches {
770 let id_col = batch
771 .column_by_name("id")
772 .and_then(|c| c.as_any().downcast_ref::<arrow_array::StringArray>());
773 if let Some(col) = id_col {
774 for i in 0..col.len() {
775 let id = MemoryId::parse(col.value(i))
776 .map_err(|e| HirnError::InvalidInput(e.to_string()))?;
777 ids.push(id);
778 }
779 }
780 }
781 Ok(ids)
782 }
783
784 pub(crate) async fn list_semantic_ids_in_namespace(
786 &self,
787 ns: &Namespace,
788 ) -> HirnResult<Vec<MemoryId>> {
789 let filter = namespace_column_filter(ns);
790 let opts = hirn_storage::store::ScanOptions {
791 filter: Some(filter),
792 columns: Some(vec!["id".to_string()]),
793 ..Default::default()
794 };
795 let batches = self
796 .storage_runtime
797 .scan(hirn_storage::datasets::semantic::DATASET_NAME, opts)
798 .await
799 .map_err(|e| HirnError::storage(e))?;
800
801 let mut ids = Vec::new();
802 for batch in &batches {
803 let id_col = batch
804 .column_by_name("id")
805 .and_then(|c| c.as_any().downcast_ref::<arrow_array::StringArray>());
806 if let Some(col) = id_col {
807 for i in 0..col.len() {
808 let id = MemoryId::parse(col.value(i))
809 .map_err(|e| HirnError::InvalidInput(e.to_string()))?;
810 ids.push(id);
811 }
812 }
813 }
814 Ok(ids)
815 }
816
817 pub(crate) async fn list_procedural_ids_in_namespace(
819 &self,
820 ns: &Namespace,
821 ) -> HirnResult<Vec<MemoryId>> {
822 let filter = namespace_column_filter(ns);
823 let opts = hirn_storage::store::ScanOptions {
824 filter: Some(filter),
825 columns: Some(vec!["id".to_string()]),
826 ..Default::default()
827 };
828 let batches = self
829 .storage_runtime
830 .scan(hirn_storage::datasets::procedural::DATASET_NAME, opts)
831 .await
832 .map_err(|e| HirnError::storage(e))?;
833
834 let mut ids = Vec::new();
835 for batch in &batches {
836 let id_col = batch
837 .column_by_name("id")
838 .and_then(|c| c.as_any().downcast_ref::<arrow_array::StringArray>());
839 if let Some(col) = id_col {
840 for i in 0..col.len() {
841 let id = MemoryId::parse(col.value(i))
842 .map_err(|e| HirnError::InvalidInput(e.to_string()))?;
843 ids.push(id);
844 }
845 }
846 }
847 Ok(ids)
848 }
849
850 pub async fn register_agent(
854 &self,
855 agent_id: &hirn_core::types::AgentId,
856 display_name: impl Into<String>,
857 ) -> HirnResult<()> {
858 if self.namespace_runtime.cached_agent(agent_id).is_some() {
859 return Err(HirnError::AlreadyExists(format!(
860 "agent '{}' already registered",
861 agent_id
862 )));
863 }
864
865 let filter = escaped_id_filter(agent_id.as_str());
867 let count = self
868 .storage_runtime
869 .count(hirn_storage::datasets::agent::DATASET_NAME, Some(&filter))
870 .await
871 .map_err(|e| HirnError::storage(e))?;
872 if count > 0 {
873 return Err(HirnError::AlreadyExists(format!(
874 "agent '{}' already registered",
875 agent_id
876 )));
877 }
878
879 let rec = hirn_core::agent::AgentRecord::new(agent_id.clone(), display_name);
880 let ns_rec = hirn_core::namespace::NamespaceRecord::private_for(agent_id);
881 let envelope = build_agent_register_envelope(rec, ns_rec)?;
882 hirn_storage::append_mutation_envelope(self.storage_backend(), &envelope)
883 .await
884 .map_err(HirnError::storage)?;
885
886 let payload = decode_agent_register_envelope(&envelope)?;
887 self.apply_agent_register_plan(&payload).await?;
888
889 if let Err(error) = hirn_storage::update_mutation_envelope_state(
890 self.storage_backend(),
891 &envelope.id,
892 hirn_storage::MutationEnvelopeState::Applied,
893 None,
894 )
895 .await
896 {
897 tracing::warn!(
898 agent_id = %agent_id,
899 envelope_id = %envelope.id,
900 error = %error,
901 "agent register mutation envelope finalize failed; recovery will retry"
902 );
903 }
904
905 Ok(())
906 }
907
908 pub async fn list_agents(&self) -> HirnResult<Vec<hirn_core::agent::AgentRecord>> {
910 let opts = hirn_storage::store::ScanOptions::default();
911 let batches = self
912 .storage_runtime
913 .scan(hirn_storage::datasets::agent::DATASET_NAME, opts)
914 .await
915 .map_err(|e| HirnError::storage(e))?;
916
917 let mut result = Vec::new();
918 for batch in &batches {
919 let recs = hirn_storage::datasets::agent::from_batch(batch)
920 .map_err(|e| HirnError::storage(e))?;
921 result.extend(recs);
922 }
923 Ok(result)
924 }
925
926 pub async fn get_agent(
928 &self,
929 agent_id: &hirn_core::types::AgentId,
930 ) -> HirnResult<hirn_core::agent::AgentRecord> {
931 if let Some(agent) = self.namespace_runtime.cached_agent(agent_id) {
932 return Ok(agent);
933 }
934
935 let filter = escaped_id_filter(agent_id.as_str());
936 let opts = hirn_storage::store::ScanOptions {
937 filter: Some(filter),
938 ..Default::default()
939 };
940 let batches = self
941 .storage_runtime
942 .scan(hirn_storage::datasets::agent::DATASET_NAME, opts)
943 .await
944 .map_err(|e| HirnError::storage(e))?;
945
946 for batch in &batches {
947 let recs = hirn_storage::datasets::agent::from_batch(batch)
948 .map_err(|e| HirnError::storage(e))?;
949 if let Some(rec) = recs.into_iter().next() {
950 self.namespace_runtime.cache_agent(rec.clone());
951 return Ok(rec);
952 }
953 }
954 Err(HirnError::NotFound(format!("agent '{agent_id}'")))
955 }
956
957 pub async fn update_agent(&self, agent: &hirn_core::agent::AgentRecord) -> HirnResult<()> {
959 let filter = escaped_id_filter(agent.id.as_str());
960 let count = self
961 .storage_runtime
962 .count(hirn_storage::datasets::agent::DATASET_NAME, Some(&filter))
963 .await
964 .map_err(|e| HirnError::storage(e))?;
965 if count == 0 {
966 return Err(HirnError::NotFound(format!("agent '{}'", agent.id)));
967 }
968 let batch = hirn_storage::datasets::agent::to_batch(std::slice::from_ref(agent))
969 .map_err(|e| HirnError::storage(e))?;
970 self.storage_backend()
971 .merge_insert(hirn_storage::datasets::agent::DATASET_NAME, &["id"], batch)
972 .await
973 .map_err(|e| HirnError::storage(e))?;
974
975 self.namespace_runtime.cache_agent(agent.clone());
976 Ok(())
977 }
978
979 pub async fn deregister_agent(&self, agent_id: &hirn_core::types::AgentId) -> HirnResult<()> {
981 self.get_agent(agent_id).await?;
983
984 let envelope = build_agent_deregister_envelope(*agent_id)?;
985 hirn_storage::append_mutation_envelope(self.storage_backend(), &envelope)
986 .await
987 .map_err(HirnError::storage)?;
988
989 let payload = decode_agent_deregister_envelope(&envelope)?;
990 self.apply_agent_deregister_plan(&payload).await?;
991
992 if let Err(error) = hirn_storage::update_mutation_envelope_state(
993 self.storage_backend(),
994 &envelope.id,
995 hirn_storage::MutationEnvelopeState::Applied,
996 None,
997 )
998 .await
999 {
1000 tracing::warn!(
1001 agent_id = %agent_id,
1002 envelope_id = %envelope.id,
1003 error = %error,
1004 "agent deregister mutation envelope finalize failed; recovery will retry"
1005 );
1006 }
1007
1008 Ok(())
1009 }
1010
1011 pub async fn create_team_namespace(
1015 &self,
1016 name: &str,
1017 agent_ids: Vec<hirn_core::types::AgentId>,
1018 ) -> HirnResult<()> {
1019 self.create_namespace(name, hirn_core::types::NamespaceKind::Team, agent_ids)
1020 .await
1021 }
1022
1023 pub async fn add_agent_to_team(
1025 &self,
1026 agent_id: &hirn_core::types::AgentId,
1027 team_name: &str,
1028 ) -> HirnResult<()> {
1029 let mut ns_rec = self.get_namespace(team_name).await?;
1030 if ns_rec.kind != hirn_core::types::NamespaceKind::Team {
1031 return Err(HirnError::InvalidInput(format!(
1032 "'{team_name}' is not a team namespace"
1033 )));
1034 }
1035 if ns_rec.member_agents.contains(agent_id) {
1036 return Ok(()); }
1038 ns_rec.member_agents.push(agent_id.clone());
1039 self.update_namespace_record(&ns_rec).await?;
1040
1041 self.append_audit(
1042 None,
1043 hirn_core::audit::AuditAction::AgentAddedToTeam {
1044 agent_id: agent_id.clone(),
1045 team: team_name.to_string(),
1046 },
1047 )
1048 .await?;
1049 Ok(())
1050 }
1051
1052 pub async fn remove_agent_from_team(
1054 &self,
1055 agent_id: &hirn_core::types::AgentId,
1056 team_name: &str,
1057 ) -> HirnResult<()> {
1058 let mut ns_rec = self.get_namespace(team_name).await?;
1059 if ns_rec.kind != hirn_core::types::NamespaceKind::Team {
1060 return Err(HirnError::InvalidInput(format!(
1061 "'{team_name}' is not a team namespace"
1062 )));
1063 }
1064 ns_rec.member_agents.retain(|a| a != agent_id);
1065 self.update_namespace_record(&ns_rec).await?;
1066
1067 self.append_audit(
1068 None,
1069 hirn_core::audit::AuditAction::AgentRemovedFromTeam {
1070 agent_id: agent_id.clone(),
1071 team: team_name.to_string(),
1072 },
1073 )
1074 .await?;
1075 Ok(())
1076 }
1077
1078 async fn update_namespace_record(
1080 &self,
1081 rec: &hirn_core::namespace::NamespaceRecord,
1082 ) -> HirnResult<()> {
1083 let batch = hirn_storage::datasets::namespace::to_batch(std::slice::from_ref(rec))
1084 .map_err(|e| HirnError::storage(e))?;
1085 self.storage_backend()
1086 .merge_insert(
1087 hirn_storage::datasets::namespace::DATASET_NAME,
1088 &["id"],
1089 batch,
1090 )
1091 .await
1092 .map_err(|e| HirnError::storage(e))?;
1093
1094 self.namespace_runtime.invalidate_namespaces();
1095 Ok(())
1096 }
1097
1098 pub(crate) async fn append_audit(
1102 &self,
1103 actor: Option<hirn_core::types::AgentId>,
1104 action: hirn_core::audit::AuditAction,
1105 ) -> HirnResult<()> {
1106 self.policy_runtime().append_audit(actor, action).await
1107 }
1108
1109 pub(crate) async fn audit_log(
1111 &self,
1112 after: Option<&Timestamp>,
1113 before: Option<&Timestamp>,
1114 ) -> HirnResult<Vec<hirn_core::audit::AuditEntry>> {
1115 self.policy_runtime().audit_log(after, before).await
1116 }
1117
1118 pub async fn ensure_agent(&self, agent_id: &hirn_core::types::AgentId) -> HirnResult<()> {
1122 if self.namespace_runtime.cached_agent(agent_id).is_some() {
1123 return Ok(());
1124 }
1125
1126 if self.get_agent(agent_id).await.is_ok() {
1127 return Ok(());
1128 }
1129
1130 match self.register_agent(agent_id, agent_id.as_str()).await {
1131 Ok(()) | Err(HirnError::AlreadyExists(_)) => Ok(()),
1132 Err(e) => Err(e),
1133 }
1134 }
1135
1136 pub async fn as_agent(
1138 &self,
1139 agent_id: &hirn_core::types::AgentId,
1140 ) -> HirnResult<crate::agent_context::AgentContext<'_>> {
1141 let private_namespace = Namespace::private_for(agent_id);
1142
1143 if let Some(accessible) = self
1144 .namespace_runtime
1145 .cached_accessible_namespaces(agent_id)
1146 {
1147 let mut accessible = accessible;
1148 if !accessible.contains(&private_namespace) {
1149 accessible.push(private_namespace);
1150 }
1151 return Ok(crate::agent_context::AgentContext::new(
1152 self,
1153 agent_id.clone(),
1154 accessible,
1155 ));
1156 }
1157
1158 self.get_agent(agent_id).await?;
1160
1161 let namespaces = self.list_namespaces().await?;
1163 let mut accessible: Vec<Namespace> = namespaces
1164 .iter()
1165 .filter(|ns| ns.agent_has_access(agent_id))
1166 .map(|ns| ns.namespace.clone())
1167 .collect();
1168 if !accessible.contains(&private_namespace) {
1169 accessible.push(private_namespace);
1170 }
1171
1172 self.namespace_runtime
1173 .cache_accessible_namespaces(agent_id.clone(), accessible.clone());
1174
1175 Ok(crate::agent_context::AgentContext::new(
1176 self,
1177 agent_id.clone(),
1178 accessible,
1179 ))
1180 }
1181}
1182
1183#[cfg(test)]
1184mod tests {
1185 use std::sync::Arc;
1186 use std::sync::atomic::{AtomicBool, Ordering as AtomicOrdering};
1187
1188 use arrow_array::RecordBatch;
1189 use async_trait::async_trait;
1190 use datafusion::catalog::TableProvider;
1191 use hirn_core::types::{AgentId, EventType, NamespaceKind};
1192 use hirn_storage::memory_store::MemoryStore;
1193 use hirn_storage::store::{
1194 ColumnTransform, CompactOptions, CompactResult, DatasetInfo, FtsSearchOptions,
1195 HybridSearchOptions, IndexConfig, MultivectorSearchOptions, RecordBatchStream, ScanOptions,
1196 VectorSearchOptions, VersionTag,
1197 };
1198 use hirn_storage::{HirnDb, HirnDbConfig, HirnDbError, PhysicalStore};
1199
1200 use super::*;
1201
1202 struct FaultInjectingNamespaceStore {
1203 inner: MemoryStore,
1204 fail_agent_writes: AtomicBool,
1205 fail_agent_deletes: AtomicBool,
1206 fail_namespace_writes: AtomicBool,
1207 }
1208
1209 impl FaultInjectingNamespaceStore {
1210 fn new() -> Self {
1211 Self {
1212 inner: MemoryStore::new(),
1213 fail_agent_writes: AtomicBool::new(false),
1214 fail_agent_deletes: AtomicBool::new(false),
1215 fail_namespace_writes: AtomicBool::new(false),
1216 }
1217 }
1218
1219 fn fail_agent_writes(&self) {
1220 self.fail_agent_writes.store(true, AtomicOrdering::Release);
1221 }
1222
1223 fn allow_agent_writes(&self) {
1224 self.fail_agent_writes.store(false, AtomicOrdering::Release);
1225 }
1226
1227 fn fail_agent_deletes(&self) {
1228 self.fail_agent_deletes.store(true, AtomicOrdering::Release);
1229 }
1230
1231 fn allow_agent_deletes(&self) {
1232 self.fail_agent_deletes
1233 .store(false, AtomicOrdering::Release);
1234 }
1235
1236 fn fail_namespace_writes(&self) {
1237 self.fail_namespace_writes
1238 .store(true, AtomicOrdering::Release);
1239 }
1240
1241 fn allow_namespace_writes(&self) {
1242 self.fail_namespace_writes
1243 .store(false, AtomicOrdering::Release);
1244 }
1245 }
1246
1247 #[async_trait]
1248 impl PhysicalStore for FaultInjectingNamespaceStore {
1249 async fn append(&self, dataset: &str, batch: RecordBatch) -> Result<(), HirnDbError> {
1250 if dataset == hirn_storage::datasets::agent::DATASET_NAME
1251 && self.fail_agent_writes.load(AtomicOrdering::Acquire)
1252 {
1253 return Err(HirnDbError::Unsupported(
1254 "simulated agent append failure".to_string(),
1255 ));
1256 }
1257 if dataset == hirn_storage::datasets::namespace::DATASET_NAME
1258 && self.fail_namespace_writes.load(AtomicOrdering::Acquire)
1259 {
1260 return Err(HirnDbError::Unsupported(
1261 "simulated namespace append failure".to_string(),
1262 ));
1263 }
1264 self.inner.append(dataset, batch).await
1265 }
1266
1267 async fn append_batches(
1268 &self,
1269 dataset: &str,
1270 batches: Vec<RecordBatch>,
1271 ) -> Result<(), HirnDbError> {
1272 self.inner.append_batches(dataset, batches).await
1273 }
1274
1275 async fn scan(
1276 &self,
1277 dataset: &str,
1278 opts: ScanOptions,
1279 ) -> Result<Vec<RecordBatch>, HirnDbError> {
1280 self.inner.scan(dataset, opts).await
1281 }
1282
1283 async fn scan_stream(
1284 &self,
1285 dataset: &str,
1286 opts: ScanOptions,
1287 ) -> Result<RecordBatchStream, HirnDbError> {
1288 self.inner.scan_stream(dataset, opts).await
1289 }
1290
1291 async fn delete(&self, dataset: &str, predicate: &str) -> Result<u64, HirnDbError> {
1292 if dataset == hirn_storage::datasets::agent::DATASET_NAME
1293 && self.fail_agent_deletes.load(AtomicOrdering::Acquire)
1294 {
1295 return Err(HirnDbError::Unsupported(
1296 "simulated agent delete failure".to_string(),
1297 ));
1298 }
1299 self.inner.delete(dataset, predicate).await
1300 }
1301
1302 async fn update_where(
1303 &self,
1304 dataset: &str,
1305 filter: &str,
1306 updates: &[(&str, &str)],
1307 ) -> Result<u64, HirnDbError> {
1308 self.inner.update_where(dataset, filter, updates).await
1309 }
1310
1311 async fn merge_insert(
1312 &self,
1313 dataset: &str,
1314 on: &[&str],
1315 batch: RecordBatch,
1316 ) -> Result<(), HirnDbError> {
1317 if dataset == hirn_storage::datasets::agent::DATASET_NAME
1318 && self.fail_agent_writes.load(AtomicOrdering::Acquire)
1319 {
1320 return Err(HirnDbError::Unsupported(
1321 "simulated agent merge_insert failure".to_string(),
1322 ));
1323 }
1324 if dataset == hirn_storage::datasets::namespace::DATASET_NAME
1325 && self.fail_namespace_writes.load(AtomicOrdering::Acquire)
1326 {
1327 return Err(HirnDbError::Unsupported(
1328 "simulated namespace merge_insert failure".to_string(),
1329 ));
1330 }
1331 self.inner.merge_insert(dataset, on, batch).await
1332 }
1333
1334 async fn count(&self, dataset: &str, filter: Option<&str>) -> Result<u64, HirnDbError> {
1335 self.inner.count(dataset, filter).await
1336 }
1337
1338 async fn vector_search(
1339 &self,
1340 dataset: &str,
1341 opts: VectorSearchOptions,
1342 ) -> Result<Vec<RecordBatch>, HirnDbError> {
1343 self.inner.vector_search(dataset, opts).await
1344 }
1345
1346 async fn vector_search_many(
1347 &self,
1348 dataset: &str,
1349 queries: Vec<VectorSearchOptions>,
1350 ) -> Result<Vec<Vec<RecordBatch>>, HirnDbError> {
1351 self.inner.vector_search_many(dataset, queries).await
1352 }
1353
1354 async fn fts_search(
1355 &self,
1356 dataset: &str,
1357 opts: FtsSearchOptions,
1358 ) -> Result<Vec<RecordBatch>, HirnDbError> {
1359 self.inner.fts_search(dataset, opts).await
1360 }
1361
1362 async fn hybrid_search(
1363 &self,
1364 dataset: &str,
1365 opts: HybridSearchOptions,
1366 ) -> Result<Vec<RecordBatch>, HirnDbError> {
1367 self.inner.hybrid_search(dataset, opts).await
1368 }
1369
1370 async fn multivector_search(
1371 &self,
1372 dataset: &str,
1373 opts: MultivectorSearchOptions,
1374 ) -> Result<Vec<RecordBatch>, HirnDbError> {
1375 self.inner.multivector_search(dataset, opts).await
1376 }
1377
1378 async fn create_index(
1379 &self,
1380 dataset: &str,
1381 config: IndexConfig,
1382 ) -> Result<(), HirnDbError> {
1383 self.inner.create_index(dataset, config).await
1384 }
1385
1386 async fn optimize_indices(&self, dataset: &str) -> Result<(), HirnDbError> {
1387 self.inner.optimize_indices(dataset).await
1388 }
1389
1390 async fn compact(
1391 &self,
1392 dataset: &str,
1393 opts: CompactOptions,
1394 ) -> Result<CompactResult, HirnDbError> {
1395 self.inner.compact(dataset, opts).await
1396 }
1397
1398 async fn version(&self, dataset: &str) -> Result<u64, HirnDbError> {
1399 self.inner.version(dataset).await
1400 }
1401
1402 async fn tag(&self, dataset: &str, tag: &str) -> Result<(), HirnDbError> {
1403 self.inner.tag(dataset, tag).await
1404 }
1405
1406 async fn checkout(&self, dataset: &str, version: u64) -> Result<(), HirnDbError> {
1407 self.inner.checkout(dataset, version).await
1408 }
1409
1410 async fn list_tags(&self, dataset: &str) -> Result<Vec<VersionTag>, HirnDbError> {
1411 self.inner.list_tags(dataset).await
1412 }
1413
1414 async fn list_datasets(&self) -> Result<Vec<DatasetInfo>, HirnDbError> {
1415 self.inner.list_datasets().await
1416 }
1417
1418 async fn exists(&self, dataset: &str) -> Result<bool, HirnDbError> {
1419 self.inner.exists(dataset).await
1420 }
1421
1422 async fn list_namespaces(&self) -> Result<Vec<String>, HirnDbError> {
1423 self.inner.list_namespaces().await
1424 }
1425
1426 async fn create_namespace(&self, name: &str) -> Result<(), HirnDbError> {
1427 self.inner.create_namespace(name).await
1428 }
1429
1430 async fn drop_namespace(&self, name: &str) -> Result<(), HirnDbError> {
1431 self.inner.drop_namespace(name).await
1432 }
1433
1434 async fn add_columns(
1435 &self,
1436 dataset: &str,
1437 transforms: Vec<ColumnTransform>,
1438 ) -> Result<(), HirnDbError> {
1439 self.inner.add_columns(dataset, transforms).await
1440 }
1441
1442 async fn drop_columns(&self, dataset: &str, columns: &[&str]) -> Result<(), HirnDbError> {
1443 self.inner.drop_columns(dataset, columns).await
1444 }
1445
1446 async fn table_provider(&self, dataset: &str) -> Option<Arc<dyn TableProvider>> {
1447 self.inner.table_provider(dataset).await
1448 }
1449 }
1450
1451 fn test_agent() -> AgentId {
1452 AgentId::new("namespace_delete_agent").unwrap()
1453 }
1454
1455 async fn temp_db() -> (
1456 HirnDB,
1457 HirnConfig,
1458 Arc<dyn PhysicalStore>,
1459 tempfile::TempDir,
1460 ) {
1461 let dir = tempfile::tempdir().unwrap();
1462 let lance_path = dir.path().join("lance");
1463 let storage: Arc<dyn PhysicalStore> = HirnDb::open(HirnDbConfig::local(
1464 lance_path.to_str().expect("temp path should be utf8"),
1465 ))
1466 .await
1467 .unwrap()
1468 .store_arc();
1469 let config = HirnConfig::builder()
1470 .db_path(dir.path().join("db"))
1471 .working_memory_token_limit(100_000)
1472 .build()
1473 .unwrap();
1474 let db = HirnDB::open_with_config(config.clone(), Arc::clone(&storage))
1475 .await
1476 .unwrap();
1477 (db, config, storage, dir)
1478 }
1479
1480 async fn temp_db_with_storage(
1481 storage: Arc<dyn PhysicalStore>,
1482 ) -> (
1483 HirnDB,
1484 HirnConfig,
1485 Arc<dyn PhysicalStore>,
1486 tempfile::TempDir,
1487 ) {
1488 let dir = tempfile::tempdir().unwrap();
1489 let config = HirnConfig::builder()
1490 .db_path(dir.path().join("db"))
1491 .working_memory_token_limit(100_000)
1492 .build()
1493 .unwrap();
1494 let db = HirnDB::open_with_config(config.clone(), Arc::clone(&storage))
1495 .await
1496 .unwrap();
1497 (db, config, storage, dir)
1498 }
1499
1500 fn episode(namespace: Namespace, content: &str) -> EpisodicRecord {
1501 EpisodicRecord::builder()
1502 .content(content)
1503 .summary(content)
1504 .event_type(EventType::Observation)
1505 .importance(0.5)
1506 .namespace(namespace)
1507 .agent_id(test_agent())
1508 .build()
1509 .unwrap()
1510 }
1511
1512 #[tokio::test(flavor = "multi_thread")]
1513 async fn delete_namespace_records_applied_mutation_envelope() {
1514 let (db, _config, _storage, _dir) = temp_db().await;
1515 let namespace = Namespace::new("recoverable_ns_delete").unwrap();
1516 db.create_namespace(namespace.as_str(), NamespaceKind::Team, vec![])
1517 .await
1518 .unwrap();
1519 let survivor_id = db
1520 .episodic()
1521 .remember(episode(Namespace::shared(), "survivor"))
1522 .await
1523 .unwrap();
1524 let episode_id = db
1525 .episodic()
1526 .remember(episode(namespace, "delete me"))
1527 .await
1528 .unwrap();
1529
1530 db.delete_namespace(namespace.as_str()).await.unwrap();
1531
1532 assert!(matches!(
1533 db.get_namespace(namespace.as_str()).await,
1534 Err(HirnError::NotFound(_))
1535 ));
1536 assert!(matches!(
1537 db.read_episodic_record(episode_id).await,
1538 Err(HirnError::NotFound(_))
1539 ));
1540 assert!(db.read_episodic_record(survivor_id).await.is_ok());
1541
1542 let envelopes = hirn_storage::list_mutation_envelopes(
1543 db.storage_backend(),
1544 Some(NAMESPACE_DELETE_MUTATION_KIND),
1545 Some(hirn_storage::MutationEnvelopeState::Applied),
1546 )
1547 .await
1548 .unwrap();
1549 let envelope = envelopes
1550 .iter()
1551 .find(|envelope| {
1552 envelope
1553 .id
1554 .starts_with(&format!("namespace-delete:{namespace}:"))
1555 })
1556 .expect("namespace delete envelope should exist");
1557 assert_eq!(envelope.kind, NAMESPACE_DELETE_MUTATION_KIND);
1558 assert_eq!(envelope.state, hirn_storage::MutationEnvelopeState::Applied);
1559 }
1560
1561 #[tokio::test(flavor = "multi_thread")]
1562 async fn namespace_name_can_be_deleted_recreated_and_deleted_again() {
1563 let (db, _config, _storage, _dir) = temp_db().await;
1564 let namespace = Namespace::new("reusable_delete_name").unwrap();
1565
1566 db.create_namespace(namespace.as_str(), NamespaceKind::Team, vec![])
1567 .await
1568 .unwrap();
1569 db.delete_namespace(namespace.as_str()).await.unwrap();
1570 db.create_namespace(namespace.as_str(), NamespaceKind::Team, vec![])
1571 .await
1572 .unwrap();
1573 db.delete_namespace(namespace.as_str()).await.unwrap();
1574
1575 let envelopes = hirn_storage::list_mutation_envelopes(
1576 db.storage_backend(),
1577 Some(NAMESPACE_DELETE_MUTATION_KIND),
1578 Some(hirn_storage::MutationEnvelopeState::Applied),
1579 )
1580 .await
1581 .unwrap();
1582 let namespace_envelopes = envelopes
1583 .iter()
1584 .filter(|envelope| {
1585 envelope
1586 .id
1587 .starts_with(&format!("namespace-delete:{namespace}:"))
1588 })
1589 .count();
1590 assert_eq!(namespace_envelopes, 2);
1591 }
1592
1593 #[tokio::test(flavor = "multi_thread")]
1594 async fn open_reconciles_pending_namespace_delete_mutation_after_partial_cleanup() {
1595 let (db, config, storage, _dir) = temp_db().await;
1596 let namespace = Namespace::new("replay_ns_delete").unwrap();
1597 db.create_namespace(namespace.as_str(), NamespaceKind::Team, vec![])
1598 .await
1599 .unwrap();
1600 let survivor_id = db
1601 .episodic()
1602 .remember(episode(Namespace::shared(), "survivor before replay"))
1603 .await
1604 .unwrap();
1605 let episode_id = db
1606 .episodic()
1607 .remember(episode(namespace, "already gone before replay"))
1608 .await
1609 .unwrap();
1610 let envelope =
1611 build_namespace_delete_envelope(namespace, vec![episode_id], vec![], vec![]).unwrap();
1612 let payload = decode_namespace_delete_envelope(&envelope).unwrap();
1613 hirn_storage::append_mutation_envelope(db.storage_backend(), &envelope)
1614 .await
1615 .unwrap();
1616 db.delete_episode(episode_id).await.unwrap();
1617 db.append_namespace_delete_audit_once(&payload)
1618 .await
1619 .unwrap();
1620 drop(db);
1621
1622 let reopened = HirnDB::open_with_config(config, Arc::clone(&storage))
1623 .await
1624 .unwrap();
1625
1626 assert!(matches!(
1627 reopened.get_namespace(namespace.as_str()).await,
1628 Err(HirnError::NotFound(_))
1629 ));
1630 assert!(reopened.read_episodic_record(survivor_id).await.is_ok());
1631 let stored_envelope = hirn_storage::get_mutation_envelope(storage.as_ref(), &envelope.id)
1632 .await
1633 .unwrap()
1634 .expect("namespace delete envelope should remain queryable");
1635 assert_eq!(
1636 stored_envelope.state,
1637 hirn_storage::MutationEnvelopeState::Applied
1638 );
1639 let audit_log = reopened.audit_log(None, None).await.unwrap();
1640 let matching = audit_log
1641 .iter()
1642 .filter(|entry| {
1643 entry.id == payload.audit_entry_id
1644 && matches!(
1645 &entry.action,
1646 hirn_core::audit::AuditAction::NamespaceDeleted { namespace: deleted }
1647 if deleted == namespace.as_str()
1648 )
1649 })
1650 .count();
1651 assert_eq!(
1652 matching, 1,
1653 "namespace delete audit replay must be idempotent"
1654 );
1655 }
1656
1657 #[tokio::test(flavor = "multi_thread")]
1658 async fn failed_agent_update_preserves_existing_row() {
1659 let fault_store = Arc::new(FaultInjectingNamespaceStore::new());
1660 let storage: Arc<dyn PhysicalStore> = fault_store.clone();
1661 let (db, _config, _storage, _dir) = temp_db_with_storage(storage).await;
1662
1663 let agent_id = AgentId::new("upserted_agent").unwrap();
1664 db.register_agent(&agent_id, "Original Agent")
1665 .await
1666 .unwrap();
1667
1668 let mut updated = db.get_agent(&agent_id).await.unwrap();
1669 updated.display_name = "Updated Agent".to_string();
1670
1671 fault_store.fail_agent_writes();
1672
1673 let error = db.update_agent(&updated).await.unwrap_err();
1674 assert!(
1675 error
1676 .to_string()
1677 .contains("simulated agent merge_insert failure"),
1678 "expected keyed-upsert failure, got: {error}"
1679 );
1680
1681 let agents = db.list_agents().await.unwrap();
1682 let stored = agents
1683 .iter()
1684 .find(|agent| agent.id == agent_id)
1685 .expect("original agent row should still exist");
1686 assert_eq!(stored.display_name, "Original Agent");
1687 }
1688
1689 #[tokio::test(flavor = "multi_thread")]
1690 async fn failed_team_membership_update_preserves_existing_namespace_row() {
1691 let fault_store = Arc::new(FaultInjectingNamespaceStore::new());
1692 let storage: Arc<dyn PhysicalStore> = fault_store.clone();
1693 let (db, _config, _storage, _dir) = temp_db_with_storage(storage).await;
1694
1695 let team = "team_upsert_guard";
1696 let member = AgentId::new("team_member").unwrap();
1697 db.create_namespace(team, NamespaceKind::Team, vec![])
1698 .await
1699 .unwrap();
1700
1701 fault_store.fail_namespace_writes();
1702
1703 let error = db.add_agent_to_team(&member, team).await.unwrap_err();
1704 assert!(
1705 error
1706 .to_string()
1707 .contains("simulated namespace merge_insert failure"),
1708 "expected keyed-upsert failure, got: {error}"
1709 );
1710
1711 let stored = db.get_namespace(team).await.unwrap();
1712 assert!(stored.member_agents.is_empty());
1713 }
1714
1715 #[tokio::test(flavor = "multi_thread")]
1716 async fn open_reconciles_pending_agent_register_mutation_after_partial_namespace_write() {
1717 let fault_store = Arc::new(FaultInjectingNamespaceStore::new());
1718 let storage: Arc<dyn PhysicalStore> = fault_store.clone();
1719 let (db, config, _storage, _dir) = temp_db_with_storage(storage.clone()).await;
1720
1721 let agent_id = AgentId::new("recoverable_agent_register").unwrap();
1722 let private_ns = Namespace::private_for(&agent_id);
1723
1724 fault_store.fail_namespace_writes();
1725 let error = db
1726 .register_agent(&agent_id, "Recoverable Agent")
1727 .await
1728 .unwrap_err();
1729 assert!(
1730 error
1731 .to_string()
1732 .contains("simulated namespace merge_insert failure"),
1733 "expected namespace upsert failure, got: {error}"
1734 );
1735
1736 let agents = db.list_agents().await.unwrap();
1737 assert!(agents.iter().any(|agent| agent.id == agent_id));
1738 assert!(matches!(
1739 db.get_namespace(private_ns.as_str()).await,
1740 Err(HirnError::NotFound(_))
1741 ));
1742 drop(db);
1743
1744 fault_store.allow_namespace_writes();
1745 let reopened = HirnDB::open_with_config(config, storage).await.unwrap();
1746
1747 assert_eq!(
1748 reopened.get_agent(&agent_id).await.unwrap().display_name,
1749 "Recoverable Agent"
1750 );
1751 assert_eq!(
1752 reopened
1753 .get_namespace(private_ns.as_str())
1754 .await
1755 .unwrap()
1756 .namespace,
1757 private_ns
1758 );
1759
1760 let envelopes = hirn_storage::list_mutation_envelopes(
1761 reopened.storage_backend(),
1762 Some(AGENT_REGISTER_MUTATION_KIND),
1763 Some(hirn_storage::MutationEnvelopeState::Applied),
1764 )
1765 .await
1766 .unwrap();
1767 assert!(envelopes.iter().any(|envelope| {
1768 envelope
1769 .id
1770 .starts_with(&format!("agent-register:{agent_id}:"))
1771 }));
1772
1773 let audit_entries = reopened.audit_log(None, None).await.unwrap();
1774 let matching = audit_entries
1775 .iter()
1776 .filter(|entry| {
1777 matches!(
1778 &entry.action,
1779 hirn_core::audit::AuditAction::AgentRegistered { agent_id: recorded }
1780 if recorded == &agent_id
1781 )
1782 })
1783 .count();
1784 assert_eq!(matching, 1);
1785 }
1786
1787 #[tokio::test(flavor = "multi_thread")]
1788 async fn open_reconciles_pending_agent_deregister_mutation_after_partial_agent_delete() {
1789 let fault_store = Arc::new(FaultInjectingNamespaceStore::new());
1790 let storage: Arc<dyn PhysicalStore> = fault_store.clone();
1791 let (db, config, _storage, _dir) = temp_db_with_storage(storage.clone()).await;
1792
1793 let agent_id = AgentId::new("recoverable_agent_deregister").unwrap();
1794 let private_ns = Namespace::private_for(&agent_id);
1795 db.register_agent(&agent_id, "Recoverable Delete")
1796 .await
1797 .unwrap();
1798
1799 fault_store.fail_agent_deletes();
1800 let error = db.deregister_agent(&agent_id).await.unwrap_err();
1801 assert!(
1802 error.to_string().contains("simulated agent delete failure"),
1803 "expected agent delete failure, got: {error}"
1804 );
1805
1806 assert!(matches!(
1807 db.get_namespace(private_ns.as_str()).await,
1808 Err(HirnError::NotFound(_))
1809 ));
1810 let agents = db.list_agents().await.unwrap();
1811 assert!(agents.iter().any(|agent| agent.id == agent_id));
1812 drop(db);
1813
1814 fault_store.allow_agent_deletes();
1815 fault_store.allow_agent_writes();
1816 let reopened = HirnDB::open_with_config(config, storage).await.unwrap();
1817
1818 assert!(matches!(
1819 reopened.get_agent(&agent_id).await,
1820 Err(HirnError::NotFound(_))
1821 ));
1822 assert!(matches!(
1823 reopened.get_namespace(private_ns.as_str()).await,
1824 Err(HirnError::NotFound(_))
1825 ));
1826
1827 let envelopes = hirn_storage::list_mutation_envelopes(
1828 reopened.storage_backend(),
1829 Some(AGENT_DEREGISTER_MUTATION_KIND),
1830 Some(hirn_storage::MutationEnvelopeState::Applied),
1831 )
1832 .await
1833 .unwrap();
1834 assert!(envelopes.iter().any(|envelope| {
1835 envelope
1836 .id
1837 .starts_with(&format!("agent-deregister:{agent_id}:"))
1838 }));
1839
1840 let audit_entries = reopened.audit_log(None, None).await.unwrap();
1841 let matching = audit_entries
1842 .iter()
1843 .filter(|entry| {
1844 matches!(
1845 &entry.action,
1846 hirn_core::audit::AuditAction::AgentDeregistered { agent_id: recorded }
1847 if recorded == &agent_id
1848 )
1849 })
1850 .count();
1851 assert_eq!(matching, 1);
1852 }
1853}