1use anyhow::{Context, Result};
8use dashmap::DashMap;
9use std::collections::{HashMap, VecDeque};
10use std::sync::{Arc, OnceLock};
11use tracing::info;
12
13fn allcaps_regex() -> &'static regex::Regex {
15 static RE: OnceLock<regex::Regex> = OnceLock::new();
16 RE.get_or_init(|| regex::Regex::new(r"\b[A-Z]{2,}[A-Z0-9]*\b").unwrap())
17}
18
19fn issue_regex() -> &'static regex::Regex {
21 static RE: OnceLock<regex::Regex> = OnceLock::new();
22 RE.get_or_init(|| regex::Regex::new(r"\b([A-Z]{2,10}-\d+)\b").unwrap())
23}
24
25use crate::ab_testing;
26use crate::backup;
27use crate::config::ServerConfig;
28use crate::embeddings::{
29 are_ner_models_downloaded, download_ner_models, get_ner_models_dir, ner::NerEntityType,
30 KeywordExtractor, NerConfig, NeuralNer,
31};
32use crate::graph_memory::{
33 EdgeTier, EntityLabel, EntityNode, EpisodeSource, EpisodicNode, GraphMemory, GraphStats,
34 LtpStatus, RelationType, RelationshipEdge,
35};
36use crate::memory::{
37 query_parser, Experience, FeedbackStore, FileMemoryStore, MemoryConfig, MemoryId, MemoryStats,
38 MemorySystem, ProspectiveStore, SessionStore, TodoStore,
39};
40use crate::relevance::RelevanceEngine;
41use crate::streaming;
42
43use super::types::{AuditEvent, ContextStatus, MemoryEvent};
44
45pub type ContextSessions = DashMap<String, ContextStatus>;
47
48struct MultiUserMemoryManagerRotationHelper {
50 shared_db: Arc<rocksdb::DB>,
51 audit_logs: Arc<DashMap<String, Arc<parking_lot::RwLock<VecDeque<AuditEvent>>>>>,
52 audit_retention_days: i64,
53 audit_max_entries: usize,
54}
55
56const CF_AUDIT: &str = "audit";
57
58impl MultiUserMemoryManagerRotationHelper {
59 fn audit_cf(&self) -> &rocksdb::ColumnFamily {
60 self.shared_db
61 .cf_handle(CF_AUDIT)
62 .expect("audit CF must exist")
63 }
64
65 fn rotate_user_audit_logs(&self, user_id: &str) -> Result<usize> {
72 let cutoff_time = chrono::Utc::now() - chrono::Duration::days(self.audit_retention_days);
73 let cutoff_nanos = cutoff_time.timestamp_nanos_opt().unwrap_or_else(|| {
74 tracing::warn!("audit cutoff timestamp outside i64 nanos range, using 0");
75 0
76 });
77 let prefix = format!("{user_id}:");
78 let audit = self.audit_cf();
79
80 let mut total_count = 0usize;
82 let iter = self.shared_db.prefix_iterator_cf(audit, prefix.as_bytes());
83 for (key, _) in iter.flatten() {
84 if let Ok(key_str) = std::str::from_utf8(&key) {
85 if !key_str.starts_with(&prefix) {
86 break;
87 }
88 total_count += 1;
89 }
90 }
91
92 if total_count == 0 {
93 return Ok(0);
94 }
95
96 let excess_count = total_count.saturating_sub(self.audit_max_entries);
97
98 const BATCH_FLUSH_SIZE: usize = 10_000;
101 let mut batch = rocksdb::WriteBatch::default();
102 let mut removed_count = 0usize;
103 let mut position = 0usize;
104
105 let iter = self.shared_db.prefix_iterator_cf(audit, prefix.as_bytes());
106 for (key, _) in iter.flatten() {
107 let key_str = match std::str::from_utf8(&key) {
108 Ok(s) => s,
109 Err(_) => {
110 position += 1;
111 continue;
112 }
113 };
114 if !key_str.starts_with(&prefix) {
115 break;
116 }
117
118 let ts = key_str
119 .strip_prefix(&prefix)
120 .and_then(|s| s.parse::<i64>().ok())
121 .unwrap_or(0); if ts < cutoff_nanos || position < excess_count {
124 batch.delete_cf(audit, &key);
125 removed_count += 1;
126
127 if removed_count % BATCH_FLUSH_SIZE == 0 {
128 self.shared_db
129 .write(std::mem::take(&mut batch))
130 .map_err(|e| anyhow::anyhow!("Failed to write rotation batch: {e}"))?;
131 batch = rocksdb::WriteBatch::default();
132 }
133 }
134
135 position += 1;
136 }
137
138 if removed_count % BATCH_FLUSH_SIZE != 0 {
140 self.shared_db
141 .write(batch)
142 .map_err(|e| anyhow::anyhow!("Failed to write rotation batch: {e}"))?;
143 }
144
145 if removed_count > 0 {
147 if let Some(log) = self.audit_logs.get(user_id) {
148 let mut log_guard = log.write();
149
150 log_guard.retain(|event| {
151 let event_nanos = event.timestamp.timestamp_nanos_opt().unwrap_or(0);
152 event_nanos >= cutoff_nanos
153 });
154
155 while log_guard.len() > self.audit_max_entries {
156 log_guard.pop_front();
157 }
158 }
159 }
160
161 Ok(removed_count)
162 }
163}
164
165pub struct MultiUserMemoryManager {
167 pub user_memories: moka::sync::Cache<String, Arc<parking_lot::RwLock<MemorySystem>>>,
169
170 pub audit_logs: Arc<DashMap<String, Arc<parking_lot::RwLock<VecDeque<AuditEvent>>>>>,
172
173 pub shared_db: Arc<rocksdb::DB>,
175
176 pub base_path: std::path::PathBuf,
178
179 pub default_config: MemoryConfig,
181
182 pub audit_log_counter: Arc<std::sync::atomic::AtomicUsize>,
184
185 pub graph_memories: moka::sync::Cache<String, Arc<parking_lot::RwLock<GraphMemory>>>,
187
188 pub neural_ner: Arc<NeuralNer>,
190
191 pub keyword_extractor: Arc<KeywordExtractor>,
193
194 pub user_evictions: Arc<std::sync::atomic::AtomicUsize>,
196
197 pub server_config: ServerConfig,
199
200 pub event_broadcaster: tokio::sync::broadcast::Sender<MemoryEvent>,
202
203 pub streaming_extractor: Arc<streaming::StreamingMemoryExtractor>,
205
206 pub prospective_store: Arc<ProspectiveStore>,
208
209 pub todo_store: Arc<TodoStore>,
211
212 pub file_store: Arc<FileMemoryStore>,
214
215 pub feedback_store: Arc<parking_lot::RwLock<FeedbackStore>>,
217
218 pub backup_engine: Arc<backup::ShodhBackupEngine>,
220
221 pub context_sessions: Arc<ContextSessions>,
223
224 pub context_broadcaster: tokio::sync::broadcast::Sender<ContextStatus>,
226
227 pub ab_test_manager: Arc<ab_testing::ABTestManager>,
229
230 pub session_store: Arc<SessionStore>,
232
233 pub relevance_engine: Arc<RelevanceEngine>,
235
236 maintenance_cycle: std::sync::atomic::AtomicU64,
240
241 user_memory_init_locks: DashMap<String, Arc<parking_lot::Mutex<()>>>,
246
247 user_graph_init_locks: DashMap<String, Arc<parking_lot::Mutex<()>>>,
252
253 shared_rocksdb_cache: rocksdb::Cache,
258}
259
260impl MultiUserMemoryManager {
261 pub fn new(base_path: std::path::PathBuf, server_config: ServerConfig) -> Result<Self> {
262 std::fs::create_dir_all(&base_path)?;
263
264 let (event_broadcaster, _) = tokio::sync::broadcast::channel(1024);
265
266 let ner_dir = get_ner_models_dir();
267 tracing::debug!("Checking for NER models at {:?}", ner_dir);
268 let neural_ner = if are_ner_models_downloaded() {
269 tracing::debug!("NER models found, using existing files");
270 let config = NerConfig {
271 model_path: ner_dir.join("model.onnx"),
272 tokenizer_path: ner_dir.join("tokenizer.json"),
273 max_length: 128,
274 confidence_threshold: 0.5,
275 };
276 match NeuralNer::new(config) {
277 Ok(ner) => {
278 info!("Neural NER initialized (TinyBERT model at {:?})", ner_dir);
279 Arc::new(ner)
280 }
281 Err(e) => {
282 tracing::warn!("Failed to initialize neural NER: {}. Using fallback.", e);
283 Arc::new(NeuralNer::new_fallback(NerConfig::default()))
284 }
285 }
286 } else {
287 tracing::debug!("NER models not found at {:?}, will download", ner_dir);
288 info!("Downloading NER models (TinyBERT-NER, ~15MB)...");
289 match download_ner_models(Some(std::sync::Arc::new(|downloaded, total| {
290 if total > 0 {
291 let percent = (downloaded as f64 / total as f64 * 100.0) as u32;
292 if percent % 20 == 0 {
293 tracing::info!("NER model download: {}%", percent);
294 }
295 }
296 }))) {
297 Ok(ner_dir) => {
298 info!("NER models downloaded to {:?}", ner_dir);
299 let config = NerConfig {
300 model_path: ner_dir.join("model.onnx"),
301 tokenizer_path: ner_dir.join("tokenizer.json"),
302 max_length: 128,
303 confidence_threshold: 0.5,
304 };
305 match NeuralNer::new(config) {
306 Ok(ner) => {
307 info!("Neural NER initialized after download");
308 Arc::new(ner)
309 }
310 Err(e) => {
311 tracing::warn!(
312 "Failed to initialize downloaded NER: {}. Using fallback.",
313 e
314 );
315 Arc::new(NeuralNer::new_fallback(NerConfig::default()))
316 }
317 }
318 }
319 Err(e) => {
320 tracing::warn!(
321 "Failed to download NER models: {}. Using rule-based fallback.",
322 e
323 );
324 Arc::new(NeuralNer::new_fallback(NerConfig::default()))
325 }
326 }
327 };
328
329 let user_evictions = Arc::new(std::sync::atomic::AtomicUsize::new(0));
330 let evictions_clone = user_evictions.clone();
331 let max_cache = server_config.max_users_in_memory;
332 let eviction_base_path = base_path.clone();
333
334 let user_memories = moka::sync::Cache::builder()
335 .max_capacity(server_config.max_users_in_memory as u64)
336 .time_to_idle(std::time::Duration::from_secs(1800))
337 .eviction_listener(move |key: Arc<String>, value: Arc<parking_lot::RwLock<MemorySystem>>, cause| {
338 if matches!(cause, moka::notification::RemovalCause::Size | moka::notification::RemovalCause::Expired) {
339 evictions_clone.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
340
341 let cause_label = if cause == moka::notification::RemovalCause::Expired { "idle-timeout" } else { "LRU" };
342
343 let index_path = eviction_base_path.join(key.as_str()).join("vector_index");
347 let user_key = key.clone();
348 std::thread::spawn(move || {
349 if let Some(guard) = value.try_read() {
350 match guard.save_vector_index(&index_path) {
351 Ok(()) => {
352 info!(
353 "Evicted user '{}' from memory cache ({}, cache_size={}) - vector index saved",
354 user_key, cause_label, max_cache
355 );
356 }
357 Err(e) => {
358 tracing::warn!(
359 "Evicted user '{}' from memory cache ({}) - failed to save vector index: {}",
360 user_key, cause_label, e
361 );
362 }
363 }
364 } else {
365 tracing::warn!(
366 "Evicted user '{}' from memory cache ({}) - could not acquire lock to save index",
367 user_key, cause_label
368 );
369 }
370 });
371 }
372 })
373 .build();
374
375 let graph_memories = moka::sync::Cache::builder()
376 .max_capacity(server_config.max_users_in_memory as u64)
377 .time_to_idle(std::time::Duration::from_secs(1800))
378 .eviction_listener(move |key: Arc<String>, _value, cause| {
379 let cause_label = if cause == moka::notification::RemovalCause::Expired {
380 "idle-timeout"
381 } else {
382 "LRU"
383 };
384 info!(
385 "Evicted graph for user '{}' from memory cache ({})",
386 key, cause_label
387 );
388 })
389 .build();
390
391 let shared_rocksdb_cache =
397 rocksdb::Cache::new_lru_cache(crate::constants::ROCKSDB_SHARED_CACHE_BYTES);
398 info!(
399 "Shared RocksDB block cache initialized ({}MB)",
400 crate::constants::ROCKSDB_SHARED_CACHE_BYTES / (1024 * 1024)
401 );
402
403 let shared_db = {
406 use rocksdb::{BlockBasedOptions, ColumnFamilyDescriptor, Options as RocksOptions};
407 let shared_db_path = base_path.join("shared");
408 std::fs::create_dir_all(&shared_db_path)?;
409
410 let mut db_opts = RocksOptions::default();
411 db_opts.create_if_missing(true);
412 db_opts.create_missing_column_families(true);
413 db_opts.set_compression_type(rocksdb::DBCompressionType::Lz4);
414 db_opts.set_max_write_buffer_number(2);
415 db_opts.set_write_buffer_size(8 * 1024 * 1024); let mut block_opts = BlockBasedOptions::default();
419 block_opts.set_block_cache(&shared_rocksdb_cache);
420 block_opts.set_cache_index_and_filter_blocks(true);
421 db_opts.set_block_based_table_factory(&block_opts);
422
423 let mut cfs = vec![ColumnFamilyDescriptor::new("default", {
425 let mut o = RocksOptions::default();
426 o.create_if_missing(true);
427 o
428 })];
429 cfs.extend(TodoStore::cf_descriptors());
430 cfs.extend(ProspectiveStore::column_family_descriptors());
431 cfs.extend(FileMemoryStore::cf_descriptors());
432 cfs.push(ColumnFamilyDescriptor::new(
434 crate::memory::feedback::CF_FEEDBACK,
435 {
436 let mut o = RocksOptions::default();
437 o.create_if_missing(true);
438 o.set_compression_type(rocksdb::DBCompressionType::Lz4);
439 o
440 },
441 ));
442 cfs.push(ColumnFamilyDescriptor::new("audit", {
444 let mut o = RocksOptions::default();
445 o.create_if_missing(true);
446 o.set_compression_type(rocksdb::DBCompressionType::Lz4);
447 o
448 }));
449
450 Arc::new(
451 rocksdb::DB::open_cf_descriptors(&db_opts, &shared_db_path, cfs)
452 .context("Failed to open shared DB with column families")?,
453 )
454 };
455
456 Self::migrate_audit_db(&base_path, &shared_db)?;
458
459 let prospective_store = Arc::new(ProspectiveStore::new(shared_db.clone(), &base_path)?);
460 info!("Prospective memory store initialized");
461
462 let todo_store = Arc::new(TodoStore::new(shared_db.clone(), &base_path)?);
463 if let Err(e) = todo_store.load_vector_indices() {
464 tracing::warn!("Failed to load todo vector indices: {}, semantic todo search will rebuild on first use", e);
465 }
466 info!("Todo store initialized");
467
468 let file_store = Arc::new(FileMemoryStore::new(shared_db.clone(), &base_path)?);
469 info!("File memory store initialized");
470
471 let feedback_store = Arc::new(parking_lot::RwLock::new(
472 FeedbackStore::with_shared_db(shared_db.clone(), &base_path).unwrap_or_else(|e| {
473 tracing::warn!("Failed to load feedback store: {}, using in-memory", e);
474 FeedbackStore::new()
475 }),
476 ));
477 info!("Feedback store initialized");
478
479 let streaming_extractor =
482 Arc::new(streaming::StreamingMemoryExtractor::new(neural_ner.clone()));
483 info!("Streaming memory extractor initialized");
484
485 let keyword_extractor = Arc::new(KeywordExtractor::new());
486 info!("Keyword extractor initialized (YAKE)");
487
488 let relevance_engine = Arc::new(RelevanceEngine::new(neural_ner.clone()));
489 info!("Relevance engine initialized (entity cache + learned weights)");
490
491 let backup_path = base_path.join("backups");
492 let backup_engine = Arc::new(backup::ShodhBackupEngine::new(backup_path)?);
493 if server_config.backup_enabled {
494 info!(
495 "Backup engine initialized (interval: {}h, keep: {})",
496 server_config.backup_interval_secs / 3600,
497 server_config.backup_max_count
498 );
499 } else {
500 info!("Backup engine initialized (auto-backup disabled)");
501 }
502
503 let broadcast_capacity = (server_config.max_users_in_memory * 4).max(64);
504
505 let manager = Self {
506 user_memories,
507 audit_logs: Arc::new(DashMap::new()),
508 shared_db,
509 base_path,
510 default_config: MemoryConfig::default(),
511 audit_log_counter: Arc::new(std::sync::atomic::AtomicUsize::new(0)),
512 graph_memories,
513 neural_ner,
514 keyword_extractor,
515 user_evictions,
516 server_config,
517 event_broadcaster,
518 streaming_extractor,
519 prospective_store,
520 todo_store,
521 file_store,
522 feedback_store,
523 backup_engine,
524 context_sessions: Arc::new(DashMap::new()),
525 context_broadcaster: {
526 let (tx, _) = tokio::sync::broadcast::channel(broadcast_capacity);
527 tx
528 },
529 ab_test_manager: Arc::new(ab_testing::ABTestManager::new()),
530 session_store: Arc::new(SessionStore::new()),
531 relevance_engine,
532 maintenance_cycle: std::sync::atomic::AtomicU64::new(0),
533 user_memory_init_locks: DashMap::new(),
534 user_graph_init_locks: DashMap::new(),
535 shared_rocksdb_cache,
536 };
537
538 info!("Running initial audit log rotation...");
539 if let Err(e) = manager.rotate_all_audit_logs() {
540 tracing::warn!("Failed to rotate audit logs on startup: {}", e);
541 }
542
543 Ok(manager)
544 }
545
546 fn audit_cf(&self) -> &rocksdb::ColumnFamily {
548 self.shared_db
549 .cf_handle(CF_AUDIT)
550 .expect("audit CF must exist in shared DB")
551 }
552
553 fn migrate_audit_db(base_path: &std::path::Path, shared_db: &rocksdb::DB) -> Result<()> {
556 let old_dir = base_path.join("audit_logs");
557 if !old_dir.exists() {
558 return Ok(());
559 }
560
561 let audit_cf = shared_db
562 .cf_handle(CF_AUDIT)
563 .expect("audit CF must exist in shared DB");
564
565 let mut has_data = false;
567 let mut iter = shared_db.raw_iterator_cf(audit_cf);
568 iter.seek_to_first();
569 if iter.valid() {
570 has_data = true;
571 }
572 if has_data {
573 tracing::info!(
574 "Audit CF already has data, skipping migration from {:?}",
575 old_dir
576 );
577 return Ok(());
578 }
579
580 tracing::info!("Migrating audit_logs from standalone DB to shared DB audit CF...");
581
582 let old_opts = rocksdb::Options::default();
583 let old_db = rocksdb::DB::open_for_read_only(&old_opts, &old_dir, false)
584 .context("Failed to open old audit_logs DB for migration")?;
585
586 let mut batch = rocksdb::WriteBatch::default();
587 let mut count = 0usize;
588 const BATCH_SIZE: usize = 10_000;
589
590 let iter = old_db.iterator(rocksdb::IteratorMode::Start);
591 for item in iter {
592 let (key, value) =
593 item.map_err(|e| anyhow::anyhow!("audit migration iter error: {e}"))?;
594 batch.put_cf(audit_cf, &key, &value);
595 count += 1;
596
597 if count % BATCH_SIZE == 0 {
598 shared_db
599 .write(std::mem::take(&mut batch))
600 .map_err(|e| anyhow::anyhow!("audit migration batch write error: {e}"))?;
601 batch = rocksdb::WriteBatch::default();
602 }
603 }
604
605 if count % BATCH_SIZE != 0 {
606 shared_db
607 .write(batch)
608 .map_err(|e| anyhow::anyhow!("audit migration final batch error: {e}"))?;
609 }
610
611 drop(old_db);
612
613 let renamed = old_dir.with_file_name("audit_logs.pre_cf_migration");
614 if renamed.exists() {
615 let _ = std::fs::remove_dir_all(&renamed);
616 }
617 std::fs::rename(&old_dir, &renamed)
618 .context("Failed to rename old audit_logs dir after migration")?;
619
620 tracing::info!(
621 "Migrated {} audit entries from standalone DB to shared CF, old dir renamed to {:?}",
622 count,
623 renamed
624 );
625
626 Ok(())
627 }
628
629 pub fn log_event(&self, user_id: &str, event_type: &str, memory_id: &str, details: &str) {
631 let event = AuditEvent {
632 timestamp: chrono::Utc::now(),
633 event_type: event_type.to_string(),
634 memory_id: memory_id.to_string(),
635 details: details.to_string(),
636 };
637
638 let key = format!(
639 "{}:{:020}",
640 user_id,
641 event.timestamp.timestamp_nanos_opt().unwrap_or_else(|| {
642 tracing::warn!("audit event timestamp outside i64 nanos range, using 0");
643 0
644 })
645 );
646 if let Ok(serialized) = bincode::serde::encode_to_vec(&event, bincode::config::standard()) {
647 let db = self.shared_db.clone();
648 let key_bytes = key.into_bytes();
649
650 tokio::task::spawn_blocking(move || {
651 let audit = db.cf_handle(CF_AUDIT).expect("audit CF must exist");
652 if let Err(e) = db.put_cf(&audit, &key_bytes, &serialized) {
653 tracing::error!("Failed to persist audit log: {}", e);
654 }
655 });
656 }
657
658 let max_entries = self.server_config.audit_max_entries_per_user;
659 let log = self
660 .audit_logs
661 .entry(user_id.to_string())
662 .or_insert_with(|| Arc::new(parking_lot::RwLock::new(VecDeque::new())))
663 .clone();
664 {
665 let mut entries = log.write();
666 entries.push_back(event);
667 while entries.len() > max_entries {
668 entries.pop_front();
669 }
670 }
671
672 let count = self
673 .audit_log_counter
674 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
675
676 if count % self.server_config.audit_rotation_check_interval == 0 && count > 0 {
677 let shared_db = self.shared_db.clone();
678 let audit_logs = self.audit_logs.clone();
679 let user_id_clone = user_id.to_string();
680
681 let audit_retention_days = self.server_config.audit_retention_days as i64;
682 let audit_max_entries = self.server_config.audit_max_entries_per_user;
683
684 tokio::task::spawn_blocking(move || {
685 let manager = MultiUserMemoryManagerRotationHelper {
686 shared_db,
687 audit_logs,
688 audit_retention_days,
689 audit_max_entries,
690 };
691 if let Err(e) = manager.rotate_user_audit_logs(&user_id_clone) {
692 tracing::debug!("Audit log rotation check for user {}: {}", user_id_clone, e);
693 }
694 });
695 }
696 }
697
698 pub fn emit_event(&self, event: MemoryEvent) {
700 let _ = self.event_broadcaster.send(event);
701 }
702
703 pub fn subscribe_events(&self) -> tokio::sync::broadcast::Receiver<MemoryEvent> {
705 self.event_broadcaster.subscribe()
706 }
707
708 pub fn get_history(&self, user_id: &str, memory_id: Option<&str>) -> Vec<AuditEvent> {
710 if let Some(log) = self.audit_logs.get(user_id) {
711 let events = log.read();
712 if !events.is_empty() {
713 return if let Some(mid) = memory_id {
714 events
715 .iter()
716 .filter(|e| e.memory_id == mid)
717 .cloned()
718 .collect()
719 } else {
720 events.iter().cloned().collect()
721 };
722 }
723 }
724
725 let mut events = Vec::new();
726 let prefix = format!("{user_id}:");
727
728 let audit = self.audit_cf();
729 let iter = self.shared_db.prefix_iterator_cf(audit, prefix.as_bytes());
730 for (key, value) in iter.flatten() {
731 if let Ok(key_str) = std::str::from_utf8(&key) {
732 if !key_str.starts_with(&prefix) {
733 break;
734 }
735
736 if let Ok((event, _)) = bincode::serde::decode_from_slice::<AuditEvent, _>(
737 &value,
738 bincode::config::standard(),
739 ) {
740 events.push(event);
741 }
742 }
743 }
744
745 if !events.is_empty() {
746 self.audit_logs
747 .entry(user_id.to_string())
748 .or_insert_with(|| {
749 Arc::new(parking_lot::RwLock::new(VecDeque::from(events.clone())))
750 });
751 }
752
753 if let Some(mid) = memory_id {
754 events.into_iter().filter(|e| e.memory_id == mid).collect()
755 } else {
756 events
757 }
758 }
759
760 pub fn get_user_memory(&self, user_id: &str) -> Result<Arc<parking_lot::RwLock<MemorySystem>>> {
766 if let Some(memory) = self.user_memories.get(user_id) {
768 return Ok(memory);
769 }
770
771 let lock = self
773 .user_memory_init_locks
774 .entry(user_id.to_string())
775 .or_insert_with(|| Arc::new(parking_lot::Mutex::new(())))
776 .clone();
777 let _guard = lock.lock();
778
779 if let Some(memory) = self.user_memories.get(user_id) {
781 return Ok(memory);
782 }
783
784 let user_path = self.base_path.join(user_id);
785 let config = MemoryConfig {
786 storage_path: user_path,
787 ..self.default_config.clone()
788 };
789
790 let mut memory_system = MemorySystem::new(config, Some(&self.shared_rocksdb_cache))
791 .with_context(|| format!("Failed to initialize memory system for user '{user_id}'"))?;
792 let graph = self.get_user_graph(user_id)?;
794 memory_system.set_graph_memory(graph);
795 memory_system.set_feedback_store(self.feedback_store.clone());
797
798 let memory_arc = Arc::new(parking_lot::RwLock::new(memory_system));
799
800 self.user_memories
801 .insert(user_id.to_string(), memory_arc.clone());
802
803 info!("Created memory system for user: {}", user_id);
804
805 Ok(memory_arc)
806 }
807
808 pub fn evict_user(&self, user_id: &str) {
811 self.user_memories.invalidate(user_id);
812 self.graph_memories.invalidate(user_id);
813 self.user_memories.run_pending_tasks();
814 self.graph_memories.run_pending_tasks();
815
816 #[cfg(target_os = "windows")]
817 {
818 std::thread::sleep(std::time::Duration::from_millis(200));
820 self.user_memories.run_pending_tasks();
821 self.graph_memories.run_pending_tasks();
822 }
823
824 tracing::info!(user_id = user_id, "Evicted user caches for restore");
825 }
826
827 pub fn forget_user(&self, user_id: &str) -> Result<()> {
834 self.user_memories.invalidate(user_id);
835 self.graph_memories.invalidate(user_id);
836
837 self.user_memories.run_pending_tasks();
838 self.graph_memories.run_pending_tasks();
839
840 #[cfg(target_os = "windows")]
841 {
842 std::thread::sleep(std::time::Duration::from_millis(200));
843 self.user_memories.run_pending_tasks();
844 self.graph_memories.run_pending_tasks();
845 }
846
847 self.purge_user_from_shared_db(user_id)?;
849
850 self.todo_store.purge_user_vectors(user_id);
852
853 {
855 let mut fb = self.feedback_store.write();
856 fb.take_pending(user_id);
857 }
858
859 let user_path = self.base_path.join(user_id);
861 if user_path.exists() {
862 let mut attempts = 0;
863 let max_attempts = 10;
864 while attempts < max_attempts {
865 match std::fs::remove_dir_all(&user_path) {
866 Ok(_) => break,
867 Err(e) if attempts < max_attempts - 1 => {
868 let delay = 100 * (1 << attempts.min(4));
869 tracing::debug!(
870 "Delete retry {} for {} (waiting {}ms): {}",
871 attempts + 1,
872 user_id,
873 delay,
874 e
875 );
876 std::thread::sleep(std::time::Duration::from_millis(delay));
877 attempts += 1;
878 }
879 Err(e) => {
880 return Err(anyhow::anyhow!(
881 "Failed to delete user data after {max_attempts} retries: {e}"
882 ))
883 }
884 }
885 }
886 }
887
888 info!("Deleted all data for user: {}", user_id);
889 Ok(())
890 }
891
892 fn delete_by_prefix(db: &rocksdb::DB, cf: &rocksdb::ColumnFamily, prefix: &[u8]) -> usize {
894 let mut batch = rocksdb::WriteBatch::default();
895 let mut count = 0;
896 let iter = db.prefix_iterator_cf(cf, prefix);
897 for item in iter.flatten() {
898 let (key, _) = item;
899 if !key.starts_with(prefix) {
900 break;
901 }
902 batch.delete_cf(cf, &key);
903 count += 1;
904 }
905 if count > 0 {
906 let _ = db.write(batch);
907 }
908 count
909 }
910
911 fn purge_user_from_shared_db(&self, user_id: &str) -> Result<()> {
913 let prefix = format!("{user_id}:");
914 let prefix_bytes = prefix.as_bytes();
915
916 let cf_names = ["todos", "projects", "prospective"];
918 for name in &cf_names {
919 if let Some(cf) = self.shared_db.cf_handle(name) {
920 let n = Self::delete_by_prefix(&self.shared_db, cf, prefix_bytes);
921 if n > 0 {
922 tracing::debug!("GDPR: purged {n} entries from {name} CF for {user_id}");
923 }
924 }
925 }
926
927 if let Some(cf) = self.shared_db.cf_handle("todo_index") {
929 let prefixes = [
930 format!("user:{user_id}:"),
931 format!("status:Backlog:{user_id}:"),
932 format!("status:Todo:{user_id}:"),
933 format!("status:InProgress:{user_id}:"),
934 format!("status:Blocked:{user_id}:"),
935 format!("status:Done:{user_id}:"),
936 format!("status:Cancelled:{user_id}:"),
937 format!("vector_id:{user_id}:"),
938 format!("todo_vector:{user_id}:"),
939 ];
940 for p in &prefixes {
941 Self::delete_by_prefix(&self.shared_db, cf, p.as_bytes());
942 }
943 let mut batch = rocksdb::WriteBatch::default();
946 let iter = self.shared_db.iterator_cf(cf, rocksdb::IteratorMode::Start);
947 for item in iter.flatten() {
948 let (key, _) = item;
949 if let Ok(key_str) = std::str::from_utf8(&key) {
950 if key_str.contains(&prefix) {
951 batch.delete_cf(cf, &key);
952 }
953 }
954 }
955 let _ = self.shared_db.write(batch);
956 }
957
958 if let Some(cf) = self.shared_db.cf_handle("prospective_index") {
959 let prefixes = [
960 format!("user:{user_id}:"),
961 format!("status:Pending:{user_id}:"),
962 format!("status:Triggered:{user_id}:"),
963 format!("status:Dismissed:{user_id}:"),
964 ];
965 for p in &prefixes {
966 Self::delete_by_prefix(&self.shared_db, cf, p.as_bytes());
967 }
968 let mut batch = rocksdb::WriteBatch::default();
970 let iter = self.shared_db.iterator_cf(cf, rocksdb::IteratorMode::Start);
971 for item in iter.flatten() {
972 let (key, _) = item;
973 if let Ok(key_str) = std::str::from_utf8(&key) {
974 if key_str.contains(&prefix) {
975 batch.delete_cf(cf, &key);
976 }
977 }
978 }
979 let _ = self.shared_db.write(batch);
980 }
981
982 if let Some(cf) = self.shared_db.cf_handle("files") {
984 Self::delete_by_prefix(&self.shared_db, cf, prefix_bytes);
985 }
986 if let Some(cf) = self.shared_db.cf_handle("file_index") {
987 let idx_prefix = format!("file_idx:{user_id}:");
988 Self::delete_by_prefix(&self.shared_db, cf, idx_prefix.as_bytes());
989 let mut batch = rocksdb::WriteBatch::default();
991 let iter = self.shared_db.iterator_cf(cf, rocksdb::IteratorMode::Start);
992 for item in iter.flatten() {
993 let (key, _) = item;
994 if let Ok(key_str) = std::str::from_utf8(&key) {
995 if key_str.contains(&prefix) {
996 batch.delete_cf(cf, &key);
997 }
998 }
999 }
1000 let _ = self.shared_db.write(batch);
1001 }
1002
1003 if let Some(cf) = self.shared_db.cf_handle("feedback") {
1005 let pending_key = format!("pending:{user_id}");
1006 let _ = self.shared_db.delete_cf(cf, pending_key.as_bytes());
1007 }
1008
1009 if let Some(cf) = self.shared_db.cf_handle("audit") {
1011 Self::delete_by_prefix(&self.shared_db, cf, prefix_bytes);
1012 }
1013
1014 self.audit_logs.remove(user_id);
1016
1017 Ok(())
1018 }
1019
1020 pub fn get_stats(&self, user_id: &str) -> Result<MemoryStats> {
1022 let memory = self.get_user_memory(user_id)?;
1023 let memory_guard = memory.read();
1024 let mut stats = memory_guard.stats();
1025
1026 if let Ok(graph) = self.get_user_graph(user_id) {
1027 let graph_guard = graph.read();
1028 if let Ok(graph_stats) = graph_guard.get_stats() {
1029 stats.graph_nodes = graph_stats.entity_count;
1030 stats.graph_edges = graph_stats.relationship_count;
1031 }
1032 }
1033
1034 Ok(stats)
1035 }
1036
1037 pub fn list_users(&self) -> Vec<String> {
1039 let mut users = Vec::new();
1040 if let Ok(entries) = std::fs::read_dir(&self.base_path) {
1041 for entry in entries.flatten() {
1042 if let Ok(file_type) = entry.file_type() {
1043 if file_type.is_dir() {
1044 if let Some(name) = entry.file_name().to_str() {
1045 if name != "audit_logs"
1047 && name != "audit_logs.pre_cf_migration"
1048 && name != "backups"
1049 && name != "feedback"
1050 && name != "feedback.pre_cf_migration"
1051 && name != "semantic_facts"
1052 && name != "files"
1053 && name != "files.pre_cf_migration"
1054 && name != "prospective"
1055 && name != "prospective.pre_cf_migration"
1056 && name != "todos"
1057 && name != "todos.pre_cf_migration"
1058 && name != "shared"
1059 {
1060 users.push(name.to_string());
1061 }
1062 }
1063 }
1064 }
1065 }
1066 }
1067 users.sort();
1068 users
1069 }
1070
1071 pub fn list_cached_users(&self) -> Vec<String> {
1073 self.user_memories
1074 .iter()
1075 .map(|(id, _)| id.to_string())
1076 .collect()
1077 }
1078
1079 pub fn get_audit_logs(&self, user_id: &str, limit: usize) -> Vec<AuditEvent> {
1081 let mut events: Vec<AuditEvent> = Vec::new();
1082 let prefix = format!("{user_id}:");
1083 let audit = self.audit_cf();
1084 let iter = self.shared_db.prefix_iterator_cf(audit, prefix.as_bytes());
1085 for (key, value) in iter.flatten() {
1086 if let Ok(key_str) = std::str::from_utf8(&key) {
1087 if !key_str.starts_with(&prefix) {
1088 break;
1089 }
1090 if let Ok((event, _)) = bincode::serde::decode_from_slice::<AuditEvent, _>(
1091 &value,
1092 bincode::config::standard(),
1093 ) {
1094 events.push(event);
1095 }
1096 }
1097 }
1098 events.reverse();
1099 events.truncate(limit);
1100 events
1101 }
1102
1103 pub fn flush_all_databases(&self) -> Result<()> {
1105 info!("Flushing all databases to disk...");
1106
1107 self.shared_db
1109 .flush()
1110 .map_err(|e| anyhow::anyhow!("Failed to flush shared database: {e}"))?;
1111 info!(" Shared database flushed (todos, prospective, files, feedback, audit)");
1112
1113 let user_entries: Vec<(String, Arc<parking_lot::RwLock<MemorySystem>>)> = self
1114 .user_memories
1115 .iter()
1116 .map(|(k, v)| (k.to_string(), v.clone()))
1117 .collect();
1118
1119 let mut flushed = 0;
1120 for (user_id, memory_system) in user_entries {
1121 if let Some(guard) = memory_system.try_read() {
1122 if let Err(e) = guard.flush_storage() {
1123 tracing::warn!(" Failed to flush database for user {}: {}", user_id, e);
1124 } else {
1125 flushed += 1;
1126 }
1127 } else {
1128 tracing::warn!(" Could not acquire lock for user: {}", user_id);
1129 }
1130 }
1131
1132 info!(
1133 "All databases flushed: shared (5 stores), {} user memories",
1134 flushed
1135 );
1136
1137 Ok(())
1138 }
1139
1140 pub fn save_all_vector_indices(&self) -> Result<()> {
1142 info!("Saving vector indices to disk...");
1143
1144 let user_entries: Vec<(String, Arc<parking_lot::RwLock<MemorySystem>>)> = self
1145 .user_memories
1146 .iter()
1147 .map(|(k, v)| (k.to_string(), v.clone()))
1148 .collect();
1149
1150 let mut saved = 0;
1151 for (user_id, memory_system) in user_entries {
1152 if let Some(guard) = memory_system.try_read() {
1153 let index_path = self.base_path.join(&user_id).join("vector_index");
1154 if let Err(e) = guard.save_vector_index(&index_path) {
1155 tracing::warn!(" Failed to save vector index for user {}: {}", user_id, e);
1156 } else {
1157 info!(" Saved vector index for user: {}", user_id);
1158 saved += 1;
1159 }
1160 } else {
1161 tracing::warn!(" Could not acquire lock for user: {}", user_id);
1162 }
1163 }
1164
1165 info!("Saved {} vector indices", saved);
1166 Ok(())
1167 }
1168
1169 fn rotate_all_audit_logs(&self) -> Result<()> {
1171 let mut total_removed = 0;
1172
1173 let mut user_ids = std::collections::HashSet::new();
1174 let audit = self.audit_cf();
1175 let iter = self
1176 .shared_db
1177 .iterator_cf(audit, rocksdb::IteratorMode::Start);
1178
1179 for (key, _) in iter.flatten() {
1180 if let Ok(key_str) = std::str::from_utf8(&key) {
1181 if let Some(user_id) = key_str.split(':').next() {
1182 user_ids.insert(user_id.to_string());
1183 }
1184 }
1185 }
1186
1187 let helper = MultiUserMemoryManagerRotationHelper {
1188 shared_db: self.shared_db.clone(),
1189 audit_logs: self.audit_logs.clone(),
1190 audit_retention_days: self.server_config.audit_retention_days as i64,
1191 audit_max_entries: self.server_config.audit_max_entries_per_user,
1192 };
1193
1194 for user_id in user_ids {
1195 match helper.rotate_user_audit_logs(&user_id) {
1196 Ok(removed) => {
1197 if removed > 0 {
1198 info!(
1199 " Rotated audit logs for user {}: removed {} old entries",
1200 user_id, removed
1201 );
1202 total_removed += removed;
1203 }
1204 }
1205 Err(e) => {
1206 tracing::warn!(" Failed to rotate audit logs for user {}: {}", user_id, e);
1207 }
1208 }
1209 }
1210
1211 if total_removed > 0 {
1212 info!(
1213 "Audit log rotation complete: removed {} total entries",
1214 total_removed
1215 );
1216 }
1217
1218 Ok(())
1219 }
1220
1221 pub fn get_neural_ner(&self) -> Arc<NeuralNer> {
1223 self.neural_ner.clone()
1224 }
1225
1226 pub fn get_keyword_extractor(&self) -> Arc<KeywordExtractor> {
1228 self.keyword_extractor.clone()
1229 }
1230
1231 pub fn get_user_graph(&self, user_id: &str) -> Result<Arc<parking_lot::RwLock<GraphMemory>>> {
1236 if let Some(graph) = self.graph_memories.get(user_id) {
1238 return Ok(graph);
1239 }
1240
1241 let lock = self
1244 .user_graph_init_locks
1245 .entry(user_id.to_string())
1246 .or_insert_with(|| Arc::new(parking_lot::Mutex::new(())))
1247 .clone();
1248 let _guard = lock.lock();
1249
1250 if let Some(graph) = self.graph_memories.get(user_id) {
1252 return Ok(graph);
1253 }
1254
1255 let graph_path = self.base_path.join(user_id).join("graph");
1256 let graph_memory = GraphMemory::new(&graph_path, Some(&self.shared_rocksdb_cache))?;
1257 let graph_arc = Arc::new(parking_lot::RwLock::new(graph_memory));
1258
1259 self.graph_memories
1260 .insert(user_id.to_string(), graph_arc.clone());
1261
1262 info!("Created graph memory for user: {}", user_id);
1263
1264 Ok(graph_arc)
1265 }
1266
1267 pub fn get_user_graph_stats(&self, user_id: &str) -> Result<GraphStats> {
1269 let graph = self.get_user_graph(user_id)?;
1270 let graph_guard = graph.read();
1271 graph_guard.get_stats()
1272 }
1273
1274 pub fn run_maintenance_all_users(&self) -> usize {
1276 let cycle = self
1277 .maintenance_cycle
1278 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1279
1280 let is_heavy = cycle % 6 == 0;
1284
1285 if is_heavy {
1286 tracing::info!(
1287 "Maintenance cycle {} (HEAVY — graph decay + fact extraction + flush)",
1288 cycle
1289 );
1290 } else {
1291 tracing::debug!("Maintenance cycle {} (light — in-memory only)", cycle);
1292 }
1293
1294 let decay_factor = self.server_config.activation_decay_factor;
1295 let mut total_processed = 0;
1296
1297 let user_ids: Vec<String> = self
1298 .user_memories
1299 .iter()
1300 .map(|(id, _)| id.to_string())
1301 .collect();
1302
1303 let user_count = user_ids.len();
1304 let mut edges_decayed = 0;
1305 let mut edges_strengthened = 0;
1306 let mut entity_edges_strengthened = 0;
1307 let mut total_facts_extracted = 0;
1308 let mut total_facts_reinforced = 0;
1309
1310 for user_id in user_ids {
1311 let maintenance_result = if let Ok(memory_lock) = self.get_user_memory(&user_id) {
1312 let memory = memory_lock.read();
1313 match memory.run_maintenance(decay_factor, &user_id, is_heavy) {
1314 Ok(result) => {
1315 total_processed += result.decayed_count;
1316 total_facts_extracted += result.facts_extracted;
1317 total_facts_reinforced += result.facts_reinforced;
1318 Some(result)
1319 }
1320 Err(e) => {
1321 tracing::warn!("Maintenance failed for user {}: {}", user_id, e);
1322 None
1323 }
1324 }
1325 } else {
1326 None
1327 };
1328
1329 if let Some(ref result) = maintenance_result {
1331 if !result.edge_boosts.is_empty() {
1332 if let Ok(graph) = self.get_user_graph(&user_id) {
1333 let graph_guard = graph.read();
1334 match graph_guard.strengthen_memory_edges(&result.edge_boosts) {
1335 Ok((count, promotion_boosts)) => {
1336 edges_strengthened += count;
1337
1338 if !promotion_boosts.is_empty() {
1340 if let Ok(memory_lock) = self.get_user_memory(&user_id) {
1341 let memory = memory_lock.read();
1342 match memory.apply_edge_promotion_boosts(&promotion_boosts)
1343 {
1344 Ok(boosted) => {
1345 tracing::debug!(
1346 user_id = %user_id,
1347 boosted,
1348 promotions = promotion_boosts.len(),
1349 "Applied edge promotion boosts"
1350 );
1351 }
1352 Err(e) => {
1353 tracing::debug!(
1354 "Edge promotion boost failed for user {}: {}",
1355 user_id,
1356 e
1357 );
1358 }
1359 }
1360 }
1361 }
1362 }
1363 Err(e) => {
1364 tracing::debug!(
1365 "Edge boost application failed for user {}: {}",
1366 user_id,
1367 e
1368 );
1369 }
1370 }
1371 }
1372 }
1373 }
1374
1375 if let Some(ref result) = maintenance_result {
1379 if !result.replay_memory_ids.is_empty() {
1380 if let Ok(graph) = self.get_user_graph(&user_id) {
1381 let graph_guard = graph.read();
1382 for mem_id_str in &result.replay_memory_ids {
1383 if let Ok(uuid) = uuid::Uuid::parse_str(mem_id_str) {
1384 match graph_guard.strengthen_episode_entity_edges(&uuid) {
1385 Ok(count) => entity_edges_strengthened += count,
1386 Err(e) => {
1387 tracing::debug!(
1388 "Entity edge strengthening failed for memory {}: {}",
1389 mem_id_str,
1390 e
1391 );
1392 }
1393 }
1394 }
1395 }
1396 }
1397 }
1398 }
1399
1400 if let Ok(graph) = self.get_user_graph(&user_id) {
1405 let graph_guard = graph.read();
1406 match graph_guard.flush_pending_maintenance() {
1407 Ok(decay_result) => {
1408 edges_decayed += decay_result.pruned_count;
1409
1410 if !decay_result.orphaned_entity_ids.is_empty() {
1412 if let Ok(memory_lock) = self.get_user_memory(&user_id) {
1413 let memory = memory_lock.read();
1414 match memory
1415 .compensate_orphaned_memories(&decay_result.orphaned_entity_ids)
1416 {
1417 Ok(compensated) => {
1418 tracing::debug!(
1419 user_id = %user_id,
1420 compensated,
1421 orphaned = decay_result.orphaned_entity_ids.len(),
1422 "Compensated orphaned memories"
1423 );
1424 }
1425 Err(e) => {
1426 tracing::debug!(
1427 "Orphan compensation failed for user {}: {}",
1428 user_id,
1429 e
1430 );
1431 }
1432 }
1433 }
1434 }
1435 }
1436 Err(e) => {
1437 tracing::debug!("Graph lazy pruning failed for user {}: {}", user_id, e);
1438 }
1439 }
1440 }
1441
1442 if is_heavy {
1447 if let Ok(graph) = self.get_user_graph(&user_id) {
1448 let graph_guard = graph.read();
1449 match graph_guard.apply_decay() {
1450 Ok(decay_result) => {
1451 if decay_result.pruned_count > 0 {
1452 edges_decayed += decay_result.pruned_count;
1453 tracing::debug!(
1454 user_id = %user_id,
1455 pruned = decay_result.pruned_count,
1456 orphaned = decay_result.orphaned_entity_ids.len(),
1457 "Full graph decay applied"
1458 );
1459 }
1460
1461 if !decay_result.orphaned_entity_ids.is_empty() {
1462 if let Ok(memory_lock) = self.get_user_memory(&user_id) {
1463 let memory = memory_lock.read();
1464 let _ = memory.compensate_orphaned_memories(
1465 &decay_result.orphaned_entity_ids,
1466 );
1467 }
1468 }
1469 }
1470 Err(e) => {
1471 tracing::debug!("Full graph decay failed for user {}: {}", user_id, e);
1472 }
1473 }
1474 }
1475 }
1476 }
1477
1478 if is_heavy {
1480 for (user_id_arc, _) in self.user_memories.iter() {
1481 let user_id = user_id_arc.as_ref();
1482 match self.prospective_store.cleanup_old_tasks(user_id, 30) {
1483 Ok(deleted) if deleted > 0 => {
1484 tracing::info!(
1485 user_id = %user_id,
1486 deleted = deleted,
1487 "Cleaned up old prospective tasks (>30 days)"
1488 );
1489 }
1490 Err(e) => {
1491 tracing::debug!(
1492 user_id = %user_id,
1493 error = %e,
1494 "Prospective task cleanup failed"
1495 );
1496 }
1497 _ => {}
1498 }
1499 }
1500 }
1501
1502 if is_heavy {
1505 if let Err(e) = self.flush_all_databases() {
1506 tracing::warn!("Periodic flush failed: {}", e);
1507 }
1508
1509 let active_users: std::collections::HashSet<String> = self
1512 .user_memories
1513 .iter()
1514 .map(|(id, _)| id.to_string())
1515 .collect();
1516 self.user_memory_init_locks
1517 .retain(|user_id, _| active_users.contains(user_id));
1518 self.user_graph_init_locks
1519 .retain(|user_id, _| active_users.contains(user_id));
1520 let pre_audit = self.audit_logs.len();
1524 self.audit_logs
1525 .retain(|user_id, _| active_users.contains(user_id));
1526 let pruned_audit = pre_audit.saturating_sub(self.audit_logs.len());
1527 if pruned_audit > 0 {
1528 tracing::info!(
1529 "Pruned audit logs for {} evicted users ({} active)",
1530 pruned_audit,
1531 self.audit_logs.len()
1532 );
1533 }
1534 }
1535
1536 tracing::info!(
1537 "Maintenance complete (cycle {}, {}): {} memories processed, {} edges strengthened, {} entity edges strengthened, {} weak edges pruned, {} facts extracted, {} facts reinforced across {} users",
1538 cycle,
1539 if is_heavy { "heavy" } else { "light" },
1540 total_processed,
1541 edges_strengthened,
1542 entity_edges_strengthened,
1543 edges_decayed,
1544 total_facts_extracted,
1545 total_facts_reinforced,
1546 user_count
1547 );
1548
1549 total_processed
1550 }
1551
1552 pub fn streaming_extractor(&self) -> &Arc<streaming::StreamingMemoryExtractor> {
1554 &self.streaming_extractor
1555 }
1556
1557 pub fn backup_engine(&self) -> &Arc<backup::ShodhBackupEngine> {
1559 &self.backup_engine
1560 }
1561
1562 pub fn ab_test_manager(&self) -> &Arc<ab_testing::ABTestManager> {
1564 &self.ab_test_manager
1565 }
1566
1567 pub fn todo_store(&self) -> &Arc<TodoStore> {
1569 &self.todo_store
1570 }
1571
1572 pub fn prospective_store(&self) -> &Arc<ProspectiveStore> {
1574 &self.prospective_store
1575 }
1576
1577 pub fn file_store(&self) -> &Arc<FileMemoryStore> {
1579 &self.file_store
1580 }
1581
1582 pub fn feedback_store(&self) -> &Arc<parking_lot::RwLock<FeedbackStore>> {
1584 &self.feedback_store
1585 }
1586
1587 pub fn session_store(&self) -> &Arc<SessionStore> {
1589 &self.session_store
1590 }
1591
1592 pub fn context_sessions(&self) -> &Arc<ContextSessions> {
1594 &self.context_sessions
1595 }
1596
1597 pub fn subscribe_context(&self) -> tokio::sync::broadcast::Receiver<ContextStatus> {
1599 self.context_broadcaster.subscribe()
1600 }
1601
1602 pub fn broadcast_context(&self, status: ContextStatus) {
1604 let _ = self.context_broadcaster.send(status);
1605 }
1606
1607 pub fn server_config(&self) -> &ServerConfig {
1609 &self.server_config
1610 }
1611
1612 pub fn base_path(&self) -> &std::path::Path {
1614 &self.base_path
1615 }
1616
1617 pub fn user_evictions(&self) -> usize {
1619 self.user_evictions
1620 .load(std::sync::atomic::Ordering::Relaxed)
1621 }
1622
1623 pub fn users_in_cache(&self) -> usize {
1625 self.user_memories.entry_count() as usize
1626 }
1627
1628 pub fn check_and_emit_due_reminders(&self) -> usize {
1634 let due_tasks = match self.prospective_store.get_all_due_tasks() {
1635 Ok(tasks) => tasks,
1636 Err(e) => {
1637 tracing::debug!("Active reminder check failed: {}", e);
1638 return 0;
1639 }
1640 };
1641
1642 let mut triggered = 0;
1643 for (user_id, task) in &due_tasks {
1644 match self.prospective_store.mark_triggered(user_id, &task.id) {
1645 Ok(true) => {} Ok(false) => {
1647 tracing::debug!(
1649 user_id = %user_id,
1650 reminder_id = %task.id.0,
1651 "Reminder already triggered (scheduler race)"
1652 );
1653 continue;
1654 }
1655 Err(e) => {
1656 tracing::warn!(
1657 user_id = %user_id,
1658 reminder_id = %task.id.0,
1659 error = %e,
1660 "Failed to mark reminder triggered in scheduler"
1661 );
1662 continue;
1663 }
1664 }
1665
1666 self.emit_event(MemoryEvent {
1667 event_type: "REMINDER_DUE".to_string(),
1668 timestamp: chrono::Utc::now(),
1669 user_id: user_id.clone(),
1670 memory_id: Some(task.id.0.to_string()),
1671 content_preview: Some(task.content.chars().take(100).collect()),
1672 memory_type: Some("reminder".to_string()),
1673 importance: Some(task.priority as f32 / 5.0),
1674 count: None,
1675 results: None,
1676 });
1677
1678 tracing::info!(
1679 user_id = %user_id,
1680 reminder_id = %task.id.0,
1681 content = %task.content.chars().take(50).collect::<String>(),
1682 "Reminder triggered (active)"
1683 );
1684
1685 triggered += 1;
1686 }
1687
1688 triggered
1689 }
1690
1691 pub fn collect_secondary_store_refs(&self) -> Vec<(String, std::sync::Arc<rocksdb::DB>)> {
1695 vec![("shared".to_string(), std::sync::Arc::clone(&self.shared_db))]
1696 }
1697
1698 pub fn run_backup_all_users(&self, max_backups: usize) -> usize {
1700 let mut backed_up = 0;
1701
1702 let users_path = &self.base_path;
1703 if let Ok(entries) = std::fs::read_dir(users_path) {
1704 for entry in entries.flatten() {
1705 let path = entry.path();
1706 if !path.is_dir() {
1707 continue;
1708 }
1709 let name = path.file_name().and_then(|n| n.to_str()).unwrap_or("");
1710 if name.starts_with('.') || name == "audit_logs" || name == "backups" {
1711 continue;
1712 }
1713
1714 let db_path = path.join("memory.db");
1715 if !db_path.exists() {
1716 continue;
1717 }
1718
1719 if let Ok(memory_lock) = self.get_user_memory(name) {
1720 let memory = memory_lock.read();
1721 let db = memory.get_db();
1722 let secondary_refs = self.collect_secondary_store_refs();
1723 let store_refs: Vec<crate::backup::SecondaryStoreRef<'_>> = secondary_refs
1724 .iter()
1725 .map(|(n, d)| crate::backup::SecondaryStoreRef { name: n, db: d })
1726 .collect();
1727 let graph_lock = self.get_user_graph(name).ok();
1728 let graph_guard = graph_lock.as_ref().map(|g| g.read());
1729 let graph_db_ref = graph_guard.as_ref().map(|g| g.get_db());
1730 match self.backup_engine.create_comprehensive_backup_with_graph(
1731 &db,
1732 name,
1733 &store_refs,
1734 graph_db_ref,
1735 ) {
1736 Ok(metadata) => {
1737 tracing::info!(
1738 user_id = name,
1739 backup_id = metadata.backup_id,
1740 size_mb = metadata.size_bytes / 1024 / 1024,
1741 "Backup created successfully"
1742 );
1743 backed_up += 1;
1744
1745 if let Err(e) = self.backup_engine.purge_old_backups(name, max_backups)
1746 {
1747 tracing::warn!(
1748 user_id = name,
1749 error = %e,
1750 "Failed to purge old backups"
1751 );
1752 }
1753 }
1754 Err(e) => {
1755 tracing::warn!(
1756 user_id = name,
1757 error = %e,
1758 "Failed to create backup"
1759 );
1760 }
1761 }
1762 }
1763 }
1764 }
1765
1766 backed_up
1767 }
1768
1769 pub fn process_experience_into_graph(
1778 &self,
1779 user_id: &str,
1780 experience: &Experience,
1781 memory_id: &MemoryId,
1782 ) -> Result<()> {
1783 let graph = self.get_user_graph(user_id)?;
1784
1785 let now = chrono::Utc::now();
1792
1793 let stop_words: std::collections::HashSet<&str> = [
1795 "the", "and", "for", "that", "this", "with", "from", "have", "been", "are", "was",
1796 "were", "will", "would", "could", "should", "may", "might",
1797 ]
1798 .iter()
1799 .cloned()
1800 .collect();
1801
1802 let extracted_entities = if !experience.ner_entities.is_empty() {
1805 tracing::debug!(
1806 "Using {} pre-extracted NER entities from handler",
1807 experience.ner_entities.len()
1808 );
1809 experience
1810 .ner_entities
1811 .iter()
1812 .map(|record| crate::embeddings::ner::NerEntity {
1813 text: record.text.clone(),
1814 entity_type: match record.entity_type.as_str() {
1815 "PER" => NerEntityType::Person,
1816 "ORG" => NerEntityType::Organization,
1817 "LOC" => NerEntityType::Location,
1818 _ => NerEntityType::Misc,
1819 },
1820 confidence: record.confidence,
1821 start: record.start_char.unwrap_or(0),
1822 end: record.end_char.unwrap_or(record.text.len()),
1823 })
1824 .collect()
1825 } else if !experience.entities.is_empty() {
1826 tracing::debug!(
1827 "Using {} pre-extracted entity names (no NER types available)",
1828 experience.entities.len()
1829 );
1830 experience
1831 .entities
1832 .iter()
1833 .map(|name| crate::embeddings::ner::NerEntity {
1834 text: name.clone(),
1835 entity_type: NerEntityType::Misc,
1836 confidence: 0.8,
1837 start: 0,
1838 end: name.len(),
1839 })
1840 .collect()
1841 } else {
1842 match self.neural_ner.extract(&experience.content) {
1843 Ok(entities) => {
1844 tracing::debug!(
1845 "NER extracted {} entities: {:?}",
1846 entities.len(),
1847 entities.iter().map(|e| e.text.as_str()).collect::<Vec<_>>()
1848 );
1849 entities
1850 }
1851 Err(e) => {
1852 tracing::debug!("NER extraction failed: {}. Continuing without entities.", e);
1853 Vec::new()
1854 }
1855 }
1856 };
1857
1858 let filtered_entities: Vec<_> = extracted_entities
1860 .into_iter()
1861 .filter(|e| {
1862 let name = e.text.trim();
1863 if name.len() < 3 {
1864 return false;
1865 }
1866 if !name.chars().any(|c| c.is_uppercase()) && e.confidence < 0.7 {
1867 return false;
1868 }
1869 if stop_words.contains(name.to_lowercase().as_str()) {
1870 return false;
1871 }
1872 if name.len() < 5 && e.confidence < 0.8 {
1873 return false;
1874 }
1875 true
1876 })
1877 .collect();
1878
1879 tracing::debug!(
1880 "After filtering: {} entities: {:?}",
1881 filtered_entities.len(),
1882 filtered_entities
1883 .iter()
1884 .map(|e| e.text.as_str())
1885 .collect::<Vec<_>>()
1886 );
1887
1888 let ner_entities: Vec<(String, EntityNode)> = filtered_entities
1890 .into_iter()
1891 .map(|ner_entity| {
1892 let label = match ner_entity.entity_type {
1893 NerEntityType::Person => EntityLabel::Person,
1894 NerEntityType::Organization => EntityLabel::Organization,
1895 NerEntityType::Location => EntityLabel::Location,
1896 NerEntityType::Misc => EntityLabel::Other("MISC".to_string()),
1897 };
1898 let node = EntityNode {
1899 uuid: uuid::Uuid::new_v4(),
1900 name: ner_entity.text.clone(),
1901 labels: vec![label],
1902 created_at: now,
1903 last_seen_at: now,
1904 mention_count: 1,
1905 summary: String::new(),
1906 attributes: HashMap::new(),
1907 name_embedding: None,
1908 salience: ner_entity.confidence,
1909 is_proper_noun: !matches!(ner_entity.entity_type, NerEntityType::Misc),
1912 };
1913 (ner_entity.text, node)
1914 })
1915 .collect();
1916
1917 let tag_entities: Vec<(String, EntityNode)> = experience
1919 .tags
1920 .iter()
1921 .filter_map(|tag| {
1922 let tag_name = tag.trim();
1923 if tag_name.len() >= 2 && !stop_words.contains(tag_name.to_lowercase().as_str()) {
1924 Some((
1925 tag_name.to_string(),
1926 EntityNode {
1927 uuid: uuid::Uuid::new_v4(),
1928 name: tag_name.to_string(),
1929 labels: vec![EntityLabel::Technology],
1930 created_at: now,
1931 last_seen_at: now,
1932 mention_count: 1,
1933 summary: String::new(),
1934 attributes: HashMap::new(),
1935 name_embedding: None,
1936 salience: 0.6,
1937 is_proper_noun: false,
1938 },
1939 ))
1940 } else {
1941 None
1942 }
1943 })
1944 .collect();
1945
1946 let mut known_names: Vec<String> = ner_entities
1948 .iter()
1949 .map(|(name, _)| name.clone())
1950 .chain(tag_entities.iter().map(|(name, _)| name.clone()))
1951 .collect();
1952
1953 let allcaps_entities: Vec<(String, EntityNode)> = allcaps_regex()
1955 .find_iter(&experience.content)
1956 .filter_map(|cap| {
1957 let term = cap.as_str();
1958 if known_names
1959 .iter()
1960 .any(|name| name.eq_ignore_ascii_case(term))
1961 {
1962 return None;
1963 }
1964 if stop_words.contains(term.to_lowercase().as_str()) {
1965 return None;
1966 }
1967 known_names.push(term.to_string());
1968 Some((
1969 term.to_string(),
1970 EntityNode {
1971 uuid: uuid::Uuid::new_v4(),
1972 name: term.to_string(),
1973 labels: vec![EntityLabel::Technology],
1974 created_at: now,
1975 last_seen_at: now,
1976 mention_count: 1,
1977 summary: String::new(),
1978 attributes: HashMap::new(),
1979 name_embedding: None,
1980 salience: 0.5,
1981 is_proper_noun: true,
1982 },
1983 ))
1984 })
1985 .collect();
1986
1987 let issue_entities: Vec<(String, EntityNode)> = issue_regex()
1989 .find_iter(&experience.content)
1990 .filter_map(|issue| {
1991 let issue_id = issue.as_str();
1992 if known_names.iter().any(|name| name == issue_id) {
1993 return None;
1994 }
1995 known_names.push(issue_id.to_string());
1996 Some((
1997 issue_id.to_string(),
1998 EntityNode {
1999 uuid: uuid::Uuid::new_v4(),
2000 name: issue_id.to_string(),
2001 labels: vec![EntityLabel::Other("Issue".to_string())],
2002 created_at: now,
2003 last_seen_at: now,
2004 mention_count: 1,
2005 summary: String::new(),
2006 attributes: HashMap::new(),
2007 name_embedding: None,
2008 salience: 0.7,
2009 is_proper_noun: true,
2010 },
2011 ))
2012 })
2013 .collect();
2014
2015 let analysis = query_parser::analyze_query(&experience.content);
2017 let mut verb_entities: Vec<(String, EntityNode)> = Vec::new();
2018 for verb in &analysis.relational_context {
2019 let verb_text = verb.text.as_str();
2020 let verb_stem = verb.stem.as_str();
2021
2022 if known_names
2023 .iter()
2024 .any(|name| name.eq_ignore_ascii_case(verb_text))
2025 {
2026 continue;
2027 }
2028 if stop_words.contains(verb_text.to_lowercase().as_str()) {
2029 continue;
2030 }
2031 if verb_text.len() < 3 {
2032 continue;
2033 }
2034
2035 for name in [verb_text, verb_stem] {
2036 if name.len() < 3 {
2037 continue;
2038 }
2039 if known_names.iter().any(|n| n.eq_ignore_ascii_case(name)) {
2040 continue;
2041 }
2042 known_names.push(name.to_string());
2043 verb_entities.push((
2044 name.to_string(),
2045 EntityNode {
2046 uuid: uuid::Uuid::new_v4(),
2047 name: name.to_string(),
2048 labels: vec![EntityLabel::Other("Verb".to_string())],
2049 created_at: now,
2050 last_seen_at: now,
2051 mention_count: 1,
2052 summary: String::new(),
2053 attributes: HashMap::new(),
2054 name_embedding: None,
2055 salience: 0.4,
2056 is_proper_noun: false,
2057 },
2058 ));
2059 }
2060 }
2061
2062 let mut all_entities: Vec<(String, EntityNode)> = ner_entities
2065 .into_iter()
2066 .chain(tag_entities)
2067 .chain(allcaps_entities)
2068 .chain(issue_entities)
2069 .chain(verb_entities)
2070 .collect();
2071 all_entities.sort_by(|a, b| b.1.salience.total_cmp(&a.1.salience));
2072 let entity_cap = self.server_config.max_entities_per_memory;
2073 all_entities.truncate(entity_cap);
2074
2075 let graph_guard = graph.read();
2081
2082 let mut entity_uuids = Vec::new();
2083
2084 for (name, entity) in all_entities {
2086 match graph_guard.add_entity(entity) {
2087 Ok(uuid) => entity_uuids.push((name, uuid)),
2088 Err(e) => tracing::debug!("Failed to add entity {}: {}", name, e),
2089 }
2090 }
2091
2092 tracing::debug!(
2094 "Creating episode for memory {} with {} entities: {:?}",
2095 &memory_id.0.to_string()[..8],
2096 entity_uuids.len(),
2097 entity_uuids
2098 .iter()
2099 .map(|(name, _)| name.as_str())
2100 .collect::<Vec<_>>()
2101 );
2102
2103 let episode = EpisodicNode {
2104 uuid: memory_id.0,
2105 name: format!("Memory {}", &memory_id.0.to_string()[..8]),
2106 content: experience.content.clone(),
2107 valid_at: now,
2108 created_at: now,
2109 entity_refs: entity_uuids.iter().map(|(_, uuid)| *uuid).collect(),
2110 source: EpisodeSource::Message,
2111 metadata: experience.metadata.clone(),
2112 };
2113
2114 match graph_guard.add_episode(episode) {
2115 Ok(uuid) => {
2116 tracing::debug!(
2117 "Episode {} added with {} entity refs",
2118 &uuid.to_string()[..8],
2119 entity_uuids.len()
2120 );
2121 }
2122 Err(e) => {
2123 tracing::warn!("Failed to add episode: {}", e);
2124 }
2125 }
2126
2127 let truncated_context: String = experience.content.chars().take(150).collect();
2130 for i in 0..entity_uuids.len() {
2131 for j in (i + 1)..entity_uuids.len() {
2132 let edge = RelationshipEdge {
2133 uuid: uuid::Uuid::new_v4(),
2134 from_entity: entity_uuids[i].1,
2135 to_entity: entity_uuids[j].1,
2136 relation_type: RelationType::RelatedTo,
2137 strength: EdgeTier::L1Working.initial_weight(),
2138 created_at: now,
2139 valid_at: now,
2140 invalidated_at: None,
2141 source_episode_id: Some(memory_id.0),
2142 context: truncated_context.clone(),
2143 last_activated: now,
2144 activation_count: 1,
2145 ltp_status: LtpStatus::None,
2146 tier: EdgeTier::L1Working,
2147 activation_timestamps: None,
2148 entity_confidence: None,
2149 };
2150
2151 if let Err(e) = graph_guard.add_relationship(edge) {
2152 tracing::debug!("Failed to add relationship: {}", e);
2153 }
2154 }
2155 }
2156 Ok(())
2159 }
2160}