Skip to main content

shodh_memory/handlers/
state.rs

1//! Multi-User Memory Manager - Core State Management
2//!
3//! This module contains the central state manager for the shodh-memory server.
4//! It handles per-user memory systems, graph memories, audit logs, and all
5//! subsidiary stores (todos, reminders, files, etc.).
6
7use anyhow::{Context, Result};
8use dashmap::DashMap;
9use std::collections::{HashMap, VecDeque};
10use std::sync::{Arc, OnceLock};
11use tracing::info;
12
13/// Static regex for extracting all-caps terms (API, TUI, NER, REST, etc.)
14fn allcaps_regex() -> &'static regex::Regex {
15    static RE: OnceLock<regex::Regex> = OnceLock::new();
16    RE.get_or_init(|| regex::Regex::new(r"\b[A-Z]{2,}[A-Z0-9]*\b").unwrap())
17}
18
19/// Static regex for extracting issue IDs (SHO-XX, JIRA-123, etc.)
20fn issue_regex() -> &'static regex::Regex {
21    static RE: OnceLock<regex::Regex> = OnceLock::new();
22    RE.get_or_init(|| regex::Regex::new(r"\b([A-Z]{2,10}-\d+)\b").unwrap())
23}
24
25use crate::ab_testing;
26use crate::backup;
27use crate::config::ServerConfig;
28use crate::embeddings::{
29    are_ner_models_downloaded, download_ner_models, get_ner_models_dir, ner::NerEntityType,
30    KeywordExtractor, NerConfig, NeuralNer,
31};
32use crate::graph_memory::{
33    EdgeTier, EntityLabel, EntityNode, EpisodeSource, EpisodicNode, GraphMemory, GraphStats,
34    LtpStatus, RelationType, RelationshipEdge,
35};
36use crate::memory::{
37    query_parser, Experience, FeedbackStore, FileMemoryStore, MemoryConfig, MemoryId, MemoryStats,
38    MemorySystem, ProspectiveStore, SessionStore, TodoStore,
39};
40use crate::relevance::RelevanceEngine;
41use crate::streaming;
42
43use super::types::{AuditEvent, ContextStatus, MemoryEvent};
44
45/// Type alias for context sessions map
46pub type ContextSessions = DashMap<String, ContextStatus>;
47
48/// Helper struct for audit log rotation (allows spawn_blocking with minimal clone)
49struct MultiUserMemoryManagerRotationHelper {
50    shared_db: Arc<rocksdb::DB>,
51    audit_logs: Arc<DashMap<String, Arc<parking_lot::RwLock<VecDeque<AuditEvent>>>>>,
52    audit_retention_days: i64,
53    audit_max_entries: usize,
54}
55
56const CF_AUDIT: &str = "audit";
57
58impl MultiUserMemoryManagerRotationHelper {
59    fn audit_cf(&self) -> &rocksdb::ColumnFamily {
60        self.shared_db
61            .cf_handle(CF_AUDIT)
62            .expect("audit CF must exist")
63    }
64
65    /// Rotate audit logs for a user - delete old entries and enforce max count.
66    ///
67    /// Keys are `{user_id}:{timestamp_nanos:020}` so RocksDB returns them in
68    /// ascending timestamp order. Two strategies depending on scale:
69    /// - ≤100K keys: collect all, compute excess, batch delete
70    /// - >100K keys: streaming 2-pass (count, then delete) to avoid OOM
71    fn rotate_user_audit_logs(&self, user_id: &str) -> Result<usize> {
72        let cutoff_time = chrono::Utc::now() - chrono::Duration::days(self.audit_retention_days);
73        let cutoff_nanos = cutoff_time.timestamp_nanos_opt().unwrap_or_else(|| {
74            tracing::warn!("audit cutoff timestamp outside i64 nanos range, using 0");
75            0
76        });
77        let prefix = format!("{user_id}:");
78        let audit = self.audit_cf();
79
80        // Pass 1: count total entries to determine excess
81        let mut total_count = 0usize;
82        let iter = self.shared_db.prefix_iterator_cf(audit, prefix.as_bytes());
83        for (key, _) in iter.flatten() {
84            if let Ok(key_str) = std::str::from_utf8(&key) {
85                if !key_str.starts_with(&prefix) {
86                    break;
87                }
88                total_count += 1;
89            }
90        }
91
92        if total_count == 0 {
93            return Ok(0);
94        }
95
96        let excess_count = total_count.saturating_sub(self.audit_max_entries);
97
98        // Pass 2: stream through keys, deleting those that are too old or excess.
99        // Flush WriteBatch every 10K deletes to bound memory.
100        const BATCH_FLUSH_SIZE: usize = 10_000;
101        let mut batch = rocksdb::WriteBatch::default();
102        let mut removed_count = 0usize;
103        let mut position = 0usize;
104
105        let iter = self.shared_db.prefix_iterator_cf(audit, prefix.as_bytes());
106        for (key, _) in iter.flatten() {
107            let key_str = match std::str::from_utf8(&key) {
108                Ok(s) => s,
109                Err(_) => {
110                    position += 1;
111                    continue;
112                }
113            };
114            if !key_str.starts_with(&prefix) {
115                break;
116            }
117
118            let ts = key_str
119                .strip_prefix(&prefix)
120                .and_then(|s| s.parse::<i64>().ok())
121                .unwrap_or(0); // Malformed keys sort first → get deleted
122
123            if ts < cutoff_nanos || position < excess_count {
124                batch.delete_cf(audit, &key);
125                removed_count += 1;
126
127                if removed_count % BATCH_FLUSH_SIZE == 0 {
128                    self.shared_db
129                        .write(std::mem::take(&mut batch))
130                        .map_err(|e| anyhow::anyhow!("Failed to write rotation batch: {e}"))?;
131                    batch = rocksdb::WriteBatch::default();
132                }
133            }
134
135            position += 1;
136        }
137
138        // Flush remaining
139        if removed_count % BATCH_FLUSH_SIZE != 0 {
140            self.shared_db
141                .write(batch)
142                .map_err(|e| anyhow::anyhow!("Failed to write rotation batch: {e}"))?;
143        }
144
145        // Sync in-memory cache
146        if removed_count > 0 {
147            if let Some(log) = self.audit_logs.get(user_id) {
148                let mut log_guard = log.write();
149
150                log_guard.retain(|event| {
151                    let event_nanos = event.timestamp.timestamp_nanos_opt().unwrap_or(0);
152                    event_nanos >= cutoff_nanos
153                });
154
155                while log_guard.len() > self.audit_max_entries {
156                    log_guard.pop_front();
157                }
158            }
159        }
160
161        Ok(removed_count)
162    }
163}
164
165/// Multi-user memory manager - central state for the server
166pub struct MultiUserMemoryManager {
167    /// Per-user memory systems with LRU eviction
168    pub user_memories: moka::sync::Cache<String, Arc<parking_lot::RwLock<MemorySystem>>>,
169
170    /// Per-user audit logs (in-memory cache)
171    pub audit_logs: Arc<DashMap<String, Arc<parking_lot::RwLock<VecDeque<AuditEvent>>>>>,
172
173    /// Shared DB for all global stores (todos, reminders, files, feedback, audit)
174    pub shared_db: Arc<rocksdb::DB>,
175
176    /// Base storage path
177    pub base_path: std::path::PathBuf,
178
179    /// Default config
180    pub default_config: MemoryConfig,
181
182    /// Counter for audit log rotation checks
183    pub audit_log_counter: Arc<std::sync::atomic::AtomicUsize>,
184
185    /// Per-user graph memory systems
186    pub graph_memories: moka::sync::Cache<String, Arc<parking_lot::RwLock<GraphMemory>>>,
187
188    /// Neural NER for automatic entity extraction
189    pub neural_ner: Arc<NeuralNer>,
190
191    /// Statistical keyword extraction for graph population
192    pub keyword_extractor: Arc<KeywordExtractor>,
193
194    /// User eviction counter for metrics
195    pub user_evictions: Arc<std::sync::atomic::AtomicUsize>,
196
197    /// Server configuration
198    pub server_config: ServerConfig,
199
200    /// SSE event broadcaster for real-time dashboard updates
201    pub event_broadcaster: tokio::sync::broadcast::Sender<MemoryEvent>,
202
203    /// Streaming memory extractor for implicit learning
204    pub streaming_extractor: Arc<streaming::StreamingMemoryExtractor>,
205
206    /// Prospective memory store for reminders/intentions
207    pub prospective_store: Arc<ProspectiveStore>,
208
209    /// GTD-style todo store
210    pub todo_store: Arc<TodoStore>,
211
212    /// File memory store for codebase integration
213    pub file_store: Arc<FileMemoryStore>,
214
215    /// Implicit feedback store for memory reinforcement
216    pub feedback_store: Arc<parking_lot::RwLock<FeedbackStore>>,
217
218    /// Backup engine for automated and manual backups
219    pub backup_engine: Arc<backup::ShodhBackupEngine>,
220
221    /// Context status from Claude Code sessions
222    pub context_sessions: Arc<ContextSessions>,
223
224    /// SSE broadcaster for context status updates
225    pub context_broadcaster: tokio::sync::broadcast::Sender<ContextStatus>,
226
227    /// A/B testing manager for relevance scoring experiments
228    pub ab_test_manager: Arc<ab_testing::ABTestManager>,
229
230    /// Session tracking store
231    pub session_store: Arc<SessionStore>,
232
233    /// Shared relevance engine for proactive memory surfacing (entity cache + learned weights persist)
234    pub relevance_engine: Arc<RelevanceEngine>,
235
236    /// Maintenance cycle counter: cycles 0..5 are lightweight (in-memory only),
237    /// cycle 0 (mod 6) is heavyweight (graph decay, fact extraction, flush).
238    /// At 300s intervals, heavy cycles fire every 30 minutes.
239    maintenance_cycle: std::sync::atomic::AtomicU64,
240
241    /// Per-user creation locks to prevent TOCTOU races in get_user_memory.
242    /// Without this, concurrent first-access requests for the same user_id can both
243    /// miss the cache check, both try to open RocksDB, and the second open fails
244    /// because RocksDB holds an exclusive file lock.
245    user_memory_init_locks: DashMap<String, Arc<parking_lot::Mutex<()>>>,
246
247    /// Separate per-user creation locks for graph memory.
248    /// Must be separate from user_memory_init_locks because get_user_memory()
249    /// calls get_user_graph() while holding its lock, and parking_lot::Mutex
250    /// is not re-entrant — sharing a single lock map would deadlock.
251    user_graph_init_locks: DashMap<String, Arc<parking_lot::Mutex<()>>>,
252
253    /// Shared RocksDB block cache across all per-user DB instances.
254    /// Single LRU cache provides a hard memory ceiling regardless of user count.
255    /// Without this, each user's MemoryStorage + GraphMemory allocates ~96MB in
256    /// independent caches — 6 users = 576MB just in block caches alone.
257    shared_rocksdb_cache: rocksdb::Cache,
258}
259
260impl MultiUserMemoryManager {
261    pub fn new(base_path: std::path::PathBuf, server_config: ServerConfig) -> Result<Self> {
262        std::fs::create_dir_all(&base_path)?;
263
264        let (event_broadcaster, _) = tokio::sync::broadcast::channel(1024);
265
266        let ner_dir = get_ner_models_dir();
267        tracing::debug!("Checking for NER models at {:?}", ner_dir);
268        let neural_ner = if are_ner_models_downloaded() {
269            tracing::debug!("NER models found, using existing files");
270            let config = NerConfig {
271                model_path: ner_dir.join("model.onnx"),
272                tokenizer_path: ner_dir.join("tokenizer.json"),
273                max_length: 128,
274                confidence_threshold: 0.5,
275            };
276            match NeuralNer::new(config) {
277                Ok(ner) => {
278                    info!("Neural NER initialized (TinyBERT model at {:?})", ner_dir);
279                    Arc::new(ner)
280                }
281                Err(e) => {
282                    tracing::warn!("Failed to initialize neural NER: {}. Using fallback.", e);
283                    Arc::new(NeuralNer::new_fallback(NerConfig::default()))
284                }
285            }
286        } else {
287            tracing::debug!("NER models not found at {:?}, will download", ner_dir);
288            info!("Downloading NER models (TinyBERT-NER, ~15MB)...");
289            match download_ner_models(Some(std::sync::Arc::new(|downloaded, total| {
290                if total > 0 {
291                    let percent = (downloaded as f64 / total as f64 * 100.0) as u32;
292                    if percent % 20 == 0 {
293                        tracing::info!("NER model download: {}%", percent);
294                    }
295                }
296            }))) {
297                Ok(ner_dir) => {
298                    info!("NER models downloaded to {:?}", ner_dir);
299                    let config = NerConfig {
300                        model_path: ner_dir.join("model.onnx"),
301                        tokenizer_path: ner_dir.join("tokenizer.json"),
302                        max_length: 128,
303                        confidence_threshold: 0.5,
304                    };
305                    match NeuralNer::new(config) {
306                        Ok(ner) => {
307                            info!("Neural NER initialized after download");
308                            Arc::new(ner)
309                        }
310                        Err(e) => {
311                            tracing::warn!(
312                                "Failed to initialize downloaded NER: {}. Using fallback.",
313                                e
314                            );
315                            Arc::new(NeuralNer::new_fallback(NerConfig::default()))
316                        }
317                    }
318                }
319                Err(e) => {
320                    tracing::warn!(
321                        "Failed to download NER models: {}. Using rule-based fallback.",
322                        e
323                    );
324                    Arc::new(NeuralNer::new_fallback(NerConfig::default()))
325                }
326            }
327        };
328
329        let user_evictions = Arc::new(std::sync::atomic::AtomicUsize::new(0));
330        let evictions_clone = user_evictions.clone();
331        let max_cache = server_config.max_users_in_memory;
332        let eviction_base_path = base_path.clone();
333
334        let user_memories = moka::sync::Cache::builder()
335            .max_capacity(server_config.max_users_in_memory as u64)
336            .time_to_idle(std::time::Duration::from_secs(1800))
337            .eviction_listener(move |key: Arc<String>, value: Arc<parking_lot::RwLock<MemorySystem>>, cause| {
338                if matches!(cause, moka::notification::RemovalCause::Size | moka::notification::RemovalCause::Expired) {
339                    evictions_clone.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
340
341                    let cause_label = if cause == moka::notification::RemovalCause::Expired { "idle-timeout" } else { "LRU" };
342
343                    // Spawn blocking task to persist vector index without holding the lock
344                    // during I/O. The eviction listener runs synchronously inside moka,
345                    // so we must not block here for disk writes.
346                    let index_path = eviction_base_path.join(key.as_str()).join("vector_index");
347                    let user_key = key.clone();
348                    std::thread::spawn(move || {
349                        if let Some(guard) = value.try_read() {
350                            match guard.save_vector_index(&index_path) {
351                                Ok(()) => {
352                                    info!(
353                                        "Evicted user '{}' from memory cache ({}, cache_size={}) - vector index saved",
354                                        user_key, cause_label, max_cache
355                                    );
356                                }
357                                Err(e) => {
358                                    tracing::warn!(
359                                        "Evicted user '{}' from memory cache ({}) - failed to save vector index: {}",
360                                        user_key, cause_label, e
361                                    );
362                                }
363                            }
364                        } else {
365                            tracing::warn!(
366                                "Evicted user '{}' from memory cache ({}) - could not acquire lock to save index",
367                                user_key, cause_label
368                            );
369                        }
370                    });
371                }
372            })
373            .build();
374
375        let graph_memories = moka::sync::Cache::builder()
376            .max_capacity(server_config.max_users_in_memory as u64)
377            .time_to_idle(std::time::Duration::from_secs(1800))
378            .eviction_listener(move |key: Arc<String>, _value, cause| {
379                let cause_label = if cause == moka::notification::RemovalCause::Expired {
380                    "idle-timeout"
381                } else {
382                    "LRU"
383                };
384                info!(
385                    "Evicted graph for user '{}' from memory cache ({})",
386                    key, cause_label
387                );
388            })
389            .build();
390
391        // Single shared LRU block cache for ALL RocksDB instances (per-user memory DBs,
392        // per-user graph DBs, and the global shared DB). Provides a hard memory ceiling
393        // regardless of how many users are active. Without this, each user allocates
394        // ~96MB in independent caches — the shared cache collapses that to a single
395        // 256MB pool with LRU eviction of the coldest blocks across all users.
396        let shared_rocksdb_cache =
397            rocksdb::Cache::new_lru_cache(crate::constants::ROCKSDB_SHARED_CACHE_BYTES);
398        info!(
399            "Shared RocksDB block cache initialized ({}MB)",
400            crate::constants::ROCKSDB_SHARED_CACHE_BYTES / (1024 * 1024)
401        );
402
403        // Open a single shared DB for all global stores (todos, reminders, files, feedback, audit).
404        // This dramatically reduces file descriptor usage compared to separate DBs per store.
405        let shared_db = {
406            use rocksdb::{BlockBasedOptions, ColumnFamilyDescriptor, Options as RocksOptions};
407            let shared_db_path = base_path.join("shared");
408            std::fs::create_dir_all(&shared_db_path)?;
409
410            let mut db_opts = RocksOptions::default();
411            db_opts.create_if_missing(true);
412            db_opts.create_missing_column_families(true);
413            db_opts.set_compression_type(rocksdb::DBCompressionType::Lz4);
414            db_opts.set_max_write_buffer_number(2);
415            db_opts.set_write_buffer_size(8 * 1024 * 1024); // 8MB (shared DB is low-throughput)
416
417            // Wire shared DB into the shared block cache
418            let mut block_opts = BlockBasedOptions::default();
419            block_opts.set_block_cache(&shared_rocksdb_cache);
420            block_opts.set_cache_index_and_filter_blocks(true);
421            db_opts.set_block_based_table_factory(&block_opts);
422
423            // Collect CF descriptors from all stores + audit
424            let mut cfs = vec![ColumnFamilyDescriptor::new("default", {
425                let mut o = RocksOptions::default();
426                o.create_if_missing(true);
427                o
428            })];
429            cfs.extend(TodoStore::cf_descriptors());
430            cfs.extend(ProspectiveStore::column_family_descriptors());
431            cfs.extend(FileMemoryStore::cf_descriptors());
432            // Feedback CF
433            cfs.push(ColumnFamilyDescriptor::new(
434                crate::memory::feedback::CF_FEEDBACK,
435                {
436                    let mut o = RocksOptions::default();
437                    o.create_if_missing(true);
438                    o.set_compression_type(rocksdb::DBCompressionType::Lz4);
439                    o
440                },
441            ));
442            // Audit CF
443            cfs.push(ColumnFamilyDescriptor::new("audit", {
444                let mut o = RocksOptions::default();
445                o.create_if_missing(true);
446                o.set_compression_type(rocksdb::DBCompressionType::Lz4);
447                o
448            }));
449
450            Arc::new(
451                rocksdb::DB::open_cf_descriptors(&db_opts, &shared_db_path, cfs)
452                    .context("Failed to open shared DB with column families")?,
453            )
454        };
455
456        // Migrate old audit_logs DB into shared DB audit CF
457        Self::migrate_audit_db(&base_path, &shared_db)?;
458
459        let prospective_store = Arc::new(ProspectiveStore::new(shared_db.clone(), &base_path)?);
460        info!("Prospective memory store initialized");
461
462        let todo_store = Arc::new(TodoStore::new(shared_db.clone(), &base_path)?);
463        if let Err(e) = todo_store.load_vector_indices() {
464            tracing::warn!("Failed to load todo vector indices: {}, semantic todo search will rebuild on first use", e);
465        }
466        info!("Todo store initialized");
467
468        let file_store = Arc::new(FileMemoryStore::new(shared_db.clone(), &base_path)?);
469        info!("File memory store initialized");
470
471        let feedback_store = Arc::new(parking_lot::RwLock::new(
472            FeedbackStore::with_shared_db(shared_db.clone(), &base_path).unwrap_or_else(|e| {
473                tracing::warn!("Failed to load feedback store: {}, using in-memory", e);
474                FeedbackStore::new()
475            }),
476        ));
477        info!("Feedback store initialized");
478
479        // PIPE-9: StreamingMemoryExtractor no longer needs FeedbackStore
480        // Feedback momentum is now applied in the MemorySystem pipeline
481        let streaming_extractor =
482            Arc::new(streaming::StreamingMemoryExtractor::new(neural_ner.clone()));
483        info!("Streaming memory extractor initialized");
484
485        let keyword_extractor = Arc::new(KeywordExtractor::new());
486        info!("Keyword extractor initialized (YAKE)");
487
488        let relevance_engine = Arc::new(RelevanceEngine::new(neural_ner.clone()));
489        info!("Relevance engine initialized (entity cache + learned weights)");
490
491        let backup_path = base_path.join("backups");
492        let backup_engine = Arc::new(backup::ShodhBackupEngine::new(backup_path)?);
493        if server_config.backup_enabled {
494            info!(
495                "Backup engine initialized (interval: {}h, keep: {})",
496                server_config.backup_interval_secs / 3600,
497                server_config.backup_max_count
498            );
499        } else {
500            info!("Backup engine initialized (auto-backup disabled)");
501        }
502
503        let broadcast_capacity = (server_config.max_users_in_memory * 4).max(64);
504
505        let manager = Self {
506            user_memories,
507            audit_logs: Arc::new(DashMap::new()),
508            shared_db,
509            base_path,
510            default_config: MemoryConfig::default(),
511            audit_log_counter: Arc::new(std::sync::atomic::AtomicUsize::new(0)),
512            graph_memories,
513            neural_ner,
514            keyword_extractor,
515            user_evictions,
516            server_config,
517            event_broadcaster,
518            streaming_extractor,
519            prospective_store,
520            todo_store,
521            file_store,
522            feedback_store,
523            backup_engine,
524            context_sessions: Arc::new(DashMap::new()),
525            context_broadcaster: {
526                let (tx, _) = tokio::sync::broadcast::channel(broadcast_capacity);
527                tx
528            },
529            ab_test_manager: Arc::new(ab_testing::ABTestManager::new()),
530            session_store: Arc::new(SessionStore::new()),
531            relevance_engine,
532            maintenance_cycle: std::sync::atomic::AtomicU64::new(0),
533            user_memory_init_locks: DashMap::new(),
534            user_graph_init_locks: DashMap::new(),
535            shared_rocksdb_cache,
536        };
537
538        info!("Running initial audit log rotation...");
539        if let Err(e) = manager.rotate_all_audit_logs() {
540            tracing::warn!("Failed to rotate audit logs on startup: {}", e);
541        }
542
543        Ok(manager)
544    }
545
546    /// Get the audit column family handle from the shared DB
547    fn audit_cf(&self) -> &rocksdb::ColumnFamily {
548        self.shared_db
549            .cf_handle(CF_AUDIT)
550            .expect("audit CF must exist in shared DB")
551    }
552
553    /// Migrate old standalone audit_logs DB into the shared DB's audit CF.
554    /// Old directory is renamed to `audit_logs.pre_cf_migration` for rollback safety.
555    fn migrate_audit_db(base_path: &std::path::Path, shared_db: &rocksdb::DB) -> Result<()> {
556        let old_dir = base_path.join("audit_logs");
557        if !old_dir.exists() {
558            return Ok(());
559        }
560
561        let audit_cf = shared_db
562            .cf_handle(CF_AUDIT)
563            .expect("audit CF must exist in shared DB");
564
565        // Check if CF already has data (migration already done)
566        let mut has_data = false;
567        let mut iter = shared_db.raw_iterator_cf(audit_cf);
568        iter.seek_to_first();
569        if iter.valid() {
570            has_data = true;
571        }
572        if has_data {
573            tracing::info!(
574                "Audit CF already has data, skipping migration from {:?}",
575                old_dir
576            );
577            return Ok(());
578        }
579
580        tracing::info!("Migrating audit_logs from standalone DB to shared DB audit CF...");
581
582        let old_opts = rocksdb::Options::default();
583        let old_db = rocksdb::DB::open_for_read_only(&old_opts, &old_dir, false)
584            .context("Failed to open old audit_logs DB for migration")?;
585
586        let mut batch = rocksdb::WriteBatch::default();
587        let mut count = 0usize;
588        const BATCH_SIZE: usize = 10_000;
589
590        let iter = old_db.iterator(rocksdb::IteratorMode::Start);
591        for item in iter {
592            let (key, value) =
593                item.map_err(|e| anyhow::anyhow!("audit migration iter error: {e}"))?;
594            batch.put_cf(audit_cf, &key, &value);
595            count += 1;
596
597            if count % BATCH_SIZE == 0 {
598                shared_db
599                    .write(std::mem::take(&mut batch))
600                    .map_err(|e| anyhow::anyhow!("audit migration batch write error: {e}"))?;
601                batch = rocksdb::WriteBatch::default();
602            }
603        }
604
605        if count % BATCH_SIZE != 0 {
606            shared_db
607                .write(batch)
608                .map_err(|e| anyhow::anyhow!("audit migration final batch error: {e}"))?;
609        }
610
611        drop(old_db);
612
613        let renamed = old_dir.with_file_name("audit_logs.pre_cf_migration");
614        if renamed.exists() {
615            let _ = std::fs::remove_dir_all(&renamed);
616        }
617        std::fs::rename(&old_dir, &renamed)
618            .context("Failed to rename old audit_logs dir after migration")?;
619
620        tracing::info!(
621            "Migrated {} audit entries from standalone DB to shared CF, old dir renamed to {:?}",
622            count,
623            renamed
624        );
625
626        Ok(())
627    }
628
629    /// Log audit event (non-blocking with background persistence)
630    pub fn log_event(&self, user_id: &str, event_type: &str, memory_id: &str, details: &str) {
631        let event = AuditEvent {
632            timestamp: chrono::Utc::now(),
633            event_type: event_type.to_string(),
634            memory_id: memory_id.to_string(),
635            details: details.to_string(),
636        };
637
638        let key = format!(
639            "{}:{:020}",
640            user_id,
641            event.timestamp.timestamp_nanos_opt().unwrap_or_else(|| {
642                tracing::warn!("audit event timestamp outside i64 nanos range, using 0");
643                0
644            })
645        );
646        if let Ok(serialized) = bincode::serde::encode_to_vec(&event, bincode::config::standard()) {
647            let db = self.shared_db.clone();
648            let key_bytes = key.into_bytes();
649
650            tokio::task::spawn_blocking(move || {
651                let audit = db.cf_handle(CF_AUDIT).expect("audit CF must exist");
652                if let Err(e) = db.put_cf(&audit, &key_bytes, &serialized) {
653                    tracing::error!("Failed to persist audit log: {}", e);
654                }
655            });
656        }
657
658        let max_entries = self.server_config.audit_max_entries_per_user;
659        let log = self
660            .audit_logs
661            .entry(user_id.to_string())
662            .or_insert_with(|| Arc::new(parking_lot::RwLock::new(VecDeque::new())))
663            .clone();
664        {
665            let mut entries = log.write();
666            entries.push_back(event);
667            while entries.len() > max_entries {
668                entries.pop_front();
669            }
670        }
671
672        let count = self
673            .audit_log_counter
674            .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
675
676        if count % self.server_config.audit_rotation_check_interval == 0 && count > 0 {
677            let shared_db = self.shared_db.clone();
678            let audit_logs = self.audit_logs.clone();
679            let user_id_clone = user_id.to_string();
680
681            let audit_retention_days = self.server_config.audit_retention_days as i64;
682            let audit_max_entries = self.server_config.audit_max_entries_per_user;
683
684            tokio::task::spawn_blocking(move || {
685                let manager = MultiUserMemoryManagerRotationHelper {
686                    shared_db,
687                    audit_logs,
688                    audit_retention_days,
689                    audit_max_entries,
690                };
691                if let Err(e) = manager.rotate_user_audit_logs(&user_id_clone) {
692                    tracing::debug!("Audit log rotation check for user {}: {}", user_id_clone, e);
693                }
694            });
695        }
696    }
697
698    /// Emit SSE event to all connected dashboard clients
699    pub fn emit_event(&self, event: MemoryEvent) {
700        let _ = self.event_broadcaster.send(event);
701    }
702
703    /// Subscribe to SSE events
704    pub fn subscribe_events(&self) -> tokio::sync::broadcast::Receiver<MemoryEvent> {
705        self.event_broadcaster.subscribe()
706    }
707
708    /// Get audit history for user
709    pub fn get_history(&self, user_id: &str, memory_id: Option<&str>) -> Vec<AuditEvent> {
710        if let Some(log) = self.audit_logs.get(user_id) {
711            let events = log.read();
712            if !events.is_empty() {
713                return if let Some(mid) = memory_id {
714                    events
715                        .iter()
716                        .filter(|e| e.memory_id == mid)
717                        .cloned()
718                        .collect()
719                } else {
720                    events.iter().cloned().collect()
721                };
722            }
723        }
724
725        let mut events = Vec::new();
726        let prefix = format!("{user_id}:");
727
728        let audit = self.audit_cf();
729        let iter = self.shared_db.prefix_iterator_cf(audit, prefix.as_bytes());
730        for (key, value) in iter.flatten() {
731            if let Ok(key_str) = std::str::from_utf8(&key) {
732                if !key_str.starts_with(&prefix) {
733                    break;
734                }
735
736                if let Ok((event, _)) = bincode::serde::decode_from_slice::<AuditEvent, _>(
737                    &value,
738                    bincode::config::standard(),
739                ) {
740                    events.push(event);
741                }
742            }
743        }
744
745        if !events.is_empty() {
746            self.audit_logs
747                .entry(user_id.to_string())
748                .or_insert_with(|| {
749                    Arc::new(parking_lot::RwLock::new(VecDeque::from(events.clone())))
750                });
751        }
752
753        if let Some(mid) = memory_id {
754            events.into_iter().filter(|e| e.memory_id == mid).collect()
755        } else {
756            events
757        }
758    }
759
760    /// Get or create memory system for a user
761    ///
762    /// Uses double-checked locking to prevent TOCTOU races where concurrent
763    /// first-access requests both miss the cache and try to open RocksDB.
764    /// RocksDB holds exclusive file locks, so the second open would fail.
765    pub fn get_user_memory(&self, user_id: &str) -> Result<Arc<parking_lot::RwLock<MemorySystem>>> {
766        // Fast path: already cached
767        if let Some(memory) = self.user_memories.get(user_id) {
768            return Ok(memory);
769        }
770
771        // Acquire per-user creation lock to serialize initialization
772        let lock = self
773            .user_memory_init_locks
774            .entry(user_id.to_string())
775            .or_insert_with(|| Arc::new(parking_lot::Mutex::new(())))
776            .clone();
777        let _guard = lock.lock();
778
779        // Re-check after acquiring lock (another thread may have created it)
780        if let Some(memory) = self.user_memories.get(user_id) {
781            return Ok(memory);
782        }
783
784        let user_path = self.base_path.join(user_id);
785        let config = MemoryConfig {
786            storage_path: user_path,
787            ..self.default_config.clone()
788        };
789
790        let mut memory_system = MemorySystem::new(config, Some(&self.shared_rocksdb_cache))
791            .with_context(|| format!("Failed to initialize memory system for user '{user_id}'"))?;
792        // Wire up GraphMemory for Layer 2 (spreading activation) and Layer 5 (Hebbian learning)
793        let graph = self.get_user_graph(user_id)?;
794        memory_system.set_graph_memory(graph);
795        // Wire up FeedbackStore for PIPE-9 (feedback momentum in all retrieval paths)
796        memory_system.set_feedback_store(self.feedback_store.clone());
797
798        let memory_arc = Arc::new(parking_lot::RwLock::new(memory_system));
799
800        self.user_memories
801            .insert(user_id.to_string(), memory_arc.clone());
802
803        info!("Created memory system for user: {}", user_id);
804
805        Ok(memory_arc)
806    }
807
808    /// Evict a user's memory and graph from in-memory caches (releases DB handles).
809    /// Does NOT delete data — used before restore to release file locks.
810    pub fn evict_user(&self, user_id: &str) {
811        self.user_memories.invalidate(user_id);
812        self.graph_memories.invalidate(user_id);
813        self.user_memories.run_pending_tasks();
814        self.graph_memories.run_pending_tasks();
815
816        #[cfg(target_os = "windows")]
817        {
818            // Windows needs extra time to release file handles
819            std::thread::sleep(std::time::Duration::from_millis(200));
820            self.user_memories.run_pending_tasks();
821            self.graph_memories.run_pending_tasks();
822        }
823
824        tracing::info!(user_id = user_id, "Evicted user caches for restore");
825    }
826
827    /// Delete user data (GDPR compliance)
828    ///
829    /// Cleans up:
830    /// 1. In-memory caches (user_memories, graph_memories)
831    /// 2. Shared RocksDB: todos, projects, todo indices, reminders, files, feedback, audit
832    /// 3. Per-user filesystem: per-user RocksDB, graph DB, vector indices
833    pub fn forget_user(&self, user_id: &str) -> Result<()> {
834        self.user_memories.invalidate(user_id);
835        self.graph_memories.invalidate(user_id);
836
837        self.user_memories.run_pending_tasks();
838        self.graph_memories.run_pending_tasks();
839
840        #[cfg(target_os = "windows")]
841        {
842            std::thread::sleep(std::time::Duration::from_millis(200));
843            self.user_memories.run_pending_tasks();
844            self.graph_memories.run_pending_tasks();
845        }
846
847        // Clean up all user data from shared RocksDB column families
848        self.purge_user_from_shared_db(user_id)?;
849
850        // Clean up todo vector indices
851        self.todo_store.purge_user_vectors(user_id);
852
853        // Clean up in-memory feedback state
854        {
855            let mut fb = self.feedback_store.write();
856            fb.take_pending(user_id);
857        }
858
859        // Delete per-user filesystem (memories DB, graph DB, vector index files)
860        let user_path = self.base_path.join(user_id);
861        if user_path.exists() {
862            let mut attempts = 0;
863            let max_attempts = 10;
864            while attempts < max_attempts {
865                match std::fs::remove_dir_all(&user_path) {
866                    Ok(_) => break,
867                    Err(e) if attempts < max_attempts - 1 => {
868                        let delay = 100 * (1 << attempts.min(4));
869                        tracing::debug!(
870                            "Delete retry {} for {} (waiting {}ms): {}",
871                            attempts + 1,
872                            user_id,
873                            delay,
874                            e
875                        );
876                        std::thread::sleep(std::time::Duration::from_millis(delay));
877                        attempts += 1;
878                    }
879                    Err(e) => {
880                        return Err(anyhow::anyhow!(
881                            "Failed to delete user data after {max_attempts} retries: {e}"
882                        ))
883                    }
884                }
885            }
886        }
887
888        info!("Deleted all data for user: {}", user_id);
889        Ok(())
890    }
891
892    /// Prefix-scan and batch-delete all keys starting with `{user_id}:` from a column family
893    fn delete_by_prefix(db: &rocksdb::DB, cf: &rocksdb::ColumnFamily, prefix: &[u8]) -> usize {
894        let mut batch = rocksdb::WriteBatch::default();
895        let mut count = 0;
896        let iter = db.prefix_iterator_cf(cf, prefix);
897        for item in iter.flatten() {
898            let (key, _) = item;
899            if !key.starts_with(prefix) {
900                break;
901            }
902            batch.delete_cf(cf, &key);
903            count += 1;
904        }
905        if count > 0 {
906            let _ = db.write(batch);
907        }
908        count
909    }
910
911    /// Purge all user data from shared RocksDB (todos, reminders, files, feedback, audit)
912    fn purge_user_from_shared_db(&self, user_id: &str) -> Result<()> {
913        let prefix = format!("{user_id}:");
914        let prefix_bytes = prefix.as_bytes();
915
916        // Shared CF names that use `{user_id}:` as key prefix
917        let cf_names = ["todos", "projects", "prospective"];
918        for name in &cf_names {
919            if let Some(cf) = self.shared_db.cf_handle(name) {
920                let n = Self::delete_by_prefix(&self.shared_db, cf, prefix_bytes);
921                if n > 0 {
922                    tracing::debug!("GDPR: purged {n} entries from {name} CF for {user_id}");
923                }
924            }
925        }
926
927        // Index CFs use varied key prefixes — scan all relevant patterns
928        if let Some(cf) = self.shared_db.cf_handle("todo_index") {
929            let prefixes = [
930                format!("user:{user_id}:"),
931                format!("status:Backlog:{user_id}:"),
932                format!("status:Todo:{user_id}:"),
933                format!("status:InProgress:{user_id}:"),
934                format!("status:Blocked:{user_id}:"),
935                format!("status:Done:{user_id}:"),
936                format!("status:Cancelled:{user_id}:"),
937                format!("vector_id:{user_id}:"),
938                format!("todo_vector:{user_id}:"),
939            ];
940            for p in &prefixes {
941                Self::delete_by_prefix(&self.shared_db, cf, p.as_bytes());
942            }
943            // Priority and due/context keys also contain user_id but at varying positions.
944            // Full scan of index CF to catch them all.
945            let mut batch = rocksdb::WriteBatch::default();
946            let iter = self.shared_db.iterator_cf(cf, rocksdb::IteratorMode::Start);
947            for item in iter.flatten() {
948                let (key, _) = item;
949                if let Ok(key_str) = std::str::from_utf8(&key) {
950                    if key_str.contains(&prefix) {
951                        batch.delete_cf(cf, &key);
952                    }
953                }
954            }
955            let _ = self.shared_db.write(batch);
956        }
957
958        if let Some(cf) = self.shared_db.cf_handle("prospective_index") {
959            let prefixes = [
960                format!("user:{user_id}:"),
961                format!("status:Pending:{user_id}:"),
962                format!("status:Triggered:{user_id}:"),
963                format!("status:Dismissed:{user_id}:"),
964            ];
965            for p in &prefixes {
966                Self::delete_by_prefix(&self.shared_db, cf, p.as_bytes());
967            }
968            // Context keyword indices: `context:{keyword}:{user_id}:{id}`
969            let mut batch = rocksdb::WriteBatch::default();
970            let iter = self.shared_db.iterator_cf(cf, rocksdb::IteratorMode::Start);
971            for item in iter.flatten() {
972                let (key, _) = item;
973                if let Ok(key_str) = std::str::from_utf8(&key) {
974                    if key_str.contains(&prefix) {
975                        batch.delete_cf(cf, &key);
976                    }
977                }
978            }
979            let _ = self.shared_db.write(batch);
980        }
981
982        // Files
983        if let Some(cf) = self.shared_db.cf_handle("files") {
984            Self::delete_by_prefix(&self.shared_db, cf, prefix_bytes);
985        }
986        if let Some(cf) = self.shared_db.cf_handle("file_index") {
987            let idx_prefix = format!("file_idx:{user_id}:");
988            Self::delete_by_prefix(&self.shared_db, cf, idx_prefix.as_bytes());
989            // Also catch other patterns
990            let mut batch = rocksdb::WriteBatch::default();
991            let iter = self.shared_db.iterator_cf(cf, rocksdb::IteratorMode::Start);
992            for item in iter.flatten() {
993                let (key, _) = item;
994                if let Ok(key_str) = std::str::from_utf8(&key) {
995                    if key_str.contains(&prefix) {
996                        batch.delete_cf(cf, &key);
997                    }
998                }
999            }
1000            let _ = self.shared_db.write(batch);
1001        }
1002
1003        // Feedback: `pending:{user_id}`
1004        if let Some(cf) = self.shared_db.cf_handle("feedback") {
1005            let pending_key = format!("pending:{user_id}");
1006            let _ = self.shared_db.delete_cf(cf, pending_key.as_bytes());
1007        }
1008
1009        // Audit logs
1010        if let Some(cf) = self.shared_db.cf_handle("audit") {
1011            Self::delete_by_prefix(&self.shared_db, cf, prefix_bytes);
1012        }
1013
1014        // Clear in-memory audit log cache
1015        self.audit_logs.remove(user_id);
1016
1017        Ok(())
1018    }
1019
1020    /// Get statistics for a user
1021    pub fn get_stats(&self, user_id: &str) -> Result<MemoryStats> {
1022        let memory = self.get_user_memory(user_id)?;
1023        let memory_guard = memory.read();
1024        let mut stats = memory_guard.stats();
1025
1026        if let Ok(graph) = self.get_user_graph(user_id) {
1027            let graph_guard = graph.read();
1028            if let Ok(graph_stats) = graph_guard.get_stats() {
1029                stats.graph_nodes = graph_stats.entity_count;
1030                stats.graph_edges = graph_stats.relationship_count;
1031            }
1032        }
1033
1034        Ok(stats)
1035    }
1036
1037    /// List all users
1038    pub fn list_users(&self) -> Vec<String> {
1039        let mut users = Vec::new();
1040        if let Ok(entries) = std::fs::read_dir(&self.base_path) {
1041            for entry in entries.flatten() {
1042                if let Ok(file_type) = entry.file_type() {
1043                    if file_type.is_dir() {
1044                        if let Some(name) = entry.file_name().to_str() {
1045                            // Filter out system directories
1046                            if name != "audit_logs"
1047                                && name != "audit_logs.pre_cf_migration"
1048                                && name != "backups"
1049                                && name != "feedback"
1050                                && name != "feedback.pre_cf_migration"
1051                                && name != "semantic_facts"
1052                                && name != "files"
1053                                && name != "files.pre_cf_migration"
1054                                && name != "prospective"
1055                                && name != "prospective.pre_cf_migration"
1056                                && name != "todos"
1057                                && name != "todos.pre_cf_migration"
1058                                && name != "shared"
1059                            {
1060                                users.push(name.to_string());
1061                            }
1062                        }
1063                    }
1064                }
1065            }
1066        }
1067        users.sort();
1068        users
1069    }
1070
1071    /// List users currently loaded in the Moka cache (no filesystem scan)
1072    pub fn list_cached_users(&self) -> Vec<String> {
1073        self.user_memories
1074            .iter()
1075            .map(|(id, _)| id.to_string())
1076            .collect()
1077    }
1078
1079    /// Get audit logs for a user
1080    pub fn get_audit_logs(&self, user_id: &str, limit: usize) -> Vec<AuditEvent> {
1081        let mut events: Vec<AuditEvent> = Vec::new();
1082        let prefix = format!("{user_id}:");
1083        let audit = self.audit_cf();
1084        let iter = self.shared_db.prefix_iterator_cf(audit, prefix.as_bytes());
1085        for (key, value) in iter.flatten() {
1086            if let Ok(key_str) = std::str::from_utf8(&key) {
1087                if !key_str.starts_with(&prefix) {
1088                    break;
1089                }
1090                if let Ok((event, _)) = bincode::serde::decode_from_slice::<AuditEvent, _>(
1091                    &value,
1092                    bincode::config::standard(),
1093                ) {
1094                    events.push(event);
1095                }
1096            }
1097        }
1098        events.reverse();
1099        events.truncate(limit);
1100        events
1101    }
1102
1103    /// Flush all RocksDB databases
1104    pub fn flush_all_databases(&self) -> Result<()> {
1105        info!("Flushing all databases to disk...");
1106
1107        // Single flush covers all shared stores (todos, prospective, files, feedback, audit)
1108        self.shared_db
1109            .flush()
1110            .map_err(|e| anyhow::anyhow!("Failed to flush shared database: {e}"))?;
1111        info!("  Shared database flushed (todos, prospective, files, feedback, audit)");
1112
1113        let user_entries: Vec<(String, Arc<parking_lot::RwLock<MemorySystem>>)> = self
1114            .user_memories
1115            .iter()
1116            .map(|(k, v)| (k.to_string(), v.clone()))
1117            .collect();
1118
1119        let mut flushed = 0;
1120        for (user_id, memory_system) in user_entries {
1121            if let Some(guard) = memory_system.try_read() {
1122                if let Err(e) = guard.flush_storage() {
1123                    tracing::warn!("  Failed to flush database for user {}: {}", user_id, e);
1124                } else {
1125                    flushed += 1;
1126                }
1127            } else {
1128                tracing::warn!("  Could not acquire lock for user: {}", user_id);
1129            }
1130        }
1131
1132        info!(
1133            "All databases flushed: shared (5 stores), {} user memories",
1134            flushed
1135        );
1136
1137        Ok(())
1138    }
1139
1140    /// Save all vector indices to disk
1141    pub fn save_all_vector_indices(&self) -> Result<()> {
1142        info!("Saving vector indices to disk...");
1143
1144        let user_entries: Vec<(String, Arc<parking_lot::RwLock<MemorySystem>>)> = self
1145            .user_memories
1146            .iter()
1147            .map(|(k, v)| (k.to_string(), v.clone()))
1148            .collect();
1149
1150        let mut saved = 0;
1151        for (user_id, memory_system) in user_entries {
1152            if let Some(guard) = memory_system.try_read() {
1153                let index_path = self.base_path.join(&user_id).join("vector_index");
1154                if let Err(e) = guard.save_vector_index(&index_path) {
1155                    tracing::warn!("  Failed to save vector index for user {}: {}", user_id, e);
1156                } else {
1157                    info!("  Saved vector index for user: {}", user_id);
1158                    saved += 1;
1159                }
1160            } else {
1161                tracing::warn!("  Could not acquire lock for user: {}", user_id);
1162            }
1163        }
1164
1165        info!("Saved {} vector indices", saved);
1166        Ok(())
1167    }
1168
1169    /// Rotate audit logs for all users
1170    fn rotate_all_audit_logs(&self) -> Result<()> {
1171        let mut total_removed = 0;
1172
1173        let mut user_ids = std::collections::HashSet::new();
1174        let audit = self.audit_cf();
1175        let iter = self
1176            .shared_db
1177            .iterator_cf(audit, rocksdb::IteratorMode::Start);
1178
1179        for (key, _) in iter.flatten() {
1180            if let Ok(key_str) = std::str::from_utf8(&key) {
1181                if let Some(user_id) = key_str.split(':').next() {
1182                    user_ids.insert(user_id.to_string());
1183                }
1184            }
1185        }
1186
1187        let helper = MultiUserMemoryManagerRotationHelper {
1188            shared_db: self.shared_db.clone(),
1189            audit_logs: self.audit_logs.clone(),
1190            audit_retention_days: self.server_config.audit_retention_days as i64,
1191            audit_max_entries: self.server_config.audit_max_entries_per_user,
1192        };
1193
1194        for user_id in user_ids {
1195            match helper.rotate_user_audit_logs(&user_id) {
1196                Ok(removed) => {
1197                    if removed > 0 {
1198                        info!(
1199                            "  Rotated audit logs for user {}: removed {} old entries",
1200                            user_id, removed
1201                        );
1202                        total_removed += removed;
1203                    }
1204                }
1205                Err(e) => {
1206                    tracing::warn!("  Failed to rotate audit logs for user {}: {}", user_id, e);
1207                }
1208            }
1209        }
1210
1211        if total_removed > 0 {
1212            info!(
1213                "Audit log rotation complete: removed {} total entries",
1214                total_removed
1215            );
1216        }
1217
1218        Ok(())
1219    }
1220
1221    /// Get neural NER for entity extraction
1222    pub fn get_neural_ner(&self) -> Arc<NeuralNer> {
1223        self.neural_ner.clone()
1224    }
1225
1226    /// Get keyword extractor for statistical term extraction
1227    pub fn get_keyword_extractor(&self) -> Arc<KeywordExtractor> {
1228        self.keyword_extractor.clone()
1229    }
1230
1231    /// Get or create graph memory for a user
1232    ///
1233    /// Uses the same per-user creation lock as get_user_memory to prevent
1234    /// concurrent RocksDB open races on the graph directory.
1235    pub fn get_user_graph(&self, user_id: &str) -> Result<Arc<parking_lot::RwLock<GraphMemory>>> {
1236        // Fast path: already cached
1237        if let Some(graph) = self.graph_memories.get(user_id) {
1238            return Ok(graph);
1239        }
1240
1241        // Acquire per-user graph creation lock (separate from memory lock
1242        // to avoid deadlock when get_user_memory() calls get_user_graph())
1243        let lock = self
1244            .user_graph_init_locks
1245            .entry(user_id.to_string())
1246            .or_insert_with(|| Arc::new(parking_lot::Mutex::new(())))
1247            .clone();
1248        let _guard = lock.lock();
1249
1250        // Re-check after acquiring lock
1251        if let Some(graph) = self.graph_memories.get(user_id) {
1252            return Ok(graph);
1253        }
1254
1255        let graph_path = self.base_path.join(user_id).join("graph");
1256        let graph_memory = GraphMemory::new(&graph_path, Some(&self.shared_rocksdb_cache))?;
1257        let graph_arc = Arc::new(parking_lot::RwLock::new(graph_memory));
1258
1259        self.graph_memories
1260            .insert(user_id.to_string(), graph_arc.clone());
1261
1262        info!("Created graph memory for user: {}", user_id);
1263
1264        Ok(graph_arc)
1265    }
1266
1267    /// Get graph statistics for a user
1268    pub fn get_user_graph_stats(&self, user_id: &str) -> Result<GraphStats> {
1269        let graph = self.get_user_graph(user_id)?;
1270        let graph_guard = graph.read();
1271        graph_guard.get_stats()
1272    }
1273
1274    /// Run maintenance on all cached user memories
1275    pub fn run_maintenance_all_users(&self) -> usize {
1276        let cycle = self
1277            .maintenance_cycle
1278            .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1279
1280        // Heavy cycle every 6th iteration (6 hours at 3600s intervals).
1281        // Heavy cycles run replay, entity-entity strengthening, fact extraction (full memory scan),
1282        // and flush databases (triggers compaction). Light cycles only touch in-memory data.
1283        let is_heavy = cycle % 6 == 0;
1284
1285        if is_heavy {
1286            tracing::info!(
1287                "Maintenance cycle {} (HEAVY — graph decay + fact extraction + flush)",
1288                cycle
1289            );
1290        } else {
1291            tracing::debug!("Maintenance cycle {} (light — in-memory only)", cycle);
1292        }
1293
1294        let decay_factor = self.server_config.activation_decay_factor;
1295        let mut total_processed = 0;
1296
1297        let user_ids: Vec<String> = self
1298            .user_memories
1299            .iter()
1300            .map(|(id, _)| id.to_string())
1301            .collect();
1302
1303        let user_count = user_ids.len();
1304        let mut edges_decayed = 0;
1305        let mut edges_strengthened = 0;
1306        let mut entity_edges_strengthened = 0;
1307        let mut total_facts_extracted = 0;
1308        let mut total_facts_reinforced = 0;
1309
1310        for user_id in user_ids {
1311            let maintenance_result = if let Ok(memory_lock) = self.get_user_memory(&user_id) {
1312                let memory = memory_lock.read();
1313                match memory.run_maintenance(decay_factor, &user_id, is_heavy) {
1314                    Ok(result) => {
1315                        total_processed += result.decayed_count;
1316                        total_facts_extracted += result.facts_extracted;
1317                        total_facts_reinforced += result.facts_reinforced;
1318                        Some(result)
1319                    }
1320                    Err(e) => {
1321                        tracing::warn!("Maintenance failed for user {}: {}", user_id, e);
1322                        None
1323                    }
1324                }
1325            } else {
1326                None
1327            };
1328
1329            // Direction 1: Edge strengthening + promotion boost propagation
1330            if let Some(ref result) = maintenance_result {
1331                if !result.edge_boosts.is_empty() {
1332                    if let Ok(graph) = self.get_user_graph(&user_id) {
1333                        let graph_guard = graph.read();
1334                        match graph_guard.strengthen_memory_edges(&result.edge_boosts) {
1335                            Ok((count, promotion_boosts)) => {
1336                                edges_strengthened += count;
1337
1338                                // Direction 1: Apply edge promotion boosts to memory importance
1339                                if !promotion_boosts.is_empty() {
1340                                    if let Ok(memory_lock) = self.get_user_memory(&user_id) {
1341                                        let memory = memory_lock.read();
1342                                        match memory.apply_edge_promotion_boosts(&promotion_boosts)
1343                                        {
1344                                            Ok(boosted) => {
1345                                                tracing::debug!(
1346                                                    user_id = %user_id,
1347                                                    boosted,
1348                                                    promotions = promotion_boosts.len(),
1349                                                    "Applied edge promotion boosts"
1350                                                );
1351                                            }
1352                                            Err(e) => {
1353                                                tracing::debug!(
1354                                                    "Edge promotion boost failed for user {}: {}",
1355                                                    user_id,
1356                                                    e
1357                                                );
1358                                            }
1359                                        }
1360                                    }
1361                                }
1362                            }
1363                            Err(e) => {
1364                                tracing::debug!(
1365                                    "Edge boost application failed for user {}: {}",
1366                                    user_id,
1367                                    e
1368                                );
1369                            }
1370                        }
1371                    }
1372                }
1373            }
1374
1375            // Direction 3: Entity-entity Hebbian reinforcement for replayed memories
1376            // During replay, memories are re-activated — strengthen edges between entities
1377            // that co-occur in the same episode, reinforcing semantic associations.
1378            if let Some(ref result) = maintenance_result {
1379                if !result.replay_memory_ids.is_empty() {
1380                    if let Ok(graph) = self.get_user_graph(&user_id) {
1381                        let graph_guard = graph.read();
1382                        for mem_id_str in &result.replay_memory_ids {
1383                            if let Ok(uuid) = uuid::Uuid::parse_str(mem_id_str) {
1384                                match graph_guard.strengthen_episode_entity_edges(&uuid) {
1385                                    Ok(count) => entity_edges_strengthened += count,
1386                                    Err(e) => {
1387                                        tracing::debug!(
1388                                            "Entity edge strengthening failed for memory {}: {}",
1389                                            mem_id_str,
1390                                            e
1391                                        );
1392                                    }
1393                                }
1394                            }
1395                        }
1396                    }
1397                }
1398            }
1399
1400            // Direction 2: Lazy decay — flush opportunistic pruning queue
1401            // Instead of scanning all 34k+ edges (apply_decay), we queue edges found
1402            // below threshold during normal reads and batch-delete them here.
1403            // Runs every cycle since it's just targeted deletes, not a full scan.
1404            if let Ok(graph) = self.get_user_graph(&user_id) {
1405                let graph_guard = graph.read();
1406                match graph_guard.flush_pending_maintenance() {
1407                    Ok(decay_result) => {
1408                        edges_decayed += decay_result.pruned_count;
1409
1410                        // Direction 2: Compensate memories that lost all graph edges
1411                        if !decay_result.orphaned_entity_ids.is_empty() {
1412                            if let Ok(memory_lock) = self.get_user_memory(&user_id) {
1413                                let memory = memory_lock.read();
1414                                match memory
1415                                    .compensate_orphaned_memories(&decay_result.orphaned_entity_ids)
1416                                {
1417                                    Ok(compensated) => {
1418                                        tracing::debug!(
1419                                            user_id = %user_id,
1420                                            compensated,
1421                                            orphaned = decay_result.orphaned_entity_ids.len(),
1422                                            "Compensated orphaned memories"
1423                                        );
1424                                    }
1425                                    Err(e) => {
1426                                        tracing::debug!(
1427                                            "Orphan compensation failed for user {}: {}",
1428                                            user_id,
1429                                            e
1430                                        );
1431                                    }
1432                                }
1433                            }
1434                        }
1435                    }
1436                    Err(e) => {
1437                        tracing::debug!("Graph lazy pruning failed for user {}: {}", user_id, e);
1438                    }
1439                }
1440            }
1441
1442            // Direction 4: Full graph decay on heavy cycles
1443            // Lazy pruning (above) only processes edges found below threshold during reads.
1444            // Edges that are never read still need decay applied. Run full apply_decay()
1445            // every heavy cycle (6 hours) to ensure no edge escapes time-based weakening.
1446            if is_heavy {
1447                if let Ok(graph) = self.get_user_graph(&user_id) {
1448                    let graph_guard = graph.read();
1449                    match graph_guard.apply_decay() {
1450                        Ok(decay_result) => {
1451                            if decay_result.pruned_count > 0 {
1452                                edges_decayed += decay_result.pruned_count;
1453                                tracing::debug!(
1454                                    user_id = %user_id,
1455                                    pruned = decay_result.pruned_count,
1456                                    orphaned = decay_result.orphaned_entity_ids.len(),
1457                                    "Full graph decay applied"
1458                                );
1459                            }
1460
1461                            if !decay_result.orphaned_entity_ids.is_empty() {
1462                                if let Ok(memory_lock) = self.get_user_memory(&user_id) {
1463                                    let memory = memory_lock.read();
1464                                    let _ = memory.compensate_orphaned_memories(
1465                                        &decay_result.orphaned_entity_ids,
1466                                    );
1467                                }
1468                            }
1469                        }
1470                        Err(e) => {
1471                            tracing::debug!("Full graph decay failed for user {}: {}", user_id, e);
1472                        }
1473                    }
1474                }
1475            }
1476        }
1477
1478        // Heavy cycle: clean up old triggered/dismissed reminders (C4 fix)
1479        if is_heavy {
1480            for (user_id_arc, _) in self.user_memories.iter() {
1481                let user_id = user_id_arc.as_ref();
1482                match self.prospective_store.cleanup_old_tasks(user_id, 30) {
1483                    Ok(deleted) if deleted > 0 => {
1484                        tracing::info!(
1485                            user_id = %user_id,
1486                            deleted = deleted,
1487                            "Cleaned up old prospective tasks (>30 days)"
1488                        );
1489                    }
1490                    Err(e) => {
1491                        tracing::debug!(
1492                            user_id = %user_id,
1493                            error = %e,
1494                            "Prospective task cleanup failed"
1495                        );
1496                    }
1497                    _ => {}
1498                }
1499            }
1500        }
1501
1502        // Flush databases only on heavy cycles — flush triggers RocksDB compaction
1503        // which allocates significant C++ memory through Windows CRT
1504        if is_heavy {
1505            if let Err(e) = self.flush_all_databases() {
1506                tracing::warn!("Periodic flush failed: {}", e);
1507            }
1508
1509            // Prune init locks: remove entries for users no longer in cache.
1510            // This prevents unbounded growth of the DashMaps over time.
1511            let active_users: std::collections::HashSet<String> = self
1512                .user_memories
1513                .iter()
1514                .map(|(id, _)| id.to_string())
1515                .collect();
1516            self.user_memory_init_locks
1517                .retain(|user_id, _| active_users.contains(user_id));
1518            self.user_graph_init_locks
1519                .retain(|user_id, _| active_users.contains(user_id));
1520            // Prune audit logs for evicted users to prevent unbounded DashMap growth.
1521            // Each user's log can hold up to audit_max_entries_per_user entries (~2-5MB),
1522            // and without pruning, entries persist long after the user's memory/graph are evicted.
1523            let pre_audit = self.audit_logs.len();
1524            self.audit_logs
1525                .retain(|user_id, _| active_users.contains(user_id));
1526            let pruned_audit = pre_audit.saturating_sub(self.audit_logs.len());
1527            if pruned_audit > 0 {
1528                tracing::info!(
1529                    "Pruned audit logs for {} evicted users ({} active)",
1530                    pruned_audit,
1531                    self.audit_logs.len()
1532                );
1533            }
1534        }
1535
1536        tracing::info!(
1537            "Maintenance complete (cycle {}, {}): {} memories processed, {} edges strengthened, {} entity edges strengthened, {} weak edges pruned, {} facts extracted, {} facts reinforced across {} users",
1538            cycle,
1539            if is_heavy { "heavy" } else { "light" },
1540            total_processed,
1541            edges_strengthened,
1542            entity_edges_strengthened,
1543            edges_decayed,
1544            total_facts_extracted,
1545            total_facts_reinforced,
1546            user_count
1547        );
1548
1549        total_processed
1550    }
1551
1552    /// Get the streaming extractor
1553    pub fn streaming_extractor(&self) -> &Arc<streaming::StreamingMemoryExtractor> {
1554        &self.streaming_extractor
1555    }
1556
1557    /// Get the backup engine
1558    pub fn backup_engine(&self) -> &Arc<backup::ShodhBackupEngine> {
1559        &self.backup_engine
1560    }
1561
1562    /// Get the A/B test manager
1563    pub fn ab_test_manager(&self) -> &Arc<ab_testing::ABTestManager> {
1564        &self.ab_test_manager
1565    }
1566
1567    /// Get the todo store
1568    pub fn todo_store(&self) -> &Arc<TodoStore> {
1569        &self.todo_store
1570    }
1571
1572    /// Get the prospective store
1573    pub fn prospective_store(&self) -> &Arc<ProspectiveStore> {
1574        &self.prospective_store
1575    }
1576
1577    /// Get the file store
1578    pub fn file_store(&self) -> &Arc<FileMemoryStore> {
1579        &self.file_store
1580    }
1581
1582    /// Get the feedback store
1583    pub fn feedback_store(&self) -> &Arc<parking_lot::RwLock<FeedbackStore>> {
1584        &self.feedback_store
1585    }
1586
1587    /// Get the session store
1588    pub fn session_store(&self) -> &Arc<SessionStore> {
1589        &self.session_store
1590    }
1591
1592    /// Get context sessions
1593    pub fn context_sessions(&self) -> &Arc<ContextSessions> {
1594        &self.context_sessions
1595    }
1596
1597    /// Subscribe to context status updates
1598    pub fn subscribe_context(&self) -> tokio::sync::broadcast::Receiver<ContextStatus> {
1599        self.context_broadcaster.subscribe()
1600    }
1601
1602    /// Broadcast context status update
1603    pub fn broadcast_context(&self, status: ContextStatus) {
1604        let _ = self.context_broadcaster.send(status);
1605    }
1606
1607    /// Get server config
1608    pub fn server_config(&self) -> &ServerConfig {
1609        &self.server_config
1610    }
1611
1612    /// Get base path
1613    pub fn base_path(&self) -> &std::path::Path {
1614        &self.base_path
1615    }
1616
1617    /// Get user evictions count
1618    pub fn user_evictions(&self) -> usize {
1619        self.user_evictions
1620            .load(std::sync::atomic::Ordering::Relaxed)
1621    }
1622
1623    /// Get users in cache count
1624    pub fn users_in_cache(&self) -> usize {
1625        self.user_memories.entry_count() as usize
1626    }
1627
1628    /// Active reminder check: scan all users for due reminders, mark them triggered,
1629    /// and emit `REMINDER_DUE` events to the broadcast channel.
1630    ///
1631    /// Called by the dedicated 60-second reminder scheduler in main.rs.
1632    /// Returns the number of reminders triggered.
1633    pub fn check_and_emit_due_reminders(&self) -> usize {
1634        let due_tasks = match self.prospective_store.get_all_due_tasks() {
1635            Ok(tasks) => tasks,
1636            Err(e) => {
1637                tracing::debug!("Active reminder check failed: {}", e);
1638                return 0;
1639            }
1640        };
1641
1642        let mut triggered = 0;
1643        for (user_id, task) in &due_tasks {
1644            match self.prospective_store.mark_triggered(user_id, &task.id) {
1645                Ok(true) => {} // successfully triggered
1646                Ok(false) => {
1647                    // Already triggered by concurrent call — skip event emission
1648                    tracing::debug!(
1649                        user_id = %user_id,
1650                        reminder_id = %task.id.0,
1651                        "Reminder already triggered (scheduler race)"
1652                    );
1653                    continue;
1654                }
1655                Err(e) => {
1656                    tracing::warn!(
1657                        user_id = %user_id,
1658                        reminder_id = %task.id.0,
1659                        error = %e,
1660                        "Failed to mark reminder triggered in scheduler"
1661                    );
1662                    continue;
1663                }
1664            }
1665
1666            self.emit_event(MemoryEvent {
1667                event_type: "REMINDER_DUE".to_string(),
1668                timestamp: chrono::Utc::now(),
1669                user_id: user_id.clone(),
1670                memory_id: Some(task.id.0.to_string()),
1671                content_preview: Some(task.content.chars().take(100).collect()),
1672                memory_type: Some("reminder".to_string()),
1673                importance: Some(task.priority as f32 / 5.0),
1674                count: None,
1675                results: None,
1676            });
1677
1678            tracing::info!(
1679                user_id = %user_id,
1680                reminder_id = %task.id.0,
1681                content = %task.content.chars().take(50).collect::<String>(),
1682                "Reminder triggered (active)"
1683            );
1684
1685            triggered += 1;
1686        }
1687
1688        triggered
1689    }
1690
1691    /// Collect references to all secondary store databases for comprehensive backup.
1692    /// All shared stores (todos, prospective, files, feedback, audit) share a single DB,
1693    /// so we return one reference. BackupEngine handles all CFs automatically.
1694    pub fn collect_secondary_store_refs(&self) -> Vec<(String, std::sync::Arc<rocksdb::DB>)> {
1695        vec![("shared".to_string(), std::sync::Arc::clone(&self.shared_db))]
1696    }
1697
1698    /// Run backups for all active users
1699    pub fn run_backup_all_users(&self, max_backups: usize) -> usize {
1700        let mut backed_up = 0;
1701
1702        let users_path = &self.base_path;
1703        if let Ok(entries) = std::fs::read_dir(users_path) {
1704            for entry in entries.flatten() {
1705                let path = entry.path();
1706                if !path.is_dir() {
1707                    continue;
1708                }
1709                let name = path.file_name().and_then(|n| n.to_str()).unwrap_or("");
1710                if name.starts_with('.') || name == "audit_logs" || name == "backups" {
1711                    continue;
1712                }
1713
1714                let db_path = path.join("memory.db");
1715                if !db_path.exists() {
1716                    continue;
1717                }
1718
1719                if let Ok(memory_lock) = self.get_user_memory(name) {
1720                    let memory = memory_lock.read();
1721                    let db = memory.get_db();
1722                    let secondary_refs = self.collect_secondary_store_refs();
1723                    let store_refs: Vec<crate::backup::SecondaryStoreRef<'_>> = secondary_refs
1724                        .iter()
1725                        .map(|(n, d)| crate::backup::SecondaryStoreRef { name: n, db: d })
1726                        .collect();
1727                    let graph_lock = self.get_user_graph(name).ok();
1728                    let graph_guard = graph_lock.as_ref().map(|g| g.read());
1729                    let graph_db_ref = graph_guard.as_ref().map(|g| g.get_db());
1730                    match self.backup_engine.create_comprehensive_backup_with_graph(
1731                        &db,
1732                        name,
1733                        &store_refs,
1734                        graph_db_ref,
1735                    ) {
1736                        Ok(metadata) => {
1737                            tracing::info!(
1738                                user_id = name,
1739                                backup_id = metadata.backup_id,
1740                                size_mb = metadata.size_bytes / 1024 / 1024,
1741                                "Backup created successfully"
1742                            );
1743                            backed_up += 1;
1744
1745                            if let Err(e) = self.backup_engine.purge_old_backups(name, max_backups)
1746                            {
1747                                tracing::warn!(
1748                                    user_id = name,
1749                                    error = %e,
1750                                    "Failed to purge old backups"
1751                                );
1752                            }
1753                        }
1754                        Err(e) => {
1755                            tracing::warn!(
1756                                user_id = name,
1757                                error = %e,
1758                                "Failed to create backup"
1759                            );
1760                        }
1761                    }
1762                }
1763            }
1764        }
1765
1766        backed_up
1767    }
1768
1769    /// Process an experience and extract entities/relationships into the graph
1770    ///
1771    /// SHO-102: Improved graph building with:
1772    /// - Neural NER entities
1773    /// - Tags as Technology/Concept entities
1774    /// - All-caps terms (API, TUI, NER, etc.)
1775    /// - Issue IDs (SHO-XX pattern)
1776    /// - Semantic similarity edges between memories
1777    pub fn process_experience_into_graph(
1778        &self,
1779        user_id: &str,
1780        experience: &Experience,
1781        memory_id: &MemoryId,
1782    ) -> Result<()> {
1783        let graph = self.get_user_graph(user_id)?;
1784
1785        // =====================================================================
1786        // PHASE 1: CPU-INTENSIVE WORK (NO LOCK)
1787        // All NER, regex, query parsing happens here to minimize lock hold time.
1788        // Was 100-400ms under lock, now only fast I/O under lock (~10-30ms).
1789        // =====================================================================
1790
1791        let now = chrono::Utc::now();
1792
1793        // Stop words for filtering
1794        let stop_words: std::collections::HashSet<&str> = [
1795            "the", "and", "for", "that", "this", "with", "from", "have", "been", "are", "was",
1796            "were", "will", "would", "could", "should", "may", "might",
1797        ]
1798        .iter()
1799        .cloned()
1800        .collect();
1801
1802        // Use pre-extracted NER records for proper entity labels when available
1803        // This avoids redundant NER inference — the handler already ran NER in Pass 1
1804        let extracted_entities = if !experience.ner_entities.is_empty() {
1805            tracing::debug!(
1806                "Using {} pre-extracted NER entities from handler",
1807                experience.ner_entities.len()
1808            );
1809            experience
1810                .ner_entities
1811                .iter()
1812                .map(|record| crate::embeddings::ner::NerEntity {
1813                    text: record.text.clone(),
1814                    entity_type: match record.entity_type.as_str() {
1815                        "PER" => NerEntityType::Person,
1816                        "ORG" => NerEntityType::Organization,
1817                        "LOC" => NerEntityType::Location,
1818                        _ => NerEntityType::Misc,
1819                    },
1820                    confidence: record.confidence,
1821                    start: record.start_char.unwrap_or(0),
1822                    end: record.end_char.unwrap_or(record.text.len()),
1823                })
1824                .collect()
1825        } else if !experience.entities.is_empty() {
1826            tracing::debug!(
1827                "Using {} pre-extracted entity names (no NER types available)",
1828                experience.entities.len()
1829            );
1830            experience
1831                .entities
1832                .iter()
1833                .map(|name| crate::embeddings::ner::NerEntity {
1834                    text: name.clone(),
1835                    entity_type: NerEntityType::Misc,
1836                    confidence: 0.8,
1837                    start: 0,
1838                    end: name.len(),
1839                })
1840                .collect()
1841        } else {
1842            match self.neural_ner.extract(&experience.content) {
1843                Ok(entities) => {
1844                    tracing::debug!(
1845                        "NER extracted {} entities: {:?}",
1846                        entities.len(),
1847                        entities.iter().map(|e| e.text.as_str()).collect::<Vec<_>>()
1848                    );
1849                    entities
1850                }
1851                Err(e) => {
1852                    tracing::debug!("NER extraction failed: {}. Continuing without entities.", e);
1853                    Vec::new()
1854                }
1855            }
1856        };
1857
1858        // Filter noise entities
1859        let filtered_entities: Vec<_> = extracted_entities
1860            .into_iter()
1861            .filter(|e| {
1862                let name = e.text.trim();
1863                if name.len() < 3 {
1864                    return false;
1865                }
1866                if !name.chars().any(|c| c.is_uppercase()) && e.confidence < 0.7 {
1867                    return false;
1868                }
1869                if stop_words.contains(name.to_lowercase().as_str()) {
1870                    return false;
1871                }
1872                if name.len() < 5 && e.confidence < 0.8 {
1873                    return false;
1874                }
1875                true
1876            })
1877            .collect();
1878
1879        tracing::debug!(
1880            "After filtering: {} entities: {:?}",
1881            filtered_entities.len(),
1882            filtered_entities
1883                .iter()
1884                .map(|e| e.text.as_str())
1885                .collect::<Vec<_>>()
1886        );
1887
1888        // Build NER entity nodes
1889        let ner_entities: Vec<(String, EntityNode)> = filtered_entities
1890            .into_iter()
1891            .map(|ner_entity| {
1892                let label = match ner_entity.entity_type {
1893                    NerEntityType::Person => EntityLabel::Person,
1894                    NerEntityType::Organization => EntityLabel::Organization,
1895                    NerEntityType::Location => EntityLabel::Location,
1896                    NerEntityType::Misc => EntityLabel::Other("MISC".to_string()),
1897                };
1898                let node = EntityNode {
1899                    uuid: uuid::Uuid::new_v4(),
1900                    name: ner_entity.text.clone(),
1901                    labels: vec![label],
1902                    created_at: now,
1903                    last_seen_at: now,
1904                    mention_count: 1,
1905                    summary: String::new(),
1906                    attributes: HashMap::new(),
1907                    name_embedding: None,
1908                    salience: ner_entity.confidence,
1909                    // Only PER, ORG, LOC are proper nouns; MISC includes non-proper
1910                    // nouns like nationalities, events, etc.
1911                    is_proper_noun: !matches!(ner_entity.entity_type, NerEntityType::Misc),
1912                };
1913                (ner_entity.text, node)
1914            })
1915            .collect();
1916
1917        // Build tag entity nodes
1918        let tag_entities: Vec<(String, EntityNode)> = experience
1919            .tags
1920            .iter()
1921            .filter_map(|tag| {
1922                let tag_name = tag.trim();
1923                if tag_name.len() >= 2 && !stop_words.contains(tag_name.to_lowercase().as_str()) {
1924                    Some((
1925                        tag_name.to_string(),
1926                        EntityNode {
1927                            uuid: uuid::Uuid::new_v4(),
1928                            name: tag_name.to_string(),
1929                            labels: vec![EntityLabel::Technology],
1930                            created_at: now,
1931                            last_seen_at: now,
1932                            mention_count: 1,
1933                            summary: String::new(),
1934                            attributes: HashMap::new(),
1935                            name_embedding: None,
1936                            salience: 0.6,
1937                            is_proper_noun: false,
1938                        },
1939                    ))
1940                } else {
1941                    None
1942                }
1943            })
1944            .collect();
1945
1946        // Collect names already covered (for dedup in regex/verb phases)
1947        let mut known_names: Vec<String> = ner_entities
1948            .iter()
1949            .map(|(name, _)| name.clone())
1950            .chain(tag_entities.iter().map(|(name, _)| name.clone()))
1951            .collect();
1952
1953        // Extract all-caps terms (API, TUI, NER, REST, etc.)
1954        let allcaps_entities: Vec<(String, EntityNode)> = allcaps_regex()
1955            .find_iter(&experience.content)
1956            .filter_map(|cap| {
1957                let term = cap.as_str();
1958                if known_names
1959                    .iter()
1960                    .any(|name| name.eq_ignore_ascii_case(term))
1961                {
1962                    return None;
1963                }
1964                if stop_words.contains(term.to_lowercase().as_str()) {
1965                    return None;
1966                }
1967                known_names.push(term.to_string());
1968                Some((
1969                    term.to_string(),
1970                    EntityNode {
1971                        uuid: uuid::Uuid::new_v4(),
1972                        name: term.to_string(),
1973                        labels: vec![EntityLabel::Technology],
1974                        created_at: now,
1975                        last_seen_at: now,
1976                        mention_count: 1,
1977                        summary: String::new(),
1978                        attributes: HashMap::new(),
1979                        name_embedding: None,
1980                        salience: 0.5,
1981                        is_proper_noun: true,
1982                    },
1983                ))
1984            })
1985            .collect();
1986
1987        // Extract issue IDs (SHO-XX, JIRA-123, etc.)
1988        let issue_entities: Vec<(String, EntityNode)> = issue_regex()
1989            .find_iter(&experience.content)
1990            .filter_map(|issue| {
1991                let issue_id = issue.as_str();
1992                if known_names.iter().any(|name| name == issue_id) {
1993                    return None;
1994                }
1995                known_names.push(issue_id.to_string());
1996                Some((
1997                    issue_id.to_string(),
1998                    EntityNode {
1999                        uuid: uuid::Uuid::new_v4(),
2000                        name: issue_id.to_string(),
2001                        labels: vec![EntityLabel::Other("Issue".to_string())],
2002                        created_at: now,
2003                        last_seen_at: now,
2004                        mention_count: 1,
2005                        summary: String::new(),
2006                        attributes: HashMap::new(),
2007                        name_embedding: None,
2008                        salience: 0.7,
2009                        is_proper_noun: true,
2010                    },
2011                ))
2012            })
2013            .collect();
2014
2015        // Extract verbs for multi-hop reasoning
2016        let analysis = query_parser::analyze_query(&experience.content);
2017        let mut verb_entities: Vec<(String, EntityNode)> = Vec::new();
2018        for verb in &analysis.relational_context {
2019            let verb_text = verb.text.as_str();
2020            let verb_stem = verb.stem.as_str();
2021
2022            if known_names
2023                .iter()
2024                .any(|name| name.eq_ignore_ascii_case(verb_text))
2025            {
2026                continue;
2027            }
2028            if stop_words.contains(verb_text.to_lowercase().as_str()) {
2029                continue;
2030            }
2031            if verb_text.len() < 3 {
2032                continue;
2033            }
2034
2035            for name in [verb_text, verb_stem] {
2036                if name.len() < 3 {
2037                    continue;
2038                }
2039                if known_names.iter().any(|n| n.eq_ignore_ascii_case(name)) {
2040                    continue;
2041                }
2042                known_names.push(name.to_string());
2043                verb_entities.push((
2044                    name.to_string(),
2045                    EntityNode {
2046                        uuid: uuid::Uuid::new_v4(),
2047                        name: name.to_string(),
2048                        labels: vec![EntityLabel::Other("Verb".to_string())],
2049                        created_at: now,
2050                        last_seen_at: now,
2051                        mention_count: 1,
2052                        summary: String::new(),
2053                        attributes: HashMap::new(),
2054                        name_embedding: None,
2055                        salience: 0.4,
2056                        is_proper_noun: false,
2057                    },
2058                ));
2059            }
2060        }
2061
2062        // Combine all entity groups for insertion, capped at 10 to prevent
2063        // O(n²) edge explosion (10 entities → max 45 edges)
2064        let mut all_entities: Vec<(String, EntityNode)> = ner_entities
2065            .into_iter()
2066            .chain(tag_entities)
2067            .chain(allcaps_entities)
2068            .chain(issue_entities)
2069            .chain(verb_entities)
2070            .collect();
2071        all_entities.sort_by(|a, b| b.1.salience.total_cmp(&a.1.salience));
2072        let entity_cap = self.server_config.max_entities_per_memory;
2073        all_entities.truncate(entity_cap);
2074
2075        // =====================================================================
2076        // PHASE 2: GRAPH INSERTION (WITH LOCK)
2077        // Only fast I/O operations happen here.
2078        // =====================================================================
2079
2080        let graph_guard = graph.read();
2081
2082        let mut entity_uuids = Vec::new();
2083
2084        // Insert all pre-built entities
2085        for (name, entity) in all_entities {
2086            match graph_guard.add_entity(entity) {
2087                Ok(uuid) => entity_uuids.push((name, uuid)),
2088                Err(e) => tracing::debug!("Failed to add entity {}: {}", name, e),
2089            }
2090        }
2091
2092        // Create episodic node
2093        tracing::debug!(
2094            "Creating episode for memory {} with {} entities: {:?}",
2095            &memory_id.0.to_string()[..8],
2096            entity_uuids.len(),
2097            entity_uuids
2098                .iter()
2099                .map(|(name, _)| name.as_str())
2100                .collect::<Vec<_>>()
2101        );
2102
2103        let episode = EpisodicNode {
2104            uuid: memory_id.0,
2105            name: format!("Memory {}", &memory_id.0.to_string()[..8]),
2106            content: experience.content.clone(),
2107            valid_at: now,
2108            created_at: now,
2109            entity_refs: entity_uuids.iter().map(|(_, uuid)| *uuid).collect(),
2110            source: EpisodeSource::Message,
2111            metadata: experience.metadata.clone(),
2112        };
2113
2114        match graph_guard.add_episode(episode) {
2115            Ok(uuid) => {
2116                tracing::debug!(
2117                    "Episode {} added with {} entity refs",
2118                    &uuid.to_string()[..8],
2119                    entity_uuids.len()
2120                );
2121            }
2122            Err(e) => {
2123                tracing::warn!("Failed to add episode: {}", e);
2124            }
2125        }
2126
2127        // Create relationships between co-occurring entities
2128        // Pre-compute truncated context once (avoids re-allocating per edge)
2129        let truncated_context: String = experience.content.chars().take(150).collect();
2130        for i in 0..entity_uuids.len() {
2131            for j in (i + 1)..entity_uuids.len() {
2132                let edge = RelationshipEdge {
2133                    uuid: uuid::Uuid::new_v4(),
2134                    from_entity: entity_uuids[i].1,
2135                    to_entity: entity_uuids[j].1,
2136                    relation_type: RelationType::RelatedTo,
2137                    strength: EdgeTier::L1Working.initial_weight(),
2138                    created_at: now,
2139                    valid_at: now,
2140                    invalidated_at: None,
2141                    source_episode_id: Some(memory_id.0),
2142                    context: truncated_context.clone(),
2143                    last_activated: now,
2144                    activation_count: 1,
2145                    ltp_status: LtpStatus::None,
2146                    tier: EdgeTier::L1Working,
2147                    activation_timestamps: None,
2148                    entity_confidence: None,
2149                };
2150
2151                if let Err(e) = graph_guard.add_relationship(edge) {
2152                    tracing::debug!("Failed to add relationship: {}", e);
2153                }
2154            }
2155        }
2156        // Lock released here
2157
2158        Ok(())
2159    }
2160}