1pub(crate) mod concepts;
5pub(crate) mod knowledge;
6pub(crate) mod multi_hop;
7pub(crate) mod query;
8pub(crate) mod router;
9pub(crate) mod shard;
10pub(crate) mod structural;
11
12use anyhow::{Context, Result};
13use parking_lot::RwLock;
14use std::path::{Path, PathBuf};
15use std::sync::Arc;
16
17use super::admission::AdmissionControl;
18use super::atom_memory::AtomMemory;
19use super::audit::{AuditLog, AuditOp};
20use super::cognition::governor::{GovernanceReport, GovernorConfig, MemoryGovernor};
21use super::cognition::r#loop::{CognitionConfig as CognitionLoopConfig, CognitionLoop, Insight};
22use super::composite_memory::CompositeMemory;
23use super::config::HmsConfig;
24use super::decompose::Decomposer;
25use super::diffusion::DiffusionFactorizer;
26use super::encoding::encode_text_internal;
27use super::entangled::EntangledHVec;
28use super::graph::RelationStore;
29use super::ivf::IVFIndex;
30use super::role::RoleRegistry;
31use super::rules::RuleStore;
32use super::storage::PersistentArena;
33use super::text::TextProcessor;
34use super::triple_store::TripleStore;
35use super::types::{GraphPath, Relation, RelationType, TextMetrics};
36
37use shard::{ShardManager, ShardSet};
38
39type SignFn<'a> = Box<dyn Fn(&[u8]) -> super::audit::SignatureBytes + 'a>;
40
41pub struct HmsCore {
44 config: HmsConfig,
45 pub(crate) arena: Arc<PersistentArena>,
46 pub(crate) dimensions: usize,
47 pub(crate) storage_path: PathBuf,
48 shards: RwLock<ShardSet>,
49 graph: RelationStore,
50 atom_memory: Option<Arc<AtomMemory>>,
51 composite_memory: Option<Arc<CompositeMemory>>,
52 triple_store: Option<Arc<TripleStore>>,
53 role_registry: Option<RoleRegistry>,
54 rule_store: Option<RuleStore>,
55 decomposer: Option<Decomposer>,
56 admission: Option<AdmissionControl>,
57 cognition_loop: parking_lot::Mutex<Option<CognitionLoop>>,
58 audit: Option<AuditLog>,
59 #[cfg(feature = "security")]
60 signing: Option<super::security::SigningManager>,
61 #[cfg(feature = "security")]
62 #[allow(dead_code)]
63 encryption: Option<super::security::EncryptionManager>,
64}
65
66impl HmsCore {
67 pub fn new(
69 dimensions: u32,
70 storage_path: Option<String>,
71 config: Option<HmsConfig>,
72 ) -> Result<Self> {
73 const MAX_DIMENSIONS: u32 = 1_000_000;
74 if dimensions == 0 || dimensions > MAX_DIMENSIONS {
75 return Err(anyhow::anyhow!(
76 "dimensions must be between 1 and {} (got {})",
77 MAX_DIMENSIONS,
78 dimensions
79 ));
80 }
81 let dim = dimensions as usize;
82 let config = config.unwrap_or_default();
83
84 let base_path = storage_path
85 .map(PathBuf::from)
86 .unwrap_or_else(|| Path::new(".").to_path_buf());
87 if !base_path.exists() {
88 std::fs::create_dir_all(&base_path)?;
89 }
90
91 let arena = Arc::new(PersistentArena::new(base_path.join("vectors_data.bin"))?);
92
93 let audit = if config.security.audit_enabled {
94 Some(AuditLog::new(&base_path)?)
95 } else {
96 None
97 };
98
99 #[cfg(feature = "security")]
100 let signing = if config.security.signing_enabled {
101 let key_path = config
102 .security
103 .key_path
104 .as_ref()
105 .map(PathBuf::from)
106 .unwrap_or_else(|| base_path.join("hms_signing.key"));
107 Some(super::security::SigningManager::new(&key_path)?)
108 } else {
109 None
110 };
111
112 #[cfg(feature = "security")]
113 let encryption = if config.security.encryption_enabled {
114 let passphrase = config
115 .security
116 .encryption_passphrase
117 .as_deref()
118 .ok_or_else(|| {
119 anyhow::anyhow!("encryption_passphrase required when encryption is enabled")
120 })?;
121 Some(super::security::EncryptionManager::new(
122 passphrase, &base_path,
123 )?)
124 } else {
125 None
126 };
127
128 let shard_set = if config.shard.enabled && config.shard.shard_count > 1 {
129 ShardSet::Multi(ShardManager::new(config.shard.shard_count, dim))
130 } else {
131 ShardSet::Single(Box::new(shard::Shard::new(dim)))
132 };
133
134 let (atom_mem, comp_mem, tri_store, role_reg, rule_st, decomp, adm) =
135 if config.meaning.enabled {
136 let mc = &config.meaning;
137 (
138 Some(Arc::new(AtomMemory::new(dim, mc.idf_clip_factor))),
139 Some(Arc::new(CompositeMemory::new(dim, mc.idf_clip_factor))),
140 Some(Arc::new(TripleStore::new())),
141 Some(RoleRegistry::new(dim)),
142 Some(RuleStore::new()),
143 Some(Decomposer::new()),
144 Some(AdmissionControl::new(mc.algebraic_max_fanout)),
145 )
146 } else {
147 (None, None, None, None, None, None, None)
148 };
149
150 let core = Self {
151 config: config.clone(),
152 arena,
153 dimensions: dim,
154 storage_path: base_path,
155 shards: RwLock::new(shard_set),
156 graph: RelationStore::new(),
157 atom_memory: atom_mem,
158 composite_memory: comp_mem,
159 triple_store: tri_store,
160 role_registry: role_reg,
161 rule_store: rule_st,
162 decomposer: decomp,
163 admission: adm,
164 cognition_loop: parking_lot::Mutex::new(None),
165 audit,
166 #[cfg(feature = "security")]
167 signing,
168 #[cfg(feature = "security")]
169 encryption,
170 };
171
172 core.load_from_log()?;
173 core.load_indices()?;
174 {
175 let shards = core.shards.read();
176 shards.try_for_each_shard(|s| s.rebuild_inverted_index(dim))?;
177 }
178
179 Ok(core)
180 }
181
182 fn load_indices(&self) -> Result<()> {
183 let nsg_path = self.storage_path.join("nsg_index.bin");
184 if nsg_path.exists() {
185 let raw = std::fs::read(&nsg_path)?;
186 let data = self.maybe_decrypt(&raw)?;
187 let nsg: super::nsg::NSGIndex = bincode::deserialize(&data)?;
188 let shards = self.shards.read();
190 if let ShardSet::Single(ref shard) = *shards {
191 *shard.nsg.write() = Some(nsg);
192 }
193 }
194
195 let ivf_path = self.storage_path.join("ivf_index.bin");
196 if ivf_path.exists() {
197 let raw = std::fs::read(&ivf_path)?;
198 let data = self.maybe_decrypt(&raw)?;
199 let mut ivf: IVFIndex = bincode::deserialize(&data)?;
200 ivf.lists = Some(super::ivf::inverted_list::InvertedLists::new());
201
202 let shards = self.shards.read();
203 if let ShardSet::Single(ref shard) = *shards {
204 let vectors = shard.vectors.read();
205 let registry = shard.registry.read();
206 for id in registry.iter() {
207 if let Some(vec) = vectors.get(id) {
208 ivf.insert(id, vec)?;
209 }
210 }
211 *shard.ivf.write() = Some(ivf);
212 }
213 }
214
215 Ok(())
216 }
217
218 fn save_nsg(&self, nsg: &super::nsg::NSGIndex) -> Result<()> {
219 let data = bincode::serialize(nsg)?;
220 std::fs::write(
221 self.storage_path.join("nsg_index.bin"),
222 self.maybe_encrypt(&data)?,
223 )?;
224 Ok(())
225 }
226
227 fn save_ivf(&self, ivf: &IVFIndex) -> Result<()> {
228 let data = bincode::serialize(ivf)?;
229 std::fs::write(
230 self.storage_path.join("ivf_index.bin"),
231 self.maybe_encrypt(&data)?,
232 )?;
233 Ok(())
234 }
235
236 pub fn bundle<V: std::borrow::Borrow<EntangledHVec>>(&self, vectors: &[V]) -> EntangledHVec {
239 if self.config.privacy.dp_enabled {
240 EntangledHVec::bundle_dp(vectors, self.config.privacy.epsilon)
241 } else {
242 EntangledHVec::bundle(vectors)
243 }
244 }
245
246 fn maybe_encrypt(&self, data: &[u8]) -> Result<Vec<u8>> {
247 #[cfg(feature = "security")]
248 if let Some(ref enc) = self.encryption {
249 return enc.encrypt(data);
250 }
251 Ok(data.to_vec())
252 }
253
254 fn maybe_decrypt(&self, data: &[u8]) -> Result<Vec<u8>> {
255 #[cfg(feature = "security")]
256 if let Some(ref enc) = self.encryption {
257 return enc.decrypt(data);
258 }
259 Ok(data.to_vec())
260 }
261
262 fn load_from_log(&self) -> Result<()> {
263 let shards = self.shards.read();
264
265 let mut offset = 0;
266 while let Ok((payload, _version)) = self.arena_read_frame(offset) {
267 if let Some((id, vec)) =
268 super::atom_memory::AtomMemory::deserialize_atom(&payload, self.dimensions)
269 {
270 if let Some(ref atom_mem) = self.atom_memory {
271 atom_mem.load_atom(id, vec);
272 }
273 } else if let Some((id, vec)) =
274 super::composite_memory::CompositeMemory::deserialize_composite(
275 &payload,
276 self.dimensions,
277 )
278 {
279 if let Some(ref comp_mem) = self.composite_memory {
280 comp_mem.load_composite(id, vec);
281 }
282 } else if let Some(record) =
283 super::triple_store::TripleStore::deserialize_triple(&payload)
284 {
285 if let Some(ref tri_store) = self.triple_store {
286 tri_store.load_triple(record);
287 }
288 } else if let Some(rule) = super::rules::RuleStore::deserialize_rule(&payload) {
289 if let Some(ref rule_store) = self.rule_store {
290 rule_store.load_rule(rule);
291 }
292 } else if let Some(rel) = RelationStore::deserialize_relation(&payload) {
293 self.graph.load_relation(&rel);
294 } else {
295 let (id, vector) = Self::parse_log_payload(self.dimensions, &payload);
296 if vector.dim == 0 {
297 shards.remove(&id, self.dimensions)?;
298 } else {
299 shards.insert(id, vector, self.dimensions)?;
300 }
301 }
302 offset = match self.arena.next_offset(offset) {
303 Ok(next) => next,
304 Err(_) => break,
305 };
306 }
307
308 shards.for_each_shard(|shard| {
310 let vectors = shard.vectors.read();
311 let mut reg = shard.registry.write();
312 let mut live_ids: Vec<String> = vectors.keys().cloned().collect();
313 live_ids.sort();
314 *reg = live_ids;
315 shard
316 .vector_count
317 .store(reg.len() as u64, std::sync::atomic::Ordering::SeqCst);
318 });
319
320 if let Some(ref atom_mem) = self.atom_memory {
321 atom_mem.rebuild_indices();
322 }
323 if let Some(ref comp_mem) = self.composite_memory {
324 comp_mem.rebuild_indices();
325 }
326
327 Ok(())
328 }
329
330 fn parse_log_payload(dimensions: usize, payload: &[u8]) -> (String, EntangledHVec) {
331 if payload.len() < 6 {
332 return (String::new(), EntangledHVec::from_indices(vec![], 0));
333 }
334 let id_len = u16::from_le_bytes([payload[0], payload[1]]) as usize;
335 let id_end = 2 + id_len;
336 if payload.len() < id_end + 4 {
337 return (String::new(), EntangledHVec::from_indices(vec![], 0));
338 }
339 let id = match std::str::from_utf8(&payload[2..id_end]) {
340 Ok(s) => s.to_owned(),
341 Err(_) => String::from_utf8_lossy(&payload[2..id_end]).into_owned(),
342 };
343 let delta_count_raw = u32::from_le_bytes(match payload[id_end..id_end + 4].try_into() {
344 Ok(b) => b,
345 Err(_) => return (id, EntangledHVec::from_indices(vec![], 0)),
346 });
347
348 if delta_count_raw == Self::TOMBSTONE_MARKER {
349 return (id, EntangledHVec::from_indices(vec![], 0));
350 }
351
352 let delta_count = delta_count_raw as usize;
353 if delta_count == 0 {
354 return (id, EntangledHVec::from_indices(vec![], dimensions));
355 }
356
357 let deltas_start = id_end + 4;
358 let deltas_end = deltas_start + delta_count * 4;
359 if payload.len() < deltas_end {
360 return (id, EntangledHVec::from_indices(vec![], 0));
361 }
362 let deltas: Vec<u32> = payload[deltas_start..deltas_end]
363 .chunks_exact(4)
364 .map(|c| u32::from_le_bytes(c.try_into().unwrap()))
365 .collect();
366 (id, EntangledHVec::from_deltas(&deltas, dimensions))
367 }
368
369 pub fn dimensions(&self) -> usize {
371 self.dimensions
372 }
373
374 pub fn encode_text(&self, text: &str) -> EntangledHVec {
376 encode_text_internal(text, self.dimensions)
377 }
378
379 pub fn analyze_text(&self, text: &str) -> TextMetrics {
381 TextProcessor::analyze(text)
382 }
383
384 pub fn calculate_readability(&self, metrics: &TextMetrics) -> f64 {
386 TextProcessor::calculate_readability(metrics)
387 }
388
389 fn serialize_log_entry(id: &str, vector: &EntangledHVec) -> Result<Vec<u8>> {
390 let id_bytes = id.as_bytes();
391 if id_bytes.len() > u16::MAX as usize {
392 return Err(anyhow::anyhow!(
393 "ID too long: {} bytes (max {})",
394 id_bytes.len(),
395 u16::MAX
396 ));
397 }
398 let deltas = vector.to_deltas();
399 let mut entry = Vec::with_capacity(2 + id_bytes.len() + 4 + deltas.len() * 4);
400 entry.extend_from_slice(&(id_bytes.len() as u16).to_le_bytes());
401 entry.extend_from_slice(id_bytes);
402 entry.extend_from_slice(&(deltas.len() as u32).to_le_bytes());
403 for &d in &deltas {
404 entry.extend_from_slice(&d.to_le_bytes());
405 }
406 Ok(entry)
407 }
408
409 const TOMBSTONE_MARKER: u32 = u32::MAX;
410
411 fn serialize_tombstone(id: &str) -> Result<Vec<u8>> {
412 let id_bytes = id.as_bytes();
413 if id_bytes.len() > u16::MAX as usize {
414 return Err(anyhow::anyhow!(
415 "ID too long: {} bytes (max {})",
416 id_bytes.len(),
417 u16::MAX
418 ));
419 }
420 let mut entry = Vec::with_capacity(2 + id_bytes.len() + 4);
421 entry.extend_from_slice(&(id_bytes.len() as u16).to_le_bytes());
422 entry.extend_from_slice(id_bytes);
423 entry.extend_from_slice(&Self::TOMBSTONE_MARKER.to_le_bytes());
424 Ok(entry)
425 }
426
427 pub fn delete(&self, id: &str) -> Result<bool> {
429 self.arena_write(&Self::serialize_tombstone(id)?)?;
433
434 if let Some(ref audit) = self.audit {
435 audit.record(AuditOp::Delete, id, self.sign_fn().as_deref())?;
436 }
437
438 if let Some(ref atom_mem) = self.atom_memory {
439 atom_mem.delete(id);
440 }
441
442 let shards = self.shards.read();
443 if !shards.remove(id, self.dimensions)? {
444 return Ok(false);
445 }
446 Ok(true)
447 }
448
449 pub fn memorize_meaning(&self, id: &str, text: &str) -> Result<()> {
450 let vector = self.encode_text(text);
451 self.memorize(id.to_string(), vector)?;
452
453 if let (
454 Some(ref decomposer),
455 Some(ref atom_mem),
456 Some(ref comp_mem),
457 Some(ref tri_store),
458 Some(ref roles),
459 ) = (
460 &self.decomposer,
461 &self.atom_memory,
462 &self.composite_memory,
463 &self.triple_store,
464 &self.role_registry,
465 ) {
466 if self.config.meaning.auto_decompose {
467 let units = decomposer.decompose(text);
468 for unit in &units {
469 let (_, s_vec) = atom_mem.get_or_insert(&unit.subject);
470 let (_, r_vec) = atom_mem.get_or_insert(&unit.relation);
471 let (_, o_vec) = atom_mem.get_or_insert(&unit.object);
472
473 self.arena_write(&super::atom_memory::AtomMemory::serialize_atom(
474 &unit.subject,
475 &s_vec,
476 ))?;
477 self.arena_write(&super::atom_memory::AtomMemory::serialize_atom(
478 &unit.relation,
479 &r_vec,
480 ))?;
481 self.arena_write(&super::atom_memory::AtomMemory::serialize_atom(
482 &unit.object,
483 &o_vec,
484 ))?;
485
486 let composite = roles.compose_triple(&s_vec, &r_vec, &o_vec);
487 let comp_id =
488 format!("{}:{}:{}:{}", id, unit.subject, unit.relation, unit.object);
489 comp_mem.insert(comp_id.clone(), composite.clone());
490
491 self.arena_write(
492 &super::composite_memory::CompositeMemory::serialize_composite(
493 &comp_id, &composite,
494 ),
495 )?;
496
497 tri_store.add(&unit.subject, &unit.relation, &unit.object, &comp_id);
498 self.arena_write(&super::triple_store::TripleStore::serialize_triple(
499 &super::triple_store::TripleRecord {
500 subject_id: unit.subject.clone(),
501 relation_id: unit.relation.clone(),
502 object_id: unit.object.clone(),
503 composite_id: comp_id,
504 deleted: false,
505 },
506 ))?;
507 }
508 }
509 }
510 Ok(())
511 }
512
513 pub fn compact(&self) -> Result<()> {
515 let shards = self.shards.write();
519
520 let mut snapshot = Vec::new();
521 shards.for_each_shard(|shard| {
522 let vectors = shard.vectors.read();
523 let registry = shard.registry.read();
524 for id in registry.iter() {
525 if let Some(v) = vectors.get(id) {
526 snapshot.push((id.clone(), v.clone()));
527 }
528 }
529 });
530
531 let temp_dir = self.storage_path.join(format!(
532 ".compact_{}",
533 std::time::SystemTime::now()
534 .duration_since(std::time::UNIX_EPOCH)
535 .unwrap_or_default()
536 .as_micros()
537 ));
538
539 let relation_snapshot = self.graph.snapshot();
540
541 {
542 let temp_arena = PersistentArena::new(&temp_dir)?;
543 for (id, vector) in &snapshot {
544 let entry = Self::serialize_log_entry(id, vector)?;
545 let payload = self.maybe_encrypt(&entry)?;
546 temp_arena.write_slice(&payload)?;
547 }
548 for rel in &relation_snapshot {
549 let entry = RelationStore::serialize_relation(rel);
550 let payload = self.maybe_encrypt(&entry)?;
551 temp_arena.write_slice(&payload)?;
552 }
553 if let Some(ref atom_mem) = self.atom_memory {
554 for (_, id, vec) in atom_mem.inner().all_vectors() {
555 let entry = super::atom_memory::AtomMemory::serialize_atom(&id, &vec);
556 let payload = self.maybe_encrypt(&entry)?;
557 temp_arena.write_slice(&payload)?;
558 }
559 }
560 if let Some(ref comp_mem) = self.composite_memory {
561 for (_, id, vec) in comp_mem.inner().all_vectors() {
562 let entry =
563 super::composite_memory::CompositeMemory::serialize_composite(&id, &vec);
564 let payload = self.maybe_encrypt(&entry)?;
565 temp_arena.write_slice(&payload)?;
566 }
567 }
568 if let Some(ref tri_store) = self.triple_store {
569 for record in tri_store.snapshot() {
570 let entry = super::triple_store::TripleStore::serialize_triple(&record);
571 let payload = self.maybe_encrypt(&entry)?;
572 temp_arena.write_slice(&payload)?;
573 }
574 }
575 if let Some(ref rule_store) = self.rule_store {
576 for rule in rule_store.all_rules() {
577 let entry = super::rules::RuleStore::serialize_rule(&rule);
578 let payload = self.maybe_encrypt(&entry)?;
579 temp_arena.write_slice(&payload)?;
580 }
581 }
582 }
583
584 self.arena.replace_with_compacted(&temp_dir)?;
585
586 if let Some(ref audit) = self.audit {
587 audit.record(AuditOp::Compact, "", self.sign_fn().as_deref())?;
588 }
589
590 if let ShardSet::Single(ref shard) = *shards {
591 if let Some(ref nsg) = *shard.nsg.read() {
592 self.save_nsg(nsg)?;
593 }
594 if let Some(ref ivf) = *shard.ivf.read() {
595 self.save_ivf(ivf)?;
596 }
597 }
598
599 Ok(())
600 }
601
602 pub fn memorize(&self, id: String, vector: EntangledHVec) -> Result<()> {
604 let entry = Self::serialize_log_entry(&id, &vector)?;
605 self.arena_write(&entry)?;
606
607 if let Some(ref audit) = self.audit {
608 audit.record(AuditOp::Memorize, &id, self.sign_fn().as_deref())?;
609 }
610
611 if let Some(ref atom_mem) = self.atom_memory {
612 atom_mem.insert_with_vec(&id, &vector);
613 }
614
615 let count = {
616 let shards = self.shards.read();
617 shards.insert(id, vector, self.dimensions)?;
618 shards.count()
619 };
620
621 if self.config.ivf.enabled
622 && self.config.ivf.auto_threshold > 0
623 && count == self.config.ivf.auto_threshold as u64
624 {
625 self.train_ivf().context("Auto-train IVF failed")?;
626 } else if self.config.nsg.auto_threshold > 0
627 && count == self.config.nsg.auto_threshold as u64
628 {
629 self.train_nsg().context("Auto-train NSG failed")?;
630 } else {
631 self.maybe_auto_shard(count);
632 }
633
634 Ok(())
635 }
636
637 fn maybe_auto_shard(&self, count: u64) {
638 let cfg = &self.config.shard;
639 if !cfg.enabled || cfg.shard_count > 0 || cfg.auto_threshold == 0 {
640 return;
641 }
642 if count < cfg.auto_threshold as u64 {
643 return;
644 }
645
646 let mut shards = self.shards.write();
648 let snapshot: Vec<(String, EntangledHVec)> = {
649 match *shards {
650 ShardSet::Single(ref old_shard) => {
651 let vectors = old_shard.vectors.read();
652 vectors
653 .iter()
654 .map(|(k, v)| (k.clone(), v.clone()))
655 .collect()
656 }
657 ShardSet::Multi(_) => return,
658 }
659 };
660
661 let n_shards = (count as usize / cfg.target_shard_size).max(2);
662 let mgr = ShardManager::new(n_shards, self.dimensions);
663 for (id, vec) in snapshot {
664 let target = mgr.shard_for(&id);
665 let shard = &mgr.shards[target];
666 shard.vectors.write().insert(id.clone(), vec.clone());
667 shard.registry.write().push(id);
668 }
669 for shard in &mgr.shards {
670 let count = shard.vectors.read().len() as u64;
671 shard
672 .vector_count
673 .store(count, std::sync::atomic::Ordering::SeqCst);
674 let _ = shard.rebuild_inverted_index(self.dimensions);
675 }
676
677 *shards = ShardSet::Multi(mgr);
678 }
679
680 pub fn memorize_vector(&self, id: String, dense: &[f32]) -> Result<()> {
682 let vector = EntangledHVec::from_dense(dense, self.dimensions);
683 self.memorize(id, vector)
684 }
685
686 pub fn memorize_scalar(&self, id: String, value: f64, min: f64, max: f64) -> Result<()> {
688 let vector = EntangledHVec::from_scalar(value, min, max, self.dimensions);
689 self.memorize(id, vector)
690 }
691
692 pub fn vector_count(&self) -> u64 {
694 self.shards.read().count()
695 }
696
697 pub fn add_relation(&self, rel: &Relation) -> Result<()> {
700 let entry = RelationStore::serialize_relation(rel);
701 self.arena_write(&entry)?;
702 self.graph.add(rel);
703 if let Some(ref audit) = self.audit {
704 let label = format!("{}->{}:{}", rel.source_id, rel.target_id, rel.relation_type);
705 audit.record(AuditOp::Memorize, &label, self.sign_fn().as_deref())?;
706 }
707 Ok(())
708 }
709
710 pub fn remove_relation(&self, source_id: &str, relation_type: &str, target_id: &str) -> bool {
711 self.graph.remove(source_id, relation_type, target_id)
712 }
713
714 pub fn declare_relation_type(&self, rel_type: RelationType) {
715 self.graph.declare_type(rel_type);
716 }
717
718 pub fn traverse(
719 &self,
720 start_id: &str,
721 relation_type: Option<&str>,
722 max_depth: u32,
723 at_time: f64,
724 ) -> Vec<GraphPath> {
725 let shards = self.shards.read();
726 self.graph
727 .traverse(start_id, relation_type, max_depth, at_time, &|a, b| {
728 let vec_a = shards.get_vector(a);
729 let vec_b = shards.get_vector(b);
730 match (vec_a, vec_b) {
731 (Some(va), Some(vb)) => va.similarity(&vb),
732 _ => 0.0,
733 }
734 })
735 }
736
737 pub fn outgoing_relations(
738 &self,
739 source_id: &str,
740 relation_type: Option<&str>,
741 at_time: f64,
742 ) -> Vec<Relation> {
743 self.graph.outgoing(source_id, relation_type, at_time)
744 }
745
746 pub fn incoming_relations(
747 &self,
748 target_id: &str,
749 relation_type: Option<&str>,
750 at_time: f64,
751 ) -> Vec<Relation> {
752 self.graph.incoming(target_id, relation_type, at_time)
753 }
754
755 pub fn relation_count(&self) -> usize {
756 self.graph.count()
757 }
758
759 pub fn federated_query(
762 &self,
763 peer_paths: &[String],
764 query_vec: &EntangledHVec,
765 k: u32,
766 ) -> Result<Vec<super::types::RetrievalResult>> {
767 use rayon::prelude::*;
768
769 let mut all_results = self.query(query_vec, k);
771
772 let peer_results: Vec<Result<Vec<super::types::RetrievalResult>>> = peer_paths
774 .par_iter()
775 .map(|path| {
776 let peer = HmsCore::new(
777 self.dimensions as u32,
778 Some(path.clone()),
779 Some(self.config.clone()),
780 )?;
781 Ok(peer.query(query_vec, k))
782 })
783 .collect();
784
785 for result in peer_results {
786 all_results.extend(result?);
787 }
788
789 all_results.sort_by(|a, b| {
791 b.similarity
792 .partial_cmp(&a.similarity)
793 .unwrap_or(std::cmp::Ordering::Equal)
794 });
795 all_results.truncate(k as usize);
796 Ok(all_results)
797 }
798
799 pub fn structural_query(
802 &self,
803 known: &[(&str, &EntangledHVec)],
804 target_role: &str,
805 ) -> Vec<structural::StructuralResult> {
806 let (atom_mem, comp_mem, tri, roles, adm) = match (
807 &self.atom_memory,
808 &self.composite_memory,
809 &self.triple_store,
810 &self.role_registry,
811 &self.admission,
812 ) {
813 (Some(a), Some(c), Some(t), Some(r), Some(ad)) => (a, c, t, r, ad),
814 _ => return Vec::new(),
815 };
816 let mc = &self.config.meaning;
817 let ctx = structural::MeaningContext {
818 atom_memory: atom_mem,
819 composite_memory: comp_mem,
820 triple_store: tri,
821 roles,
822 admission: adm,
823 beta: mc.beta,
824 k: 64,
825 max_iter: 3,
826 };
827 structural::fuzzy_structural_query(&ctx, known, target_role)
828 }
829
830 pub fn multi_hop(&self, start: &str, relations: &[&str]) -> Vec<multi_hop::MultiHopResult> {
831 let (atom_mem, comp_mem, tri, roles, adm, rules) = match (
832 &self.atom_memory,
833 &self.composite_memory,
834 &self.triple_store,
835 &self.role_registry,
836 &self.admission,
837 &self.rule_store,
838 ) {
839 (Some(a), Some(c), Some(t), Some(r), Some(ad), Some(ru)) => (a, c, t, r, ad, ru),
840 _ => return Vec::new(),
841 };
842 let mc = &self.config.meaning;
843 let ctx = structural::MeaningContext {
844 atom_memory: atom_mem,
845 composite_memory: comp_mem,
846 triple_store: tri,
847 roles,
848 admission: adm,
849 beta: mc.beta,
850 k: 64,
851 max_iter: 3,
852 };
853 multi_hop::multi_hop_query(start, relations, &ctx, rules, mc.max_hop_depth)
854 }
855
856 pub fn meaning_cleanup(&self, noisy: &EntangledHVec) -> Option<(String, f64)> {
857 let atom_mem = self.atom_memory.as_ref()?;
858 let mc = &self.config.meaning;
859 let result = atom_mem.cleanup(noisy, mc.beta, 64, 3);
860 if result.found {
861 Some((result.id, result.confidence))
862 } else {
863 None
864 }
865 }
866
867 pub fn declare_rule(&self, name: &str, input_relations: Vec<String>, output_relation: String) {
868 if let Some(ref rules) = self.rule_store {
869 rules.add_rule(super::rules::CompositionRule {
870 name: name.to_string(),
871 input_relations,
872 output_relation,
873 });
874 }
875 }
876
877 pub fn meaning_enabled(&self) -> bool {
878 self.config.meaning.enabled
879 }
880
881 pub fn meaning_atom_count(&self) -> usize {
882 self.atom_memory.as_ref().map_or(0, |m| m.count())
883 }
884
885 pub fn meaning_composite_count(&self) -> usize {
886 self.composite_memory.as_ref().map_or(0, |m| m.count())
887 }
888
889 pub fn meaning_triple_count(&self) -> usize {
890 self.triple_store.as_ref().map_or(0, |t| t.count())
891 }
892
893 pub fn meaning_rule_count(&self) -> usize {
894 self.rule_store.as_ref().map_or(0, |r| r.count())
895 }
896
897 pub fn register_role(&mut self, name: &str, shift: usize) -> anyhow::Result<()> {
898 if let Some(ref mut roles) = self.role_registry {
899 roles.register(name, shift)
900 } else {
901 Err(anyhow::anyhow!("meaning memory not enabled"))
902 }
903 }
904
905 pub fn start_cognition(&self) -> Result<()> {
908 let atom_mem = self
909 .atom_memory
910 .as_ref()
911 .ok_or_else(|| anyhow::anyhow!("meaning memory not enabled"))?;
912 let tri_store = self
913 .triple_store
914 .as_ref()
915 .ok_or_else(|| anyhow::anyhow!("meaning memory not enabled"))?;
916
917 let cc = &self.config.cognition;
918 let loop_config = CognitionLoopConfig {
919 interval: std::time::Duration::from_secs(cc.interval_secs),
920 min_pattern_freq: cc.min_pattern_freq,
921 min_abstraction_members: cc.min_abstraction_members,
922 min_shared_relations: cc.min_shared_relations,
923 min_peer_coverage: cc.min_peer_coverage,
924 hypothesis_beta: cc.hypothesis_beta,
925 min_hypothesis_confidence: cc.min_hypothesis_confidence,
926 min_analogy_relations: cc.min_analogy_relations,
927 };
928
929 let cl = CognitionLoop::start(Arc::clone(atom_mem), Arc::clone(tri_store), loop_config);
930
931 *self.cognition_loop.lock() = Some(cl);
932 Ok(())
933 }
934
935 pub fn stop_cognition(&self) {
936 if let Some(ref mut cl) = *self.cognition_loop.lock() {
937 cl.stop();
938 }
939 }
940
941 pub fn cognition_running(&self) -> bool {
942 self.cognition_loop
943 .lock()
944 .as_ref()
945 .is_some_and(|cl| cl.state().is_running())
946 }
947
948 pub fn cognition_cycle_count(&self) -> u64 {
949 self.cognition_loop
950 .lock()
951 .as_ref()
952 .map_or(0, |cl| cl.state().cycle_count())
953 }
954
955 pub fn take_insights(&self) -> Vec<Insight> {
956 self.cognition_loop
957 .lock()
958 .as_ref()
959 .map_or_else(Vec::new, |cl| cl.state().take_insights())
960 }
961
962 pub fn cognition_insight_count(&self) -> usize {
963 self.cognition_loop
964 .lock()
965 .as_ref()
966 .map_or(0, |cl| cl.state().insight_count())
967 }
968
969 pub fn run_cognition_once(&self) -> Vec<Insight> {
970 let (atom_mem, tri_store) = match (&self.atom_memory, &self.triple_store) {
971 (Some(a), Some(t)) => (a, t),
972 _ => return Vec::new(),
973 };
974 let cc = &self.config.cognition;
975 let loop_config = CognitionLoopConfig {
976 interval: std::time::Duration::from_secs(cc.interval_secs),
977 min_pattern_freq: cc.min_pattern_freq,
978 min_abstraction_members: cc.min_abstraction_members,
979 min_shared_relations: cc.min_shared_relations,
980 min_peer_coverage: cc.min_peer_coverage,
981 hypothesis_beta: cc.hypothesis_beta,
982 min_hypothesis_confidence: cc.min_hypothesis_confidence,
983 min_analogy_relations: cc.min_analogy_relations,
984 };
985 CognitionLoop::run_once(atom_mem, tri_store, &loop_config)
986 }
987
988 pub fn govern_memory(&self) -> GovernanceReport {
989 let (atom_mem, comp_mem, tri_store) = match (
990 &self.atom_memory,
991 &self.composite_memory,
992 &self.triple_store,
993 ) {
994 (Some(a), Some(c), Some(t)) => (a, c, t),
995 _ => return GovernanceReport::default(),
996 };
997 let cc = &self.config.cognition;
998 let gov_config = GovernorConfig {
999 duplicate_threshold: cc.governor_duplicate_threshold,
1000 max_scan_size: cc.governor_max_scan_size,
1001 forget_unreferenced_atoms: cc.governor_forget_unreferenced,
1002 refine_atoms: cc.refine_atoms,
1003 ..Default::default()
1004 };
1005 MemoryGovernor::govern(atom_mem, comp_mem, tri_store, &gov_config)
1006 }
1007
1008 pub fn cognition_enabled(&self) -> bool {
1009 self.config.cognition.enabled
1010 }
1011
1012 pub fn ivf_trained(&self) -> bool {
1014 self.shards.read().ivf_trained()
1015 }
1016
1017 pub fn train_ivf(&self) -> Result<()> {
1019 let shards = self.shards.read();
1020 shards.try_for_each_shard(|shard| {
1021 let (ids, vectors) = shard.load_all_vectors();
1022 if ids.is_empty() {
1023 return Ok(());
1024 }
1025
1026 if let Some(ref mut existing) = *shard.ivf.write() {
1027 if let Some(ref lists) = existing.lists {
1028 lists.clear_all()?;
1029 }
1030 }
1031
1032 let index = IVFIndex::train(&vectors, &ids, self.dimensions, &self.config.ivf)?;
1033 *shard.ivf.write() = Some(index);
1034 Ok(())
1035 })?;
1036
1037 if let ShardSet::Single(ref shard) = *shards {
1038 if let Some(ref ivf) = *shard.ivf.read() {
1039 self.save_ivf(ivf)?;
1040 }
1041 }
1042 Ok(())
1043 }
1044
1045 pub fn nsg_trained(&self) -> bool {
1047 self.shards.read().nsg_trained()
1048 }
1049
1050 pub fn train_nsg(&self) -> Result<()> {
1052 let shards = self.shards.read();
1053 shards.try_for_each_shard(|shard| {
1054 let (ids, vectors) = shard.load_all_vectors();
1055 if ids.is_empty() {
1056 return Ok(());
1057 }
1058
1059 let index = super::nsg::training::train(&vectors, &ids, &self.config.nsg)?;
1060 *shard.nsg.write() = Some(index);
1061 Ok(())
1062 })?;
1063
1064 if let ShardSet::Single(ref shard) = *shards {
1065 if let Some(ref nsg) = *shard.nsg.read() {
1066 self.save_nsg(nsg)?;
1067 }
1068 }
1069 Ok(())
1070 }
1071
1072 fn arena_write(&self, data: &[u8]) -> Result<usize> {
1073 let payload = self.maybe_encrypt(data)?;
1074 self.arena.write_slice(&payload)
1075 }
1076
1077 fn arena_read_frame(&self, offset: usize) -> Result<(Vec<u8>, u32)> {
1078 let (data, version) = self.arena.read_frame(offset)?;
1079 let payload = self.maybe_decrypt(&data)?;
1080 Ok((payload, version))
1081 }
1082
1083 fn sign_fn(&self) -> Option<SignFn<'_>> {
1084 #[cfg(feature = "security")]
1085 {
1086 self.signing
1087 .as_ref()
1088 .map(|s| Box::new(move |data: &[u8]| s.sign(data)) as SignFn<'_>)
1089 }
1090 #[cfg(not(feature = "security"))]
1091 {
1092 None
1093 }
1094 }
1095
1096 pub fn audit_since(&self, timestamp_ms: u64) -> Result<Vec<super::audit::AuditEntry>> {
1099 match self.audit {
1100 Some(ref audit) => audit.entries_since(timestamp_ms),
1101 None => Ok(Vec::new()),
1102 }
1103 }
1104
1105 pub fn factorize_diffusion(
1107 &self,
1108 product: &EntangledHVec,
1109 domain_codebooks: &[Vec<EntangledHVec>],
1110 max_iter: usize,
1111 ) -> Vec<Option<EntangledHVec>> {
1112 DiffusionFactorizer::factorize(&self.config.diffusion, product, domain_codebooks, max_iter)
1113 }
1114}