Skip to main content

holographic_memory/core/engine/
mod.rs

1// Copyright 2024-2026 WritersLogic Contributors
2// SPDX-License-Identifier: Apache-2.0
3
4pub(crate) mod concepts;
5pub(crate) mod knowledge;
6pub(crate) mod multi_hop;
7pub(crate) mod query;
8pub(crate) mod router;
9pub(crate) mod shard;
10pub(crate) mod structural;
11
12use anyhow::{Context, Result};
13use parking_lot::RwLock;
14use std::path::{Path, PathBuf};
15use std::sync::Arc;
16
17use super::admission::AdmissionControl;
18use super::atom_memory::AtomMemory;
19use super::audit::{AuditLog, AuditOp};
20use super::cognition::governor::{GovernanceReport, GovernorConfig, MemoryGovernor};
21use super::cognition::r#loop::{CognitionConfig as CognitionLoopConfig, CognitionLoop, Insight};
22use super::composite_memory::CompositeMemory;
23use super::config::HmsConfig;
24use super::decompose::Decomposer;
25use super::diffusion::DiffusionFactorizer;
26use super::encoding::encode_text_internal;
27use super::entangled::EntangledHVec;
28use super::graph::RelationStore;
29use super::ivf::IVFIndex;
30use super::role::RoleRegistry;
31use super::rules::RuleStore;
32use super::storage::PersistentArena;
33use super::text::TextProcessor;
34use super::triple_store::TripleStore;
35use super::types::{GraphPath, Relation, RelationType, TextMetrics};
36
37use shard::{ShardManager, ShardSet};
38
39type SignFn<'a> = Box<dyn Fn(&[u8]) -> super::audit::SignatureBytes + 'a>;
40
41/// Lock ordering: ShardSet (read) -> Shard.vectors -> Shard.ivf -> Shard.nsg.
42/// Arena lock is independent (managed internally by PersistentArena).
43pub struct HmsCore {
44    config: HmsConfig,
45    pub(crate) arena: Arc<PersistentArena>,
46    pub(crate) dimensions: usize,
47    pub(crate) storage_path: PathBuf,
48    shards: RwLock<ShardSet>,
49    graph: RelationStore,
50    atom_memory: Option<Arc<AtomMemory>>,
51    composite_memory: Option<Arc<CompositeMemory>>,
52    triple_store: Option<Arc<TripleStore>>,
53    role_registry: Option<RoleRegistry>,
54    rule_store: Option<RuleStore>,
55    decomposer: Option<Decomposer>,
56    admission: Option<AdmissionControl>,
57    cognition_loop: parking_lot::Mutex<Option<CognitionLoop>>,
58    audit: Option<AuditLog>,
59    #[cfg(feature = "security")]
60    signing: Option<super::security::SigningManager>,
61    #[cfg(feature = "security")]
62    #[allow(dead_code)]
63    encryption: Option<super::security::EncryptionManager>,
64}
65
66impl HmsCore {
67    /// Create a new HMS instance. If `storage_path` is None, uses the current directory.
68    pub fn new(
69        dimensions: u32,
70        storage_path: Option<String>,
71        config: Option<HmsConfig>,
72    ) -> Result<Self> {
73        const MAX_DIMENSIONS: u32 = 1_000_000;
74        if dimensions == 0 || dimensions > MAX_DIMENSIONS {
75            return Err(anyhow::anyhow!(
76                "dimensions must be between 1 and {} (got {})",
77                MAX_DIMENSIONS,
78                dimensions
79            ));
80        }
81        let dim = dimensions as usize;
82        let config = config.unwrap_or_default();
83
84        let base_path = storage_path
85            .map(PathBuf::from)
86            .unwrap_or_else(|| Path::new(".").to_path_buf());
87        if !base_path.exists() {
88            std::fs::create_dir_all(&base_path)?;
89        }
90
91        let arena = Arc::new(PersistentArena::new(base_path.join("vectors_data.bin"))?);
92
93        let audit = if config.security.audit_enabled {
94            Some(AuditLog::new(&base_path)?)
95        } else {
96            None
97        };
98
99        #[cfg(feature = "security")]
100        let signing = if config.security.signing_enabled {
101            let key_path = config
102                .security
103                .key_path
104                .as_ref()
105                .map(PathBuf::from)
106                .unwrap_or_else(|| base_path.join("hms_signing.key"));
107            Some(super::security::SigningManager::new(&key_path)?)
108        } else {
109            None
110        };
111
112        #[cfg(feature = "security")]
113        let encryption = if config.security.encryption_enabled {
114            let passphrase = config
115                .security
116                .encryption_passphrase
117                .as_deref()
118                .ok_or_else(|| {
119                    anyhow::anyhow!("encryption_passphrase required when encryption is enabled")
120                })?;
121            Some(super::security::EncryptionManager::new(
122                passphrase, &base_path,
123            )?)
124        } else {
125            None
126        };
127
128        let shard_set = if config.shard.enabled && config.shard.shard_count > 1 {
129            ShardSet::Multi(ShardManager::new(config.shard.shard_count, dim))
130        } else {
131            ShardSet::Single(Box::new(shard::Shard::new(dim)))
132        };
133
134        let (atom_mem, comp_mem, tri_store, role_reg, rule_st, decomp, adm) =
135            if config.meaning.enabled {
136                let mc = &config.meaning;
137                (
138                    Some(Arc::new(AtomMemory::new(dim, mc.idf_clip_factor))),
139                    Some(Arc::new(CompositeMemory::new(dim, mc.idf_clip_factor))),
140                    Some(Arc::new(TripleStore::new())),
141                    Some(RoleRegistry::new(dim)),
142                    Some(RuleStore::new()),
143                    Some(Decomposer::new()),
144                    Some(AdmissionControl::new(mc.algebraic_max_fanout)),
145                )
146            } else {
147                (None, None, None, None, None, None, None)
148            };
149
150        let core = Self {
151            config: config.clone(),
152            arena,
153            dimensions: dim,
154            storage_path: base_path,
155            shards: RwLock::new(shard_set),
156            graph: RelationStore::new(),
157            atom_memory: atom_mem,
158            composite_memory: comp_mem,
159            triple_store: tri_store,
160            role_registry: role_reg,
161            rule_store: rule_st,
162            decomposer: decomp,
163            admission: adm,
164            cognition_loop: parking_lot::Mutex::new(None),
165            audit,
166            #[cfg(feature = "security")]
167            signing,
168            #[cfg(feature = "security")]
169            encryption,
170        };
171
172        core.load_from_log()?;
173        core.load_indices()?;
174        {
175            let shards = core.shards.read();
176            shards.try_for_each_shard(|s| s.rebuild_inverted_index(dim))?;
177        }
178
179        Ok(core)
180    }
181
182    fn load_indices(&self) -> Result<()> {
183        let nsg_path = self.storage_path.join("nsg_index.bin");
184        if nsg_path.exists() {
185            let raw = std::fs::read(&nsg_path)?;
186            let data = self.maybe_decrypt(&raw)?;
187            let nsg: super::nsg::NSGIndex = bincode::deserialize(&data)?;
188            // Load NSG into the first (or only) shard
189            let shards = self.shards.read();
190            if let ShardSet::Single(ref shard) = *shards {
191                *shard.nsg.write() = Some(nsg);
192            }
193        }
194
195        let ivf_path = self.storage_path.join("ivf_index.bin");
196        if ivf_path.exists() {
197            let raw = std::fs::read(&ivf_path)?;
198            let data = self.maybe_decrypt(&raw)?;
199            let mut ivf: IVFIndex = bincode::deserialize(&data)?;
200            ivf.lists = Some(super::ivf::inverted_list::InvertedLists::new());
201
202            let shards = self.shards.read();
203            if let ShardSet::Single(ref shard) = *shards {
204                let vectors = shard.vectors.read();
205                let registry = shard.registry.read();
206                for id in registry.iter() {
207                    if let Some(vec) = vectors.get(id) {
208                        ivf.insert(id, vec)?;
209                    }
210                }
211                *shard.ivf.write() = Some(ivf);
212            }
213        }
214
215        Ok(())
216    }
217
218    fn save_nsg(&self, nsg: &super::nsg::NSGIndex) -> Result<()> {
219        let data = bincode::serialize(nsg)?;
220        std::fs::write(
221            self.storage_path.join("nsg_index.bin"),
222            self.maybe_encrypt(&data)?,
223        )?;
224        Ok(())
225    }
226
227    fn save_ivf(&self, ivf: &IVFIndex) -> Result<()> {
228        let data = bincode::serialize(ivf)?;
229        std::fs::write(
230            self.storage_path.join("ivf_index.bin"),
231            self.maybe_encrypt(&data)?,
232        )?;
233        Ok(())
234    }
235
236    /// Bundle vectors respecting the PrivacyConfig.
237    /// When dp_enabled, uses Laplace noise with the configured epsilon.
238    pub fn bundle<V: std::borrow::Borrow<EntangledHVec>>(&self, vectors: &[V]) -> EntangledHVec {
239        if self.config.privacy.dp_enabled {
240            EntangledHVec::bundle_dp(vectors, self.config.privacy.epsilon)
241        } else {
242            EntangledHVec::bundle(vectors)
243        }
244    }
245
246    fn maybe_encrypt(&self, data: &[u8]) -> Result<Vec<u8>> {
247        #[cfg(feature = "security")]
248        if let Some(ref enc) = self.encryption {
249            return enc.encrypt(data);
250        }
251        Ok(data.to_vec())
252    }
253
254    fn maybe_decrypt(&self, data: &[u8]) -> Result<Vec<u8>> {
255        #[cfg(feature = "security")]
256        if let Some(ref enc) = self.encryption {
257            return enc.decrypt(data);
258        }
259        Ok(data.to_vec())
260    }
261
262    fn load_from_log(&self) -> Result<()> {
263        let shards = self.shards.read();
264
265        let mut offset = 0;
266        while let Ok((payload, _version)) = self.arena_read_frame(offset) {
267            if let Some((id, vec)) =
268                super::atom_memory::AtomMemory::deserialize_atom(&payload, self.dimensions)
269            {
270                if let Some(ref atom_mem) = self.atom_memory {
271                    atom_mem.load_atom(id, vec);
272                }
273            } else if let Some((id, vec)) =
274                super::composite_memory::CompositeMemory::deserialize_composite(
275                    &payload,
276                    self.dimensions,
277                )
278            {
279                if let Some(ref comp_mem) = self.composite_memory {
280                    comp_mem.load_composite(id, vec);
281                }
282            } else if let Some(record) =
283                super::triple_store::TripleStore::deserialize_triple(&payload)
284            {
285                if let Some(ref tri_store) = self.triple_store {
286                    tri_store.load_triple(record);
287                }
288            } else if let Some(rule) = super::rules::RuleStore::deserialize_rule(&payload) {
289                if let Some(ref rule_store) = self.rule_store {
290                    rule_store.load_rule(rule);
291                }
292            } else if let Some(rel) = RelationStore::deserialize_relation(&payload) {
293                self.graph.load_relation(&rel);
294            } else {
295                let (id, vector) = Self::parse_log_payload(self.dimensions, &payload);
296                if vector.dim == 0 {
297                    shards.remove(&id, self.dimensions)?;
298                } else {
299                    shards.insert(id, vector, self.dimensions)?;
300                }
301            }
302            offset = match self.arena.next_offset(offset) {
303                Ok(next) => next,
304                Err(_) => break,
305            };
306        }
307
308        // Rebuild registries from live vectors (ensure deterministic ordering)
309        shards.for_each_shard(|shard| {
310            let vectors = shard.vectors.read();
311            let mut reg = shard.registry.write();
312            let mut live_ids: Vec<String> = vectors.keys().cloned().collect();
313            live_ids.sort();
314            *reg = live_ids;
315            shard
316                .vector_count
317                .store(reg.len() as u64, std::sync::atomic::Ordering::SeqCst);
318        });
319
320        if let Some(ref atom_mem) = self.atom_memory {
321            atom_mem.rebuild_indices();
322        }
323        if let Some(ref comp_mem) = self.composite_memory {
324            comp_mem.rebuild_indices();
325        }
326
327        Ok(())
328    }
329
330    fn parse_log_payload(dimensions: usize, payload: &[u8]) -> (String, EntangledHVec) {
331        if payload.len() < 6 {
332            return (String::new(), EntangledHVec::from_indices(vec![], 0));
333        }
334        let id_len = u16::from_le_bytes([payload[0], payload[1]]) as usize;
335        let id_end = 2 + id_len;
336        if payload.len() < id_end + 4 {
337            return (String::new(), EntangledHVec::from_indices(vec![], 0));
338        }
339        let id = match std::str::from_utf8(&payload[2..id_end]) {
340            Ok(s) => s.to_owned(),
341            Err(_) => String::from_utf8_lossy(&payload[2..id_end]).into_owned(),
342        };
343        let delta_count_raw = u32::from_le_bytes(match payload[id_end..id_end + 4].try_into() {
344            Ok(b) => b,
345            Err(_) => return (id, EntangledHVec::from_indices(vec![], 0)),
346        });
347
348        if delta_count_raw == Self::TOMBSTONE_MARKER {
349            return (id, EntangledHVec::from_indices(vec![], 0));
350        }
351
352        let delta_count = delta_count_raw as usize;
353        if delta_count == 0 {
354            return (id, EntangledHVec::from_indices(vec![], dimensions));
355        }
356
357        let deltas_start = id_end + 4;
358        let deltas_end = deltas_start + delta_count * 4;
359        if payload.len() < deltas_end {
360            return (id, EntangledHVec::from_indices(vec![], 0));
361        }
362        let deltas: Vec<u32> = payload[deltas_start..deltas_end]
363            .chunks_exact(4)
364            .map(|c| u32::from_le_bytes(c.try_into().unwrap()))
365            .collect();
366        (id, EntangledHVec::from_deltas(&deltas, dimensions))
367    }
368
369    /// Returns the dimensionality of the hypervector space.
370    pub fn dimensions(&self) -> usize {
371        self.dimensions
372    }
373
374    /// Encode text into a sparse hypervector using character trigrams.
375    pub fn encode_text(&self, text: &str) -> EntangledHVec {
376        encode_text_internal(text, self.dimensions)
377    }
378
379    /// Compute word, sentence, syllable, and character-class counts for text.
380    pub fn analyze_text(&self, text: &str) -> TextMetrics {
381        TextProcessor::analyze(text)
382    }
383
384    /// Compute Flesch Reading Ease score from text metrics.
385    pub fn calculate_readability(&self, metrics: &TextMetrics) -> f64 {
386        TextProcessor::calculate_readability(metrics)
387    }
388
389    fn serialize_log_entry(id: &str, vector: &EntangledHVec) -> Result<Vec<u8>> {
390        let id_bytes = id.as_bytes();
391        if id_bytes.len() > u16::MAX as usize {
392            return Err(anyhow::anyhow!(
393                "ID too long: {} bytes (max {})",
394                id_bytes.len(),
395                u16::MAX
396            ));
397        }
398        let deltas = vector.to_deltas();
399        let mut entry = Vec::with_capacity(2 + id_bytes.len() + 4 + deltas.len() * 4);
400        entry.extend_from_slice(&(id_bytes.len() as u16).to_le_bytes());
401        entry.extend_from_slice(id_bytes);
402        entry.extend_from_slice(&(deltas.len() as u32).to_le_bytes());
403        for &d in &deltas {
404            entry.extend_from_slice(&d.to_le_bytes());
405        }
406        Ok(entry)
407    }
408
409    const TOMBSTONE_MARKER: u32 = u32::MAX;
410
411    fn serialize_tombstone(id: &str) -> Result<Vec<u8>> {
412        let id_bytes = id.as_bytes();
413        if id_bytes.len() > u16::MAX as usize {
414            return Err(anyhow::anyhow!(
415                "ID too long: {} bytes (max {})",
416                id_bytes.len(),
417                u16::MAX
418            ));
419        }
420        let mut entry = Vec::with_capacity(2 + id_bytes.len() + 4);
421        entry.extend_from_slice(&(id_bytes.len() as u16).to_le_bytes());
422        entry.extend_from_slice(id_bytes);
423        entry.extend_from_slice(&Self::TOMBSTONE_MARKER.to_le_bytes());
424        Ok(entry)
425    }
426
427    /// Delete a vector by ID. Returns true if it existed. Crash-safe: tombstone is persisted first.
428    pub fn delete(&self, id: &str) -> Result<bool> {
429        // Persist tombstone first for crash-safety: if we crash after the
430        // arena write but before the memory remove, load_from_log replays
431        // the tombstone and correctly removes the vector.
432        self.arena_write(&Self::serialize_tombstone(id)?)?;
433
434        if let Some(ref audit) = self.audit {
435            audit.record(AuditOp::Delete, id, self.sign_fn().as_deref())?;
436        }
437
438        if let Some(ref atom_mem) = self.atom_memory {
439            atom_mem.delete(id);
440        }
441
442        let shards = self.shards.read();
443        if !shards.remove(id, self.dimensions)? {
444            return Ok(false);
445        }
446        Ok(true)
447    }
448
449    pub fn memorize_meaning(&self, id: &str, text: &str) -> Result<()> {
450        let vector = self.encode_text(text);
451        self.memorize(id.to_string(), vector)?;
452
453        if let (
454            Some(ref decomposer),
455            Some(ref atom_mem),
456            Some(ref comp_mem),
457            Some(ref tri_store),
458            Some(ref roles),
459        ) = (
460            &self.decomposer,
461            &self.atom_memory,
462            &self.composite_memory,
463            &self.triple_store,
464            &self.role_registry,
465        ) {
466            if self.config.meaning.auto_decompose {
467                let units = decomposer.decompose(text);
468                for unit in &units {
469                    let (_, s_vec) = atom_mem.get_or_insert(&unit.subject);
470                    let (_, r_vec) = atom_mem.get_or_insert(&unit.relation);
471                    let (_, o_vec) = atom_mem.get_or_insert(&unit.object);
472
473                    self.arena_write(&super::atom_memory::AtomMemory::serialize_atom(
474                        &unit.subject,
475                        &s_vec,
476                    ))?;
477                    self.arena_write(&super::atom_memory::AtomMemory::serialize_atom(
478                        &unit.relation,
479                        &r_vec,
480                    ))?;
481                    self.arena_write(&super::atom_memory::AtomMemory::serialize_atom(
482                        &unit.object,
483                        &o_vec,
484                    ))?;
485
486                    let composite = roles.compose_triple(&s_vec, &r_vec, &o_vec);
487                    let comp_id =
488                        format!("{}:{}:{}:{}", id, unit.subject, unit.relation, unit.object);
489                    comp_mem.insert(comp_id.clone(), composite.clone());
490
491                    self.arena_write(
492                        &super::composite_memory::CompositeMemory::serialize_composite(
493                            &comp_id, &composite,
494                        ),
495                    )?;
496
497                    tri_store.add(&unit.subject, &unit.relation, &unit.object, &comp_id);
498                    self.arena_write(&super::triple_store::TripleStore::serialize_triple(
499                        &super::triple_store::TripleRecord {
500                            subject_id: unit.subject.clone(),
501                            relation_id: unit.relation.clone(),
502                            object_id: unit.object.clone(),
503                            composite_id: comp_id,
504                            deleted: false,
505                        },
506                    ))?;
507                }
508            }
509        }
510        Ok(())
511    }
512
513    /// Compact the arena log by rewriting only live vectors. Blocks all writes during compaction.
514    pub fn compact(&self) -> Result<()> {
515        // Hold the shards write lock for the entire compaction to block
516        // concurrent memorize/delete. This guarantees the snapshot is
517        // consistent with what gets swapped in.
518        let shards = self.shards.write();
519
520        let mut snapshot = Vec::new();
521        shards.for_each_shard(|shard| {
522            let vectors = shard.vectors.read();
523            let registry = shard.registry.read();
524            for id in registry.iter() {
525                if let Some(v) = vectors.get(id) {
526                    snapshot.push((id.clone(), v.clone()));
527                }
528            }
529        });
530
531        let temp_dir = self.storage_path.join(format!(
532            ".compact_{}",
533            std::time::SystemTime::now()
534                .duration_since(std::time::UNIX_EPOCH)
535                .unwrap_or_default()
536                .as_micros()
537        ));
538
539        let relation_snapshot = self.graph.snapshot();
540
541        {
542            let temp_arena = PersistentArena::new(&temp_dir)?;
543            for (id, vector) in &snapshot {
544                let entry = Self::serialize_log_entry(id, vector)?;
545                let payload = self.maybe_encrypt(&entry)?;
546                temp_arena.write_slice(&payload)?;
547            }
548            for rel in &relation_snapshot {
549                let entry = RelationStore::serialize_relation(rel);
550                let payload = self.maybe_encrypt(&entry)?;
551                temp_arena.write_slice(&payload)?;
552            }
553            if let Some(ref atom_mem) = self.atom_memory {
554                for (_, id, vec) in atom_mem.inner().all_vectors() {
555                    let entry = super::atom_memory::AtomMemory::serialize_atom(&id, &vec);
556                    let payload = self.maybe_encrypt(&entry)?;
557                    temp_arena.write_slice(&payload)?;
558                }
559            }
560            if let Some(ref comp_mem) = self.composite_memory {
561                for (_, id, vec) in comp_mem.inner().all_vectors() {
562                    let entry =
563                        super::composite_memory::CompositeMemory::serialize_composite(&id, &vec);
564                    let payload = self.maybe_encrypt(&entry)?;
565                    temp_arena.write_slice(&payload)?;
566                }
567            }
568            if let Some(ref tri_store) = self.triple_store {
569                for record in tri_store.snapshot() {
570                    let entry = super::triple_store::TripleStore::serialize_triple(&record);
571                    let payload = self.maybe_encrypt(&entry)?;
572                    temp_arena.write_slice(&payload)?;
573                }
574            }
575            if let Some(ref rule_store) = self.rule_store {
576                for rule in rule_store.all_rules() {
577                    let entry = super::rules::RuleStore::serialize_rule(&rule);
578                    let payload = self.maybe_encrypt(&entry)?;
579                    temp_arena.write_slice(&payload)?;
580                }
581            }
582        }
583
584        self.arena.replace_with_compacted(&temp_dir)?;
585
586        if let Some(ref audit) = self.audit {
587            audit.record(AuditOp::Compact, "", self.sign_fn().as_deref())?;
588        }
589
590        if let ShardSet::Single(ref shard) = *shards {
591            if let Some(ref nsg) = *shard.nsg.read() {
592                self.save_nsg(nsg)?;
593            }
594            if let Some(ref ivf) = *shard.ivf.read() {
595                self.save_ivf(ivf)?;
596            }
597        }
598
599        Ok(())
600    }
601
602    /// Store a vector with the given ID. Persists to the arena log and updates all indices.
603    pub fn memorize(&self, id: String, vector: EntangledHVec) -> Result<()> {
604        let entry = Self::serialize_log_entry(&id, &vector)?;
605        self.arena_write(&entry)?;
606
607        if let Some(ref audit) = self.audit {
608            audit.record(AuditOp::Memorize, &id, self.sign_fn().as_deref())?;
609        }
610
611        if let Some(ref atom_mem) = self.atom_memory {
612            atom_mem.insert_with_vec(&id, &vector);
613        }
614
615        let count = {
616            let shards = self.shards.read();
617            shards.insert(id, vector, self.dimensions)?;
618            shards.count()
619        };
620
621        if self.config.ivf.enabled
622            && self.config.ivf.auto_threshold > 0
623            && count == self.config.ivf.auto_threshold as u64
624        {
625            self.train_ivf().context("Auto-train IVF failed")?;
626        } else if self.config.nsg.auto_threshold > 0
627            && count == self.config.nsg.auto_threshold as u64
628        {
629            self.train_nsg().context("Auto-train NSG failed")?;
630        } else {
631            self.maybe_auto_shard(count);
632        }
633
634        Ok(())
635    }
636
637    fn maybe_auto_shard(&self, count: u64) {
638        let cfg = &self.config.shard;
639        if !cfg.enabled || cfg.shard_count > 0 || cfg.auto_threshold == 0 {
640            return;
641        }
642        if count < cfg.auto_threshold as u64 {
643            return;
644        }
645
646        // Upgrade from Single to Multi: snapshot vectors, build new shards, swap.
647        let mut shards = self.shards.write();
648        let snapshot: Vec<(String, EntangledHVec)> = {
649            match *shards {
650                ShardSet::Single(ref old_shard) => {
651                    let vectors = old_shard.vectors.read();
652                    vectors
653                        .iter()
654                        .map(|(k, v)| (k.clone(), v.clone()))
655                        .collect()
656                }
657                ShardSet::Multi(_) => return,
658            }
659        };
660
661        let n_shards = (count as usize / cfg.target_shard_size).max(2);
662        let mgr = ShardManager::new(n_shards, self.dimensions);
663        for (id, vec) in snapshot {
664            let target = mgr.shard_for(&id);
665            let shard = &mgr.shards[target];
666            shard.vectors.write().insert(id.clone(), vec.clone());
667            shard.registry.write().push(id);
668        }
669        for shard in &mgr.shards {
670            let count = shard.vectors.read().len() as u64;
671            shard
672                .vector_count
673                .store(count, std::sync::atomic::Ordering::SeqCst);
674            let _ = shard.rebuild_inverted_index(self.dimensions);
675        }
676
677        *shards = ShardSet::Multi(mgr);
678    }
679
680    /// Convert a dense f32 vector to sparse and memorize it.
681    pub fn memorize_vector(&self, id: String, dense: &[f32]) -> Result<()> {
682        let vector = EntangledHVec::from_dense(dense, self.dimensions);
683        self.memorize(id, vector)
684    }
685
686    /// Encode a bounded scalar value as a hypervector and memorize it.
687    pub fn memorize_scalar(&self, id: String, value: f64, min: f64, max: f64) -> Result<()> {
688        let vector = EntangledHVec::from_scalar(value, min, max, self.dimensions);
689        self.memorize(id, vector)
690    }
691
692    /// Returns the total number of stored vectors across all shards.
693    pub fn vector_count(&self) -> u64 {
694        self.shards.read().count()
695    }
696
697    // === Graph API ===
698
699    pub fn add_relation(&self, rel: &Relation) -> Result<()> {
700        let entry = RelationStore::serialize_relation(rel);
701        self.arena_write(&entry)?;
702        self.graph.add(rel);
703        if let Some(ref audit) = self.audit {
704            let label = format!("{}->{}:{}", rel.source_id, rel.target_id, rel.relation_type);
705            audit.record(AuditOp::Memorize, &label, self.sign_fn().as_deref())?;
706        }
707        Ok(())
708    }
709
710    pub fn remove_relation(&self, source_id: &str, relation_type: &str, target_id: &str) -> bool {
711        self.graph.remove(source_id, relation_type, target_id)
712    }
713
714    pub fn declare_relation_type(&self, rel_type: RelationType) {
715        self.graph.declare_type(rel_type);
716    }
717
718    pub fn traverse(
719        &self,
720        start_id: &str,
721        relation_type: Option<&str>,
722        max_depth: u32,
723        at_time: f64,
724    ) -> Vec<GraphPath> {
725        let shards = self.shards.read();
726        self.graph
727            .traverse(start_id, relation_type, max_depth, at_time, &|a, b| {
728                let vec_a = shards.get_vector(a);
729                let vec_b = shards.get_vector(b);
730                match (vec_a, vec_b) {
731                    (Some(va), Some(vb)) => va.similarity(&vb),
732                    _ => 0.0,
733                }
734            })
735    }
736
737    pub fn outgoing_relations(
738        &self,
739        source_id: &str,
740        relation_type: Option<&str>,
741        at_time: f64,
742    ) -> Vec<Relation> {
743        self.graph.outgoing(source_id, relation_type, at_time)
744    }
745
746    pub fn incoming_relations(
747        &self,
748        target_id: &str,
749        relation_type: Option<&str>,
750        at_time: f64,
751    ) -> Vec<Relation> {
752        self.graph.incoming(target_id, relation_type, at_time)
753    }
754
755    pub fn relation_count(&self) -> usize {
756        self.graph.count()
757    }
758
759    // === Federated Query ===
760
761    pub fn federated_query(
762        &self,
763        peer_paths: &[String],
764        query_vec: &EntangledHVec,
765        k: u32,
766    ) -> Result<Vec<super::types::RetrievalResult>> {
767        use rayon::prelude::*;
768
769        // Query local instance
770        let mut all_results = self.query(query_vec, k);
771
772        // Query each peer in parallel
773        let peer_results: Vec<Result<Vec<super::types::RetrievalResult>>> = peer_paths
774            .par_iter()
775            .map(|path| {
776                let peer = HmsCore::new(
777                    self.dimensions as u32,
778                    Some(path.clone()),
779                    Some(self.config.clone()),
780                )?;
781                Ok(peer.query(query_vec, k))
782            })
783            .collect();
784
785        for result in peer_results {
786            all_results.extend(result?);
787        }
788
789        // Sort by similarity descending and take top-k
790        all_results.sort_by(|a, b| {
791            b.similarity
792                .partial_cmp(&a.similarity)
793                .unwrap_or(std::cmp::Ordering::Equal)
794        });
795        all_results.truncate(k as usize);
796        Ok(all_results)
797    }
798
799    // === Meaning Memory API ===
800
801    pub fn structural_query(
802        &self,
803        known: &[(&str, &EntangledHVec)],
804        target_role: &str,
805    ) -> Vec<structural::StructuralResult> {
806        let (atom_mem, comp_mem, tri, roles, adm) = match (
807            &self.atom_memory,
808            &self.composite_memory,
809            &self.triple_store,
810            &self.role_registry,
811            &self.admission,
812        ) {
813            (Some(a), Some(c), Some(t), Some(r), Some(ad)) => (a, c, t, r, ad),
814            _ => return Vec::new(),
815        };
816        let mc = &self.config.meaning;
817        let ctx = structural::MeaningContext {
818            atom_memory: atom_mem,
819            composite_memory: comp_mem,
820            triple_store: tri,
821            roles,
822            admission: adm,
823            beta: mc.beta,
824            k: 64,
825            max_iter: 3,
826        };
827        structural::fuzzy_structural_query(&ctx, known, target_role)
828    }
829
830    pub fn multi_hop(&self, start: &str, relations: &[&str]) -> Vec<multi_hop::MultiHopResult> {
831        let (atom_mem, comp_mem, tri, roles, adm, rules) = match (
832            &self.atom_memory,
833            &self.composite_memory,
834            &self.triple_store,
835            &self.role_registry,
836            &self.admission,
837            &self.rule_store,
838        ) {
839            (Some(a), Some(c), Some(t), Some(r), Some(ad), Some(ru)) => (a, c, t, r, ad, ru),
840            _ => return Vec::new(),
841        };
842        let mc = &self.config.meaning;
843        let ctx = structural::MeaningContext {
844            atom_memory: atom_mem,
845            composite_memory: comp_mem,
846            triple_store: tri,
847            roles,
848            admission: adm,
849            beta: mc.beta,
850            k: 64,
851            max_iter: 3,
852        };
853        multi_hop::multi_hop_query(start, relations, &ctx, rules, mc.max_hop_depth)
854    }
855
856    pub fn meaning_cleanup(&self, noisy: &EntangledHVec) -> Option<(String, f64)> {
857        let atom_mem = self.atom_memory.as_ref()?;
858        let mc = &self.config.meaning;
859        let result = atom_mem.cleanup(noisy, mc.beta, 64, 3);
860        if result.found {
861            Some((result.id, result.confidence))
862        } else {
863            None
864        }
865    }
866
867    pub fn declare_rule(&self, name: &str, input_relations: Vec<String>, output_relation: String) {
868        if let Some(ref rules) = self.rule_store {
869            rules.add_rule(super::rules::CompositionRule {
870                name: name.to_string(),
871                input_relations,
872                output_relation,
873            });
874        }
875    }
876
877    pub fn meaning_enabled(&self) -> bool {
878        self.config.meaning.enabled
879    }
880
881    pub fn meaning_atom_count(&self) -> usize {
882        self.atom_memory.as_ref().map_or(0, |m| m.count())
883    }
884
885    pub fn meaning_composite_count(&self) -> usize {
886        self.composite_memory.as_ref().map_or(0, |m| m.count())
887    }
888
889    pub fn meaning_triple_count(&self) -> usize {
890        self.triple_store.as_ref().map_or(0, |t| t.count())
891    }
892
893    pub fn meaning_rule_count(&self) -> usize {
894        self.rule_store.as_ref().map_or(0, |r| r.count())
895    }
896
897    pub fn register_role(&mut self, name: &str, shift: usize) -> anyhow::Result<()> {
898        if let Some(ref mut roles) = self.role_registry {
899            roles.register(name, shift)
900        } else {
901            Err(anyhow::anyhow!("meaning memory not enabled"))
902        }
903    }
904
905    // === Cognition API ===
906
907    pub fn start_cognition(&self) -> Result<()> {
908        let atom_mem = self
909            .atom_memory
910            .as_ref()
911            .ok_or_else(|| anyhow::anyhow!("meaning memory not enabled"))?;
912        let tri_store = self
913            .triple_store
914            .as_ref()
915            .ok_or_else(|| anyhow::anyhow!("meaning memory not enabled"))?;
916
917        let cc = &self.config.cognition;
918        let loop_config = CognitionLoopConfig {
919            interval: std::time::Duration::from_secs(cc.interval_secs),
920            min_pattern_freq: cc.min_pattern_freq,
921            min_abstraction_members: cc.min_abstraction_members,
922            min_shared_relations: cc.min_shared_relations,
923            min_peer_coverage: cc.min_peer_coverage,
924            hypothesis_beta: cc.hypothesis_beta,
925            min_hypothesis_confidence: cc.min_hypothesis_confidence,
926            min_analogy_relations: cc.min_analogy_relations,
927        };
928
929        let cl = CognitionLoop::start(Arc::clone(atom_mem), Arc::clone(tri_store), loop_config);
930
931        *self.cognition_loop.lock() = Some(cl);
932        Ok(())
933    }
934
935    pub fn stop_cognition(&self) {
936        if let Some(ref mut cl) = *self.cognition_loop.lock() {
937            cl.stop();
938        }
939    }
940
941    pub fn cognition_running(&self) -> bool {
942        self.cognition_loop
943            .lock()
944            .as_ref()
945            .is_some_and(|cl| cl.state().is_running())
946    }
947
948    pub fn cognition_cycle_count(&self) -> u64 {
949        self.cognition_loop
950            .lock()
951            .as_ref()
952            .map_or(0, |cl| cl.state().cycle_count())
953    }
954
955    pub fn take_insights(&self) -> Vec<Insight> {
956        self.cognition_loop
957            .lock()
958            .as_ref()
959            .map_or_else(Vec::new, |cl| cl.state().take_insights())
960    }
961
962    pub fn cognition_insight_count(&self) -> usize {
963        self.cognition_loop
964            .lock()
965            .as_ref()
966            .map_or(0, |cl| cl.state().insight_count())
967    }
968
969    pub fn run_cognition_once(&self) -> Vec<Insight> {
970        let (atom_mem, tri_store) = match (&self.atom_memory, &self.triple_store) {
971            (Some(a), Some(t)) => (a, t),
972            _ => return Vec::new(),
973        };
974        let cc = &self.config.cognition;
975        let loop_config = CognitionLoopConfig {
976            interval: std::time::Duration::from_secs(cc.interval_secs),
977            min_pattern_freq: cc.min_pattern_freq,
978            min_abstraction_members: cc.min_abstraction_members,
979            min_shared_relations: cc.min_shared_relations,
980            min_peer_coverage: cc.min_peer_coverage,
981            hypothesis_beta: cc.hypothesis_beta,
982            min_hypothesis_confidence: cc.min_hypothesis_confidence,
983            min_analogy_relations: cc.min_analogy_relations,
984        };
985        CognitionLoop::run_once(atom_mem, tri_store, &loop_config)
986    }
987
988    pub fn govern_memory(&self) -> GovernanceReport {
989        let (atom_mem, comp_mem, tri_store) = match (
990            &self.atom_memory,
991            &self.composite_memory,
992            &self.triple_store,
993        ) {
994            (Some(a), Some(c), Some(t)) => (a, c, t),
995            _ => return GovernanceReport::default(),
996        };
997        let cc = &self.config.cognition;
998        let gov_config = GovernorConfig {
999            duplicate_threshold: cc.governor_duplicate_threshold,
1000            max_scan_size: cc.governor_max_scan_size,
1001            forget_unreferenced_atoms: cc.governor_forget_unreferenced,
1002            refine_atoms: cc.refine_atoms,
1003            ..Default::default()
1004        };
1005        MemoryGovernor::govern(atom_mem, comp_mem, tri_store, &gov_config)
1006    }
1007
1008    pub fn cognition_enabled(&self) -> bool {
1009        self.config.cognition.enabled
1010    }
1011
1012    /// Returns true if the IVF index has been trained.
1013    pub fn ivf_trained(&self) -> bool {
1014        self.shards.read().ivf_trained()
1015    }
1016
1017    /// Train the IVF index on current vectors. Persists the index to disk.
1018    pub fn train_ivf(&self) -> Result<()> {
1019        let shards = self.shards.read();
1020        shards.try_for_each_shard(|shard| {
1021            let (ids, vectors) = shard.load_all_vectors();
1022            if ids.is_empty() {
1023                return Ok(());
1024            }
1025
1026            if let Some(ref mut existing) = *shard.ivf.write() {
1027                if let Some(ref lists) = existing.lists {
1028                    lists.clear_all()?;
1029                }
1030            }
1031
1032            let index = IVFIndex::train(&vectors, &ids, self.dimensions, &self.config.ivf)?;
1033            *shard.ivf.write() = Some(index);
1034            Ok(())
1035        })?;
1036
1037        if let ShardSet::Single(ref shard) = *shards {
1038            if let Some(ref ivf) = *shard.ivf.read() {
1039                self.save_ivf(ivf)?;
1040            }
1041        }
1042        Ok(())
1043    }
1044
1045    /// Returns true if the NSG graph index has been trained.
1046    pub fn nsg_trained(&self) -> bool {
1047        self.shards.read().nsg_trained()
1048    }
1049
1050    /// Train the NSG graph index on current vectors. Persists the index to disk.
1051    pub fn train_nsg(&self) -> Result<()> {
1052        let shards = self.shards.read();
1053        shards.try_for_each_shard(|shard| {
1054            let (ids, vectors) = shard.load_all_vectors();
1055            if ids.is_empty() {
1056                return Ok(());
1057            }
1058
1059            let index = super::nsg::training::train(&vectors, &ids, &self.config.nsg)?;
1060            *shard.nsg.write() = Some(index);
1061            Ok(())
1062        })?;
1063
1064        if let ShardSet::Single(ref shard) = *shards {
1065            if let Some(ref nsg) = *shard.nsg.read() {
1066                self.save_nsg(nsg)?;
1067            }
1068        }
1069        Ok(())
1070    }
1071
1072    fn arena_write(&self, data: &[u8]) -> Result<usize> {
1073        let payload = self.maybe_encrypt(data)?;
1074        self.arena.write_slice(&payload)
1075    }
1076
1077    fn arena_read_frame(&self, offset: usize) -> Result<(Vec<u8>, u32)> {
1078        let (data, version) = self.arena.read_frame(offset)?;
1079        let payload = self.maybe_decrypt(&data)?;
1080        Ok((payload, version))
1081    }
1082
1083    fn sign_fn(&self) -> Option<SignFn<'_>> {
1084        #[cfg(feature = "security")]
1085        {
1086            self.signing
1087                .as_ref()
1088                .map(|s| Box::new(move |data: &[u8]| s.sign(data)) as SignFn<'_>)
1089        }
1090        #[cfg(not(feature = "security"))]
1091        {
1092            None
1093        }
1094    }
1095
1096    /// Query the audit log for entries since `timestamp_ms`.
1097    /// Returns an empty vec if audit logging is disabled.
1098    pub fn audit_since(&self, timestamp_ms: u64) -> Result<Vec<super::audit::AuditEntry>> {
1099        match self.audit {
1100            Some(ref audit) => audit.entries_since(timestamp_ms),
1101            None => Ok(Vec::new()),
1102        }
1103    }
1104
1105    /// Decompose a product vector into factors from domain codebooks using diffusion.
1106    pub fn factorize_diffusion(
1107        &self,
1108        product: &EntangledHVec,
1109        domain_codebooks: &[Vec<EntangledHVec>],
1110        max_iter: usize,
1111    ) -> Vec<Option<EntangledHVec>> {
1112        DiffusionFactorizer::factorize(&self.config.diffusion, product, domain_codebooks, max_iter)
1113    }
1114}