Skip to main content

shodh_memory/memory/
storage.rs

1//! Storage backend for the memory system
2
3use anyhow::{anyhow, Context, Result};
4use bincode;
5use chrono::{DateTime, Utc};
6use rocksdb::{
7    ColumnFamily, ColumnFamilyDescriptor, IteratorMode, Options, WriteBatch, WriteOptions, DB,
8};
9use serde::{Deserialize, Serialize};
10use std::path::{Path, PathBuf};
11use std::sync::Arc;
12
13use super::types::*;
14
15/// Helper trait to safely iterate over RocksDB results with error logging.
16/// Unlike `.flatten()` which silently ignores errors, this logs them.
17trait LogErrors<T> {
18    fn log_errors(self) -> impl Iterator<Item = T>;
19}
20
21impl<I, T, E> LogErrors<T> for I
22where
23    I: Iterator<Item = Result<T, E>>,
24    E: std::fmt::Display,
25{
26    fn log_errors(self) -> impl Iterator<Item = T> {
27        self.filter_map(|r| match r {
28            Ok(v) => Some(v),
29            Err(e) => {
30                tracing::warn!("RocksDB iterator error (continuing): {}", e);
31                None
32            }
33        })
34    }
35}
36
37/// Write mode for storage operations
38#[derive(Debug, Clone, Copy, PartialEq, Eq)]
39pub enum WriteMode {
40    /// Sync writes - fsync() on every write (durable but slow: 2-10ms per write)
41    /// Use for: shutdown, critical data, compliance requirements
42    Sync,
43    /// Async writes - no fsync(), data buffered in OS page cache (fast: <1ms per write)
44    /// Use for: robotics, edge, high-throughput scenarios
45    /// Data survives process crashes but NOT power loss before next fsync
46    Async,
47}
48
49impl Default for WriteMode {
50    fn default() -> Self {
51        // Default to async for robotics-grade latency
52        // Override with SHODH_WRITE_MODE=sync for durability-critical deployments
53        match std::env::var("SHODH_WRITE_MODE") {
54            Ok(mode) if mode.to_lowercase() == "sync" => WriteMode::Sync,
55            _ => WriteMode::Async,
56        }
57    }
58}
59
60// ============================================================================
61// BACKWARD-COMPATIBLE DESERIALIZATION
62// Handles versioned format (SHO magic + checksum), current, and legacy formats
63// ============================================================================
64
65const STORAGE_MAGIC: &[u8; 3] = b"SHO";
66
67use std::collections::HashMap;
68
69/// Default experience type for legacy deserialization
70fn default_legacy_experience_type() -> ExperienceType {
71    ExperienceType::Observation
72}
73
74/// Minimal memory format - EXACT match for hex pattern: UUID (16 bytes) + varint string length + string
75/// This is the simplest possible format with no extra fields. bincode doesn't support #[serde(default)]
76/// so we can't have optional fields - the struct must match the binary data exactly.
77#[derive(Deserialize)]
78struct MinimalMemory {
79    id: MemoryId,
80    content: String,
81}
82
83/// Memory with experience type prefix - format: UUID + u8 (unknown) + u8 (experience_type) + String
84/// Matches the failing entries that have 2 extra bytes before content:
85/// - Byte 16: u8 value (seen as 28/0x1c)
86/// - Byte 17: u8 experience type index (seen as 7 = Task)
87/// - Byte 18+: varint length + string content
88#[derive(Deserialize)]
89struct MemoryWithTypePrefix {
90    id: MemoryId,
91    _unknown_field: u8,  // byte 16 - purpose unclear, maybe version?
92    experience_type: u8, // byte 17 - experience type enum index
93    content: String,     // byte 18+
94}
95
96/// Memory with 3-byte header - format: UUID + 3 bytes + raw content
97/// Hex analysis shows: bytes 16-18 are header, byte 19+ is UTF-8 content starting with `[`
98/// - Byte 16: 0x1c (28) - unknown
99/// - Byte 17: 0x07 (7) - possibly experience type
100/// - Byte 18: 0xa4 (164) - unknown (NOT valid UTF-8 start)
101/// - Byte 19+: UTF-8 content
102#[derive(Deserialize)]
103struct MemoryWith3ByteHeader {
104    id: MemoryId,
105    _header1: u8,    // byte 16
106    _header2: u8,    // byte 17
107    _header3: u8,    // byte 18
108    content: String, // byte 19+
109}
110
111impl MemoryWith3ByteHeader {
112    fn into_memory(self) -> Memory {
113        let now = Utc::now();
114        let experience = Experience {
115            experience_type: ExperienceType::Observation,
116            content: self.content,
117            ..Default::default()
118        };
119        Memory::from_legacy(
120            self.id,
121            experience,
122            0.5,
123            0,
124            now,
125            now,
126            false,
127            MemoryTier::LongTerm,
128            Vec::new(),
129            1.0,
130            None,
131            None,
132            None,
133            None,
134            0.0,
135            None,
136            None,
137            1,
138            Vec::new(),
139            Vec::new(),
140        )
141    }
142}
143
144/// Try to parse as raw bytes: UUID (16) + skip header bytes + raw UTF-8 string
145/// This is a last-resort fallback for entries that don't match any standard format
146fn try_raw_memory_parse(data: &[u8]) -> Option<Memory> {
147    if data.len() < 20 {
148        return None;
149    }
150
151    // Extract UUID from first 16 bytes
152    let uuid_bytes: [u8; 16] = data[0..16].try_into().ok()?;
153    let id = MemoryId(uuid::Uuid::from_bytes(uuid_bytes));
154
155    // Try different header sizes (2, 3, 4 bytes) and find valid UTF-8
156    for header_skip in [2, 3, 4, 5, 6] {
157        let content_start = 16 + header_skip;
158        if content_start >= data.len() {
159            continue;
160        }
161        if let Ok(content) = std::str::from_utf8(&data[content_start..]) {
162            if !content.is_empty()
163                && content
164                    .chars()
165                    .next()
166                    .map(|c| c.is_ascii_graphic())
167                    .unwrap_or(false)
168            {
169                let now = Utc::now();
170                let experience = Experience {
171                    experience_type: ExperienceType::Observation,
172                    content: content.to_string(),
173                    ..Default::default()
174                };
175                // Log only at debug level to avoid spam
176                tracing::debug!(
177                    "Recovered memory with raw parsing (header_skip={}, content_len={})",
178                    header_skip,
179                    content.len()
180                );
181                return Some(Memory::from_legacy(
182                    id,
183                    experience,
184                    0.5,
185                    0,
186                    now,
187                    now,
188                    false,
189                    MemoryTier::LongTerm,
190                    Vec::new(),
191                    1.0,
192                    None,
193                    None,
194                    None,
195                    None,
196                    0.0,
197                    None,
198                    None,
199                    1,
200                    Vec::new(),
201                    Vec::new(),
202                ));
203            }
204        }
205    }
206    None
207}
208
209impl MemoryWithTypePrefix {
210    fn into_memory(self) -> Memory {
211        let now = Utc::now();
212        let exp_type = match self.experience_type {
213            0 => ExperienceType::Observation,
214            1 => ExperienceType::Decision,
215            2 => ExperienceType::Learning,
216            3 => ExperienceType::Error,
217            4 => ExperienceType::Discovery,
218            5 => ExperienceType::Pattern,
219            6 => ExperienceType::Context,
220            7 => ExperienceType::Task,
221            8 => ExperienceType::CodeEdit,
222            9 => ExperienceType::FileAccess,
223            10 => ExperienceType::Search,
224            11 => ExperienceType::Command,
225            12 => ExperienceType::Conversation,
226            _ => ExperienceType::Observation,
227        };
228        let experience = Experience {
229            experience_type: exp_type,
230            content: self.content,
231            ..Default::default()
232        };
233        Memory::from_legacy(
234            self.id,
235            experience,
236            0.5,
237            0,
238            now,
239            now,
240            false,
241            MemoryTier::LongTerm,
242            Vec::new(),
243            1.0,
244            None,
245            None,
246            None,
247            None,
248            0.0,
249            None,
250            None,
251            1,
252            Vec::new(),
253            Vec::new(),
254        )
255    }
256}
257
258impl MinimalMemory {
259    fn into_memory(self) -> Memory {
260        let now = Utc::now();
261        let experience = Experience {
262            experience_type: ExperienceType::Observation,
263            content: self.content,
264            ..Default::default()
265        };
266        Memory::from_legacy(
267            self.id,
268            experience,
269            0.5, // default importance
270            0,
271            now,
272            now,
273            false,
274            MemoryTier::LongTerm,
275            Vec::new(),
276            1.0,
277            None,
278            None,
279            None,
280            None,
281            0.0,
282            None,
283            None,
284            1,
285            Vec::new(),
286            Vec::new(),
287        )
288    }
289}
290
291/// Very simple legacy Memory - some early versions stored content directly without Experience wrapper
292/// This matches the hex pattern: UUID (16 bytes) + varint string length + string content
293#[derive(Deserialize)]
294struct SimpleLegacyMemory {
295    id: MemoryId,
296    content: String, // Direct content field, no Experience wrapper
297    #[serde(default)]
298    importance: f32,
299    #[serde(default)]
300    access_count: u32,
301    #[serde(default)]
302    created_at: Option<DateTime<Utc>>,
303    #[serde(default)]
304    last_accessed: Option<DateTime<Utc>>,
305    #[serde(default)]
306    compressed: bool,
307    #[serde(default)]
308    agent_id: Option<String>,
309    #[serde(default)]
310    run_id: Option<String>,
311    #[serde(default)]
312    actor_id: Option<String>,
313    #[serde(default)]
314    temporal_relevance: f32,
315    #[serde(default)]
316    score: Option<f32>,
317}
318
319impl SimpleLegacyMemory {
320    fn into_memory(self) -> Memory {
321        let now = Utc::now();
322        let experience = Experience {
323            experience_type: ExperienceType::Observation,
324            content: self.content,
325            ..Default::default()
326        };
327        Memory::from_legacy(
328            self.id,
329            experience,
330            if self.importance > 0.0 {
331                self.importance
332            } else {
333                0.5
334            },
335            self.access_count,
336            self.created_at.unwrap_or(now),
337            self.last_accessed.unwrap_or(now),
338            self.compressed,
339            MemoryTier::LongTerm,
340            Vec::new(),
341            1.0,
342            None,
343            self.agent_id,
344            self.run_id,
345            self.actor_id,
346            self.temporal_relevance,
347            self.score,
348            None,
349            1,
350            Vec::new(),
351            Vec::new(),
352        )
353    }
354}
355
356/// Legacy Experience type from v0.1.0 - EXACT match for bincode 1.x deserialization
357/// Includes all fields that were in the original Experience struct in EXACT ORDER.
358/// bincode 1.x serializes fields positionally, so order matters!
359#[derive(Deserialize)]
360struct LegacyExperienceV1 {
361    // Core fields (always present)
362    #[serde(default = "default_legacy_experience_type")]
363    experience_type: ExperienceType,
364    content: String,
365    #[serde(default)]
366    context: Option<RichContext>,
367    #[serde(default)]
368    entities: Vec<String>,
369    #[serde(default)]
370    metadata: HashMap<String, String>,
371    #[serde(default)]
372    embeddings: Option<Vec<f32>>,
373    #[serde(default)]
374    related_memories: Vec<MemoryId>,
375    #[serde(default)]
376    causal_chain: Vec<MemoryId>,
377    #[serde(default)]
378    outcomes: Vec<String>,
379    // Robotics fields
380    #[serde(default)]
381    robot_id: Option<String>,
382    #[serde(default)]
383    mission_id: Option<String>,
384    #[serde(default)]
385    geo_location: Option<[f64; 3]>,
386    #[serde(default)]
387    local_position: Option<[f32; 3]>,
388    #[serde(default)]
389    heading: Option<f32>,
390    #[serde(default)]
391    action_type: Option<String>,
392    #[serde(default)]
393    reward: Option<f32>,
394    #[serde(default)]
395    sensor_data: HashMap<String, f64>,
396    // Decision & learning fields
397    #[serde(default)]
398    decision_context: Option<HashMap<String, String>>,
399    #[serde(default)]
400    action_params: Option<HashMap<String, String>>,
401    #[serde(default)]
402    outcome_type: Option<String>,
403    #[serde(default)]
404    outcome_details: Option<String>,
405    #[serde(default)]
406    confidence: Option<f32>,
407    #[serde(default)]
408    alternatives_considered: Vec<String>,
409    // Environmental context
410    #[serde(default)]
411    weather: Option<HashMap<String, String>>,
412    #[serde(default)]
413    terrain_type: Option<String>,
414    #[serde(default)]
415    lighting: Option<String>,
416    #[serde(default)]
417    nearby_agents: Vec<HashMap<String, String>>,
418    // Failure & anomaly tracking
419    #[serde(default)]
420    is_failure: bool,
421    #[serde(default)]
422    is_anomaly: bool,
423    #[serde(default)]
424    severity: Option<String>,
425    #[serde(default)]
426    recovery_action: Option<String>,
427    #[serde(default)]
428    root_cause: Option<String>,
429    // Learned patterns & predictions
430    #[serde(default)]
431    pattern_id: Option<String>,
432    #[serde(default)]
433    predicted_outcome: Option<String>,
434    #[serde(default)]
435    prediction_accurate: Option<bool>,
436    #[serde(default)]
437    tags: Vec<String>,
438}
439
440impl LegacyExperienceV1 {
441    fn into_experience(self) -> Experience {
442        Experience {
443            experience_type: self.experience_type,
444            content: self.content,
445            context: self.context,
446            entities: self.entities,
447            metadata: self.metadata,
448            embeddings: self.embeddings,
449            image_embeddings: None,
450            audio_embeddings: None,
451            video_embeddings: None,
452            media_refs: Vec::new(),
453            related_memories: self.related_memories,
454            causal_chain: self.causal_chain,
455            outcomes: self.outcomes,
456            robot_id: self.robot_id,
457            mission_id: self.mission_id,
458            geo_location: self.geo_location,
459            local_position: self.local_position,
460            heading: self.heading,
461            action_type: self.action_type,
462            reward: self.reward,
463            sensor_data: self.sensor_data,
464            decision_context: self.decision_context,
465            action_params: self.action_params,
466            outcome_type: self.outcome_type,
467            outcome_details: self.outcome_details,
468            confidence: self.confidence,
469            alternatives_considered: self.alternatives_considered,
470            weather: self.weather,
471            terrain_type: self.terrain_type,
472            lighting: self.lighting,
473            nearby_agents: self.nearby_agents,
474            is_failure: self.is_failure,
475            is_anomaly: self.is_anomaly,
476            severity: self.severity,
477            recovery_action: self.recovery_action,
478            root_cause: self.root_cause,
479            pattern_id: self.pattern_id,
480            predicted_outcome: self.predicted_outcome,
481            prediction_accurate: self.prediction_accurate,
482            tags: self.tags,
483            temporal_refs: Vec::new(),
484            ner_entities: Vec::new(),
485            cooccurrence_pairs: Vec::new(),
486        }
487    }
488}
489
490/// Legacy v0.1.0 Memory with full Experience - for bincode 1.x deserialization
491#[derive(Deserialize)]
492struct LegacyMemoryV1Full {
493    #[serde(rename = "memory_id")]
494    id: MemoryId,
495    experience: LegacyExperienceV1,
496    importance: f32,
497    access_count: u32,
498    created_at: DateTime<Utc>,
499    last_accessed: DateTime<Utc>,
500    compressed: bool,
501    agent_id: Option<String>,
502    run_id: Option<String>,
503    actor_id: Option<String>,
504    temporal_relevance: f32,
505    score: Option<f32>,
506}
507
508impl LegacyMemoryV1Full {
509    fn into_memory(self) -> Memory {
510        Memory::from_legacy(
511            self.id,
512            self.experience.into_experience(),
513            self.importance,
514            self.access_count,
515            self.created_at,
516            self.last_accessed,
517            self.compressed,
518            MemoryTier::LongTerm,
519            Vec::new(),
520            1.0,
521            None,
522            self.agent_id,
523            self.run_id,
524            self.actor_id,
525            self.temporal_relevance,
526            self.score,
527            None,
528            1,
529            Vec::new(),
530            Vec::new(),
531        )
532    }
533}
534
535/// Legacy v0.1.0 format - matches the initial release serialization
536/// Uses LegacyExperienceV1 (no multimodal fields) because bincode is positional
537#[derive(Deserialize)]
538struct LegacyMemoryV1 {
539    #[serde(rename = "memory_id")]
540    id: MemoryId,
541    experience: LegacyExperienceV1, // Must use legacy Experience - bincode ignores #[serde(default)]
542    importance: f32,
543    access_count: u32,
544    created_at: DateTime<Utc>,
545    last_accessed: DateTime<Utc>,
546    compressed: bool,
547    agent_id: Option<String>,
548    run_id: Option<String>,
549    actor_id: Option<String>,
550    temporal_relevance: f32,
551    score: Option<f32>,
552}
553
554impl LegacyMemoryV1 {
555    /// Convert legacy v1 format to current Memory format
556    fn into_memory(self) -> Memory {
557        Memory::from_legacy(
558            self.id,
559            self.experience.into_experience(),
560            self.importance,
561            self.access_count,
562            self.created_at,
563            self.last_accessed,
564            self.compressed,
565            MemoryTier::LongTerm,
566            Vec::new(),
567            1.0,
568            None,
569            self.agent_id,
570            self.run_id,
571            self.actor_id,
572            self.temporal_relevance,
573            self.score,
574            None,
575            1,
576            Vec::new(),
577            Vec::new(),
578        )
579    }
580}
581
582/// Legacy v2 format - after cognitive extensions but before external linking
583/// Has tier, entity_refs, activation but no external_id/version/history
584#[derive(Deserialize)]
585struct LegacyMemoryV2 {
586    id: MemoryId,
587    experience: LegacyExperienceV1, // bincode is positional - must match original fields
588    importance: f32,
589    access_count: u32,
590    created_at: DateTime<Utc>,
591    last_accessed: DateTime<Utc>,
592    compressed: bool,
593    tier: MemoryTier,
594    entity_refs: Vec<EntityRef>,
595    activation: f32,
596    last_retrieval_id: Option<uuid::Uuid>,
597    agent_id: Option<String>,
598    run_id: Option<String>,
599    actor_id: Option<String>,
600    temporal_relevance: f32,
601    score: Option<f32>,
602}
603
604impl LegacyMemoryV2 {
605    /// Convert legacy v2 format to current Memory format
606    fn into_memory(self) -> Memory {
607        Memory::from_legacy(
608            self.id,
609            self.experience.into_experience(),
610            self.importance,
611            self.access_count,
612            self.created_at,
613            self.last_accessed,
614            self.compressed,
615            self.tier,
616            self.entity_refs,
617            self.activation,
618            self.last_retrieval_id,
619            self.agent_id,
620            self.run_id,
621            self.actor_id,
622            self.temporal_relevance,
623            self.score,
624            None,       // external_id not in v2
625            1,          // version
626            Vec::new(), // history not in v2
627            Vec::new(), // related_todo_ids not in v2
628        )
629    }
630}
631
632/// Deserialize memory with multi-version fallback for backwards compatibility
633///
634/// Tries formats in order from newest to oldest:
635/// 1. Current format (with external linking, todos)
636/// 2. Legacy v2 (cognitive extensions, no external linking)
637/// 3. Legacy v1 (original v0.1.0 format)
638///
639/// Returns (Memory, needs_migration) where needs_migration=true means the data
640/// was in a legacy format and should be re-written for future performance.
641fn deserialize_memory(data: &[u8]) -> Result<(Memory, bool)> {
642    // Check for versioned format: SHO + version byte + payload + 4-byte CRC32
643    if data.len() >= 8 && &data[0..3] == STORAGE_MAGIC {
644        let version = data[3];
645        let payload_end = data.len() - 4;
646        let stored_checksum = u32::from_le_bytes([
647            data[payload_end],
648            data[payload_end + 1],
649            data[payload_end + 2],
650            data[payload_end + 3],
651        ]);
652        let computed_checksum = crc32_simple(&data[..payload_end]);
653        if stored_checksum != computed_checksum {
654            tracing::warn!(
655                "Checksum mismatch: stored={:08x} computed={:08x}",
656                stored_checksum,
657                computed_checksum
658            );
659        }
660        let payload = &data[4..payload_end];
661        // Try current format first, then fallback to legacy formats
662        deserialize_with_fallback(payload).map_err(|e| anyhow!("v{version} decode failed: {e}"))
663    } else {
664        // Legacy format: raw bincode (no SHO header)
665        deserialize_with_fallback(data).map_err(|e| anyhow!("legacy decode failed: {e}"))
666    }
667}
668
669/// Legacy MemoryFlat for bincode 2.x data written BEFORE multimodal Experience fields
670/// This matches the format at MIF commit (dee4b03) - has tier/entity_refs but Experience without multimodal
671#[derive(Deserialize)]
672struct LegacyMemoryFlatV2 {
673    id: MemoryId,
674    experience: LegacyExperienceV1, // No multimodal fields
675    importance: f32,
676    access_count: u32,
677    created_at: DateTime<Utc>,
678    last_accessed: DateTime<Utc>,
679    compressed: bool,
680    tier: MemoryTier,
681    entity_refs: Vec<EntityRef>,
682    activation: f32,
683    last_retrieval_id: Option<uuid::Uuid>,
684    agent_id: Option<String>,
685    run_id: Option<String>,
686    actor_id: Option<String>,
687    temporal_relevance: f32,
688    score: Option<f32>,
689    external_id: Option<String>,
690    version: u32,
691    history: Vec<MemoryRevision>,
692    #[serde(default)]
693    related_todo_ids: Vec<TodoId>,
694}
695
696impl LegacyMemoryFlatV2 {
697    fn into_memory(self) -> Memory {
698        Memory::from_legacy(
699            self.id,
700            self.experience.into_experience(),
701            self.importance,
702            self.access_count,
703            self.created_at,
704            self.last_accessed,
705            self.compressed,
706            self.tier,
707            self.entity_refs,
708            self.activation,
709            self.last_retrieval_id,
710            self.agent_id,
711            self.run_id,
712            self.actor_id,
713            self.temporal_relevance,
714            self.score,
715            self.external_id,
716            self.version,
717            self.history,
718            self.related_todo_ids,
719        )
720    }
721}
722
723/// Try deserializing with multiple format fallbacks
724/// Supports bincode 2.x (current), MessagePack, and bincode 1.x (legacy) wire formats
725///
726/// Returns (Memory, is_legacy) where is_legacy=true means the data was in an old format
727/// and should be re-written to current format for future performance.
728fn deserialize_with_fallback(data: &[u8]) -> Result<(Memory, bool)> {
729    fn record_branch(branch: &str) {
730        crate::metrics::LEGACY_FALLBACK_BRANCH_TOTAL
731            .with_label_values(&[branch])
732            .inc();
733    }
734
735    // Try current format first (bincode 2.x with current Memory/Experience)
736    // This is the hot path — avoid any allocations before this check.
737    match bincode::serde::decode_from_slice::<Memory, _>(data, bincode::config::standard()) {
738        Ok((memory, _)) => {
739            return Ok((memory, false));
740        } // Current format, no migration needed
741        Err(e) => {
742            // Current format failed — enter fallback chain.
743            // From here on we collect errors for diagnostics.
744            return deserialize_legacy_fallback(data, e, record_branch);
745        }
746    }
747}
748
749/// Fallback deserialization chain for legacy memory formats.
750///
751/// Separated from `deserialize_with_fallback` so the hot path (current format)
752/// stays allocation-free and the compiler can inline/optimize it independently.
753fn deserialize_legacy_fallback(
754    data: &[u8],
755    first_error: bincode::error::DecodeError,
756    record_branch: fn(&str),
757) -> Result<(Memory, bool)> {
758    // Log detailed errors for first entry only to help debug format issues
759    static DEBUG_ENTRY_LOGGED: std::sync::atomic::AtomicBool =
760        std::sync::atomic::AtomicBool::new(false);
761    let is_first_failure = !DEBUG_ENTRY_LOGGED.load(std::sync::atomic::Ordering::Relaxed);
762
763    // Collect all errors for debugging
764    let mut errors: Vec<(&str, String)> = Vec::new();
765    errors.push(("bincode2 Memory", first_error.to_string()));
766
767    // Try bincode 2.x MINIMAL format (just UUID + content string)
768    // This matches the hex pattern: 16-byte UUID + varint length + string bytes
769    match bincode::serde::decode_from_slice::<MinimalMemory, _>(data, bincode::config::standard()) {
770        Ok((minimal, _)) => {
771            tracing::debug!("Migrated memory from bincode 2.x minimal format");
772            record_branch("bincode2_minimal");
773            return Ok((minimal.into_memory(), true));
774        }
775        Err(e) => errors.push(("bincode2 MinimalMemory", e.to_string())),
776    }
777
778    // Try bincode 2.x with type prefix: UUID + u8 + u8 (experience_type) + String
779    // Matches entries with 2 extra bytes before content (byte 16=unknown, byte 17=exp_type)
780    match bincode::serde::decode_from_slice::<MemoryWithTypePrefix, _>(
781        data,
782        bincode::config::standard(),
783    ) {
784        Ok((typed, _)) => {
785            tracing::debug!("Migrated memory from bincode 2.x with type prefix");
786            record_branch("bincode2_type_prefix");
787            return Ok((typed.into_memory(), true));
788        }
789        Err(e) => errors.push(("bincode2 MemoryWithTypePrefix", e.to_string())),
790    }
791
792    // Try bincode 2.x with OLD Experience (before multimodal fields were added)
793    match bincode::serde::decode_from_slice::<LegacyMemoryFlatV2, _>(
794        data,
795        bincode::config::standard(),
796    ) {
797        Ok((legacy, _)) => {
798            tracing::debug!("Migrated memory from bincode 2.x pre-multimodal format");
799            record_branch("bincode2_legacy_flat_v2");
800            return Ok((legacy.into_memory(), true));
801        }
802        Err(e) => errors.push(("bincode2 LegacyMemoryFlatV2", e.to_string())),
803    }
804
805    // Try bincode 1.x with LegacyMemoryV1 (v0.1.0 format)
806    match bincode1::deserialize::<LegacyMemoryV1>(data) {
807        Ok(legacy) => {
808            tracing::debug!("Migrated memory from bincode 1.x v0.1.0 format");
809            record_branch("bincode1_legacy_v1");
810            return Ok((legacy.into_memory(), true));
811        }
812        Err(e) => errors.push(("bincode1 LegacyMemoryV1", e.to_string())),
813    }
814
815    // Try bincode 1.x MINIMAL format (just UUID + content)
816    match bincode1::deserialize::<MinimalMemory>(data) {
817        Ok(minimal) => {
818            tracing::debug!("Migrated memory from bincode 1.x minimal format");
819            record_branch("bincode1_minimal");
820            return Ok((minimal.into_memory(), true));
821        }
822        Err(e) => errors.push(("bincode1 MinimalMemory", e.to_string())),
823    }
824
825    // Try bincode 1.x with SIMPLE legacy format (content as direct field, no Experience wrapper)
826    match bincode1::deserialize::<SimpleLegacyMemory>(data) {
827        Ok(legacy) => {
828            tracing::debug!("Migrated memory from bincode 1.x simple format");
829            record_branch("bincode1_simple");
830            return Ok((legacy.into_memory(), true));
831        }
832        Err(e) => errors.push(("bincode1 SimpleLegacyMemory", e.to_string())),
833    }
834
835    // Try bincode 1.x with fixint encoding (u64 lengths instead of varint)
836    use bincode1::Options;
837    let fixint_config = bincode1::options()
838        .with_fixint_encoding()
839        .allow_trailing_bytes();
840
841    // Try bincode 1.x fixint MinimalMemory
842    match fixint_config.deserialize::<MinimalMemory>(data) {
843        Ok(minimal) => {
844            tracing::debug!("Migrated memory from bincode 1.x fixint minimal format");
845            record_branch("bincode1_fixint_minimal");
846            return Ok((minimal.into_memory(), true));
847        }
848        Err(e) => errors.push(("bincode1 fixint MinimalMemory", e.to_string())),
849    }
850
851    match fixint_config.deserialize::<SimpleLegacyMemory>(data) {
852        Ok(legacy) => {
853            tracing::debug!("Migrated memory from bincode 1.x fixint simple format");
854            record_branch("bincode1_fixint_simple");
855            return Ok((legacy.into_memory(), true));
856        }
857        Err(e) => errors.push(("bincode1 fixint SimpleLegacyMemory", e.to_string())),
858    }
859
860    // Try MessagePack minimal format
861    match rmp_serde::from_slice::<MinimalMemory>(data) {
862        Ok(minimal) => {
863            tracing::debug!("Migrated memory from MessagePack minimal format");
864            record_branch("msgpack_minimal");
865            return Ok((minimal.into_memory(), true));
866        }
867        Err(e) => errors.push(("msgpack MinimalMemory", e.to_string())),
868    }
869
870    // Try MessagePack format (rmp-serde) - self-describing format
871    match rmp_serde::from_slice::<SimpleLegacyMemory>(data) {
872        Ok(legacy) => {
873            tracing::debug!("Migrated memory from MessagePack simple format");
874            record_branch("msgpack_simple");
875            return Ok((legacy.into_memory(), true));
876        }
877        Err(e) => errors.push(("msgpack SimpleLegacyMemory", e.to_string())),
878    }
879
880    // Try bincode 1.x format with original Experience (no multimodal fields)
881    match bincode1::deserialize::<LegacyMemoryV1Full>(data) {
882        Ok(legacy) => {
883            tracing::debug!("Migrated memory from bincode 1.x v1 full format");
884            record_branch("bincode1_legacy_v1_full");
885            return Ok((legacy.into_memory(), true));
886        }
887        Err(e) => errors.push(("bincode1 LegacyMemoryV1Full", e.to_string())),
888    }
889
890    // Try bincode 1.x with fixint encoding for full legacy format
891    match fixint_config.deserialize::<LegacyMemoryV1Full>(data) {
892        Ok(legacy) => {
893            tracing::debug!("Migrated memory from bincode 1.x fixint v1 full format");
894            record_branch("bincode1_fixint_legacy_v1_full");
895            return Ok((legacy.into_memory(), true));
896        }
897        Err(e) => errors.push(("bincode1 fixint LegacyMemoryV1Full", e.to_string())),
898    }
899
900    // Try MessagePack with full legacy format
901    match rmp_serde::from_slice::<LegacyMemoryV1Full>(data) {
902        Ok(legacy) => {
903            tracing::debug!("Migrated memory from MessagePack v1 full format");
904            record_branch("msgpack_legacy_v1_full");
905            return Ok((legacy.into_memory(), true));
906        }
907        Err(e) => errors.push(("msgpack LegacyMemoryV1Full", e.to_string())),
908    }
909
910    // Try bincode 1.x format (used in versions prior to bincode 2.0 migration)
911    match bincode1::deserialize::<LegacyMemoryV1>(data) {
912        Ok(legacy) => {
913            tracing::debug!("Migrated memory from bincode 1.x format");
914            record_branch("bincode1_legacy_v1_repeat");
915            return Ok((legacy.into_memory(), true));
916        }
917        Err(_) => {} // Already tried above
918    }
919
920    // Try legacy v2 format with bincode 1.x (cognitive extensions era)
921    match bincode1::deserialize::<LegacyMemoryV2>(data) {
922        Ok(legacy) => {
923            tracing::debug!("Migrated memory from bincode 1.x v2 format");
924            record_branch("bincode1_legacy_v2");
925            return Ok((legacy.into_memory(), true));
926        }
927        Err(e) => errors.push(("bincode1 LegacyMemoryV2", e.to_string())),
928    }
929
930    // Try bincode 2.x with 3-byte header: UUID + 3 bytes + String
931    // Hex analysis shows content starts at byte 19 (byte 18 is 0xa4, not valid UTF-8 start)
932    match bincode::serde::decode_from_slice::<MemoryWith3ByteHeader, _>(
933        data,
934        bincode::config::standard(),
935    ) {
936        Ok((mem, _)) => {
937            tracing::debug!("Migrated memory from bincode 2.x with 3-byte header");
938            record_branch("bincode2_3byte_header");
939            return Ok((mem.into_memory(), true));
940        }
941        Err(e) => errors.push(("bincode2 MemoryWith3ByteHeader", e.to_string())),
942    }
943
944    // LAST RESORT: Try raw byte parsing with different header skip sizes
945    // This handles non-standard formats by finding where valid UTF-8 content starts
946    if let Some(memory) = try_raw_memory_parse(data) {
947        record_branch("raw_parse");
948        return Ok((memory, true));
949    }
950    errors.push(("raw parse", "no valid UTF-8 content found".to_string()));
951
952    // Log debug info on first failure only (at debug level to reduce noise)
953    if is_first_failure {
954        DEBUG_ENTRY_LOGGED.store(true, std::sync::atomic::Ordering::Relaxed);
955
956        let hex_preview: String = data
957            .iter()
958            .take(32)
959            .map(|b| format!("{:02x}", b))
960            .collect::<Vec<_>>()
961            .join(" ");
962        tracing::debug!(
963            "Unknown memory format ({} bytes): {}...",
964            data.len(),
965            hex_preview
966        );
967    }
968
969    // All formats failed
970    record_branch("decode_failed");
971    Err(anyhow!(
972        "Failed to deserialize memory: incompatible format ({} bytes)",
973        data.len()
974    ))
975}
976
977/// Simple CRC32 implementation (IEEE polynomial)
978fn crc32_simple(data: &[u8]) -> u32 {
979    let mut crc: u32 = 0xFFFFFFFF;
980    for byte in data {
981        crc ^= *byte as u32;
982        for _ in 0..8 {
983            crc = if crc & 1 != 0 {
984                (crc >> 1) ^ 0xEDB88320
985            } else {
986                crc >> 1
987            };
988        }
989    }
990    !crc
991}
992
993/// Column family name for secondary indices (tags, types, timestamps, etc.)
994const CF_INDEX: &str = "memory_index";
995
996/// Storage engine for long-term memory persistence
997///
998/// Uses a single RocksDB instance with 2 column families:
999/// - default: main memory data (also shared by LearningHistoryStore, TemporalFactStore, etc. via key prefixes)
1000/// - `memory_index`: secondary indices for tag/type/timestamp queries
1001pub struct MemoryStorage {
1002    db: Arc<DB>,
1003    /// Base storage path for all memory data
1004    storage_path: PathBuf,
1005    /// Write mode (sync vs async) - affects latency vs durability tradeoff
1006    write_mode: WriteMode,
1007}
1008
1009impl MemoryStorage {
1010    /// CF accessor for the memory_index column family
1011    fn index_cf(&self) -> &ColumnFamily {
1012        self.db
1013            .cf_handle(CF_INDEX)
1014            .expect("memory_index CF must exist")
1015    }
1016
1017    /// Create a new memory storage.
1018    ///
1019    /// If `shared_cache` is provided, all block-cache reads are charged against
1020    /// the shared LRU cache (recommended for multi-tenant server mode). When
1021    /// `None`, a small per-instance cache is created (standalone / test use).
1022    pub fn new(path: &Path, shared_cache: Option<&rocksdb::Cache>) -> Result<Self> {
1023        use crate::constants::ROCKSDB_MEMORY_WRITE_BUFFER_BYTES;
1024
1025        // Create directories if they don't exist
1026        let storage_path = path.join("storage");
1027        std::fs::create_dir_all(&storage_path)?;
1028
1029        // Configure RocksDB options for PRODUCTION durability + performance
1030        let mut opts = Options::default();
1031        opts.create_if_missing(true);
1032        opts.create_missing_column_families(true);
1033        opts.set_compression_type(rocksdb::DBCompressionType::Lz4);
1034
1035        // ========================================================================
1036        // DURABILITY SETTINGS - Critical for data persistence across restarts
1037        // ========================================================================
1038        //
1039        // RocksDB data flow: Write → WAL → Memtable → SST files
1040        // Without proper sync, data in memtable can be lost on crash/restart
1041        //
1042        // Our approach: Sync WAL on every write (most durable option)
1043        // This ensures data survives even if process crashes before memtable flush
1044        // ========================================================================
1045
1046        // WAL stays in default location (same as data dir) - avoids corruption issues
1047        opts.set_manual_wal_flush(false); // Auto-flush WAL entries
1048
1049        // Write performance — sized for edge deployment (tune up via env for heavy workloads)
1050        opts.set_max_write_buffer_number(2);
1051        opts.set_write_buffer_size(ROCKSDB_MEMORY_WRITE_BUFFER_BYTES);
1052        opts.set_level_zero_file_num_compaction_trigger(4);
1053        opts.set_target_file_size_base(64 * 1024 * 1024); // 64MB SST files
1054        opts.set_max_bytes_for_level_base(256 * 1024 * 1024); // 256MB L1
1055        opts.set_max_background_jobs(4);
1056        opts.set_level_compaction_dynamic_level_bytes(true);
1057
1058        // Read performance — shared block cache for multi-tenant, small local for standalone
1059        use rocksdb::{BlockBasedOptions, Cache};
1060        let mut block_opts = BlockBasedOptions::default();
1061        block_opts.set_bloom_filter(10.0, false); // 10 bits/key = ~1% FPR
1062        let local_cache;
1063        let cache = match shared_cache {
1064            Some(c) => c,
1065            None => {
1066                local_cache = Cache::new_lru_cache(16 * 1024 * 1024); // 16MB standalone
1067                &local_cache
1068            }
1069        };
1070        block_opts.set_block_cache(cache);
1071        block_opts.set_cache_index_and_filter_blocks(true);
1072        block_opts.set_pin_l0_filter_and_index_blocks_in_cache(true); // Pin L0 for fast reads
1073        opts.set_block_based_table_factory(&block_opts);
1074
1075        // Open single database with column families (index CF uses lighter settings)
1076        let main_opts = opts.clone();
1077        let db = Arc::new(Self::open_or_repair_cf(&opts, &storage_path, move || {
1078            vec![
1079                ColumnFamilyDescriptor::new("default", main_opts.clone()),
1080                ColumnFamilyDescriptor::new(CF_INDEX, {
1081                    let mut idx_opts = Options::default();
1082                    idx_opts.create_if_missing(true);
1083                    idx_opts.set_compression_type(rocksdb::DBCompressionType::Lz4);
1084                    idx_opts.set_max_write_buffer_number(2);
1085                    idx_opts.set_write_buffer_size(ROCKSDB_MEMORY_WRITE_BUFFER_BYTES);
1086                    idx_opts
1087                }),
1088            ]
1089        })?);
1090
1091        // Migrate from old separate-DB layout if needed
1092        Self::migrate_from_separate_dbs(path, &db)?;
1093
1094        let write_mode = WriteMode::default();
1095        tracing::info!(
1096            "Storage initialized with {:?} write mode (latency: {})",
1097            write_mode,
1098            if write_mode == WriteMode::Sync {
1099                "2-10ms per write"
1100            } else {
1101                "<1ms per write"
1102            }
1103        );
1104
1105        Ok(Self {
1106            db,
1107            storage_path: path.to_path_buf(),
1108            write_mode,
1109        })
1110    }
1111
1112    /// Open a RocksDB database with column families, automatically repairing if corruption is detected.
1113    ///
1114    /// On hard kills (ONNX deadlock, OOM, kill -9), RocksDB SST files can be left
1115    /// in a partially written state. This attempts repair before giving up.
1116    ///
1117    /// Takes a builder closure because `ColumnFamilyDescriptor` is not `Clone` —
1118    /// we need to rebuild descriptors for the retry path after repair.
1119    fn open_or_repair_cf<F>(opts: &Options, path: &Path, build_cfs: F) -> Result<DB>
1120    where
1121        F: Fn() -> Vec<ColumnFamilyDescriptor>,
1122    {
1123        match DB::open_cf_descriptors(opts, path, build_cfs()) {
1124            Ok(db) => Ok(db),
1125            Err(open_err) => {
1126                let err_str = open_err.to_string();
1127                // Only attempt repair for corruption-related errors
1128                if err_str.contains("Corruption")
1129                    || err_str.contains("bad block")
1130                    || err_str.contains("checksum mismatch")
1131                    || err_str.contains("MANIFEST")
1132                    || err_str.contains("CURRENT")
1133                {
1134                    tracing::warn!(
1135                        error = %open_err,
1136                        "RocksDB corruption detected in memory storage, attempting repair"
1137                    );
1138                    if let Err(repair_err) = DB::repair(opts, path) {
1139                        tracing::error!(
1140                            error = %repair_err,
1141                            "RocksDB repair failed for memory storage"
1142                        );
1143                        return Err(anyhow::anyhow!(
1144                            "Failed to open or repair memory storage: open={open_err}, repair={repair_err}"
1145                        ));
1146                    }
1147                    tracing::info!("RocksDB repair succeeded for memory storage, reopening");
1148                    DB::open_cf_descriptors(opts, path, build_cfs()).map_err(|e| {
1149                        anyhow::anyhow!("Failed to open memory storage after repair: {e}")
1150                    })
1151                } else {
1152                    Err(anyhow::anyhow!("Failed to open memory storage: {open_err}"))
1153                }
1154            }
1155        }
1156    }
1157
1158    /// Migrate from old separate-DB layout (memories/ + memory_index/) to single CF-based DB.
1159    ///
1160    /// Detects whether old directories exist and the new CFs are empty, then bulk-copies
1161    /// all data into the unified DB. Old directories are renamed to *.pre_cf_migration
1162    /// for rollback safety.
1163    fn migrate_from_separate_dbs(base_path: &Path, db: &DB) -> Result<()> {
1164        let old_memories_dir = base_path.join("memories");
1165        let old_index_dir = base_path.join("memory_index");
1166
1167        let has_old_memories = old_memories_dir.is_dir();
1168        let has_old_index = old_index_dir.is_dir();
1169
1170        if !has_old_memories && !has_old_index {
1171            return Ok(());
1172        }
1173
1174        tracing::info!("Detected old separate-DB layout, migrating to column families...");
1175        let mut total_migrated = 0usize;
1176
1177        // Migrate main memories → default CF
1178        if has_old_memories {
1179            let old_opts = Options::default();
1180            match DB::open_for_read_only(&old_opts, &old_memories_dir, false) {
1181                Ok(old_db) => {
1182                    let mut batch = WriteBatch::default();
1183                    let mut count = 0usize;
1184                    for item in old_db.iterator(IteratorMode::Start) {
1185                        if let Ok((key, value)) = item {
1186                            batch.put(&key, &value);
1187                            count += 1;
1188                            if count % 10_000 == 0 {
1189                                db.write(std::mem::take(&mut batch))?;
1190                                tracing::info!("  memories: migrated {count} entries...");
1191                            }
1192                        }
1193                    }
1194                    if !batch.is_empty() {
1195                        db.write(batch)?;
1196                    }
1197                    drop(old_db);
1198                    total_migrated += count;
1199                    tracing::info!("  memories: migrated {count} entries to default CF");
1200
1201                    let backup_name = base_path.join("memories.pre_cf_migration");
1202                    if backup_name.exists() {
1203                        let _ = std::fs::remove_dir_all(&backup_name);
1204                    }
1205                    if let Err(e) = std::fs::rename(&old_memories_dir, &backup_name) {
1206                        tracing::warn!("Could not rename old memories dir: {e}");
1207                    }
1208                }
1209                Err(e) => {
1210                    tracing::warn!("Could not open old memories DB for migration: {e}");
1211                }
1212            }
1213        }
1214
1215        // Migrate index → memory_index CF
1216        if has_old_index {
1217            let index_cf = db
1218                .cf_handle(CF_INDEX)
1219                .expect("memory_index CF must exist during migration");
1220
1221            let old_opts = Options::default();
1222            match DB::open_for_read_only(&old_opts, &old_index_dir, false) {
1223                Ok(old_db) => {
1224                    let mut batch = WriteBatch::default();
1225                    let mut count = 0usize;
1226                    for item in old_db.iterator(IteratorMode::Start) {
1227                        if let Ok((key, value)) = item {
1228                            batch.put_cf(&index_cf, &key, &value);
1229                            count += 1;
1230                            if count % 10_000 == 0 {
1231                                db.write(std::mem::take(&mut batch))?;
1232                                tracing::info!("  index: migrated {count} entries...");
1233                            }
1234                        }
1235                    }
1236                    if !batch.is_empty() {
1237                        db.write(batch)?;
1238                    }
1239                    drop(old_db);
1240                    total_migrated += count;
1241                    tracing::info!("  index: migrated {count} entries to {CF_INDEX} CF");
1242
1243                    let backup_name = base_path.join("memory_index.pre_cf_migration");
1244                    if backup_name.exists() {
1245                        let _ = std::fs::remove_dir_all(&backup_name);
1246                    }
1247                    if let Err(e) = std::fs::rename(&old_index_dir, &backup_name) {
1248                        tracing::warn!("Could not rename old memory_index dir: {e}");
1249                    }
1250                }
1251                Err(e) => {
1252                    tracing::warn!("Could not open old memory_index DB for migration: {e}");
1253                }
1254            }
1255        }
1256
1257        if total_migrated > 0 {
1258            tracing::info!(
1259                "Memory storage migration complete: {total_migrated} total entries migrated"
1260            );
1261        }
1262
1263        Ok(())
1264    }
1265
1266    /// Get the base storage path
1267    pub fn path(&self) -> &Path {
1268        &self.storage_path
1269    }
1270
1271    /// Store a memory with configurable write durability
1272    ///
1273    /// ROBOTICS OPTIMIZATION: Write mode is configurable via SHODH_WRITE_MODE env var.
1274    /// - Async (default): <1ms per write, data survives process crashes
1275    /// - Sync: 2-10ms per write, data survives power loss
1276    ///
1277    /// For robotics/edge: Use async mode + periodic flush() calls for best latency.
1278    /// For compliance/critical: Set SHODH_WRITE_MODE=sync for full durability.
1279    pub fn store(&self, memory: &Memory) -> Result<()> {
1280        let key = memory.id.0.as_bytes();
1281
1282        // Serialize memory
1283        let value = bincode::serde::encode_to_vec(memory, bincode::config::standard())
1284            .context(format!("Failed to serialize memory {}", memory.id.0))?;
1285
1286        // Use write mode based on configuration
1287        let mut write_opts = WriteOptions::default();
1288        write_opts.set_sync(self.write_mode == WriteMode::Sync);
1289
1290        // Store in main database
1291        self.db
1292            .put_opt(key, &value, &write_opts)
1293            .context(format!("Failed to put memory {} in RocksDB", memory.id.0))?;
1294
1295        // Update indices - rollback main write on failure for consistency
1296        if let Err(e) = self.update_indices(memory) {
1297            // Rollback: delete the memory we just wrote
1298            if let Err(del_err) = self.db.delete(key) {
1299                tracing::error!(
1300                    "Index write failed AND rollback failed for memory {}: index_err={}, delete_err={}",
1301                    memory.id.0, e, del_err
1302                );
1303            }
1304            return Err(e.context(format!(
1305                "Failed to update indices for memory {} (rolled back)",
1306                memory.id.0
1307            )));
1308        }
1309
1310        Ok(())
1311    }
1312
1313    /// Update secondary indices for efficient retrieval
1314    fn update_indices(&self, memory: &Memory) -> Result<()> {
1315        let idx = self.index_cf();
1316        let mut batch = WriteBatch::default();
1317
1318        // === Standard Indices ===
1319
1320        // Index by date (for temporal queries)
1321        // BUG-001 FIX: Include memory_id in key to allow multiple memories per day
1322        // Old format: date:YYYYMMDD (overwrites on same day)
1323        // New format: date:YYYYMMDD:uuid (unique per memory)
1324        let date_key = format!(
1325            "date:{}:{}",
1326            memory.created_at.format("%Y%m%d"),
1327            memory.id.0
1328        );
1329        batch.put_cf(idx, date_key.as_bytes(), b"1");
1330
1331        // Index by type
1332        let type_key = format!(
1333            "type:{:?}:{}",
1334            memory.experience.experience_type, memory.id.0
1335        );
1336        batch.put_cf(idx, type_key.as_bytes(), b"1");
1337
1338        // Index by importance (quantized into buckets)
1339        let importance_bucket = (memory.importance() * 10.0) as u32;
1340        let importance_key = format!("importance:{}:{}", importance_bucket, memory.id.0);
1341        batch.put_cf(idx, importance_key.as_bytes(), b"1");
1342
1343        // Index by entities (case-insensitive for tag search compatibility)
1344        for entity in &memory.experience.entities {
1345            let normalized_entity = entity.to_lowercase();
1346            let entity_key = format!("entity:{}:{}", normalized_entity, memory.id.0);
1347            batch.put_cf(idx, entity_key.as_bytes(), b"1");
1348        }
1349
1350        // Index by tags (separate from entities for explicit tag queries)
1351        for tag in &memory.experience.tags {
1352            let normalized_tag = tag.to_lowercase();
1353            let tag_key = format!("tag:{}:{}", normalized_tag, memory.id.0);
1354            batch.put_cf(idx, tag_key.as_bytes(), b"1");
1355        }
1356
1357        // Index by episode_id (for temporal/episodic retrieval)
1358        // Episode is the primary temporal grouping - memories in same episode are highly related
1359        if let Some(ctx) = &memory.experience.context {
1360            if let Some(episode_id) = &ctx.episode.episode_id {
1361                let episode_key = format!("episode:{}:{}", episode_id, memory.id.0);
1362                batch.put_cf(idx, episode_key.as_bytes(), b"1");
1363
1364                // Also index by sequence within episode for temporal ordering
1365                if let Some(seq) = ctx.episode.sequence_number {
1366                    let seq_key = format!("episode_seq:{}:{}:{}", episode_id, seq, memory.id.0);
1367                    batch.put_cf(idx, seq_key.as_bytes(), b"1");
1368                }
1369            }
1370        }
1371
1372        // === Robotics Indices ===
1373
1374        // Index by robot_id (for multi-robot systems)
1375        if let Some(ref robot_id) = memory.experience.robot_id {
1376            let robot_key = format!("robot:{}:{}", robot_id, memory.id.0);
1377            batch.put_cf(idx, robot_key.as_bytes(), b"1");
1378        }
1379
1380        // Index by mission_id (for mission context retrieval)
1381        if let Some(ref mission_id) = memory.experience.mission_id {
1382            let mission_key = format!("mission:{}:{}", mission_id, memory.id.0);
1383            batch.put_cf(idx, mission_key.as_bytes(), b"1");
1384        }
1385
1386        // Index by geo_location (for spatial queries) using geohash
1387        // Key format: geo:GEOHASH:memory_id (geohash at precision 10 = ~1.2m x 60cm)
1388        // Geohash enables efficient prefix-based spatial queries
1389        if let Some(geo) = memory.experience.geo_location {
1390            let lat = geo[0];
1391            let lon = geo[1];
1392            // Use precision 10 for warehouse-level accuracy (~1.2m cells)
1393            let geohash = super::types::geohash_encode(lat, lon, 10);
1394            let geo_key = format!("geo:{}:{}", geohash, memory.id.0);
1395            batch.put_cf(idx, geo_key.as_bytes(), b"1");
1396        }
1397
1398        // Index by action_type (for action-based retrieval)
1399        if let Some(ref action_type) = memory.experience.action_type {
1400            let action_key = format!("action:{}:{}", action_type, memory.id.0);
1401            batch.put_cf(idx, action_key.as_bytes(), b"1");
1402        }
1403
1404        // Index by reward (bucketed, for RL-style queries)
1405        // Bucket: -1.0 to 1.0 mapped to 0-20
1406        if let Some(reward) = memory.experience.reward {
1407            let clamped_reward = reward.clamp(-1.0, 1.0);
1408            let reward_bucket = ((clamped_reward + 1.0) * 10.0) as i32;
1409            let reward_key = format!("reward:{}:{}", reward_bucket, memory.id.0);
1410            batch.put_cf(idx, reward_key.as_bytes(), b"1");
1411        }
1412
1413        // === Content Hash Index (idempotency) ===
1414        // Index by SHA256 content hash for dedup (issue #109)
1415        // Key format: content_hash:{hex} -> memory_id (16 bytes UUID)
1416        // Enables O(1) duplicate detection on remember()
1417        {
1418            let content_hash = Self::sha256_content_hash(&memory.experience.content);
1419            let hash_key = format!("content_hash:{}", content_hash);
1420            batch.put_cf(idx, hash_key.as_bytes(), memory.id.0.as_bytes());
1421        }
1422
1423        // === External Linking Index ===
1424        // Index by external_id for upsert operations (Linear, GitHub, etc.)
1425        // Key format: external:{source}:{id}:{memory_id} -> memory_id
1426        // Enables O(1) lookup when syncing from external systems
1427        if let Some(ref external_id) = memory.external_id {
1428            let external_key = format!("external:{}:{}", external_id, memory.id.0);
1429            // Store memory_id as value for direct lookup
1430            batch.put_cf(idx, external_key.as_bytes(), memory.id.0.as_bytes());
1431        }
1432
1433        // === Hierarchy Index ===
1434        // Index by parent_id for tree queries (list children, build tree)
1435        // Key format: parent:{parent_id}:{child_id} -> 1
1436        // Enables O(1) lookup of all children for a parent
1437        if let Some(ref parent_id) = memory.parent_id {
1438            let parent_key = format!("parent:{}:{}", parent_id.0, memory.id.0);
1439            batch.put_cf(idx, parent_key.as_bytes(), b"1");
1440        }
1441
1442        // Use write mode based on configuration
1443        let mut write_opts = WriteOptions::default();
1444        write_opts.set_sync(self.write_mode == WriteMode::Sync);
1445        self.db.write_opt(batch, &write_opts)?;
1446        Ok(())
1447    }
1448
1449    /// Compute SHA256 hex digest for content dedup indexing (issue #109)
1450    fn sha256_content_hash(content: &str) -> String {
1451        use sha2::{Digest, Sha256};
1452        let mut hasher = Sha256::new();
1453        hasher.update(content.as_bytes());
1454        hex::encode(hasher.finalize())
1455    }
1456
1457    /// Look up an existing memory by content hash (idempotency dedup, issue #109).
1458    /// Returns the MemoryId if identical content was already stored.
1459    pub fn get_by_content_hash(&self, content: &str) -> Option<MemoryId> {
1460        let content_hash = Self::sha256_content_hash(content);
1461        let hash_key = format!("content_hash:{}", content_hash);
1462        let idx = self.index_cf();
1463        match self.db.get_cf(idx, hash_key.as_bytes()) {
1464            Ok(Some(value)) if value.len() == 16 => {
1465                let uuid = uuid::Uuid::from_slice(&value).ok()?;
1466                // Verify the memory still exists (might have been deleted)
1467                let memory_id = MemoryId(uuid);
1468                match self.get(&memory_id) {
1469                    Ok(_) => Some(memory_id),
1470                    Err(_) => {
1471                        // Stale index entry — memory was deleted, clean up
1472                        let _ = self.db.delete_cf(idx, hash_key.as_bytes());
1473                        None
1474                    }
1475                }
1476            }
1477            _ => None,
1478        }
1479    }
1480
1481    /// Retrieve a memory by ID
1482    ///
1483    /// Performs lazy migration: if memory is in legacy format, re-writes it
1484    /// in current format for faster future reads.
1485    pub fn get(&self, id: &MemoryId) -> Result<Memory> {
1486        let key = id.0.as_bytes();
1487        match self.db.get(key)? {
1488            Some(value) => {
1489                let (memory, needs_migration) = deserialize_memory(&value).with_context(|| {
1490                    format!(
1491                        "Failed to deserialize memory {} ({} bytes)",
1492                        id.0,
1493                        value.len()
1494                    )
1495                })?;
1496
1497                // Lazy migration: re-write legacy formats in current format
1498                if needs_migration {
1499                    if let Err(e) = self.migrate_memory_format(&memory) {
1500                        // Migration failure is non-fatal - log and continue
1501                        tracing::debug!("Lazy migration skipped for memory {}: {}", memory.id.0, e);
1502                    }
1503                }
1504
1505                Ok(memory)
1506            }
1507            None => Err(anyhow!("Memory not found: {id:?}")),
1508        }
1509    }
1510
1511    /// Re-write a memory in current format (lazy migration helper)
1512    fn migrate_memory_format(&self, memory: &Memory) -> Result<()> {
1513        let key = memory.id.0.as_bytes();
1514        let value = bincode::serde::encode_to_vec(memory, bincode::config::standard())
1515            .context("Failed to serialize for migration")?;
1516
1517        let mut write_opts = WriteOptions::default();
1518        write_opts.set_sync(false); // Async is fine for migration
1519
1520        self.db.put_opt(key, &value, &write_opts)?;
1521        tracing::debug!("Migrated memory {} to current format", memory.id.0);
1522        Ok(())
1523    }
1524
1525    /// Find a memory by its external_id (e.g., "linear:SHO-39", "github:pr-123")
1526    ///
1527    /// Returns the memory if found, None if no memory with this external_id exists.
1528    /// Used for upsert operations when syncing from external sources.
1529    pub fn find_by_external_id(&self, external_id: &str) -> Result<Option<Memory>> {
1530        // Index key format: external:{external_id}:{memory_id}
1531        let prefix = format!("external:{external_id}:");
1532
1533        let iter = self.db.iterator_cf(
1534            self.index_cf(),
1535            IteratorMode::From(prefix.as_bytes(), rocksdb::Direction::Forward),
1536        );
1537
1538        for (key, _value) in iter.log_errors() {
1539            let key_str = String::from_utf8_lossy(&key);
1540            if !key_str.starts_with(&prefix) {
1541                break;
1542            }
1543            // Extract memory_id from key (format: external:{external_id}:{memory_id})
1544            if let Some(id_str) = key_str.strip_prefix(&prefix) {
1545                if let Ok(uuid) = uuid::Uuid::parse_str(id_str) {
1546                    return Ok(Some(self.get(&MemoryId(uuid))?));
1547                }
1548            }
1549        }
1550
1551        Ok(None)
1552    }
1553
1554    /// Update an existing memory
1555    ///
1556    /// ALGO-004 FIX: Re-indexes memory after update to handle importance drift.
1557    /// When Hebbian feedback changes importance, the old bucket index becomes stale.
1558    /// We remove old indices before storing to ensure index consistency.
1559    pub fn update(&self, memory: &Memory) -> Result<()> {
1560        // Remove old indices first (they may have stale importance buckets)
1561        self.remove_from_indices(&memory.id)?;
1562        // Store with fresh indices
1563        self.store(memory)
1564    }
1565
1566    /// Delete a memory with configurable durability
1567    #[allow(unused)] // Public API - available for memory management
1568    pub fn delete(&self, id: &MemoryId) -> Result<()> {
1569        // Clean up indices FIRST while the memory still exists in the main DB,
1570        // since remove_from_indices() needs to read the memory to reconstruct index keys.
1571        self.remove_from_indices(id)?;
1572
1573        // Then delete from main database
1574        let key = id.0.as_bytes();
1575        let mut write_opts = WriteOptions::default();
1576        write_opts.set_sync(self.write_mode == WriteMode::Sync);
1577        self.db.delete_opt(key, &write_opts)?;
1578
1579        // Delete vector mapping if present
1580        let mapping_key = format!("vmapping:{}", id.0);
1581        let _ = self.db.delete_opt(mapping_key.as_bytes(), &write_opts);
1582
1583        Ok(())
1584    }
1585
1586    /// Remove memory from all indices
1587    /// BUG-005 FIX: Direct key deletion instead of O(n) scan with contains()
1588    /// We reconstruct index keys from memory metadata for O(k) deletion
1589    fn remove_from_indices(&self, id: &MemoryId) -> Result<()> {
1590        // Fetch memory to reconstruct index keys
1591        let memory = match self.get(id) {
1592            Ok(m) => m,
1593            Err(_) => {
1594                tracing::debug!("Memory {} not found, skipping index cleanup", id.0);
1595                return Ok(());
1596            }
1597        };
1598
1599        let idx = self.index_cf();
1600        let mut batch = WriteBatch::default();
1601
1602        // Reconstruct and delete all index keys directly (O(k) instead of O(n))
1603
1604        // Date index
1605        let date_key = format!("date:{}:{}", memory.created_at.format("%Y%m%d"), id.0);
1606        batch.delete_cf(idx, date_key.as_bytes());
1607
1608        // Type index
1609        let type_key = format!("type:{:?}:{}", memory.experience.experience_type, id.0);
1610        batch.delete_cf(idx, type_key.as_bytes());
1611
1612        // Importance index
1613        let importance_bucket = (memory.importance() * 10.0) as u32;
1614        let importance_key = format!("importance:{}:{}", importance_bucket, id.0);
1615        batch.delete_cf(idx, importance_key.as_bytes());
1616
1617        // Entity indices (must match the to_lowercase() normalization in update_indices)
1618        for entity in &memory.experience.entities {
1619            let normalized_entity = entity.to_lowercase();
1620            let entity_key = format!("entity:{}:{}", normalized_entity, id.0);
1621            batch.delete_cf(idx, entity_key.as_bytes());
1622        }
1623
1624        // Tag indices
1625        for tag in &memory.experience.tags {
1626            let normalized_tag = tag.to_lowercase();
1627            let tag_key = format!("tag:{}:{}", normalized_tag, id.0);
1628            batch.delete_cf(idx, tag_key.as_bytes());
1629        }
1630
1631        // Episode indices
1632        if let Some(ctx) = &memory.experience.context {
1633            if let Some(episode_id) = &ctx.episode.episode_id {
1634                let episode_key = format!("episode:{}:{}", episode_id, id.0);
1635                batch.delete_cf(idx, episode_key.as_bytes());
1636
1637                if let Some(seq) = ctx.episode.sequence_number {
1638                    let seq_key = format!("episode_seq:{}:{}:{}", episode_id, seq, id.0);
1639                    batch.delete_cf(idx, seq_key.as_bytes());
1640                }
1641            }
1642        }
1643
1644        // Robot index
1645        if let Some(ref robot_id) = memory.experience.robot_id {
1646            let robot_key = format!("robot:{}:{}", robot_id, id.0);
1647            batch.delete_cf(idx, robot_key.as_bytes());
1648        }
1649
1650        // Mission index
1651        if let Some(ref mission_id) = memory.experience.mission_id {
1652            let mission_key = format!("mission:{}:{}", mission_id, id.0);
1653            batch.delete_cf(idx, mission_key.as_bytes());
1654        }
1655
1656        // Geo index
1657        if let Some(geo) = memory.experience.geo_location {
1658            let geohash = super::types::geohash_encode(geo[0], geo[1], 10);
1659            let geo_key = format!("geo:{}:{}", geohash, id.0);
1660            batch.delete_cf(idx, geo_key.as_bytes());
1661        }
1662
1663        // Action index
1664        if let Some(ref action_type) = memory.experience.action_type {
1665            let action_key = format!("action:{}:{}", action_type, id.0);
1666            batch.delete_cf(idx, action_key.as_bytes());
1667        }
1668
1669        // Reward index (must match the clamp in update_indices)
1670        if let Some(reward) = memory.experience.reward {
1671            let clamped_reward = reward.clamp(-1.0, 1.0);
1672            let reward_bucket = ((clamped_reward + 1.0) * 10.0) as i32;
1673            let reward_key = format!("reward:{}:{}", reward_bucket, id.0);
1674            batch.delete_cf(idx, reward_key.as_bytes());
1675        }
1676
1677        // Content hash index (idempotency dedup)
1678        {
1679            let content_hash = Self::sha256_content_hash(&memory.experience.content);
1680            let hash_key = format!("content_hash:{}", content_hash);
1681            batch.delete_cf(idx, hash_key.as_bytes());
1682        }
1683
1684        // External linking index
1685        if let Some(ref external_id) = memory.external_id {
1686            let external_key = format!("external:{}:{}", external_id, id.0);
1687            batch.delete_cf(idx, external_key.as_bytes());
1688        }
1689
1690        // Parent index (hierarchy)
1691        if let Some(ref parent_id) = memory.parent_id {
1692            let parent_key = format!("parent:{}:{}", parent_id.0, id.0);
1693            batch.delete_cf(idx, parent_key.as_bytes());
1694        }
1695
1696        // Use write mode based on configuration
1697        let mut write_opts = WriteOptions::default();
1698        write_opts.set_sync(self.write_mode == WriteMode::Sync);
1699        self.db.write_opt(batch, &write_opts)?;
1700        Ok(())
1701    }
1702
1703    /// Search memories by various criteria
1704    pub fn search(&self, criteria: SearchCriteria) -> Result<Vec<Memory>> {
1705        let mut memory_ids = Vec::new();
1706
1707        match criteria {
1708            // === Standard Criteria ===
1709            SearchCriteria::ByDate { start, end } => {
1710                memory_ids = self.search_by_date_range(start, end)?;
1711            }
1712            SearchCriteria::ByType(exp_type) => {
1713                memory_ids = self.search_by_type(exp_type)?;
1714            }
1715            SearchCriteria::ByImportance { min, max } => {
1716                memory_ids = self.search_by_importance(min, max)?;
1717            }
1718            SearchCriteria::ByEntity(entity) => {
1719                memory_ids = self.search_by_entity(&entity)?;
1720            }
1721            SearchCriteria::ByTags(tags) => {
1722                memory_ids = self.search_by_tags(&tags)?;
1723            }
1724
1725            // === Temporal/Episode Criteria ===
1726            SearchCriteria::ByEpisode(episode_id) => {
1727                memory_ids = self.search_by_episode(&episode_id)?;
1728            }
1729            SearchCriteria::ByEpisodeSequence {
1730                episode_id,
1731                min_sequence,
1732                max_sequence,
1733            } => {
1734                memory_ids =
1735                    self.search_by_episode_sequence(&episode_id, min_sequence, max_sequence)?;
1736            }
1737
1738            // === Robotics Criteria ===
1739            SearchCriteria::ByRobot(robot_id) => {
1740                memory_ids = self.search_by_robot(&robot_id)?;
1741            }
1742            SearchCriteria::ByMission(mission_id) => {
1743                memory_ids = self.search_by_mission(&mission_id)?;
1744            }
1745            SearchCriteria::ByLocation {
1746                lat,
1747                lon,
1748                radius_meters,
1749            } => {
1750                memory_ids = self.search_by_location(lat, lon, radius_meters)?;
1751            }
1752            SearchCriteria::ByActionType(action_type) => {
1753                memory_ids = self.search_by_action_type(&action_type)?;
1754            }
1755            SearchCriteria::ByReward { min, max } => {
1756                memory_ids = self.search_by_reward(min, max)?;
1757            }
1758
1759            // === Compound Criteria ===
1760            SearchCriteria::Combined(criterias) => {
1761                // Intersection of all criteria results
1762                // Use HashSet for O(1) lookups instead of O(n) Vec::contains
1763                use std::collections::HashSet;
1764                let mut result_sets: Vec<HashSet<MemoryId>> = Vec::new();
1765                for c in criterias {
1766                    result_sets.push(
1767                        self.search(c)?
1768                            .into_iter()
1769                            .map(|m| m.id)
1770                            .collect::<HashSet<_>>(),
1771                    );
1772                }
1773
1774                if !result_sets.is_empty() {
1775                    let first_set = result_sets.remove(0);
1776                    memory_ids = first_set
1777                        .into_iter()
1778                        .filter(|id| result_sets.iter().all(|set| set.contains(id)))
1779                        .collect();
1780                }
1781            }
1782
1783            // === Hierarchy Criteria ===
1784            SearchCriteria::ByParent(parent_id) => {
1785                memory_ids = self.search_by_parent(&parent_id)?;
1786            }
1787            SearchCriteria::RootsOnly => {
1788                memory_ids = self.search_roots()?;
1789            }
1790        }
1791
1792        // Fetch full memories, filtering out forgotten ones
1793        let mut memories = Vec::new();
1794        for id in memory_ids {
1795            if let Ok(memory) = self.get(&id) {
1796                if !memory.is_forgotten() {
1797                    memories.push(memory);
1798                }
1799            }
1800        }
1801
1802        Ok(memories)
1803    }
1804
1805    fn search_by_date_range(
1806        &self,
1807        start: DateTime<Utc>,
1808        end: DateTime<Utc>,
1809    ) -> Result<Vec<MemoryId>> {
1810        let mut ids = Vec::new();
1811        let start_key = format!("date:{}", start.format("%Y%m%d"));
1812        // BUG-001 FIX: End key needs ~ suffix to include all UUIDs for that date
1813        // Keys are: date:YYYYMMDD:uuid, so date:20251207~ comes after all Dec 7 entries
1814        let end_key = format!("date:{}~", end.format("%Y%m%d"));
1815
1816        let iter = self.db.iterator_cf(
1817            self.index_cf(),
1818            IteratorMode::From(start_key.as_bytes(), rocksdb::Direction::Forward),
1819        );
1820        for (key, _value) in iter.log_errors() {
1821            let key_str = String::from_utf8_lossy(&key);
1822            if &*key_str > end_key.as_str() {
1823                break;
1824            }
1825            // BUG-001 FIX: Extract memory_id from key (format: date:YYYYMMDD:uuid)
1826            if key_str.starts_with("date:") {
1827                let parts: Vec<&str> = key_str.split(':').collect();
1828                if parts.len() >= 3 {
1829                    // parts[0] = "date", parts[1] = "YYYYMMDD", parts[2] = uuid
1830                    if let Ok(uuid) = uuid::Uuid::parse_str(parts[2]) {
1831                        ids.push(MemoryId(uuid));
1832                    }
1833                }
1834            }
1835        }
1836
1837        Ok(ids)
1838    }
1839
1840    fn search_by_type(&self, exp_type: ExperienceType) -> Result<Vec<MemoryId>> {
1841        let mut ids = Vec::new();
1842        let prefix = format!("type:{exp_type:?}:");
1843
1844        let iter = self.db.iterator_cf(
1845            self.index_cf(),
1846            IteratorMode::From(prefix.as_bytes(), rocksdb::Direction::Forward),
1847        );
1848        for (key, _) in iter.log_errors() {
1849            let key_str = String::from_utf8_lossy(&key);
1850            if !key_str.starts_with(&prefix) {
1851                break;
1852            }
1853            // Extract ID from key
1854            if let Some(id_str) = key_str.strip_prefix(&prefix) {
1855                if let Ok(uuid) = uuid::Uuid::parse_str(id_str) {
1856                    ids.push(MemoryId(uuid));
1857                }
1858            }
1859        }
1860
1861        Ok(ids)
1862    }
1863
1864    fn search_by_importance(&self, min: f32, max: f32) -> Result<Vec<MemoryId>> {
1865        let mut ids = Vec::new();
1866        let min_bucket = (min * 10.0) as u32;
1867        let max_bucket = (max * 10.0) as u32;
1868
1869        for bucket in min_bucket..=max_bucket {
1870            let prefix = format!("importance:{bucket}:");
1871            let iter = self.db.iterator_cf(
1872                self.index_cf(),
1873                IteratorMode::From(prefix.as_bytes(), rocksdb::Direction::Forward),
1874            );
1875
1876            for (key, _) in iter.log_errors() {
1877                let key_str = String::from_utf8_lossy(&key);
1878                if !key_str.starts_with(&prefix) {
1879                    break;
1880                }
1881                // Extract ID from key
1882                if let Some(id_str) = key_str.strip_prefix(&prefix) {
1883                    if let Ok(uuid) = uuid::Uuid::parse_str(id_str) {
1884                        ids.push(MemoryId(uuid));
1885                    }
1886                }
1887            }
1888        }
1889
1890        Ok(ids)
1891    }
1892
1893    fn search_by_entity(&self, entity: &str) -> Result<Vec<MemoryId>> {
1894        let mut ids = Vec::new();
1895        // Normalize to lowercase for case-insensitive matching
1896        let normalized_entity = entity.to_lowercase();
1897        let prefix = format!("entity:{normalized_entity}:");
1898
1899        let iter = self.db.iterator_cf(
1900            self.index_cf(),
1901            IteratorMode::From(prefix.as_bytes(), rocksdb::Direction::Forward),
1902        );
1903        for (key, _) in iter.log_errors() {
1904            let key_str = String::from_utf8_lossy(&key);
1905            if !key_str.starts_with(&prefix) {
1906                break;
1907            }
1908            // Extract ID from key
1909            if let Some(id_str) = key_str.strip_prefix(&prefix) {
1910                if let Ok(uuid) = uuid::Uuid::parse_str(id_str) {
1911                    ids.push(MemoryId(uuid));
1912                }
1913            }
1914        }
1915
1916        Ok(ids)
1917    }
1918
1919    /// Search memories by tags (returns memories matching ANY of the provided tags)
1920    fn search_by_tags(&self, tags: &[String]) -> Result<Vec<MemoryId>> {
1921        use std::collections::HashSet;
1922
1923        // Union of all tag matches
1924        let mut all_ids = HashSet::new();
1925
1926        for tag in tags {
1927            // Normalize to lowercase for case-insensitive matching
1928            let normalized_tag = tag.to_lowercase();
1929            let prefix = format!("tag:{normalized_tag}:");
1930            let iter = self.db.iterator_cf(
1931                self.index_cf(),
1932                IteratorMode::From(prefix.as_bytes(), rocksdb::Direction::Forward),
1933            );
1934            for (key, _) in iter.log_errors() {
1935                let key_str = String::from_utf8_lossy(&key);
1936                if !key_str.starts_with(&prefix) {
1937                    break;
1938                }
1939                if let Some(id_str) = key_str.strip_prefix(&prefix) {
1940                    if let Ok(uuid) = uuid::Uuid::parse_str(id_str) {
1941                        all_ids.insert(MemoryId(uuid));
1942                    }
1943                }
1944            }
1945        }
1946
1947        Ok(all_ids.into_iter().collect())
1948    }
1949
1950    /// Search memories by episode ID
1951    /// Returns all memories in the specified episode
1952    fn search_by_episode(&self, episode_id: &str) -> Result<Vec<MemoryId>> {
1953        let mut ids = Vec::new();
1954        let prefix = format!("episode:{episode_id}:");
1955
1956        let iter = self.db.iterator_cf(
1957            self.index_cf(),
1958            IteratorMode::From(prefix.as_bytes(), rocksdb::Direction::Forward),
1959        );
1960        for (key, _) in iter.log_errors() {
1961            let key_str = String::from_utf8_lossy(&key);
1962            if !key_str.starts_with(&prefix) {
1963                break;
1964            }
1965            if let Some(id_str) = key_str.strip_prefix(&prefix) {
1966                if let Ok(uuid) = uuid::Uuid::parse_str(id_str) {
1967                    ids.push(MemoryId(uuid));
1968                }
1969            }
1970        }
1971
1972        Ok(ids)
1973    }
1974
1975    /// Search memories by episode with sequence filtering
1976    /// Returns memories in temporal order within the episode
1977    fn search_by_episode_sequence(
1978        &self,
1979        episode_id: &str,
1980        min_sequence: Option<u32>,
1981        max_sequence: Option<u32>,
1982    ) -> Result<Vec<MemoryId>> {
1983        let mut results: Vec<(u32, MemoryId)> = Vec::new();
1984
1985        // Scan the episode_seq index which has format: episode_seq:{episode_id}:{seq}:{memory_id}
1986        let prefix = format!("episode_seq:{episode_id}:");
1987
1988        let iter = self.db.iterator_cf(
1989            self.index_cf(),
1990            IteratorMode::From(prefix.as_bytes(), rocksdb::Direction::Forward),
1991        );
1992
1993        for (key, _) in iter.log_errors() {
1994            let key_str = String::from_utf8_lossy(&key);
1995            if !key_str.starts_with(&prefix) {
1996                break;
1997            }
1998
1999            // Parse: episode_seq:{episode_id}:{seq}:{memory_id}
2000            if let Some(rest) = key_str.strip_prefix(&prefix) {
2001                let parts: Vec<&str> = rest.splitn(2, ':').collect();
2002                if parts.len() == 2 {
2003                    if let (Ok(seq), Ok(uuid)) =
2004                        (parts[0].parse::<u32>(), uuid::Uuid::parse_str(parts[1]))
2005                    {
2006                        // Apply sequence filters
2007                        let passes_min = min_sequence.map_or(true, |min| seq >= min);
2008                        let passes_max = max_sequence.map_or(true, |max| seq <= max);
2009
2010                        if passes_min && passes_max {
2011                            results.push((seq, MemoryId(uuid)));
2012                        }
2013                    }
2014                }
2015            }
2016        }
2017
2018        // Sort by sequence number for temporal ordering
2019        results.sort_by_key(|(seq, _)| *seq);
2020
2021        Ok(results.into_iter().map(|(_, id)| id).collect())
2022    }
2023
2024    // ========================================================================
2025    // ROBOTICS SEARCH METHODS
2026    // ========================================================================
2027
2028    /// Search memories by robot/drone identifier
2029    fn search_by_robot(&self, robot_id: &str) -> Result<Vec<MemoryId>> {
2030        let mut ids = Vec::new();
2031        let prefix = format!("robot:{robot_id}:");
2032
2033        let iter = self.db.iterator_cf(
2034            self.index_cf(),
2035            IteratorMode::From(prefix.as_bytes(), rocksdb::Direction::Forward),
2036        );
2037        for (key, _) in iter.log_errors() {
2038            let key_str = String::from_utf8_lossy(&key);
2039            if !key_str.starts_with(&prefix) {
2040                break;
2041            }
2042            if let Some(id_str) = key_str.strip_prefix(&prefix) {
2043                if let Ok(uuid) = uuid::Uuid::parse_str(id_str) {
2044                    ids.push(MemoryId(uuid));
2045                }
2046            }
2047        }
2048
2049        Ok(ids)
2050    }
2051
2052    /// Search memories by mission identifier
2053    fn search_by_mission(&self, mission_id: &str) -> Result<Vec<MemoryId>> {
2054        let mut ids = Vec::new();
2055        let prefix = format!("mission:{mission_id}:");
2056
2057        let iter = self.db.iterator_cf(
2058            self.index_cf(),
2059            IteratorMode::From(prefix.as_bytes(), rocksdb::Direction::Forward),
2060        );
2061        for (key, _) in iter.log_errors() {
2062            let key_str = String::from_utf8_lossy(&key);
2063            if !key_str.starts_with(&prefix) {
2064                break;
2065            }
2066            if let Some(id_str) = key_str.strip_prefix(&prefix) {
2067                if let Ok(uuid) = uuid::Uuid::parse_str(id_str) {
2068                    ids.push(MemoryId(uuid));
2069                }
2070            }
2071        }
2072
2073        Ok(ids)
2074    }
2075
2076    /// Search memories by geographic location using geohash prefix scanning
2077    ///
2078    /// Performance: O(k) where k = memories in ~9 geohash cells covering the radius
2079    /// Previous approach was O(n) where n = all geo-indexed memories
2080    fn search_by_location(
2081        &self,
2082        center_lat: f64,
2083        center_lon: f64,
2084        radius_meters: f64,
2085    ) -> Result<Vec<MemoryId>> {
2086        use super::types::{geohash_decode, geohash_search_prefixes, GeoFilter};
2087
2088        let geo_filter = GeoFilter::new(center_lat, center_lon, radius_meters);
2089        let mut ids = Vec::new();
2090
2091        // Get geohash prefixes for center + neighbors at appropriate precision
2092        let prefixes = geohash_search_prefixes(center_lat, center_lon, radius_meters);
2093
2094        // Scan only the relevant geohash cells (9 cells = center + 8 neighbors)
2095        for geohash_prefix in prefixes {
2096            // Use prefix WITHOUT trailing colon so shorter search prefixes (e.g. precision 6)
2097            // still match longer stored geohashes (precision 10).
2098            // Stored keys: "geo:s02jksd91f:{uuid}", search prefix: "geo:s02jks"
2099            let prefix = format!("geo:{}", geohash_prefix);
2100            let iter = self.db.iterator_cf(
2101                self.index_cf(),
2102                IteratorMode::From(prefix.as_bytes(), rocksdb::Direction::Forward),
2103            );
2104
2105            for (key, _value) in iter.log_errors() {
2106                let key_str = String::from_utf8_lossy(&key);
2107                if !key_str.starts_with(&prefix) {
2108                    break;
2109                }
2110
2111                // Key format: geo:GEOHASH:memory_id
2112                let parts: Vec<&str> = key_str.split(':').collect();
2113                if parts.len() >= 3 {
2114                    let geohash = parts[1];
2115                    // Decode geohash to get approximate lat/lon for distance check
2116                    let (min_lat, min_lon, max_lat, max_lon) = geohash_decode(geohash);
2117                    let approx_lat = (min_lat + max_lat) / 2.0;
2118                    let approx_lon = (min_lon + max_lon) / 2.0;
2119
2120                    // Final haversine check for edge cases at cell boundaries
2121                    if geo_filter.contains(approx_lat, approx_lon) {
2122                        if let Ok(uuid) = uuid::Uuid::parse_str(parts[2]) {
2123                            ids.push(MemoryId(uuid));
2124                        }
2125                    }
2126                }
2127            }
2128        }
2129
2130        Ok(ids)
2131    }
2132
2133    /// Search memories by action type
2134    fn search_by_action_type(&self, action_type: &str) -> Result<Vec<MemoryId>> {
2135        let mut ids = Vec::new();
2136        let prefix = format!("action:{action_type}:");
2137
2138        let iter = self.db.iterator_cf(
2139            self.index_cf(),
2140            IteratorMode::From(prefix.as_bytes(), rocksdb::Direction::Forward),
2141        );
2142        for (key, _) in iter.log_errors() {
2143            let key_str = String::from_utf8_lossy(&key);
2144            if !key_str.starts_with(&prefix) {
2145                break;
2146            }
2147            if let Some(id_str) = key_str.strip_prefix(&prefix) {
2148                if let Ok(uuid) = uuid::Uuid::parse_str(id_str) {
2149                    ids.push(MemoryId(uuid));
2150                }
2151            }
2152        }
2153
2154        Ok(ids)
2155    }
2156
2157    /// Search memories by reward range (for RL-style queries)
2158    fn search_by_reward(&self, min: f32, max: f32) -> Result<Vec<MemoryId>> {
2159        let mut ids = Vec::new();
2160
2161        // Reward is bucketed similar to importance (-10 to 10 buckets)
2162        // Clamp to prevent bucket overflow from out-of-range values
2163        let clamped_min = min.clamp(-1.0, 1.0);
2164        let clamped_max = max.clamp(-1.0, 1.0);
2165        let min_bucket = ((clamped_min + 1.0) * 10.0) as i32; // -1.0 -> 0, 1.0 -> 20
2166        let max_bucket = ((clamped_max + 1.0) * 10.0) as i32;
2167
2168        for bucket in min_bucket..=max_bucket {
2169            let prefix = format!("reward:{bucket}:");
2170            let iter = self.db.iterator_cf(
2171                self.index_cf(),
2172                IteratorMode::From(prefix.as_bytes(), rocksdb::Direction::Forward),
2173            );
2174
2175            for (key, _) in iter.log_errors() {
2176                let key_str = String::from_utf8_lossy(&key);
2177                if !key_str.starts_with(&prefix) {
2178                    break;
2179                }
2180                if let Some(id_str) = key_str.strip_prefix(&prefix) {
2181                    if let Ok(uuid) = uuid::Uuid::parse_str(id_str) {
2182                        ids.push(MemoryId(uuid));
2183                    }
2184                }
2185            }
2186        }
2187
2188        Ok(ids)
2189    }
2190
2191    // =========================================================================
2192    // HIERARCHY SEARCH METHODS
2193    // =========================================================================
2194
2195    /// Get all children of a parent memory
2196    fn search_by_parent(&self, parent_id: &MemoryId) -> Result<Vec<MemoryId>> {
2197        let mut ids = Vec::new();
2198        let prefix = format!("parent:{}:", parent_id.0);
2199
2200        let iter = self.db.iterator_cf(
2201            self.index_cf(),
2202            IteratorMode::From(prefix.as_bytes(), rocksdb::Direction::Forward),
2203        );
2204
2205        for (key, _) in iter.log_errors() {
2206            let key_str = String::from_utf8_lossy(&key);
2207            if !key_str.starts_with(&prefix) {
2208                break;
2209            }
2210            // Key format: parent:{parent_uuid}:{child_uuid}
2211            if let Some(child_id_str) = key_str.strip_prefix(&prefix) {
2212                if let Ok(uuid) = uuid::Uuid::parse_str(child_id_str) {
2213                    ids.push(MemoryId(uuid));
2214                }
2215            }
2216        }
2217
2218        Ok(ids)
2219    }
2220
2221    /// Get all root memories (memories with no parent)
2222    fn search_roots(&self) -> Result<Vec<MemoryId>> {
2223        let mut roots = Vec::new();
2224
2225        // Iterate all memories and check for parent_id = None
2226        let iter = self.db.iterator(IteratorMode::Start);
2227        for item in iter {
2228            if let Ok((key, value)) = item {
2229                if key.len() != 16 {
2230                    continue;
2231                }
2232                if let Ok((memory, _)) = deserialize_memory(&value) {
2233                    if memory.parent_id.is_none() {
2234                        roots.push(memory.id);
2235                    }
2236                }
2237            }
2238        }
2239
2240        Ok(roots)
2241    }
2242
2243    /// Get children of a memory (public API)
2244    pub fn get_children(&self, parent_id: &MemoryId) -> Result<Vec<Memory>> {
2245        let child_ids = self.search_by_parent(parent_id)?;
2246        let mut children = Vec::new();
2247        for id in child_ids {
2248            if let Ok(memory) = self.get(&id) {
2249                children.push(memory);
2250            }
2251        }
2252        Ok(children)
2253    }
2254
2255    /// Get the parent chain (ancestors) of a memory
2256    pub fn get_ancestors(&self, memory_id: &MemoryId) -> Result<Vec<Memory>> {
2257        let mut ancestors = Vec::new();
2258        let mut current_id = memory_id.clone();
2259
2260        // Walk up the parent chain (max 100 to prevent infinite loops)
2261        for _ in 0..100 {
2262            let memory = self.get(&current_id)?;
2263            if let Some(parent_id) = &memory.parent_id {
2264                let parent = self.get(parent_id)?;
2265                ancestors.push(parent.clone());
2266                current_id = parent_id.clone();
2267            } else {
2268                break; // Reached root
2269            }
2270        }
2271
2272        Ok(ancestors)
2273    }
2274
2275    /// Get the full hierarchy context for a memory
2276    /// Returns (ancestors, memory, children)
2277    pub fn get_hierarchy_context(
2278        &self,
2279        memory_id: &MemoryId,
2280    ) -> Result<(Vec<Memory>, Memory, Vec<Memory>)> {
2281        let memory = self.get(memory_id)?;
2282        let ancestors = self.get_ancestors(memory_id)?;
2283        let children = self.get_children(memory_id)?;
2284        Ok((ancestors, memory, children))
2285    }
2286
2287    /// Get all memories in a subtree rooted at the given memory
2288    pub fn get_subtree(&self, root_id: &MemoryId, max_depth: usize) -> Result<Vec<Memory>> {
2289        let mut result = Vec::new();
2290        let mut queue = vec![(root_id.clone(), 0usize)];
2291
2292        while let Some((id, depth)) = queue.pop() {
2293            if depth > max_depth {
2294                continue;
2295            }
2296            if let Ok(memory) = self.get(&id) {
2297                result.push(memory);
2298                // Add children to queue
2299                if depth < max_depth {
2300                    let child_ids = self.search_by_parent(&id)?;
2301                    for child_id in child_ids {
2302                        queue.push((child_id, depth + 1));
2303                    }
2304                }
2305            }
2306        }
2307
2308        Ok(result)
2309    }
2310
2311    /// Get all memory IDs without loading full Memory objects.
2312    ///
2313    /// Returns only 16-byte UUID keys (lightweight). Use with `get()` to load
2314    /// individual memories in batches to avoid OOM on large datasets.
2315    pub fn get_all_ids(&self) -> Result<Vec<MemoryId>> {
2316        let mut ids = Vec::new();
2317        let mut read_opts = rocksdb::ReadOptions::default();
2318        read_opts.fill_cache(false);
2319        let iter = self.db.iterator_opt(IteratorMode::Start, read_opts);
2320        for item in iter {
2321            if let Ok((key, _)) = item {
2322                if key.len() == 16 {
2323                    let uuid_bytes: [u8; 16] = key[..16].try_into().unwrap();
2324                    ids.push(MemoryId(uuid::Uuid::from_bytes(uuid_bytes)));
2325                }
2326            }
2327        }
2328        Ok(ids)
2329    }
2330
2331    /// Get all memories from long-term storage
2332    ///
2333    /// Only returns entries with valid 16-byte UUID keys (consistent with get_stats)
2334    pub fn get_all(&self) -> Result<Vec<Memory>> {
2335        let mut memories = Vec::new();
2336
2337        // fill_cache(false) prevents this maintenance scan from polluting
2338        // the block cache with cold data, reducing C++ peak memory
2339        let mut read_opts = rocksdb::ReadOptions::default();
2340        read_opts.fill_cache(false);
2341        let iter = self.db.iterator_opt(IteratorMode::Start, read_opts);
2342        for item in iter {
2343            if let Ok((key, value)) = item {
2344                // Only process valid 16-byte UUID keys (consistent with get_stats)
2345                if key.len() != 16 {
2346                    continue;
2347                }
2348                if let Ok((memory, _)) = deserialize_memory(&value) {
2349                    if !memory.is_forgotten() {
2350                        memories.push(memory);
2351                    }
2352                }
2353            }
2354        }
2355
2356        Ok(memories)
2357    }
2358
2359    pub fn get_uncompressed_older_than(&self, cutoff: DateTime<Utc>) -> Result<Vec<Memory>> {
2360        let mut memories = Vec::new();
2361
2362        // Iterate through all memories
2363        let iter = self.db.iterator(IteratorMode::Start);
2364        for item in iter {
2365            if let Ok((key, value)) = item {
2366                // Only process valid 16-byte UUID keys
2367                if key.len() != 16 {
2368                    continue;
2369                }
2370                if let Ok((memory, _)) = deserialize_memory(&value) {
2371                    if !memory.compressed && !memory.is_forgotten() && memory.created_at < cutoff {
2372                        memories.push(memory);
2373                    }
2374                }
2375            }
2376        }
2377
2378        Ok(memories)
2379    }
2380
2381    /// Mark memories as forgotten (soft delete) with atomic batch write.
2382    /// Returns the IDs of memories that were flagged, so callers can clean up
2383    /// secondary indices (vector, BM25, graph).
2384    pub fn mark_forgotten_by_age(&self, cutoff: DateTime<Utc>) -> Result<Vec<MemoryId>> {
2385        let mut batch = rocksdb::WriteBatch::default();
2386        let mut flagged_ids = Vec::new();
2387        let now = Utc::now().to_rfc3339();
2388
2389        let iter = self.db.iterator(IteratorMode::Start);
2390        for item in iter {
2391            if let Ok((key, value)) = item {
2392                if key.len() != 16 {
2393                    continue;
2394                }
2395                if let Ok((mut memory, _)) = deserialize_memory(&value) {
2396                    if memory.is_forgotten() {
2397                        continue;
2398                    }
2399                    if memory.created_at < cutoff {
2400                        flagged_ids.push(memory.id.clone());
2401                        memory
2402                            .experience
2403                            .metadata
2404                            .insert("forgotten".to_string(), "true".to_string());
2405                        memory
2406                            .experience
2407                            .metadata
2408                            .insert("forgotten_at".to_string(), now.clone());
2409
2410                        let updated_value =
2411                            bincode::serde::encode_to_vec(&memory, bincode::config::standard())?;
2412                        batch.put(&key, updated_value);
2413                    }
2414                }
2415            }
2416        }
2417
2418        if !flagged_ids.is_empty() {
2419            let mut write_opts = WriteOptions::default();
2420            write_opts.set_sync(true);
2421            self.db.write_opt(batch, &write_opts)?;
2422        }
2423
2424        Ok(flagged_ids)
2425    }
2426
2427    /// Mark memories with low importance as forgotten with atomic batch write.
2428    /// Returns the IDs of memories that were flagged, so callers can clean up
2429    /// secondary indices (vector, BM25, graph).
2430    pub fn mark_forgotten_by_importance(&self, threshold: f32) -> Result<Vec<MemoryId>> {
2431        let mut batch = rocksdb::WriteBatch::default();
2432        let mut flagged_ids = Vec::new();
2433        let now = Utc::now().to_rfc3339();
2434
2435        let iter = self.db.iterator(IteratorMode::Start);
2436        for item in iter {
2437            if let Ok((key, value)) = item {
2438                if key.len() != 16 {
2439                    continue;
2440                }
2441                if let Ok((mut memory, _)) = deserialize_memory(&value) {
2442                    if memory.is_forgotten() {
2443                        continue;
2444                    }
2445                    if memory.importance() < threshold {
2446                        flagged_ids.push(memory.id.clone());
2447                        memory
2448                            .experience
2449                            .metadata
2450                            .insert("forgotten".to_string(), "true".to_string());
2451                        memory
2452                            .experience
2453                            .metadata
2454                            .insert("forgotten_at".to_string(), now.clone());
2455
2456                        let updated_value =
2457                            bincode::serde::encode_to_vec(&memory, bincode::config::standard())?;
2458                        batch.put(&key, updated_value);
2459                    }
2460                }
2461            }
2462        }
2463
2464        if !flagged_ids.is_empty() {
2465            let mut write_opts = WriteOptions::default();
2466            write_opts.set_sync(true);
2467            self.db.write_opt(batch, &write_opts)?;
2468        }
2469
2470        Ok(flagged_ids)
2471    }
2472
2473    /// Remove memories matching a pattern with durable writes
2474    pub fn remove_matching(&self, regex: &regex::Regex) -> Result<usize> {
2475        let mut count = 0;
2476        let mut to_delete: Vec<MemoryId> = Vec::new();
2477
2478        let iter = self.db.iterator(IteratorMode::Start);
2479        for item in iter {
2480            if let Ok((key, value)) = item {
2481                // Only process valid 16-byte UUID keys
2482                if key.len() != 16 {
2483                    continue;
2484                }
2485                if let Ok((memory, _)) = deserialize_memory(&value) {
2486                    if regex.is_match(&memory.experience.content) {
2487                        to_delete.push(memory.id);
2488                        count += 1;
2489                    }
2490                }
2491            }
2492        }
2493
2494        // Delete each memory through the proper delete() path which cleans up indices first
2495        for memory_id in to_delete {
2496            if let Err(e) = self.delete(&memory_id) {
2497                tracing::warn!("Failed to delete matching memory {}: {}", memory_id.0, e);
2498            }
2499        }
2500
2501        Ok(count)
2502    }
2503
2504    /// Update access count for a memory
2505    pub fn update_access(&self, id: &MemoryId) -> Result<()> {
2506        if let Ok(memory) = self.get(id) {
2507            // ZERO-COPY: Update metadata through interior mutability
2508            memory.update_access();
2509
2510            // Persist updated metadata
2511            self.update(&memory)?;
2512        }
2513        Ok(())
2514    }
2515
2516    /// Get statistics about stored memories
2517    pub fn get_stats(&self) -> Result<StorageStats> {
2518        let mut stats = StorageStats::default();
2519        let mut raw_count = 0;
2520        let mut skipped_non_memory = 0;
2521        let mut deserialize_errors = 0;
2522        let stats_prefix = b"stats:";
2523
2524        let iter = self.db.iterator(IteratorMode::Start);
2525        for item in iter {
2526            match item {
2527                Ok((key, value)) => {
2528                    raw_count += 1;
2529
2530                    // Skip stats entries - they use a different format
2531                    if key.starts_with(stats_prefix) {
2532                        skipped_non_memory += 1;
2533                        continue;
2534                    }
2535
2536                    // Valid memory keys should be exactly 16 bytes (UUID bytes)
2537                    if key.len() != 16 {
2538                        skipped_non_memory += 1;
2539                        continue;
2540                    }
2541
2542                    match deserialize_memory(&value) {
2543                        Ok((memory, _)) => {
2544                            if memory.is_forgotten() {
2545                                continue;
2546                            }
2547                            stats.total_count += 1;
2548                            stats.total_size_bytes += value.len();
2549                            if memory.compressed {
2550                                stats.compressed_count += 1;
2551                            }
2552                            stats.importance_sum += memory.importance();
2553                        }
2554                        Err(e) => {
2555                            deserialize_errors += 1;
2556                            tracing::warn!(
2557                                "Corrupted memory entry (key len: {}, value len: {}): {}",
2558                                key.len(),
2559                                value.len(),
2560                                e
2561                            );
2562                        }
2563                    }
2564                }
2565                Err(e) => {
2566                    tracing::error!("Iterator error: {}", e);
2567                }
2568            }
2569        }
2570
2571        tracing::debug!(
2572            "get_stats: raw_count={}, memories={}, skipped={}, corrupted={}",
2573            raw_count,
2574            stats.total_count,
2575            skipped_non_memory,
2576            deserialize_errors
2577        );
2578
2579        if stats.total_count > 0 {
2580            stats.average_importance = stats.importance_sum / stats.total_count as f32;
2581        }
2582
2583        // Load persisted retrieval counter
2584        stats.total_retrievals = self.get_retrieval_count().unwrap_or(0);
2585
2586        Ok(stats)
2587    }
2588
2589    /// Get the persisted retrieval counter
2590    pub fn get_retrieval_count(&self) -> Result<usize> {
2591        const RETRIEVAL_KEY: &[u8] = b"stats:total_retrievals";
2592        match self.db.get(RETRIEVAL_KEY)? {
2593            Some(data) => {
2594                if data.len() >= 8 {
2595                    Ok(usize::from_le_bytes(data[..8].try_into().unwrap_or([0; 8])))
2596                } else {
2597                    Ok(0)
2598                }
2599            }
2600            None => Ok(0),
2601        }
2602    }
2603
2604    /// Increment and persist the retrieval counter, returns new value
2605    pub fn increment_retrieval_count(&self) -> Result<usize> {
2606        const RETRIEVAL_KEY: &[u8] = b"stats:total_retrievals";
2607        let current = self.get_retrieval_count().unwrap_or(0);
2608        let new_count = current + 1;
2609        self.db.put(RETRIEVAL_KEY, new_count.to_le_bytes())?;
2610        Ok(new_count)
2611    }
2612
2613    /// Remove corrupted memories that fail to deserialize even with legacy fallbacks
2614    /// Returns the number of entries deleted
2615    ///
2616    /// This function safely cleans up:
2617    /// 1. Entries with keys that are not valid 16-byte UUIDs (corrupted/misplaced)
2618    /// 2. Entries with valid UUID keys but corrupted values that fail ALL format fallbacks
2619    ///
2620    /// It preserves:
2621    /// - Valid Memory entries (any format - current or legacy)
2622    /// - Stats entries (keys starting with "stats:")
2623    pub fn cleanup_corrupted(&self) -> Result<usize> {
2624        let mut to_delete = Vec::new();
2625
2626        // Known non-memory prefixes in the default CF that must be preserved.
2627        // These are legitimate data entries stored by subsystems that share the
2628        // default column family via storage.db(): SemanticFactStore, TemporalFactStore,
2629        // LineageGraph, LearningHistoryStore, plus vector mappings and interference.
2630        let skip_prefixes: &[&[u8]] = &[
2631            b"stats:",
2632            b"vmapping:",
2633            b"interference:",
2634            b"interference_meta:",
2635            b"_watermark:",
2636            b"facts:",
2637            b"facts_by_entity:",
2638            b"facts_by_type:",
2639            b"facts_embedding:",
2640            b"temporal_facts:",
2641            b"temporal_by_time:",
2642            b"temporal_by_entity:",
2643            b"lineage:",
2644            b"learning:",
2645            b"geo:",
2646        ];
2647
2648        let iter = self.db.iterator(IteratorMode::Start);
2649        for item in iter {
2650            if let Ok((key, value)) = item {
2651                // Skip known non-memory prefixed entries
2652                if skip_prefixes.iter().any(|p| key.starts_with(p)) {
2653                    continue;
2654                }
2655
2656                // Valid memory keys should be exactly 16 bytes (UUID bytes)
2657                let is_valid_memory_key = key.len() == 16;
2658
2659                if !is_valid_memory_key {
2660                    // Key is not a valid UUID - this is a corrupted or misplaced entry
2661                    tracing::debug!(
2662                        "Marking for deletion: invalid key length {} (expected 16)",
2663                        key.len()
2664                    );
2665                    to_delete.push(key.to_vec());
2666                } else if deserialize_memory(&value).is_err() {
2667                    // Key is valid but value fails all format fallbacks - truly corrupted
2668                    tracing::debug!(
2669                        "Marking for deletion: valid key but corrupted value ({} bytes)",
2670                        value.len()
2671                    );
2672                    to_delete.push(key.to_vec());
2673                }
2674            }
2675        }
2676
2677        let count = to_delete.len();
2678        if count > 0 {
2679            tracing::info!("Cleaning up {} corrupted memory entries", count);
2680
2681            let mut write_opts = WriteOptions::default();
2682            write_opts.set_sync(self.write_mode == WriteMode::Sync);
2683
2684            for key in to_delete {
2685                if let Err(e) = self.db.delete_opt(&key, &write_opts) {
2686                    tracing::warn!("Failed to delete corrupted entry: {}", e);
2687                }
2688            }
2689
2690            // Flush to persist deletions
2691            self.flush()?;
2692        }
2693
2694        Ok(count)
2695    }
2696
2697    /// Migrate legacy memories to current format for improved performance
2698    /// Returns (migrated_count, already_current_count, failed_count)
2699    ///
2700    /// This function:
2701    /// 1. Iterates all memories in storage
2702    /// 2. Attempts to deserialize with format fallback
2703    /// 3. Re-saves successfully deserialized legacy memories in current format
2704    /// 4. Reports migration statistics
2705    pub fn migrate_legacy(&self) -> Result<(usize, usize, usize)> {
2706        let mut migrated = 0;
2707        let mut already_current = 0;
2708        let mut failed = 0;
2709        let stats_prefix = b"stats:";
2710
2711        let iter = self.db.iterator(IteratorMode::Start);
2712        let mut to_migrate = Vec::new();
2713
2714        for item in iter {
2715            if let Ok((key, value)) = item {
2716                // Skip stats entries
2717                if key.starts_with(stats_prefix) {
2718                    continue;
2719                }
2720
2721                // Skip non-UUID keys
2722                if key.len() != 16 {
2723                    continue;
2724                }
2725
2726                // Try current format first (quick check)
2727                let is_current = bincode::serde::decode_from_slice::<Memory, _>(
2728                    &value,
2729                    bincode::config::standard(),
2730                )
2731                .is_ok();
2732
2733                if is_current {
2734                    already_current += 1;
2735                    continue;
2736                }
2737
2738                // Not current format - try with fallback
2739                match deserialize_memory(&value) {
2740                    Ok((memory, _)) => {
2741                        // Successfully deserialized legacy format - queue for migration
2742                        to_migrate.push((key.to_vec(), memory));
2743                    }
2744                    Err(_) => {
2745                        failed += 1;
2746                    }
2747                }
2748            }
2749        }
2750
2751        // Re-save migrated memories in current format
2752        if !to_migrate.is_empty() {
2753            tracing::info!(
2754                "Migrating {} legacy memories to current format",
2755                to_migrate.len()
2756            );
2757
2758            let mut write_opts = WriteOptions::default();
2759            write_opts.set_sync(self.write_mode == WriteMode::Sync);
2760
2761            for (key, memory) in to_migrate {
2762                match bincode::serde::encode_to_vec(&memory, bincode::config::standard()) {
2763                    Ok(serialized) => {
2764                        if let Err(e) = self.db.put_opt(&key, &serialized, &write_opts) {
2765                            tracing::warn!("Failed to migrate memory: {e}");
2766                            failed += 1;
2767                        } else {
2768                            migrated += 1;
2769                        }
2770                    }
2771                    Err(e) => {
2772                        tracing::warn!("Failed to serialize migrated memory: {e}");
2773                        failed += 1;
2774                    }
2775                }
2776            }
2777
2778            // Flush to persist migrations
2779            self.flush()?;
2780        }
2781
2782        tracing::info!(
2783            "Migration complete: {} migrated, {} already current, {} failed",
2784            migrated,
2785            already_current,
2786            failed
2787        );
2788
2789        Ok((migrated, already_current, failed))
2790    }
2791
2792    /// Flush all column families to ensure data is persisted (critical for graceful shutdown)
2793    pub fn flush(&self) -> Result<()> {
2794        use rocksdb::FlushOptions;
2795
2796        let mut flush_opts = FlushOptions::default();
2797        flush_opts.set_wait(true); // Block until flush is complete
2798
2799        // Single DB flush covers both default and index CFs
2800        self.db
2801            .flush_opt(&flush_opts)
2802            .map_err(|e| anyhow::anyhow!("Failed to flush memory storage: {e}"))?;
2803
2804        // Explicitly flush the index CF (RocksDB flush_opt only flushes default CF)
2805        self.db
2806            .flush_cf_opt(self.index_cf(), &flush_opts)
2807            .map_err(|e| anyhow::anyhow!("Failed to flush index CF: {e}"))?;
2808
2809        Ok(())
2810    }
2811
2812    /// Get a reference to the underlying RocksDB instance
2813    ///
2814    /// Used by SemanticFactStore to share the same database for fact storage.
2815    /// Facts use a different key prefix ("facts:") to avoid collisions.
2816    pub fn db(&self) -> Arc<DB> {
2817        self.db.clone()
2818    }
2819}
2820
2821/// Search criteria for memory retrieval
2822#[derive(Debug, Clone)]
2823pub enum SearchCriteria {
2824    // === Standard Criteria ===
2825    ByDate {
2826        start: DateTime<Utc>,
2827        end: DateTime<Utc>,
2828    },
2829    ByType(ExperienceType),
2830    ByImportance {
2831        min: f32,
2832        max: f32,
2833    },
2834    ByEntity(String),
2835    /// Filter by tags (matches memories containing ANY of these tags)
2836    ByTags(Vec<String>),
2837
2838    // === Temporal/Episode Criteria ===
2839    /// Filter by episode ID - memories in the same episode are highly related
2840    ByEpisode(String),
2841    /// Filter by episode with sequence ordering - returns memories in temporal order
2842    ByEpisodeSequence {
2843        episode_id: String,
2844        /// If provided, only return memories with sequence >= this value
2845        min_sequence: Option<u32>,
2846        /// If provided, only return memories with sequence <= this value
2847        max_sequence: Option<u32>,
2848    },
2849
2850    // === Robotics Criteria ===
2851    /// Filter by robot/drone identifier
2852    ByRobot(String),
2853    /// Filter by mission identifier
2854    ByMission(String),
2855    /// Spatial filter: memories within radius of (lat, lon)
2856    ByLocation {
2857        lat: f64,
2858        lon: f64,
2859        radius_meters: f64,
2860    },
2861    /// Filter by action type
2862    ByActionType(String),
2863    /// Filter by reward range (for RL-style queries)
2864    ByReward {
2865        min: f32,
2866        max: f32,
2867    },
2868
2869    // === Compound Criteria ===
2870    Combined(Vec<SearchCriteria>),
2871
2872    // === Hierarchy Criteria ===
2873    /// Filter by parent memory ID - returns all children of a memory
2874    ByParent(MemoryId),
2875    /// Filter for root memories (no parent)
2876    RootsOnly,
2877}
2878
2879/// Storage statistics
2880#[derive(Debug, Default, Serialize, Deserialize)]
2881pub struct StorageStats {
2882    pub total_count: usize,
2883    pub compressed_count: usize,
2884    pub total_size_bytes: usize,
2885    pub average_importance: f32,
2886    pub importance_sum: f32,
2887    /// Total number of recall/retrieval operations (persisted)
2888    #[serde(default)]
2889    pub total_retrievals: usize,
2890}
2891
2892// =============================================================================
2893// ATOMIC VECTOR INDEX MAPPING STORAGE
2894// =============================================================================
2895//
2896// This module provides atomic storage for vector index mappings alongside memory data.
2897// By storing IdMapping in RocksDB (not separate files), we ensure:
2898//
2899// 1. ATOMIC WRITES: Memory + vector mapping written in single WriteBatch
2900// 2. NO ORPHANS: If memory exists, its vector mapping exists (or can be rebuilt)
2901// 3. CRASH SAFETY: RocksDB WAL protects both memory data and mappings
2902// 4. SINGLE SOURCE OF TRUTH: RocksDB is THE authority, Vamana is just a cache
2903//
2904// MULTIMODALITY READY:
2905// - Each modality (text, image, audio, video) has separate vector space
2906// - Text: 384-dim MiniLM (current)
2907// - Image: 1024-dim ImageBind (future)
2908// - Audio: 1024-dim ImageBind (future)
2909// - Video: 1024-dim ImageBind (future)
2910// - Cross-modal search possible via ImageBind's unified embedding space
2911//
2912// Key format: "vmapping:{memory_id}" -> bincode(VectorMappingEntry)
2913// =============================================================================
2914
2915/// Supported embedding modalities
2916///
2917/// When adding a new modality:
2918/// 1. Add variant here
2919/// 2. Create corresponding Vamana index with correct dimension
2920/// 3. Implement embedder for the modality
2921/// 4. Update search to include the modality
2922#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
2923pub enum Modality {
2924    /// Text embeddings (MiniLM-L6-v2, 384-dim)
2925    Text,
2926    /// Image embeddings (future: ImageBind, 1024-dim)
2927    Image,
2928    /// Audio embeddings (future: ImageBind, 1024-dim)
2929    Audio,
2930    /// Video embeddings (future: ImageBind, 1024-dim)
2931    Video,
2932    /// Multi-modal unified embeddings (future: ImageBind, 1024-dim)
2933    /// Used when content has multiple modalities fused together
2934    Unified,
2935}
2936
2937impl Modality {
2938    /// Get embedding dimension for this modality
2939    pub fn dimension(&self) -> usize {
2940        match self {
2941            Modality::Text => 384, // MiniLM-L6-v2
2942            // ImageBind projects all modalities to 1024-dim shared space
2943            Modality::Image => 1024,
2944            Modality::Audio => 1024,
2945            Modality::Video => 1024,
2946            Modality::Unified => 1024,
2947        }
2948    }
2949
2950    /// Get the string key for storage
2951    pub fn as_str(&self) -> &'static str {
2952        match self {
2953            Modality::Text => "text",
2954            Modality::Image => "image",
2955            Modality::Audio => "audio",
2956            Modality::Video => "video",
2957            Modality::Unified => "unified",
2958        }
2959    }
2960}
2961
2962impl std::fmt::Display for Modality {
2963    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2964        write!(f, "{}", self.as_str())
2965    }
2966}
2967
2968/// Vector IDs for a specific modality
2969#[derive(Debug, Clone, Serialize, Deserialize, Default)]
2970pub struct ModalityVectors {
2971    /// Vector IDs in this modality's Vamana index
2972    pub vector_ids: Vec<u32>,
2973    /// Embedding dimension (for validation)
2974    pub dimension: usize,
2975    /// Chunk boundaries (for long content)
2976    /// Each entry is (start_char, end_char) in original content
2977    pub chunk_ranges: Option<Vec<(usize, usize)>>,
2978}
2979
2980/// Vector mapping entry for a single memory - MULTIMODALITY READY
2981///
2982/// Stores vector IDs for each modality separately, allowing:
2983/// - Text-only memories (current)
2984/// - Image-only memories (future)
2985/// - Multi-modal memories (text + image + audio)
2986/// - Cross-modal search via unified embeddings
2987#[derive(Debug, Clone, Serialize, Deserialize)]
2988pub struct VectorMappingEntry {
2989    /// Vector IDs per modality
2990    /// Key: Modality enum (serializes as string)
2991    /// Value: Vector IDs + metadata for that modality
2992    pub modalities: HashMap<Modality, ModalityVectors>,
2993    /// Timestamp when mapping was created (for debugging)
2994    pub created_at: i64,
2995    /// Schema version for forward compatibility
2996    pub version: u8,
2997}
2998
2999impl Default for VectorMappingEntry {
3000    fn default() -> Self {
3001        Self {
3002            modalities: HashMap::new(),
3003            created_at: chrono::Utc::now().timestamp_millis(),
3004            version: 1,
3005        }
3006    }
3007}
3008
3009impl VectorMappingEntry {
3010    /// Create a new mapping with text vectors (most common case)
3011    pub fn with_text(vector_ids: Vec<u32>) -> Self {
3012        let mut modalities = HashMap::new();
3013        modalities.insert(
3014            Modality::Text,
3015            ModalityVectors {
3016                vector_ids,
3017                dimension: 384,
3018                chunk_ranges: None,
3019            },
3020        );
3021        Self {
3022            modalities,
3023            created_at: chrono::Utc::now().timestamp_millis(),
3024            version: 1,
3025        }
3026    }
3027
3028    /// Get text vector IDs (convenience method for current text-only usage)
3029    pub fn text_vectors(&self) -> Option<&Vec<u32>> {
3030        self.modalities.get(&Modality::Text).map(|m| &m.vector_ids)
3031    }
3032
3033    /// Get all vector IDs across all modalities (for deletion)
3034    pub fn all_vector_ids(&self) -> Vec<(Modality, u32)> {
3035        self.modalities
3036            .iter()
3037            .flat_map(|(modality, mv)| mv.vector_ids.iter().map(|id| (*modality, *id)))
3038            .collect()
3039    }
3040
3041    /// Check if this entry has any vectors
3042    pub fn is_empty(&self) -> bool {
3043        self.modalities.values().all(|mv| mv.vector_ids.is_empty())
3044    }
3045
3046    /// Add vectors for a modality
3047    pub fn add_modality(&mut self, modality: Modality, vector_ids: Vec<u32>) {
3048        self.modalities.insert(
3049            modality,
3050            ModalityVectors {
3051                dimension: modality.dimension(),
3052                vector_ids,
3053                chunk_ranges: None,
3054            },
3055        );
3056    }
3057
3058    /// Future: Add image vectors
3059    #[allow(dead_code)]
3060    pub fn with_image(mut self, vector_ids: Vec<u32>) -> Self {
3061        self.add_modality(Modality::Image, vector_ids);
3062        self
3063    }
3064
3065    /// Future: Add audio vectors
3066    #[allow(dead_code)]
3067    pub fn with_audio(mut self, vector_ids: Vec<u32>) -> Self {
3068        self.add_modality(Modality::Audio, vector_ids);
3069        self
3070    }
3071
3072    /// Future: Add video vectors
3073    #[allow(dead_code)]
3074    pub fn with_video(mut self, vector_ids: Vec<u32>) -> Self {
3075        self.add_modality(Modality::Video, vector_ids);
3076        self
3077    }
3078}
3079
3080impl MemoryStorage {
3081    // =========================================================================
3082    // ATOMIC VECTOR MAPPING OPERATIONS
3083    // =========================================================================
3084
3085    /// Store memory and its text vector mapping atomically
3086    ///
3087    /// Uses WriteBatch to ensure both operations succeed or both fail.
3088    /// This is the ONLY way orphaned memories can be prevented.
3089    ///
3090    /// For text-only memories (current implementation). Use store_with_multimodal_vectors
3091    /// for memories with image/audio/video content.
3092    pub fn store_with_vectors(&self, memory: &Memory, vector_ids: Vec<u32>) -> Result<()> {
3093        self.store_with_multimodal_vectors(memory, Modality::Text, vector_ids)
3094    }
3095
3096    /// Store memory with vectors for a specific modality
3097    ///
3098    /// MULTIMODALITY READY: Supports text, image, audio, video modalities.
3099    /// Each modality is stored separately, allowing cross-modal search.
3100    pub fn store_with_multimodal_vectors(
3101        &self,
3102        memory: &Memory,
3103        modality: Modality,
3104        vector_ids: Vec<u32>,
3105    ) -> Result<()> {
3106        let mut batch = WriteBatch::default();
3107
3108        // 1. Serialize memory
3109        let memory_key = memory.id.0.as_bytes();
3110        let memory_value = bincode::serde::encode_to_vec(memory, bincode::config::standard())
3111            .context(format!("Failed to serialize memory {}", memory.id.0))?;
3112        batch.put(memory_key, &memory_value);
3113
3114        // 2. Serialize vector mapping with modality support
3115        let mapping_key = format!("vmapping:{}", memory.id.0);
3116
3117        // Load existing mapping (for adding new modality to existing memory)
3118        let mut mapping_entry = self.get_vector_mapping(&memory.id)?.unwrap_or_default();
3119
3120        // Add/update the modality vectors
3121        mapping_entry.add_modality(modality, vector_ids);
3122
3123        let mapping_value =
3124            bincode::serde::encode_to_vec(&mapping_entry, bincode::config::standard())
3125                .context("Failed to serialize vector mapping")?;
3126        batch.put(mapping_key.as_bytes(), &mapping_value);
3127
3128        // 3. Atomic write - both succeed or both fail
3129        let mut write_opts = WriteOptions::default();
3130        write_opts.set_sync(self.write_mode == WriteMode::Sync);
3131        self.db
3132            .write_opt(batch, &write_opts)
3133            .context("Atomic write of memory + vector mapping failed")?;
3134
3135        // 4. Update secondary indices (separate operation, but non-critical)
3136        if let Err(e) = self.update_indices(memory) {
3137            tracing::warn!("Secondary index update failed (non-fatal): {}", e);
3138        }
3139
3140        Ok(())
3141    }
3142
3143    /// Get vector mapping for a memory
3144    pub fn get_vector_mapping(&self, memory_id: &MemoryId) -> Result<Option<VectorMappingEntry>> {
3145        let mapping_key = format!("vmapping:{}", memory_id.0);
3146        match self.db.get(mapping_key.as_bytes())? {
3147            Some(data) => {
3148                let (entry, _): (VectorMappingEntry, _) =
3149                    bincode::serde::decode_from_slice(&data, bincode::config::standard())
3150                        .context("Failed to deserialize vector mapping")?;
3151                Ok(Some(entry))
3152            }
3153            None => Ok(None),
3154        }
3155    }
3156
3157    /// Get all vector mappings (for rebuilding Vamana index on startup)
3158    ///
3159    /// Returns iterator-style results to avoid loading everything into memory at once.
3160    /// Sorted by memory_id for deterministic Vamana rebuilding.
3161    pub fn get_all_vector_mappings(&self) -> Result<Vec<(MemoryId, VectorMappingEntry)>> {
3162        let mut mappings = Vec::new();
3163        let prefix = b"vmapping:";
3164
3165        let iter = self
3166            .db
3167            .iterator(IteratorMode::From(prefix, rocksdb::Direction::Forward));
3168
3169        for item in iter {
3170            match item {
3171                Ok((key, value)) => {
3172                    let key_str = String::from_utf8_lossy(&key);
3173                    if !key_str.starts_with("vmapping:") {
3174                        break;
3175                    }
3176
3177                    // Extract memory_id from key
3178                    if let Some(id_str) = key_str.strip_prefix("vmapping:") {
3179                        if let Ok(uuid) = uuid::Uuid::parse_str(id_str) {
3180                            if let Ok((entry, _)) =
3181                                bincode::serde::decode_from_slice::<VectorMappingEntry, _>(
3182                                    &value,
3183                                    bincode::config::standard(),
3184                                )
3185                            {
3186                                mappings.push((MemoryId(uuid), entry));
3187                            }
3188                        }
3189                    }
3190                }
3191                Err(e) => {
3192                    tracing::warn!("Error reading vector mapping: {}", e);
3193                }
3194            }
3195        }
3196
3197        Ok(mappings)
3198    }
3199
3200    /// Delete vector mapping for a memory (called when deleting memory)
3201    pub fn delete_vector_mapping(&self, memory_id: &MemoryId) -> Result<()> {
3202        let mapping_key = format!("vmapping:{}", memory_id.0);
3203        let mut write_opts = WriteOptions::default();
3204        write_opts.set_sync(self.write_mode == WriteMode::Sync);
3205        self.db.delete_opt(mapping_key.as_bytes(), &write_opts)?;
3206        Ok(())
3207    }
3208
3209    /// Update text vector mapping for a memory (for reindex operations)
3210    ///
3211    /// Convenience method for text-only reindexing.
3212    pub fn update_vector_mapping(&self, memory_id: &MemoryId, vector_ids: Vec<u32>) -> Result<()> {
3213        self.update_modality_vectors(memory_id, Modality::Text, vector_ids)
3214    }
3215
3216    /// Update vector mapping for a specific modality
3217    ///
3218    /// MULTIMODALITY READY: Preserves vectors for other modalities while updating one.
3219    pub fn update_modality_vectors(
3220        &self,
3221        memory_id: &MemoryId,
3222        modality: Modality,
3223        vector_ids: Vec<u32>,
3224    ) -> Result<()> {
3225        let mapping_key = format!("vmapping:{}", memory_id.0);
3226
3227        // Load existing mapping to preserve other modalities
3228        let mut mapping_entry = self.get_vector_mapping(memory_id)?.unwrap_or_default();
3229
3230        // Update the specific modality
3231        mapping_entry.add_modality(modality, vector_ids);
3232
3233        let mapping_value =
3234            bincode::serde::encode_to_vec(&mapping_entry, bincode::config::standard())?;
3235
3236        let mut write_opts = WriteOptions::default();
3237        write_opts.set_sync(self.write_mode == WriteMode::Sync);
3238        self.db
3239            .put_opt(mapping_key.as_bytes(), &mapping_value, &write_opts)?;
3240        Ok(())
3241    }
3242
3243    /// Delete memory and its vector mapping atomically
3244    pub fn delete_with_vectors(&self, id: &MemoryId) -> Result<()> {
3245        let mut batch = WriteBatch::default();
3246
3247        // 1. Delete memory
3248        batch.delete(id.0.as_bytes());
3249
3250        // 2. Delete vector mapping
3251        let mapping_key = format!("vmapping:{}", id.0);
3252        batch.delete(mapping_key.as_bytes());
3253
3254        // 3. Atomic delete
3255        let mut write_opts = WriteOptions::default();
3256        write_opts.set_sync(self.write_mode == WriteMode::Sync);
3257        self.db.write_opt(batch, &write_opts)?;
3258
3259        // 4. Clean up indices (non-critical)
3260        if let Err(e) = self.remove_from_indices(id) {
3261            tracing::warn!("Index cleanup failed (non-fatal): {}", e);
3262        }
3263
3264        Ok(())
3265    }
3266
3267    /// Count memories with vector mappings (for health checks)
3268    pub fn count_vector_mappings(&self) -> usize {
3269        let prefix = b"vmapping:";
3270        let iter = self
3271            .db
3272            .iterator(IteratorMode::From(prefix, rocksdb::Direction::Forward));
3273
3274        let mut count = 0;
3275        for item in iter {
3276            if let Ok((key, _)) = item {
3277                if key.starts_with(prefix) {
3278                    count += 1;
3279                } else {
3280                    break;
3281                }
3282            }
3283        }
3284        count
3285    }
3286
3287    /// Check integrity: find memories without vector mappings
3288    ///
3289    /// Returns memories that have embeddings but no corresponding vector mapping.
3290    /// These need to be reindexed.
3291    pub fn find_memories_without_mappings(&self) -> Result<Vec<MemoryId>> {
3292        let mut orphans = Vec::new();
3293
3294        let iter = self.db.iterator(IteratorMode::Start);
3295        for item in iter {
3296            if let Ok((key, value)) = item {
3297                // Skip non-memory keys
3298                if key.len() != 16 {
3299                    continue;
3300                }
3301
3302                // Try to deserialize as memory
3303                if let Ok((memory, _)) = deserialize_memory(&value) {
3304                    // Check if vector mapping exists and has text vectors
3305                    let has_mapping = match self.get_vector_mapping(&memory.id) {
3306                        Ok(Some(entry)) => entry.text_vectors().is_some_and(|v| !v.is_empty()),
3307                        _ => false,
3308                    };
3309
3310                    // Memory has embeddings but no mapping - needs reindex
3311                    if !has_mapping && memory.experience.embeddings.is_some() {
3312                        orphans.push(memory.id);
3313                    }
3314                }
3315            }
3316        }
3317
3318        Ok(orphans)
3319    }
3320
3321    /// Get all text vector IDs from mappings (for Vamana statistics)
3322    pub fn get_all_text_vector_ids(&self) -> Result<Vec<u32>> {
3323        let mut all_ids = Vec::new();
3324        let mappings = self.get_all_vector_mappings()?;
3325
3326        for (_, entry) in mappings {
3327            if let Some(text_vecs) = entry.text_vectors() {
3328                all_ids.extend(text_vecs.iter().copied());
3329            }
3330        }
3331
3332        Ok(all_ids)
3333    }
3334
3335    /// Get vector count per modality (for health monitoring)
3336    pub fn get_modality_stats(&self) -> Result<HashMap<Modality, usize>> {
3337        let mut stats: HashMap<Modality, usize> = HashMap::new();
3338        let mappings = self.get_all_vector_mappings()?;
3339
3340        for (_, entry) in mappings {
3341            for (modality, mv) in entry.modalities {
3342                *stats.entry(modality).or_insert(0) += mv.vector_ids.len();
3343            }
3344        }
3345
3346        Ok(stats)
3347    }
3348
3349    // =========================================================================
3350    // INTERFERENCE PERSISTENCE (SHO-106 RIF)
3351    // =========================================================================
3352    //
3353    // Persists InterferenceDetector state to the main RocksDB database using
3354    // key prefix "interference:{memory_id}" for per-memory records and
3355    // "interference_meta:total" for the aggregate event counter.
3356    //
3357    // This ensures retrieval-induced forgetting history survives server restarts.
3358    // =========================================================================
3359
3360    /// Persist interference records for a single memory
3361    ///
3362    /// Key format: `interference:{memory_id}` → JSON `Vec<InterferenceRecord>`
3363    pub fn save_interference_records(
3364        &self,
3365        memory_id: &str,
3366        records: &[super::replay::InterferenceRecord],
3367    ) -> Result<()> {
3368        let key = format!("interference:{memory_id}");
3369        let value =
3370            serde_json::to_vec(records).context("Failed to serialize interference records")?;
3371
3372        let mut write_opts = WriteOptions::default();
3373        write_opts.set_sync(self.write_mode == WriteMode::Sync);
3374        self.db
3375            .put_opt(key.as_bytes(), &value, &write_opts)
3376            .context("Failed to persist interference records")?;
3377
3378        Ok(())
3379    }
3380
3381    /// Load all interference records from storage on startup
3382    ///
3383    /// Scans all `interference:` prefixed keys and deserializes records.
3384    /// Returns `(history_map, total_event_count)` for bulk-loading into InterferenceDetector.
3385    pub fn load_all_interference_records(
3386        &self,
3387    ) -> Result<(
3388        HashMap<String, Vec<super::replay::InterferenceRecord>>,
3389        usize,
3390    )> {
3391        let prefix = b"interference:";
3392        let mut history: HashMap<String, Vec<super::replay::InterferenceRecord>> = HashMap::new();
3393        let mut total_events: usize = 0;
3394
3395        let iter = self
3396            .db
3397            .iterator(IteratorMode::From(prefix, rocksdb::Direction::Forward));
3398
3399        for item in iter.log_errors() {
3400            let (key, value) = item;
3401            let key_str = String::from_utf8_lossy(&key);
3402
3403            if !key_str.starts_with("interference:") {
3404                break;
3405            }
3406
3407            if let Some(memory_id) = key_str.strip_prefix("interference:") {
3408                match serde_json::from_slice::<Vec<super::replay::InterferenceRecord>>(&value) {
3409                    Ok(records) => {
3410                        total_events += records.len();
3411                        history.insert(memory_id.to_string(), records);
3412                    }
3413                    Err(e) => {
3414                        tracing::warn!(
3415                            key = %key_str,
3416                            error = %e,
3417                            "Failed to deserialize interference records, skipping"
3418                        );
3419                    }
3420                }
3421            }
3422        }
3423
3424        // Load persisted total count (may be higher than sum of records due to eviction)
3425        let persisted_total = self
3426            .db
3427            .get(b"interference_meta:total")
3428            .ok()
3429            .flatten()
3430            .and_then(|v| {
3431                if v.len() == 8 {
3432                    Some(u64::from_le_bytes(v[..8].try_into().unwrap()) as usize)
3433                } else {
3434                    None
3435                }
3436            })
3437            .unwrap_or(total_events);
3438
3439        Ok((history, persisted_total.max(total_events)))
3440    }
3441
3442    /// Delete interference records for a single memory (called on forget/delete)
3443    pub fn delete_interference_records(&self, memory_id: &str) -> Result<()> {
3444        let key = format!("interference:{memory_id}");
3445        let mut write_opts = WriteOptions::default();
3446        write_opts.set_sync(self.write_mode == WriteMode::Sync);
3447        self.db
3448            .delete_opt(key.as_bytes(), &write_opts)
3449            .context("Failed to delete interference records")?;
3450        Ok(())
3451    }
3452
3453    /// Persist the total interference event count
3454    ///
3455    /// Key: `interference_meta:total` → 8-byte little-endian u64
3456    pub fn save_interference_event_count(&self, count: usize) -> Result<()> {
3457        let mut write_opts = WriteOptions::default();
3458        write_opts.set_sync(self.write_mode == WriteMode::Sync);
3459        self.db
3460            .put_opt(
3461                b"interference_meta:total",
3462                &(count as u64).to_le_bytes(),
3463                &write_opts,
3464            )
3465            .context("Failed to persist interference event count")?;
3466        Ok(())
3467    }
3468
3469    /// Load the fact extraction watermark for a user.
3470    ///
3471    /// Key: `_watermark:fact_extraction:{user_id}` → 8-byte little-endian i64 (unix millis)
3472    /// Returns None if no watermark has been persisted yet.
3473    pub fn get_fact_watermark(&self, user_id: &str) -> Option<i64> {
3474        let key = format!("_watermark:fact_extraction:{user_id}");
3475        match self.db.get(key.as_bytes()) {
3476            Ok(Some(bytes)) if bytes.len() == 8 => {
3477                Some(i64::from_le_bytes(bytes[..8].try_into().unwrap()))
3478            }
3479            _ => None,
3480        }
3481    }
3482
3483    /// Persist the fact extraction watermark for a user.
3484    ///
3485    /// Key: `_watermark:fact_extraction:{user_id}` → 8-byte little-endian i64 (unix millis)
3486    pub fn set_fact_watermark(&self, user_id: &str, timestamp_millis: i64) {
3487        let key = format!("_watermark:fact_extraction:{user_id}");
3488        let mut write_opts = WriteOptions::default();
3489        write_opts.set_sync(self.write_mode == WriteMode::Sync);
3490        if let Err(e) =
3491            self.db
3492                .put_opt(key.as_bytes(), &timestamp_millis.to_le_bytes(), &write_opts)
3493        {
3494            tracing::warn!("Failed to persist fact extraction watermark: {e}");
3495        }
3496    }
3497
3498    /// Delete ALL interference records (GDPR forget_all)
3499    ///
3500    /// Batch-deletes all `interference:` and `interference_meta:` keys.
3501    pub fn clear_all_interference_records(&self) -> Result<usize> {
3502        let prefix = b"interference";
3503        let mut batch = WriteBatch::default();
3504        let mut count = 0;
3505
3506        let iter = self
3507            .db
3508            .iterator(IteratorMode::From(prefix, rocksdb::Direction::Forward));
3509
3510        for item in iter.log_errors() {
3511            let (key, _) = item;
3512            let key_str = String::from_utf8_lossy(&key);
3513            if !key_str.starts_with("interference") {
3514                break;
3515            }
3516            batch.delete(&key);
3517            count += 1;
3518        }
3519
3520        if count > 0 {
3521            let mut write_opts = WriteOptions::default();
3522            write_opts.set_sync(self.write_mode == WriteMode::Sync);
3523            self.db.write_opt(batch, &write_opts)?;
3524        }
3525
3526        Ok(count)
3527    }
3528}
3529
3530#[cfg(test)]
3531mod tests {
3532    use super::*;
3533    use serde::Serialize;
3534
3535    #[derive(Serialize)]
3536    struct LegacyMinimalFixture {
3537        id: MemoryId,
3538        content: String,
3539    }
3540
3541    fn sample_memory(id: MemoryId, content: &str) -> Memory {
3542        let now = Utc::now();
3543        let experience = Experience {
3544            experience_type: ExperienceType::Observation,
3545            content: content.to_string(),
3546            ..Default::default()
3547        };
3548        Memory::from_legacy(
3549            id,
3550            experience,
3551            0.5,
3552            0,
3553            now,
3554            now,
3555            false,
3556            MemoryTier::LongTerm,
3557            Vec::new(),
3558            1.0,
3559            None,
3560            None,
3561            None,
3562            None,
3563            0.0,
3564            None,
3565            None,
3566            1,
3567            Vec::new(),
3568            Vec::new(),
3569        )
3570    }
3571
3572    #[test]
3573    fn test_deserialize_with_fallback_records_current_bincode2_branch() {
3574        let id = MemoryId(uuid::Uuid::new_v4());
3575        let memory = sample_memory(id.clone(), "current format memory");
3576        let bytes = bincode::serde::encode_to_vec(&memory, bincode::config::standard()).unwrap();
3577
3578        let counter =
3579            crate::metrics::LEGACY_FALLBACK_BRANCH_TOTAL.with_label_values(&["bincode2_memory"]);
3580        let before = counter.get();
3581
3582        let (decoded, is_legacy) = deserialize_with_fallback(&bytes).unwrap();
3583        let after = counter.get();
3584
3585        assert_eq!(decoded.id, id);
3586        assert!(!is_legacy);
3587        // Current format is not a fallback — metric should NOT increment
3588        assert_eq!(after, before);
3589    }
3590
3591    #[test]
3592    fn test_deserialize_with_fallback_bincode1_minimal_fixture() {
3593        let id = MemoryId(uuid::Uuid::new_v4());
3594        let fixture = LegacyMinimalFixture {
3595            id: id.clone(),
3596            content: "legacy bincode1 minimal".to_string(),
3597        };
3598        let bytes = bincode1::serialize(&fixture).unwrap();
3599
3600        let counter =
3601            crate::metrics::LEGACY_FALLBACK_BRANCH_TOTAL.with_label_values(&["bincode1_minimal"]);
3602        let before = counter.get();
3603
3604        let (decoded, is_legacy) = deserialize_with_fallback(&bytes).unwrap();
3605        let after = counter.get();
3606
3607        assert_eq!(decoded.id, id);
3608        assert!(is_legacy);
3609        assert_eq!(after, before + 1);
3610    }
3611
3612    #[test]
3613    fn test_deserialize_with_fallback_msgpack_minimal_fixture() {
3614        let id = MemoryId(uuid::Uuid::new_v4());
3615        let fixture = LegacyMinimalFixture {
3616            id: id.clone(),
3617            content: "legacy msgpack minimal".to_string(),
3618        };
3619        let bytes = rmp_serde::to_vec(&fixture).unwrap();
3620
3621        let counter =
3622            crate::metrics::LEGACY_FALLBACK_BRANCH_TOTAL.with_label_values(&["msgpack_minimal"]);
3623        let before = counter.get();
3624
3625        let (decoded, is_legacy) = deserialize_with_fallback(&bytes).unwrap();
3626        let after = counter.get();
3627
3628        assert_eq!(decoded.id, id);
3629        assert!(is_legacy);
3630        assert_eq!(after, before + 1);
3631    }
3632
3633    #[test]
3634    fn test_write_mode_default_async() {
3635        std::env::remove_var("SHODH_WRITE_MODE");
3636        let mode = WriteMode::default();
3637        assert_eq!(mode, WriteMode::Async);
3638    }
3639
3640    #[test]
3641    fn test_crc32_simple() {
3642        let data = b"test data for CRC32";
3643        let crc1 = crc32_simple(data);
3644        let crc2 = crc32_simple(data);
3645
3646        assert_eq!(crc1, crc2);
3647        assert_ne!(crc1, 0);
3648
3649        let crc3 = crc32_simple(b"different data");
3650        assert_ne!(crc1, crc3);
3651    }
3652
3653    #[test]
3654    fn test_crc32_empty() {
3655        let crc = crc32_simple(b"");
3656        assert_eq!(
3657            crc, 0,
3658            "IEEE CRC32 of empty input is 0 (init 0xFFFFFFFF XOR final 0xFFFFFFFF)"
3659        );
3660    }
3661
3662    #[test]
3663    fn test_modality_dimension() {
3664        assert_eq!(Modality::Text.dimension(), 384);
3665        // ImageBind projects all non-text modalities to 1024-dim shared space
3666        assert_eq!(Modality::Image.dimension(), 1024);
3667        assert_eq!(Modality::Audio.dimension(), 1024);
3668        assert_eq!(Modality::Video.dimension(), 1024);
3669        assert_eq!(Modality::Unified.dimension(), 1024);
3670    }
3671
3672    #[test]
3673    fn test_modality_as_str() {
3674        assert_eq!(Modality::Text.as_str(), "text");
3675        assert_eq!(Modality::Image.as_str(), "image");
3676        assert_eq!(Modality::Audio.as_str(), "audio");
3677        assert_eq!(Modality::Video.as_str(), "video");
3678    }
3679
3680    #[test]
3681    fn test_vector_mapping_entry_with_text() {
3682        let entry = VectorMappingEntry::with_text(vec![1, 2, 3]);
3683
3684        assert_eq!(entry.text_vectors(), Some(&vec![1, 2, 3]));
3685        assert!(!entry.is_empty());
3686    }
3687
3688    #[test]
3689    fn test_vector_mapping_entry_multimodal() {
3690        let entry = VectorMappingEntry::with_text(vec![1])
3691            .with_image(vec![2])
3692            .with_audio(vec![3])
3693            .with_video(vec![4]);
3694
3695        let all = entry.all_vector_ids();
3696        assert_eq!(all.len(), 4);
3697
3698        assert!(all.contains(&(Modality::Text, 1)));
3699        assert!(all.contains(&(Modality::Image, 2)));
3700        assert!(all.contains(&(Modality::Audio, 3)));
3701        assert!(all.contains(&(Modality::Video, 4)));
3702    }
3703
3704    #[test]
3705    fn test_vector_mapping_entry_empty() {
3706        let entry = VectorMappingEntry::default();
3707
3708        assert!(entry.is_empty());
3709        assert!(entry.text_vectors().is_none());
3710        assert!(entry.all_vector_ids().is_empty());
3711    }
3712
3713    #[test]
3714    fn test_vector_mapping_entry_add_modality() {
3715        let mut entry = VectorMappingEntry::default();
3716        entry.add_modality(Modality::Text, vec![1, 2]);
3717
3718        assert_eq!(entry.text_vectors(), Some(&vec![1, 2]));
3719    }
3720
3721    #[test]
3722    fn test_storage_stats_default() {
3723        let stats = StorageStats::default();
3724
3725        assert_eq!(stats.total_count, 0);
3726        assert_eq!(stats.compressed_count, 0);
3727        assert_eq!(stats.total_size_bytes, 0);
3728        assert_eq!(stats.total_retrievals, 0);
3729    }
3730
3731    #[test]
3732    fn test_search_criteria_variants() {
3733        let criteria1 = SearchCriteria::ByEntity("test".to_string());
3734        let criteria2 = SearchCriteria::ByImportance { min: 0.5, max: 1.0 };
3735        let criteria3 = SearchCriteria::ByType(ExperienceType::Observation);
3736
3737        assert!(matches!(criteria1, SearchCriteria::ByEntity(_)));
3738        assert!(matches!(criteria2, SearchCriteria::ByImportance { .. }));
3739        assert!(matches!(criteria3, SearchCriteria::ByType(_)));
3740    }
3741
3742    #[test]
3743    fn test_search_criteria_by_date() {
3744        let now = Utc::now();
3745        let start = now - chrono::Duration::days(7);
3746        let criteria = SearchCriteria::ByDate { start, end: now };
3747
3748        if let SearchCriteria::ByDate { start: s, end: e } = criteria {
3749            assert!(s < e);
3750        } else {
3751            panic!("Expected ByDate");
3752        }
3753    }
3754
3755    #[test]
3756    fn test_search_criteria_combined() {
3757        let criteria = SearchCriteria::Combined(vec![
3758            SearchCriteria::ByEntity("test".to_string()),
3759            SearchCriteria::ByImportance { min: 0.5, max: 1.0 },
3760        ]);
3761
3762        if let SearchCriteria::Combined(inner) = criteria {
3763            assert_eq!(inner.len(), 2);
3764        } else {
3765            panic!("Expected Combined");
3766        }
3767    }
3768
3769    #[test]
3770    fn test_modality_vectors_struct() {
3771        let mv = ModalityVectors {
3772            vector_ids: vec![1, 2, 3],
3773            dimension: 384,
3774            chunk_ranges: None,
3775        };
3776
3777        assert_eq!(mv.vector_ids.len(), 3);
3778        assert_eq!(mv.dimension, 384);
3779        assert!(mv.chunk_ranges.is_none());
3780    }
3781}