1use anyhow::{anyhow, Context, Result};
4use bincode;
5use chrono::{DateTime, Utc};
6use rocksdb::{
7 ColumnFamily, ColumnFamilyDescriptor, IteratorMode, Options, WriteBatch, WriteOptions, DB,
8};
9use serde::{Deserialize, Serialize};
10use std::path::{Path, PathBuf};
11use std::sync::Arc;
12
13use super::types::*;
14
15trait LogErrors<T> {
18 fn log_errors(self) -> impl Iterator<Item = T>;
19}
20
21impl<I, T, E> LogErrors<T> for I
22where
23 I: Iterator<Item = Result<T, E>>,
24 E: std::fmt::Display,
25{
26 fn log_errors(self) -> impl Iterator<Item = T> {
27 self.filter_map(|r| match r {
28 Ok(v) => Some(v),
29 Err(e) => {
30 tracing::warn!("RocksDB iterator error (continuing): {}", e);
31 None
32 }
33 })
34 }
35}
36
37#[derive(Debug, Clone, Copy, PartialEq, Eq)]
39pub enum WriteMode {
40 Sync,
43 Async,
47}
48
49impl Default for WriteMode {
50 fn default() -> Self {
51 match std::env::var("SHODH_WRITE_MODE") {
54 Ok(mode) if mode.to_lowercase() == "sync" => WriteMode::Sync,
55 _ => WriteMode::Async,
56 }
57 }
58}
59
60const STORAGE_MAGIC: &[u8; 3] = b"SHO";
66
67use std::collections::HashMap;
68
69fn default_legacy_experience_type() -> ExperienceType {
71 ExperienceType::Observation
72}
73
74#[derive(Deserialize)]
78struct MinimalMemory {
79 id: MemoryId,
80 content: String,
81}
82
83#[derive(Deserialize)]
89struct MemoryWithTypePrefix {
90 id: MemoryId,
91 _unknown_field: u8, experience_type: u8, content: String, }
95
96#[derive(Deserialize)]
103struct MemoryWith3ByteHeader {
104 id: MemoryId,
105 _header1: u8, _header2: u8, _header3: u8, content: String, }
110
111impl MemoryWith3ByteHeader {
112 fn into_memory(self) -> Memory {
113 let now = Utc::now();
114 let experience = Experience {
115 experience_type: ExperienceType::Observation,
116 content: self.content,
117 ..Default::default()
118 };
119 Memory::from_legacy(
120 self.id,
121 experience,
122 0.5,
123 0,
124 now,
125 now,
126 false,
127 MemoryTier::LongTerm,
128 Vec::new(),
129 1.0,
130 None,
131 None,
132 None,
133 None,
134 0.0,
135 None,
136 None,
137 1,
138 Vec::new(),
139 Vec::new(),
140 )
141 }
142}
143
144fn try_raw_memory_parse(data: &[u8]) -> Option<Memory> {
147 if data.len() < 20 {
148 return None;
149 }
150
151 let uuid_bytes: [u8; 16] = data[0..16].try_into().ok()?;
153 let id = MemoryId(uuid::Uuid::from_bytes(uuid_bytes));
154
155 for header_skip in [2, 3, 4, 5, 6] {
157 let content_start = 16 + header_skip;
158 if content_start >= data.len() {
159 continue;
160 }
161 if let Ok(content) = std::str::from_utf8(&data[content_start..]) {
162 if !content.is_empty()
163 && content
164 .chars()
165 .next()
166 .map(|c| c.is_ascii_graphic())
167 .unwrap_or(false)
168 {
169 let now = Utc::now();
170 let experience = Experience {
171 experience_type: ExperienceType::Observation,
172 content: content.to_string(),
173 ..Default::default()
174 };
175 tracing::debug!(
177 "Recovered memory with raw parsing (header_skip={}, content_len={})",
178 header_skip,
179 content.len()
180 );
181 return Some(Memory::from_legacy(
182 id,
183 experience,
184 0.5,
185 0,
186 now,
187 now,
188 false,
189 MemoryTier::LongTerm,
190 Vec::new(),
191 1.0,
192 None,
193 None,
194 None,
195 None,
196 0.0,
197 None,
198 None,
199 1,
200 Vec::new(),
201 Vec::new(),
202 ));
203 }
204 }
205 }
206 None
207}
208
209impl MemoryWithTypePrefix {
210 fn into_memory(self) -> Memory {
211 let now = Utc::now();
212 let exp_type = match self.experience_type {
213 0 => ExperienceType::Observation,
214 1 => ExperienceType::Decision,
215 2 => ExperienceType::Learning,
216 3 => ExperienceType::Error,
217 4 => ExperienceType::Discovery,
218 5 => ExperienceType::Pattern,
219 6 => ExperienceType::Context,
220 7 => ExperienceType::Task,
221 8 => ExperienceType::CodeEdit,
222 9 => ExperienceType::FileAccess,
223 10 => ExperienceType::Search,
224 11 => ExperienceType::Command,
225 12 => ExperienceType::Conversation,
226 _ => ExperienceType::Observation,
227 };
228 let experience = Experience {
229 experience_type: exp_type,
230 content: self.content,
231 ..Default::default()
232 };
233 Memory::from_legacy(
234 self.id,
235 experience,
236 0.5,
237 0,
238 now,
239 now,
240 false,
241 MemoryTier::LongTerm,
242 Vec::new(),
243 1.0,
244 None,
245 None,
246 None,
247 None,
248 0.0,
249 None,
250 None,
251 1,
252 Vec::new(),
253 Vec::new(),
254 )
255 }
256}
257
258impl MinimalMemory {
259 fn into_memory(self) -> Memory {
260 let now = Utc::now();
261 let experience = Experience {
262 experience_type: ExperienceType::Observation,
263 content: self.content,
264 ..Default::default()
265 };
266 Memory::from_legacy(
267 self.id,
268 experience,
269 0.5, 0,
271 now,
272 now,
273 false,
274 MemoryTier::LongTerm,
275 Vec::new(),
276 1.0,
277 None,
278 None,
279 None,
280 None,
281 0.0,
282 None,
283 None,
284 1,
285 Vec::new(),
286 Vec::new(),
287 )
288 }
289}
290
291#[derive(Deserialize)]
294struct SimpleLegacyMemory {
295 id: MemoryId,
296 content: String, #[serde(default)]
298 importance: f32,
299 #[serde(default)]
300 access_count: u32,
301 #[serde(default)]
302 created_at: Option<DateTime<Utc>>,
303 #[serde(default)]
304 last_accessed: Option<DateTime<Utc>>,
305 #[serde(default)]
306 compressed: bool,
307 #[serde(default)]
308 agent_id: Option<String>,
309 #[serde(default)]
310 run_id: Option<String>,
311 #[serde(default)]
312 actor_id: Option<String>,
313 #[serde(default)]
314 temporal_relevance: f32,
315 #[serde(default)]
316 score: Option<f32>,
317}
318
319impl SimpleLegacyMemory {
320 fn into_memory(self) -> Memory {
321 let now = Utc::now();
322 let experience = Experience {
323 experience_type: ExperienceType::Observation,
324 content: self.content,
325 ..Default::default()
326 };
327 Memory::from_legacy(
328 self.id,
329 experience,
330 if self.importance > 0.0 {
331 self.importance
332 } else {
333 0.5
334 },
335 self.access_count,
336 self.created_at.unwrap_or(now),
337 self.last_accessed.unwrap_or(now),
338 self.compressed,
339 MemoryTier::LongTerm,
340 Vec::new(),
341 1.0,
342 None,
343 self.agent_id,
344 self.run_id,
345 self.actor_id,
346 self.temporal_relevance,
347 self.score,
348 None,
349 1,
350 Vec::new(),
351 Vec::new(),
352 )
353 }
354}
355
356#[derive(Deserialize)]
360struct LegacyExperienceV1 {
361 #[serde(default = "default_legacy_experience_type")]
363 experience_type: ExperienceType,
364 content: String,
365 #[serde(default)]
366 context: Option<RichContext>,
367 #[serde(default)]
368 entities: Vec<String>,
369 #[serde(default)]
370 metadata: HashMap<String, String>,
371 #[serde(default)]
372 embeddings: Option<Vec<f32>>,
373 #[serde(default)]
374 related_memories: Vec<MemoryId>,
375 #[serde(default)]
376 causal_chain: Vec<MemoryId>,
377 #[serde(default)]
378 outcomes: Vec<String>,
379 #[serde(default)]
381 robot_id: Option<String>,
382 #[serde(default)]
383 mission_id: Option<String>,
384 #[serde(default)]
385 geo_location: Option<[f64; 3]>,
386 #[serde(default)]
387 local_position: Option<[f32; 3]>,
388 #[serde(default)]
389 heading: Option<f32>,
390 #[serde(default)]
391 action_type: Option<String>,
392 #[serde(default)]
393 reward: Option<f32>,
394 #[serde(default)]
395 sensor_data: HashMap<String, f64>,
396 #[serde(default)]
398 decision_context: Option<HashMap<String, String>>,
399 #[serde(default)]
400 action_params: Option<HashMap<String, String>>,
401 #[serde(default)]
402 outcome_type: Option<String>,
403 #[serde(default)]
404 outcome_details: Option<String>,
405 #[serde(default)]
406 confidence: Option<f32>,
407 #[serde(default)]
408 alternatives_considered: Vec<String>,
409 #[serde(default)]
411 weather: Option<HashMap<String, String>>,
412 #[serde(default)]
413 terrain_type: Option<String>,
414 #[serde(default)]
415 lighting: Option<String>,
416 #[serde(default)]
417 nearby_agents: Vec<HashMap<String, String>>,
418 #[serde(default)]
420 is_failure: bool,
421 #[serde(default)]
422 is_anomaly: bool,
423 #[serde(default)]
424 severity: Option<String>,
425 #[serde(default)]
426 recovery_action: Option<String>,
427 #[serde(default)]
428 root_cause: Option<String>,
429 #[serde(default)]
431 pattern_id: Option<String>,
432 #[serde(default)]
433 predicted_outcome: Option<String>,
434 #[serde(default)]
435 prediction_accurate: Option<bool>,
436 #[serde(default)]
437 tags: Vec<String>,
438}
439
440impl LegacyExperienceV1 {
441 fn into_experience(self) -> Experience {
442 Experience {
443 experience_type: self.experience_type,
444 content: self.content,
445 context: self.context,
446 entities: self.entities,
447 metadata: self.metadata,
448 embeddings: self.embeddings,
449 image_embeddings: None,
450 audio_embeddings: None,
451 video_embeddings: None,
452 media_refs: Vec::new(),
453 related_memories: self.related_memories,
454 causal_chain: self.causal_chain,
455 outcomes: self.outcomes,
456 robot_id: self.robot_id,
457 mission_id: self.mission_id,
458 geo_location: self.geo_location,
459 local_position: self.local_position,
460 heading: self.heading,
461 action_type: self.action_type,
462 reward: self.reward,
463 sensor_data: self.sensor_data,
464 decision_context: self.decision_context,
465 action_params: self.action_params,
466 outcome_type: self.outcome_type,
467 outcome_details: self.outcome_details,
468 confidence: self.confidence,
469 alternatives_considered: self.alternatives_considered,
470 weather: self.weather,
471 terrain_type: self.terrain_type,
472 lighting: self.lighting,
473 nearby_agents: self.nearby_agents,
474 is_failure: self.is_failure,
475 is_anomaly: self.is_anomaly,
476 severity: self.severity,
477 recovery_action: self.recovery_action,
478 root_cause: self.root_cause,
479 pattern_id: self.pattern_id,
480 predicted_outcome: self.predicted_outcome,
481 prediction_accurate: self.prediction_accurate,
482 tags: self.tags,
483 temporal_refs: Vec::new(),
484 ner_entities: Vec::new(),
485 cooccurrence_pairs: Vec::new(),
486 }
487 }
488}
489
490#[derive(Deserialize)]
492struct LegacyMemoryV1Full {
493 #[serde(rename = "memory_id")]
494 id: MemoryId,
495 experience: LegacyExperienceV1,
496 importance: f32,
497 access_count: u32,
498 created_at: DateTime<Utc>,
499 last_accessed: DateTime<Utc>,
500 compressed: bool,
501 agent_id: Option<String>,
502 run_id: Option<String>,
503 actor_id: Option<String>,
504 temporal_relevance: f32,
505 score: Option<f32>,
506}
507
508impl LegacyMemoryV1Full {
509 fn into_memory(self) -> Memory {
510 Memory::from_legacy(
511 self.id,
512 self.experience.into_experience(),
513 self.importance,
514 self.access_count,
515 self.created_at,
516 self.last_accessed,
517 self.compressed,
518 MemoryTier::LongTerm,
519 Vec::new(),
520 1.0,
521 None,
522 self.agent_id,
523 self.run_id,
524 self.actor_id,
525 self.temporal_relevance,
526 self.score,
527 None,
528 1,
529 Vec::new(),
530 Vec::new(),
531 )
532 }
533}
534
535#[derive(Deserialize)]
538struct LegacyMemoryV1 {
539 #[serde(rename = "memory_id")]
540 id: MemoryId,
541 experience: LegacyExperienceV1, importance: f32,
543 access_count: u32,
544 created_at: DateTime<Utc>,
545 last_accessed: DateTime<Utc>,
546 compressed: bool,
547 agent_id: Option<String>,
548 run_id: Option<String>,
549 actor_id: Option<String>,
550 temporal_relevance: f32,
551 score: Option<f32>,
552}
553
554impl LegacyMemoryV1 {
555 fn into_memory(self) -> Memory {
557 Memory::from_legacy(
558 self.id,
559 self.experience.into_experience(),
560 self.importance,
561 self.access_count,
562 self.created_at,
563 self.last_accessed,
564 self.compressed,
565 MemoryTier::LongTerm,
566 Vec::new(),
567 1.0,
568 None,
569 self.agent_id,
570 self.run_id,
571 self.actor_id,
572 self.temporal_relevance,
573 self.score,
574 None,
575 1,
576 Vec::new(),
577 Vec::new(),
578 )
579 }
580}
581
582#[derive(Deserialize)]
585struct LegacyMemoryV2 {
586 id: MemoryId,
587 experience: LegacyExperienceV1, importance: f32,
589 access_count: u32,
590 created_at: DateTime<Utc>,
591 last_accessed: DateTime<Utc>,
592 compressed: bool,
593 tier: MemoryTier,
594 entity_refs: Vec<EntityRef>,
595 activation: f32,
596 last_retrieval_id: Option<uuid::Uuid>,
597 agent_id: Option<String>,
598 run_id: Option<String>,
599 actor_id: Option<String>,
600 temporal_relevance: f32,
601 score: Option<f32>,
602}
603
604impl LegacyMemoryV2 {
605 fn into_memory(self) -> Memory {
607 Memory::from_legacy(
608 self.id,
609 self.experience.into_experience(),
610 self.importance,
611 self.access_count,
612 self.created_at,
613 self.last_accessed,
614 self.compressed,
615 self.tier,
616 self.entity_refs,
617 self.activation,
618 self.last_retrieval_id,
619 self.agent_id,
620 self.run_id,
621 self.actor_id,
622 self.temporal_relevance,
623 self.score,
624 None, 1, Vec::new(), Vec::new(), )
629 }
630}
631
632fn deserialize_memory(data: &[u8]) -> Result<(Memory, bool)> {
642 if data.len() >= 8 && &data[0..3] == STORAGE_MAGIC {
644 let version = data[3];
645 let payload_end = data.len() - 4;
646 let stored_checksum = u32::from_le_bytes([
647 data[payload_end],
648 data[payload_end + 1],
649 data[payload_end + 2],
650 data[payload_end + 3],
651 ]);
652 let computed_checksum = crc32_simple(&data[..payload_end]);
653 if stored_checksum != computed_checksum {
654 tracing::warn!(
655 "Checksum mismatch: stored={:08x} computed={:08x}",
656 stored_checksum,
657 computed_checksum
658 );
659 }
660 let payload = &data[4..payload_end];
661 deserialize_with_fallback(payload).map_err(|e| anyhow!("v{version} decode failed: {e}"))
663 } else {
664 deserialize_with_fallback(data).map_err(|e| anyhow!("legacy decode failed: {e}"))
666 }
667}
668
669#[derive(Deserialize)]
672struct LegacyMemoryFlatV2 {
673 id: MemoryId,
674 experience: LegacyExperienceV1, importance: f32,
676 access_count: u32,
677 created_at: DateTime<Utc>,
678 last_accessed: DateTime<Utc>,
679 compressed: bool,
680 tier: MemoryTier,
681 entity_refs: Vec<EntityRef>,
682 activation: f32,
683 last_retrieval_id: Option<uuid::Uuid>,
684 agent_id: Option<String>,
685 run_id: Option<String>,
686 actor_id: Option<String>,
687 temporal_relevance: f32,
688 score: Option<f32>,
689 external_id: Option<String>,
690 version: u32,
691 history: Vec<MemoryRevision>,
692 #[serde(default)]
693 related_todo_ids: Vec<TodoId>,
694}
695
696impl LegacyMemoryFlatV2 {
697 fn into_memory(self) -> Memory {
698 Memory::from_legacy(
699 self.id,
700 self.experience.into_experience(),
701 self.importance,
702 self.access_count,
703 self.created_at,
704 self.last_accessed,
705 self.compressed,
706 self.tier,
707 self.entity_refs,
708 self.activation,
709 self.last_retrieval_id,
710 self.agent_id,
711 self.run_id,
712 self.actor_id,
713 self.temporal_relevance,
714 self.score,
715 self.external_id,
716 self.version,
717 self.history,
718 self.related_todo_ids,
719 )
720 }
721}
722
723fn deserialize_with_fallback(data: &[u8]) -> Result<(Memory, bool)> {
729 fn record_branch(branch: &str) {
730 crate::metrics::LEGACY_FALLBACK_BRANCH_TOTAL
731 .with_label_values(&[branch])
732 .inc();
733 }
734
735 match bincode::serde::decode_from_slice::<Memory, _>(data, bincode::config::standard()) {
738 Ok((memory, _)) => {
739 return Ok((memory, false));
740 } Err(e) => {
742 return deserialize_legacy_fallback(data, e, record_branch);
745 }
746 }
747}
748
749fn deserialize_legacy_fallback(
754 data: &[u8],
755 first_error: bincode::error::DecodeError,
756 record_branch: fn(&str),
757) -> Result<(Memory, bool)> {
758 static DEBUG_ENTRY_LOGGED: std::sync::atomic::AtomicBool =
760 std::sync::atomic::AtomicBool::new(false);
761 let is_first_failure = !DEBUG_ENTRY_LOGGED.load(std::sync::atomic::Ordering::Relaxed);
762
763 let mut errors: Vec<(&str, String)> = Vec::new();
765 errors.push(("bincode2 Memory", first_error.to_string()));
766
767 match bincode::serde::decode_from_slice::<MinimalMemory, _>(data, bincode::config::standard()) {
770 Ok((minimal, _)) => {
771 tracing::debug!("Migrated memory from bincode 2.x minimal format");
772 record_branch("bincode2_minimal");
773 return Ok((minimal.into_memory(), true));
774 }
775 Err(e) => errors.push(("bincode2 MinimalMemory", e.to_string())),
776 }
777
778 match bincode::serde::decode_from_slice::<MemoryWithTypePrefix, _>(
781 data,
782 bincode::config::standard(),
783 ) {
784 Ok((typed, _)) => {
785 tracing::debug!("Migrated memory from bincode 2.x with type prefix");
786 record_branch("bincode2_type_prefix");
787 return Ok((typed.into_memory(), true));
788 }
789 Err(e) => errors.push(("bincode2 MemoryWithTypePrefix", e.to_string())),
790 }
791
792 match bincode::serde::decode_from_slice::<LegacyMemoryFlatV2, _>(
794 data,
795 bincode::config::standard(),
796 ) {
797 Ok((legacy, _)) => {
798 tracing::debug!("Migrated memory from bincode 2.x pre-multimodal format");
799 record_branch("bincode2_legacy_flat_v2");
800 return Ok((legacy.into_memory(), true));
801 }
802 Err(e) => errors.push(("bincode2 LegacyMemoryFlatV2", e.to_string())),
803 }
804
805 match bincode1::deserialize::<LegacyMemoryV1>(data) {
807 Ok(legacy) => {
808 tracing::debug!("Migrated memory from bincode 1.x v0.1.0 format");
809 record_branch("bincode1_legacy_v1");
810 return Ok((legacy.into_memory(), true));
811 }
812 Err(e) => errors.push(("bincode1 LegacyMemoryV1", e.to_string())),
813 }
814
815 match bincode1::deserialize::<MinimalMemory>(data) {
817 Ok(minimal) => {
818 tracing::debug!("Migrated memory from bincode 1.x minimal format");
819 record_branch("bincode1_minimal");
820 return Ok((minimal.into_memory(), true));
821 }
822 Err(e) => errors.push(("bincode1 MinimalMemory", e.to_string())),
823 }
824
825 match bincode1::deserialize::<SimpleLegacyMemory>(data) {
827 Ok(legacy) => {
828 tracing::debug!("Migrated memory from bincode 1.x simple format");
829 record_branch("bincode1_simple");
830 return Ok((legacy.into_memory(), true));
831 }
832 Err(e) => errors.push(("bincode1 SimpleLegacyMemory", e.to_string())),
833 }
834
835 use bincode1::Options;
837 let fixint_config = bincode1::options()
838 .with_fixint_encoding()
839 .allow_trailing_bytes();
840
841 match fixint_config.deserialize::<MinimalMemory>(data) {
843 Ok(minimal) => {
844 tracing::debug!("Migrated memory from bincode 1.x fixint minimal format");
845 record_branch("bincode1_fixint_minimal");
846 return Ok((minimal.into_memory(), true));
847 }
848 Err(e) => errors.push(("bincode1 fixint MinimalMemory", e.to_string())),
849 }
850
851 match fixint_config.deserialize::<SimpleLegacyMemory>(data) {
852 Ok(legacy) => {
853 tracing::debug!("Migrated memory from bincode 1.x fixint simple format");
854 record_branch("bincode1_fixint_simple");
855 return Ok((legacy.into_memory(), true));
856 }
857 Err(e) => errors.push(("bincode1 fixint SimpleLegacyMemory", e.to_string())),
858 }
859
860 match rmp_serde::from_slice::<MinimalMemory>(data) {
862 Ok(minimal) => {
863 tracing::debug!("Migrated memory from MessagePack minimal format");
864 record_branch("msgpack_minimal");
865 return Ok((minimal.into_memory(), true));
866 }
867 Err(e) => errors.push(("msgpack MinimalMemory", e.to_string())),
868 }
869
870 match rmp_serde::from_slice::<SimpleLegacyMemory>(data) {
872 Ok(legacy) => {
873 tracing::debug!("Migrated memory from MessagePack simple format");
874 record_branch("msgpack_simple");
875 return Ok((legacy.into_memory(), true));
876 }
877 Err(e) => errors.push(("msgpack SimpleLegacyMemory", e.to_string())),
878 }
879
880 match bincode1::deserialize::<LegacyMemoryV1Full>(data) {
882 Ok(legacy) => {
883 tracing::debug!("Migrated memory from bincode 1.x v1 full format");
884 record_branch("bincode1_legacy_v1_full");
885 return Ok((legacy.into_memory(), true));
886 }
887 Err(e) => errors.push(("bincode1 LegacyMemoryV1Full", e.to_string())),
888 }
889
890 match fixint_config.deserialize::<LegacyMemoryV1Full>(data) {
892 Ok(legacy) => {
893 tracing::debug!("Migrated memory from bincode 1.x fixint v1 full format");
894 record_branch("bincode1_fixint_legacy_v1_full");
895 return Ok((legacy.into_memory(), true));
896 }
897 Err(e) => errors.push(("bincode1 fixint LegacyMemoryV1Full", e.to_string())),
898 }
899
900 match rmp_serde::from_slice::<LegacyMemoryV1Full>(data) {
902 Ok(legacy) => {
903 tracing::debug!("Migrated memory from MessagePack v1 full format");
904 record_branch("msgpack_legacy_v1_full");
905 return Ok((legacy.into_memory(), true));
906 }
907 Err(e) => errors.push(("msgpack LegacyMemoryV1Full", e.to_string())),
908 }
909
910 match bincode1::deserialize::<LegacyMemoryV1>(data) {
912 Ok(legacy) => {
913 tracing::debug!("Migrated memory from bincode 1.x format");
914 record_branch("bincode1_legacy_v1_repeat");
915 return Ok((legacy.into_memory(), true));
916 }
917 Err(_) => {} }
919
920 match bincode1::deserialize::<LegacyMemoryV2>(data) {
922 Ok(legacy) => {
923 tracing::debug!("Migrated memory from bincode 1.x v2 format");
924 record_branch("bincode1_legacy_v2");
925 return Ok((legacy.into_memory(), true));
926 }
927 Err(e) => errors.push(("bincode1 LegacyMemoryV2", e.to_string())),
928 }
929
930 match bincode::serde::decode_from_slice::<MemoryWith3ByteHeader, _>(
933 data,
934 bincode::config::standard(),
935 ) {
936 Ok((mem, _)) => {
937 tracing::debug!("Migrated memory from bincode 2.x with 3-byte header");
938 record_branch("bincode2_3byte_header");
939 return Ok((mem.into_memory(), true));
940 }
941 Err(e) => errors.push(("bincode2 MemoryWith3ByteHeader", e.to_string())),
942 }
943
944 if let Some(memory) = try_raw_memory_parse(data) {
947 record_branch("raw_parse");
948 return Ok((memory, true));
949 }
950 errors.push(("raw parse", "no valid UTF-8 content found".to_string()));
951
952 if is_first_failure {
954 DEBUG_ENTRY_LOGGED.store(true, std::sync::atomic::Ordering::Relaxed);
955
956 let hex_preview: String = data
957 .iter()
958 .take(32)
959 .map(|b| format!("{:02x}", b))
960 .collect::<Vec<_>>()
961 .join(" ");
962 tracing::debug!(
963 "Unknown memory format ({} bytes): {}...",
964 data.len(),
965 hex_preview
966 );
967 }
968
969 record_branch("decode_failed");
971 Err(anyhow!(
972 "Failed to deserialize memory: incompatible format ({} bytes)",
973 data.len()
974 ))
975}
976
977fn crc32_simple(data: &[u8]) -> u32 {
979 let mut crc: u32 = 0xFFFFFFFF;
980 for byte in data {
981 crc ^= *byte as u32;
982 for _ in 0..8 {
983 crc = if crc & 1 != 0 {
984 (crc >> 1) ^ 0xEDB88320
985 } else {
986 crc >> 1
987 };
988 }
989 }
990 !crc
991}
992
993const CF_INDEX: &str = "memory_index";
995
996pub struct MemoryStorage {
1002 db: Arc<DB>,
1003 storage_path: PathBuf,
1005 write_mode: WriteMode,
1007}
1008
1009impl MemoryStorage {
1010 fn index_cf(&self) -> &ColumnFamily {
1012 self.db
1013 .cf_handle(CF_INDEX)
1014 .expect("memory_index CF must exist")
1015 }
1016
1017 pub fn new(path: &Path, shared_cache: Option<&rocksdb::Cache>) -> Result<Self> {
1023 use crate::constants::ROCKSDB_MEMORY_WRITE_BUFFER_BYTES;
1024
1025 let storage_path = path.join("storage");
1027 std::fs::create_dir_all(&storage_path)?;
1028
1029 let mut opts = Options::default();
1031 opts.create_if_missing(true);
1032 opts.create_missing_column_families(true);
1033 opts.set_compression_type(rocksdb::DBCompressionType::Lz4);
1034
1035 opts.set_manual_wal_flush(false); opts.set_max_write_buffer_number(2);
1051 opts.set_write_buffer_size(ROCKSDB_MEMORY_WRITE_BUFFER_BYTES);
1052 opts.set_level_zero_file_num_compaction_trigger(4);
1053 opts.set_target_file_size_base(64 * 1024 * 1024); opts.set_max_bytes_for_level_base(256 * 1024 * 1024); opts.set_max_background_jobs(4);
1056 opts.set_level_compaction_dynamic_level_bytes(true);
1057
1058 use rocksdb::{BlockBasedOptions, Cache};
1060 let mut block_opts = BlockBasedOptions::default();
1061 block_opts.set_bloom_filter(10.0, false); let local_cache;
1063 let cache = match shared_cache {
1064 Some(c) => c,
1065 None => {
1066 local_cache = Cache::new_lru_cache(16 * 1024 * 1024); &local_cache
1068 }
1069 };
1070 block_opts.set_block_cache(cache);
1071 block_opts.set_cache_index_and_filter_blocks(true);
1072 block_opts.set_pin_l0_filter_and_index_blocks_in_cache(true); opts.set_block_based_table_factory(&block_opts);
1074
1075 let main_opts = opts.clone();
1077 let db = Arc::new(Self::open_or_repair_cf(&opts, &storage_path, move || {
1078 vec![
1079 ColumnFamilyDescriptor::new("default", main_opts.clone()),
1080 ColumnFamilyDescriptor::new(CF_INDEX, {
1081 let mut idx_opts = Options::default();
1082 idx_opts.create_if_missing(true);
1083 idx_opts.set_compression_type(rocksdb::DBCompressionType::Lz4);
1084 idx_opts.set_max_write_buffer_number(2);
1085 idx_opts.set_write_buffer_size(ROCKSDB_MEMORY_WRITE_BUFFER_BYTES);
1086 idx_opts
1087 }),
1088 ]
1089 })?);
1090
1091 Self::migrate_from_separate_dbs(path, &db)?;
1093
1094 let write_mode = WriteMode::default();
1095 tracing::info!(
1096 "Storage initialized with {:?} write mode (latency: {})",
1097 write_mode,
1098 if write_mode == WriteMode::Sync {
1099 "2-10ms per write"
1100 } else {
1101 "<1ms per write"
1102 }
1103 );
1104
1105 Ok(Self {
1106 db,
1107 storage_path: path.to_path_buf(),
1108 write_mode,
1109 })
1110 }
1111
1112 fn open_or_repair_cf<F>(opts: &Options, path: &Path, build_cfs: F) -> Result<DB>
1120 where
1121 F: Fn() -> Vec<ColumnFamilyDescriptor>,
1122 {
1123 match DB::open_cf_descriptors(opts, path, build_cfs()) {
1124 Ok(db) => Ok(db),
1125 Err(open_err) => {
1126 let err_str = open_err.to_string();
1127 if err_str.contains("Corruption")
1129 || err_str.contains("bad block")
1130 || err_str.contains("checksum mismatch")
1131 || err_str.contains("MANIFEST")
1132 || err_str.contains("CURRENT")
1133 {
1134 tracing::warn!(
1135 error = %open_err,
1136 "RocksDB corruption detected in memory storage, attempting repair"
1137 );
1138 if let Err(repair_err) = DB::repair(opts, path) {
1139 tracing::error!(
1140 error = %repair_err,
1141 "RocksDB repair failed for memory storage"
1142 );
1143 return Err(anyhow::anyhow!(
1144 "Failed to open or repair memory storage: open={open_err}, repair={repair_err}"
1145 ));
1146 }
1147 tracing::info!("RocksDB repair succeeded for memory storage, reopening");
1148 DB::open_cf_descriptors(opts, path, build_cfs()).map_err(|e| {
1149 anyhow::anyhow!("Failed to open memory storage after repair: {e}")
1150 })
1151 } else {
1152 Err(anyhow::anyhow!("Failed to open memory storage: {open_err}"))
1153 }
1154 }
1155 }
1156 }
1157
1158 fn migrate_from_separate_dbs(base_path: &Path, db: &DB) -> Result<()> {
1164 let old_memories_dir = base_path.join("memories");
1165 let old_index_dir = base_path.join("memory_index");
1166
1167 let has_old_memories = old_memories_dir.is_dir();
1168 let has_old_index = old_index_dir.is_dir();
1169
1170 if !has_old_memories && !has_old_index {
1171 return Ok(());
1172 }
1173
1174 tracing::info!("Detected old separate-DB layout, migrating to column families...");
1175 let mut total_migrated = 0usize;
1176
1177 if has_old_memories {
1179 let old_opts = Options::default();
1180 match DB::open_for_read_only(&old_opts, &old_memories_dir, false) {
1181 Ok(old_db) => {
1182 let mut batch = WriteBatch::default();
1183 let mut count = 0usize;
1184 for item in old_db.iterator(IteratorMode::Start) {
1185 if let Ok((key, value)) = item {
1186 batch.put(&key, &value);
1187 count += 1;
1188 if count % 10_000 == 0 {
1189 db.write(std::mem::take(&mut batch))?;
1190 tracing::info!(" memories: migrated {count} entries...");
1191 }
1192 }
1193 }
1194 if !batch.is_empty() {
1195 db.write(batch)?;
1196 }
1197 drop(old_db);
1198 total_migrated += count;
1199 tracing::info!(" memories: migrated {count} entries to default CF");
1200
1201 let backup_name = base_path.join("memories.pre_cf_migration");
1202 if backup_name.exists() {
1203 let _ = std::fs::remove_dir_all(&backup_name);
1204 }
1205 if let Err(e) = std::fs::rename(&old_memories_dir, &backup_name) {
1206 tracing::warn!("Could not rename old memories dir: {e}");
1207 }
1208 }
1209 Err(e) => {
1210 tracing::warn!("Could not open old memories DB for migration: {e}");
1211 }
1212 }
1213 }
1214
1215 if has_old_index {
1217 let index_cf = db
1218 .cf_handle(CF_INDEX)
1219 .expect("memory_index CF must exist during migration");
1220
1221 let old_opts = Options::default();
1222 match DB::open_for_read_only(&old_opts, &old_index_dir, false) {
1223 Ok(old_db) => {
1224 let mut batch = WriteBatch::default();
1225 let mut count = 0usize;
1226 for item in old_db.iterator(IteratorMode::Start) {
1227 if let Ok((key, value)) = item {
1228 batch.put_cf(&index_cf, &key, &value);
1229 count += 1;
1230 if count % 10_000 == 0 {
1231 db.write(std::mem::take(&mut batch))?;
1232 tracing::info!(" index: migrated {count} entries...");
1233 }
1234 }
1235 }
1236 if !batch.is_empty() {
1237 db.write(batch)?;
1238 }
1239 drop(old_db);
1240 total_migrated += count;
1241 tracing::info!(" index: migrated {count} entries to {CF_INDEX} CF");
1242
1243 let backup_name = base_path.join("memory_index.pre_cf_migration");
1244 if backup_name.exists() {
1245 let _ = std::fs::remove_dir_all(&backup_name);
1246 }
1247 if let Err(e) = std::fs::rename(&old_index_dir, &backup_name) {
1248 tracing::warn!("Could not rename old memory_index dir: {e}");
1249 }
1250 }
1251 Err(e) => {
1252 tracing::warn!("Could not open old memory_index DB for migration: {e}");
1253 }
1254 }
1255 }
1256
1257 if total_migrated > 0 {
1258 tracing::info!(
1259 "Memory storage migration complete: {total_migrated} total entries migrated"
1260 );
1261 }
1262
1263 Ok(())
1264 }
1265
1266 pub fn path(&self) -> &Path {
1268 &self.storage_path
1269 }
1270
1271 pub fn store(&self, memory: &Memory) -> Result<()> {
1280 let key = memory.id.0.as_bytes();
1281
1282 let value = bincode::serde::encode_to_vec(memory, bincode::config::standard())
1284 .context(format!("Failed to serialize memory {}", memory.id.0))?;
1285
1286 let mut write_opts = WriteOptions::default();
1288 write_opts.set_sync(self.write_mode == WriteMode::Sync);
1289
1290 self.db
1292 .put_opt(key, &value, &write_opts)
1293 .context(format!("Failed to put memory {} in RocksDB", memory.id.0))?;
1294
1295 if let Err(e) = self.update_indices(memory) {
1297 if let Err(del_err) = self.db.delete(key) {
1299 tracing::error!(
1300 "Index write failed AND rollback failed for memory {}: index_err={}, delete_err={}",
1301 memory.id.0, e, del_err
1302 );
1303 }
1304 return Err(e.context(format!(
1305 "Failed to update indices for memory {} (rolled back)",
1306 memory.id.0
1307 )));
1308 }
1309
1310 Ok(())
1311 }
1312
1313 fn update_indices(&self, memory: &Memory) -> Result<()> {
1315 let idx = self.index_cf();
1316 let mut batch = WriteBatch::default();
1317
1318 let date_key = format!(
1325 "date:{}:{}",
1326 memory.created_at.format("%Y%m%d"),
1327 memory.id.0
1328 );
1329 batch.put_cf(idx, date_key.as_bytes(), b"1");
1330
1331 let type_key = format!(
1333 "type:{:?}:{}",
1334 memory.experience.experience_type, memory.id.0
1335 );
1336 batch.put_cf(idx, type_key.as_bytes(), b"1");
1337
1338 let importance_bucket = (memory.importance() * 10.0) as u32;
1340 let importance_key = format!("importance:{}:{}", importance_bucket, memory.id.0);
1341 batch.put_cf(idx, importance_key.as_bytes(), b"1");
1342
1343 for entity in &memory.experience.entities {
1345 let normalized_entity = entity.to_lowercase();
1346 let entity_key = format!("entity:{}:{}", normalized_entity, memory.id.0);
1347 batch.put_cf(idx, entity_key.as_bytes(), b"1");
1348 }
1349
1350 for tag in &memory.experience.tags {
1352 let normalized_tag = tag.to_lowercase();
1353 let tag_key = format!("tag:{}:{}", normalized_tag, memory.id.0);
1354 batch.put_cf(idx, tag_key.as_bytes(), b"1");
1355 }
1356
1357 if let Some(ctx) = &memory.experience.context {
1360 if let Some(episode_id) = &ctx.episode.episode_id {
1361 let episode_key = format!("episode:{}:{}", episode_id, memory.id.0);
1362 batch.put_cf(idx, episode_key.as_bytes(), b"1");
1363
1364 if let Some(seq) = ctx.episode.sequence_number {
1366 let seq_key = format!("episode_seq:{}:{}:{}", episode_id, seq, memory.id.0);
1367 batch.put_cf(idx, seq_key.as_bytes(), b"1");
1368 }
1369 }
1370 }
1371
1372 if let Some(ref robot_id) = memory.experience.robot_id {
1376 let robot_key = format!("robot:{}:{}", robot_id, memory.id.0);
1377 batch.put_cf(idx, robot_key.as_bytes(), b"1");
1378 }
1379
1380 if let Some(ref mission_id) = memory.experience.mission_id {
1382 let mission_key = format!("mission:{}:{}", mission_id, memory.id.0);
1383 batch.put_cf(idx, mission_key.as_bytes(), b"1");
1384 }
1385
1386 if let Some(geo) = memory.experience.geo_location {
1390 let lat = geo[0];
1391 let lon = geo[1];
1392 let geohash = super::types::geohash_encode(lat, lon, 10);
1394 let geo_key = format!("geo:{}:{}", geohash, memory.id.0);
1395 batch.put_cf(idx, geo_key.as_bytes(), b"1");
1396 }
1397
1398 if let Some(ref action_type) = memory.experience.action_type {
1400 let action_key = format!("action:{}:{}", action_type, memory.id.0);
1401 batch.put_cf(idx, action_key.as_bytes(), b"1");
1402 }
1403
1404 if let Some(reward) = memory.experience.reward {
1407 let clamped_reward = reward.clamp(-1.0, 1.0);
1408 let reward_bucket = ((clamped_reward + 1.0) * 10.0) as i32;
1409 let reward_key = format!("reward:{}:{}", reward_bucket, memory.id.0);
1410 batch.put_cf(idx, reward_key.as_bytes(), b"1");
1411 }
1412
1413 {
1418 let content_hash = Self::sha256_content_hash(&memory.experience.content);
1419 let hash_key = format!("content_hash:{}", content_hash);
1420 batch.put_cf(idx, hash_key.as_bytes(), memory.id.0.as_bytes());
1421 }
1422
1423 if let Some(ref external_id) = memory.external_id {
1428 let external_key = format!("external:{}:{}", external_id, memory.id.0);
1429 batch.put_cf(idx, external_key.as_bytes(), memory.id.0.as_bytes());
1431 }
1432
1433 if let Some(ref parent_id) = memory.parent_id {
1438 let parent_key = format!("parent:{}:{}", parent_id.0, memory.id.0);
1439 batch.put_cf(idx, parent_key.as_bytes(), b"1");
1440 }
1441
1442 let mut write_opts = WriteOptions::default();
1444 write_opts.set_sync(self.write_mode == WriteMode::Sync);
1445 self.db.write_opt(batch, &write_opts)?;
1446 Ok(())
1447 }
1448
1449 fn sha256_content_hash(content: &str) -> String {
1451 use sha2::{Digest, Sha256};
1452 let mut hasher = Sha256::new();
1453 hasher.update(content.as_bytes());
1454 hex::encode(hasher.finalize())
1455 }
1456
1457 pub fn get_by_content_hash(&self, content: &str) -> Option<MemoryId> {
1460 let content_hash = Self::sha256_content_hash(content);
1461 let hash_key = format!("content_hash:{}", content_hash);
1462 let idx = self.index_cf();
1463 match self.db.get_cf(idx, hash_key.as_bytes()) {
1464 Ok(Some(value)) if value.len() == 16 => {
1465 let uuid = uuid::Uuid::from_slice(&value).ok()?;
1466 let memory_id = MemoryId(uuid);
1468 match self.get(&memory_id) {
1469 Ok(_) => Some(memory_id),
1470 Err(_) => {
1471 let _ = self.db.delete_cf(idx, hash_key.as_bytes());
1473 None
1474 }
1475 }
1476 }
1477 _ => None,
1478 }
1479 }
1480
1481 pub fn get(&self, id: &MemoryId) -> Result<Memory> {
1486 let key = id.0.as_bytes();
1487 match self.db.get(key)? {
1488 Some(value) => {
1489 let (memory, needs_migration) = deserialize_memory(&value).with_context(|| {
1490 format!(
1491 "Failed to deserialize memory {} ({} bytes)",
1492 id.0,
1493 value.len()
1494 )
1495 })?;
1496
1497 if needs_migration {
1499 if let Err(e) = self.migrate_memory_format(&memory) {
1500 tracing::debug!("Lazy migration skipped for memory {}: {}", memory.id.0, e);
1502 }
1503 }
1504
1505 Ok(memory)
1506 }
1507 None => Err(anyhow!("Memory not found: {id:?}")),
1508 }
1509 }
1510
1511 fn migrate_memory_format(&self, memory: &Memory) -> Result<()> {
1513 let key = memory.id.0.as_bytes();
1514 let value = bincode::serde::encode_to_vec(memory, bincode::config::standard())
1515 .context("Failed to serialize for migration")?;
1516
1517 let mut write_opts = WriteOptions::default();
1518 write_opts.set_sync(false); self.db.put_opt(key, &value, &write_opts)?;
1521 tracing::debug!("Migrated memory {} to current format", memory.id.0);
1522 Ok(())
1523 }
1524
1525 pub fn find_by_external_id(&self, external_id: &str) -> Result<Option<Memory>> {
1530 let prefix = format!("external:{external_id}:");
1532
1533 let iter = self.db.iterator_cf(
1534 self.index_cf(),
1535 IteratorMode::From(prefix.as_bytes(), rocksdb::Direction::Forward),
1536 );
1537
1538 for (key, _value) in iter.log_errors() {
1539 let key_str = String::from_utf8_lossy(&key);
1540 if !key_str.starts_with(&prefix) {
1541 break;
1542 }
1543 if let Some(id_str) = key_str.strip_prefix(&prefix) {
1545 if let Ok(uuid) = uuid::Uuid::parse_str(id_str) {
1546 return Ok(Some(self.get(&MemoryId(uuid))?));
1547 }
1548 }
1549 }
1550
1551 Ok(None)
1552 }
1553
1554 pub fn update(&self, memory: &Memory) -> Result<()> {
1560 self.remove_from_indices(&memory.id)?;
1562 self.store(memory)
1564 }
1565
1566 #[allow(unused)] pub fn delete(&self, id: &MemoryId) -> Result<()> {
1569 self.remove_from_indices(id)?;
1572
1573 let key = id.0.as_bytes();
1575 let mut write_opts = WriteOptions::default();
1576 write_opts.set_sync(self.write_mode == WriteMode::Sync);
1577 self.db.delete_opt(key, &write_opts)?;
1578
1579 let mapping_key = format!("vmapping:{}", id.0);
1581 let _ = self.db.delete_opt(mapping_key.as_bytes(), &write_opts);
1582
1583 Ok(())
1584 }
1585
1586 fn remove_from_indices(&self, id: &MemoryId) -> Result<()> {
1590 let memory = match self.get(id) {
1592 Ok(m) => m,
1593 Err(_) => {
1594 tracing::debug!("Memory {} not found, skipping index cleanup", id.0);
1595 return Ok(());
1596 }
1597 };
1598
1599 let idx = self.index_cf();
1600 let mut batch = WriteBatch::default();
1601
1602 let date_key = format!("date:{}:{}", memory.created_at.format("%Y%m%d"), id.0);
1606 batch.delete_cf(idx, date_key.as_bytes());
1607
1608 let type_key = format!("type:{:?}:{}", memory.experience.experience_type, id.0);
1610 batch.delete_cf(idx, type_key.as_bytes());
1611
1612 let importance_bucket = (memory.importance() * 10.0) as u32;
1614 let importance_key = format!("importance:{}:{}", importance_bucket, id.0);
1615 batch.delete_cf(idx, importance_key.as_bytes());
1616
1617 for entity in &memory.experience.entities {
1619 let normalized_entity = entity.to_lowercase();
1620 let entity_key = format!("entity:{}:{}", normalized_entity, id.0);
1621 batch.delete_cf(idx, entity_key.as_bytes());
1622 }
1623
1624 for tag in &memory.experience.tags {
1626 let normalized_tag = tag.to_lowercase();
1627 let tag_key = format!("tag:{}:{}", normalized_tag, id.0);
1628 batch.delete_cf(idx, tag_key.as_bytes());
1629 }
1630
1631 if let Some(ctx) = &memory.experience.context {
1633 if let Some(episode_id) = &ctx.episode.episode_id {
1634 let episode_key = format!("episode:{}:{}", episode_id, id.0);
1635 batch.delete_cf(idx, episode_key.as_bytes());
1636
1637 if let Some(seq) = ctx.episode.sequence_number {
1638 let seq_key = format!("episode_seq:{}:{}:{}", episode_id, seq, id.0);
1639 batch.delete_cf(idx, seq_key.as_bytes());
1640 }
1641 }
1642 }
1643
1644 if let Some(ref robot_id) = memory.experience.robot_id {
1646 let robot_key = format!("robot:{}:{}", robot_id, id.0);
1647 batch.delete_cf(idx, robot_key.as_bytes());
1648 }
1649
1650 if let Some(ref mission_id) = memory.experience.mission_id {
1652 let mission_key = format!("mission:{}:{}", mission_id, id.0);
1653 batch.delete_cf(idx, mission_key.as_bytes());
1654 }
1655
1656 if let Some(geo) = memory.experience.geo_location {
1658 let geohash = super::types::geohash_encode(geo[0], geo[1], 10);
1659 let geo_key = format!("geo:{}:{}", geohash, id.0);
1660 batch.delete_cf(idx, geo_key.as_bytes());
1661 }
1662
1663 if let Some(ref action_type) = memory.experience.action_type {
1665 let action_key = format!("action:{}:{}", action_type, id.0);
1666 batch.delete_cf(idx, action_key.as_bytes());
1667 }
1668
1669 if let Some(reward) = memory.experience.reward {
1671 let clamped_reward = reward.clamp(-1.0, 1.0);
1672 let reward_bucket = ((clamped_reward + 1.0) * 10.0) as i32;
1673 let reward_key = format!("reward:{}:{}", reward_bucket, id.0);
1674 batch.delete_cf(idx, reward_key.as_bytes());
1675 }
1676
1677 {
1679 let content_hash = Self::sha256_content_hash(&memory.experience.content);
1680 let hash_key = format!("content_hash:{}", content_hash);
1681 batch.delete_cf(idx, hash_key.as_bytes());
1682 }
1683
1684 if let Some(ref external_id) = memory.external_id {
1686 let external_key = format!("external:{}:{}", external_id, id.0);
1687 batch.delete_cf(idx, external_key.as_bytes());
1688 }
1689
1690 if let Some(ref parent_id) = memory.parent_id {
1692 let parent_key = format!("parent:{}:{}", parent_id.0, id.0);
1693 batch.delete_cf(idx, parent_key.as_bytes());
1694 }
1695
1696 let mut write_opts = WriteOptions::default();
1698 write_opts.set_sync(self.write_mode == WriteMode::Sync);
1699 self.db.write_opt(batch, &write_opts)?;
1700 Ok(())
1701 }
1702
1703 pub fn search(&self, criteria: SearchCriteria) -> Result<Vec<Memory>> {
1705 let mut memory_ids = Vec::new();
1706
1707 match criteria {
1708 SearchCriteria::ByDate { start, end } => {
1710 memory_ids = self.search_by_date_range(start, end)?;
1711 }
1712 SearchCriteria::ByType(exp_type) => {
1713 memory_ids = self.search_by_type(exp_type)?;
1714 }
1715 SearchCriteria::ByImportance { min, max } => {
1716 memory_ids = self.search_by_importance(min, max)?;
1717 }
1718 SearchCriteria::ByEntity(entity) => {
1719 memory_ids = self.search_by_entity(&entity)?;
1720 }
1721 SearchCriteria::ByTags(tags) => {
1722 memory_ids = self.search_by_tags(&tags)?;
1723 }
1724
1725 SearchCriteria::ByEpisode(episode_id) => {
1727 memory_ids = self.search_by_episode(&episode_id)?;
1728 }
1729 SearchCriteria::ByEpisodeSequence {
1730 episode_id,
1731 min_sequence,
1732 max_sequence,
1733 } => {
1734 memory_ids =
1735 self.search_by_episode_sequence(&episode_id, min_sequence, max_sequence)?;
1736 }
1737
1738 SearchCriteria::ByRobot(robot_id) => {
1740 memory_ids = self.search_by_robot(&robot_id)?;
1741 }
1742 SearchCriteria::ByMission(mission_id) => {
1743 memory_ids = self.search_by_mission(&mission_id)?;
1744 }
1745 SearchCriteria::ByLocation {
1746 lat,
1747 lon,
1748 radius_meters,
1749 } => {
1750 memory_ids = self.search_by_location(lat, lon, radius_meters)?;
1751 }
1752 SearchCriteria::ByActionType(action_type) => {
1753 memory_ids = self.search_by_action_type(&action_type)?;
1754 }
1755 SearchCriteria::ByReward { min, max } => {
1756 memory_ids = self.search_by_reward(min, max)?;
1757 }
1758
1759 SearchCriteria::Combined(criterias) => {
1761 use std::collections::HashSet;
1764 let mut result_sets: Vec<HashSet<MemoryId>> = Vec::new();
1765 for c in criterias {
1766 result_sets.push(
1767 self.search(c)?
1768 .into_iter()
1769 .map(|m| m.id)
1770 .collect::<HashSet<_>>(),
1771 );
1772 }
1773
1774 if !result_sets.is_empty() {
1775 let first_set = result_sets.remove(0);
1776 memory_ids = first_set
1777 .into_iter()
1778 .filter(|id| result_sets.iter().all(|set| set.contains(id)))
1779 .collect();
1780 }
1781 }
1782
1783 SearchCriteria::ByParent(parent_id) => {
1785 memory_ids = self.search_by_parent(&parent_id)?;
1786 }
1787 SearchCriteria::RootsOnly => {
1788 memory_ids = self.search_roots()?;
1789 }
1790 }
1791
1792 let mut memories = Vec::new();
1794 for id in memory_ids {
1795 if let Ok(memory) = self.get(&id) {
1796 if !memory.is_forgotten() {
1797 memories.push(memory);
1798 }
1799 }
1800 }
1801
1802 Ok(memories)
1803 }
1804
1805 fn search_by_date_range(
1806 &self,
1807 start: DateTime<Utc>,
1808 end: DateTime<Utc>,
1809 ) -> Result<Vec<MemoryId>> {
1810 let mut ids = Vec::new();
1811 let start_key = format!("date:{}", start.format("%Y%m%d"));
1812 let end_key = format!("date:{}~", end.format("%Y%m%d"));
1815
1816 let iter = self.db.iterator_cf(
1817 self.index_cf(),
1818 IteratorMode::From(start_key.as_bytes(), rocksdb::Direction::Forward),
1819 );
1820 for (key, _value) in iter.log_errors() {
1821 let key_str = String::from_utf8_lossy(&key);
1822 if &*key_str > end_key.as_str() {
1823 break;
1824 }
1825 if key_str.starts_with("date:") {
1827 let parts: Vec<&str> = key_str.split(':').collect();
1828 if parts.len() >= 3 {
1829 if let Ok(uuid) = uuid::Uuid::parse_str(parts[2]) {
1831 ids.push(MemoryId(uuid));
1832 }
1833 }
1834 }
1835 }
1836
1837 Ok(ids)
1838 }
1839
1840 fn search_by_type(&self, exp_type: ExperienceType) -> Result<Vec<MemoryId>> {
1841 let mut ids = Vec::new();
1842 let prefix = format!("type:{exp_type:?}:");
1843
1844 let iter = self.db.iterator_cf(
1845 self.index_cf(),
1846 IteratorMode::From(prefix.as_bytes(), rocksdb::Direction::Forward),
1847 );
1848 for (key, _) in iter.log_errors() {
1849 let key_str = String::from_utf8_lossy(&key);
1850 if !key_str.starts_with(&prefix) {
1851 break;
1852 }
1853 if let Some(id_str) = key_str.strip_prefix(&prefix) {
1855 if let Ok(uuid) = uuid::Uuid::parse_str(id_str) {
1856 ids.push(MemoryId(uuid));
1857 }
1858 }
1859 }
1860
1861 Ok(ids)
1862 }
1863
1864 fn search_by_importance(&self, min: f32, max: f32) -> Result<Vec<MemoryId>> {
1865 let mut ids = Vec::new();
1866 let min_bucket = (min * 10.0) as u32;
1867 let max_bucket = (max * 10.0) as u32;
1868
1869 for bucket in min_bucket..=max_bucket {
1870 let prefix = format!("importance:{bucket}:");
1871 let iter = self.db.iterator_cf(
1872 self.index_cf(),
1873 IteratorMode::From(prefix.as_bytes(), rocksdb::Direction::Forward),
1874 );
1875
1876 for (key, _) in iter.log_errors() {
1877 let key_str = String::from_utf8_lossy(&key);
1878 if !key_str.starts_with(&prefix) {
1879 break;
1880 }
1881 if let Some(id_str) = key_str.strip_prefix(&prefix) {
1883 if let Ok(uuid) = uuid::Uuid::parse_str(id_str) {
1884 ids.push(MemoryId(uuid));
1885 }
1886 }
1887 }
1888 }
1889
1890 Ok(ids)
1891 }
1892
1893 fn search_by_entity(&self, entity: &str) -> Result<Vec<MemoryId>> {
1894 let mut ids = Vec::new();
1895 let normalized_entity = entity.to_lowercase();
1897 let prefix = format!("entity:{normalized_entity}:");
1898
1899 let iter = self.db.iterator_cf(
1900 self.index_cf(),
1901 IteratorMode::From(prefix.as_bytes(), rocksdb::Direction::Forward),
1902 );
1903 for (key, _) in iter.log_errors() {
1904 let key_str = String::from_utf8_lossy(&key);
1905 if !key_str.starts_with(&prefix) {
1906 break;
1907 }
1908 if let Some(id_str) = key_str.strip_prefix(&prefix) {
1910 if let Ok(uuid) = uuid::Uuid::parse_str(id_str) {
1911 ids.push(MemoryId(uuid));
1912 }
1913 }
1914 }
1915
1916 Ok(ids)
1917 }
1918
1919 fn search_by_tags(&self, tags: &[String]) -> Result<Vec<MemoryId>> {
1921 use std::collections::HashSet;
1922
1923 let mut all_ids = HashSet::new();
1925
1926 for tag in tags {
1927 let normalized_tag = tag.to_lowercase();
1929 let prefix = format!("tag:{normalized_tag}:");
1930 let iter = self.db.iterator_cf(
1931 self.index_cf(),
1932 IteratorMode::From(prefix.as_bytes(), rocksdb::Direction::Forward),
1933 );
1934 for (key, _) in iter.log_errors() {
1935 let key_str = String::from_utf8_lossy(&key);
1936 if !key_str.starts_with(&prefix) {
1937 break;
1938 }
1939 if let Some(id_str) = key_str.strip_prefix(&prefix) {
1940 if let Ok(uuid) = uuid::Uuid::parse_str(id_str) {
1941 all_ids.insert(MemoryId(uuid));
1942 }
1943 }
1944 }
1945 }
1946
1947 Ok(all_ids.into_iter().collect())
1948 }
1949
1950 fn search_by_episode(&self, episode_id: &str) -> Result<Vec<MemoryId>> {
1953 let mut ids = Vec::new();
1954 let prefix = format!("episode:{episode_id}:");
1955
1956 let iter = self.db.iterator_cf(
1957 self.index_cf(),
1958 IteratorMode::From(prefix.as_bytes(), rocksdb::Direction::Forward),
1959 );
1960 for (key, _) in iter.log_errors() {
1961 let key_str = String::from_utf8_lossy(&key);
1962 if !key_str.starts_with(&prefix) {
1963 break;
1964 }
1965 if let Some(id_str) = key_str.strip_prefix(&prefix) {
1966 if let Ok(uuid) = uuid::Uuid::parse_str(id_str) {
1967 ids.push(MemoryId(uuid));
1968 }
1969 }
1970 }
1971
1972 Ok(ids)
1973 }
1974
1975 fn search_by_episode_sequence(
1978 &self,
1979 episode_id: &str,
1980 min_sequence: Option<u32>,
1981 max_sequence: Option<u32>,
1982 ) -> Result<Vec<MemoryId>> {
1983 let mut results: Vec<(u32, MemoryId)> = Vec::new();
1984
1985 let prefix = format!("episode_seq:{episode_id}:");
1987
1988 let iter = self.db.iterator_cf(
1989 self.index_cf(),
1990 IteratorMode::From(prefix.as_bytes(), rocksdb::Direction::Forward),
1991 );
1992
1993 for (key, _) in iter.log_errors() {
1994 let key_str = String::from_utf8_lossy(&key);
1995 if !key_str.starts_with(&prefix) {
1996 break;
1997 }
1998
1999 if let Some(rest) = key_str.strip_prefix(&prefix) {
2001 let parts: Vec<&str> = rest.splitn(2, ':').collect();
2002 if parts.len() == 2 {
2003 if let (Ok(seq), Ok(uuid)) =
2004 (parts[0].parse::<u32>(), uuid::Uuid::parse_str(parts[1]))
2005 {
2006 let passes_min = min_sequence.map_or(true, |min| seq >= min);
2008 let passes_max = max_sequence.map_or(true, |max| seq <= max);
2009
2010 if passes_min && passes_max {
2011 results.push((seq, MemoryId(uuid)));
2012 }
2013 }
2014 }
2015 }
2016 }
2017
2018 results.sort_by_key(|(seq, _)| *seq);
2020
2021 Ok(results.into_iter().map(|(_, id)| id).collect())
2022 }
2023
2024 fn search_by_robot(&self, robot_id: &str) -> Result<Vec<MemoryId>> {
2030 let mut ids = Vec::new();
2031 let prefix = format!("robot:{robot_id}:");
2032
2033 let iter = self.db.iterator_cf(
2034 self.index_cf(),
2035 IteratorMode::From(prefix.as_bytes(), rocksdb::Direction::Forward),
2036 );
2037 for (key, _) in iter.log_errors() {
2038 let key_str = String::from_utf8_lossy(&key);
2039 if !key_str.starts_with(&prefix) {
2040 break;
2041 }
2042 if let Some(id_str) = key_str.strip_prefix(&prefix) {
2043 if let Ok(uuid) = uuid::Uuid::parse_str(id_str) {
2044 ids.push(MemoryId(uuid));
2045 }
2046 }
2047 }
2048
2049 Ok(ids)
2050 }
2051
2052 fn search_by_mission(&self, mission_id: &str) -> Result<Vec<MemoryId>> {
2054 let mut ids = Vec::new();
2055 let prefix = format!("mission:{mission_id}:");
2056
2057 let iter = self.db.iterator_cf(
2058 self.index_cf(),
2059 IteratorMode::From(prefix.as_bytes(), rocksdb::Direction::Forward),
2060 );
2061 for (key, _) in iter.log_errors() {
2062 let key_str = String::from_utf8_lossy(&key);
2063 if !key_str.starts_with(&prefix) {
2064 break;
2065 }
2066 if let Some(id_str) = key_str.strip_prefix(&prefix) {
2067 if let Ok(uuid) = uuid::Uuid::parse_str(id_str) {
2068 ids.push(MemoryId(uuid));
2069 }
2070 }
2071 }
2072
2073 Ok(ids)
2074 }
2075
2076 fn search_by_location(
2081 &self,
2082 center_lat: f64,
2083 center_lon: f64,
2084 radius_meters: f64,
2085 ) -> Result<Vec<MemoryId>> {
2086 use super::types::{geohash_decode, geohash_search_prefixes, GeoFilter};
2087
2088 let geo_filter = GeoFilter::new(center_lat, center_lon, radius_meters);
2089 let mut ids = Vec::new();
2090
2091 let prefixes = geohash_search_prefixes(center_lat, center_lon, radius_meters);
2093
2094 for geohash_prefix in prefixes {
2096 let prefix = format!("geo:{}", geohash_prefix);
2100 let iter = self.db.iterator_cf(
2101 self.index_cf(),
2102 IteratorMode::From(prefix.as_bytes(), rocksdb::Direction::Forward),
2103 );
2104
2105 for (key, _value) in iter.log_errors() {
2106 let key_str = String::from_utf8_lossy(&key);
2107 if !key_str.starts_with(&prefix) {
2108 break;
2109 }
2110
2111 let parts: Vec<&str> = key_str.split(':').collect();
2113 if parts.len() >= 3 {
2114 let geohash = parts[1];
2115 let (min_lat, min_lon, max_lat, max_lon) = geohash_decode(geohash);
2117 let approx_lat = (min_lat + max_lat) / 2.0;
2118 let approx_lon = (min_lon + max_lon) / 2.0;
2119
2120 if geo_filter.contains(approx_lat, approx_lon) {
2122 if let Ok(uuid) = uuid::Uuid::parse_str(parts[2]) {
2123 ids.push(MemoryId(uuid));
2124 }
2125 }
2126 }
2127 }
2128 }
2129
2130 Ok(ids)
2131 }
2132
2133 fn search_by_action_type(&self, action_type: &str) -> Result<Vec<MemoryId>> {
2135 let mut ids = Vec::new();
2136 let prefix = format!("action:{action_type}:");
2137
2138 let iter = self.db.iterator_cf(
2139 self.index_cf(),
2140 IteratorMode::From(prefix.as_bytes(), rocksdb::Direction::Forward),
2141 );
2142 for (key, _) in iter.log_errors() {
2143 let key_str = String::from_utf8_lossy(&key);
2144 if !key_str.starts_with(&prefix) {
2145 break;
2146 }
2147 if let Some(id_str) = key_str.strip_prefix(&prefix) {
2148 if let Ok(uuid) = uuid::Uuid::parse_str(id_str) {
2149 ids.push(MemoryId(uuid));
2150 }
2151 }
2152 }
2153
2154 Ok(ids)
2155 }
2156
2157 fn search_by_reward(&self, min: f32, max: f32) -> Result<Vec<MemoryId>> {
2159 let mut ids = Vec::new();
2160
2161 let clamped_min = min.clamp(-1.0, 1.0);
2164 let clamped_max = max.clamp(-1.0, 1.0);
2165 let min_bucket = ((clamped_min + 1.0) * 10.0) as i32; let max_bucket = ((clamped_max + 1.0) * 10.0) as i32;
2167
2168 for bucket in min_bucket..=max_bucket {
2169 let prefix = format!("reward:{bucket}:");
2170 let iter = self.db.iterator_cf(
2171 self.index_cf(),
2172 IteratorMode::From(prefix.as_bytes(), rocksdb::Direction::Forward),
2173 );
2174
2175 for (key, _) in iter.log_errors() {
2176 let key_str = String::from_utf8_lossy(&key);
2177 if !key_str.starts_with(&prefix) {
2178 break;
2179 }
2180 if let Some(id_str) = key_str.strip_prefix(&prefix) {
2181 if let Ok(uuid) = uuid::Uuid::parse_str(id_str) {
2182 ids.push(MemoryId(uuid));
2183 }
2184 }
2185 }
2186 }
2187
2188 Ok(ids)
2189 }
2190
2191 fn search_by_parent(&self, parent_id: &MemoryId) -> Result<Vec<MemoryId>> {
2197 let mut ids = Vec::new();
2198 let prefix = format!("parent:{}:", parent_id.0);
2199
2200 let iter = self.db.iterator_cf(
2201 self.index_cf(),
2202 IteratorMode::From(prefix.as_bytes(), rocksdb::Direction::Forward),
2203 );
2204
2205 for (key, _) in iter.log_errors() {
2206 let key_str = String::from_utf8_lossy(&key);
2207 if !key_str.starts_with(&prefix) {
2208 break;
2209 }
2210 if let Some(child_id_str) = key_str.strip_prefix(&prefix) {
2212 if let Ok(uuid) = uuid::Uuid::parse_str(child_id_str) {
2213 ids.push(MemoryId(uuid));
2214 }
2215 }
2216 }
2217
2218 Ok(ids)
2219 }
2220
2221 fn search_roots(&self) -> Result<Vec<MemoryId>> {
2223 let mut roots = Vec::new();
2224
2225 let iter = self.db.iterator(IteratorMode::Start);
2227 for item in iter {
2228 if let Ok((key, value)) = item {
2229 if key.len() != 16 {
2230 continue;
2231 }
2232 if let Ok((memory, _)) = deserialize_memory(&value) {
2233 if memory.parent_id.is_none() {
2234 roots.push(memory.id);
2235 }
2236 }
2237 }
2238 }
2239
2240 Ok(roots)
2241 }
2242
2243 pub fn get_children(&self, parent_id: &MemoryId) -> Result<Vec<Memory>> {
2245 let child_ids = self.search_by_parent(parent_id)?;
2246 let mut children = Vec::new();
2247 for id in child_ids {
2248 if let Ok(memory) = self.get(&id) {
2249 children.push(memory);
2250 }
2251 }
2252 Ok(children)
2253 }
2254
2255 pub fn get_ancestors(&self, memory_id: &MemoryId) -> Result<Vec<Memory>> {
2257 let mut ancestors = Vec::new();
2258 let mut current_id = memory_id.clone();
2259
2260 for _ in 0..100 {
2262 let memory = self.get(¤t_id)?;
2263 if let Some(parent_id) = &memory.parent_id {
2264 let parent = self.get(parent_id)?;
2265 ancestors.push(parent.clone());
2266 current_id = parent_id.clone();
2267 } else {
2268 break; }
2270 }
2271
2272 Ok(ancestors)
2273 }
2274
2275 pub fn get_hierarchy_context(
2278 &self,
2279 memory_id: &MemoryId,
2280 ) -> Result<(Vec<Memory>, Memory, Vec<Memory>)> {
2281 let memory = self.get(memory_id)?;
2282 let ancestors = self.get_ancestors(memory_id)?;
2283 let children = self.get_children(memory_id)?;
2284 Ok((ancestors, memory, children))
2285 }
2286
2287 pub fn get_subtree(&self, root_id: &MemoryId, max_depth: usize) -> Result<Vec<Memory>> {
2289 let mut result = Vec::new();
2290 let mut queue = vec![(root_id.clone(), 0usize)];
2291
2292 while let Some((id, depth)) = queue.pop() {
2293 if depth > max_depth {
2294 continue;
2295 }
2296 if let Ok(memory) = self.get(&id) {
2297 result.push(memory);
2298 if depth < max_depth {
2300 let child_ids = self.search_by_parent(&id)?;
2301 for child_id in child_ids {
2302 queue.push((child_id, depth + 1));
2303 }
2304 }
2305 }
2306 }
2307
2308 Ok(result)
2309 }
2310
2311 pub fn get_all_ids(&self) -> Result<Vec<MemoryId>> {
2316 let mut ids = Vec::new();
2317 let mut read_opts = rocksdb::ReadOptions::default();
2318 read_opts.fill_cache(false);
2319 let iter = self.db.iterator_opt(IteratorMode::Start, read_opts);
2320 for item in iter {
2321 if let Ok((key, _)) = item {
2322 if key.len() == 16 {
2323 let uuid_bytes: [u8; 16] = key[..16].try_into().unwrap();
2324 ids.push(MemoryId(uuid::Uuid::from_bytes(uuid_bytes)));
2325 }
2326 }
2327 }
2328 Ok(ids)
2329 }
2330
2331 pub fn get_all(&self) -> Result<Vec<Memory>> {
2335 let mut memories = Vec::new();
2336
2337 let mut read_opts = rocksdb::ReadOptions::default();
2340 read_opts.fill_cache(false);
2341 let iter = self.db.iterator_opt(IteratorMode::Start, read_opts);
2342 for item in iter {
2343 if let Ok((key, value)) = item {
2344 if key.len() != 16 {
2346 continue;
2347 }
2348 if let Ok((memory, _)) = deserialize_memory(&value) {
2349 if !memory.is_forgotten() {
2350 memories.push(memory);
2351 }
2352 }
2353 }
2354 }
2355
2356 Ok(memories)
2357 }
2358
2359 pub fn get_uncompressed_older_than(&self, cutoff: DateTime<Utc>) -> Result<Vec<Memory>> {
2360 let mut memories = Vec::new();
2361
2362 let iter = self.db.iterator(IteratorMode::Start);
2364 for item in iter {
2365 if let Ok((key, value)) = item {
2366 if key.len() != 16 {
2368 continue;
2369 }
2370 if let Ok((memory, _)) = deserialize_memory(&value) {
2371 if !memory.compressed && !memory.is_forgotten() && memory.created_at < cutoff {
2372 memories.push(memory);
2373 }
2374 }
2375 }
2376 }
2377
2378 Ok(memories)
2379 }
2380
2381 pub fn mark_forgotten_by_age(&self, cutoff: DateTime<Utc>) -> Result<Vec<MemoryId>> {
2385 let mut batch = rocksdb::WriteBatch::default();
2386 let mut flagged_ids = Vec::new();
2387 let now = Utc::now().to_rfc3339();
2388
2389 let iter = self.db.iterator(IteratorMode::Start);
2390 for item in iter {
2391 if let Ok((key, value)) = item {
2392 if key.len() != 16 {
2393 continue;
2394 }
2395 if let Ok((mut memory, _)) = deserialize_memory(&value) {
2396 if memory.is_forgotten() {
2397 continue;
2398 }
2399 if memory.created_at < cutoff {
2400 flagged_ids.push(memory.id.clone());
2401 memory
2402 .experience
2403 .metadata
2404 .insert("forgotten".to_string(), "true".to_string());
2405 memory
2406 .experience
2407 .metadata
2408 .insert("forgotten_at".to_string(), now.clone());
2409
2410 let updated_value =
2411 bincode::serde::encode_to_vec(&memory, bincode::config::standard())?;
2412 batch.put(&key, updated_value);
2413 }
2414 }
2415 }
2416 }
2417
2418 if !flagged_ids.is_empty() {
2419 let mut write_opts = WriteOptions::default();
2420 write_opts.set_sync(true);
2421 self.db.write_opt(batch, &write_opts)?;
2422 }
2423
2424 Ok(flagged_ids)
2425 }
2426
2427 pub fn mark_forgotten_by_importance(&self, threshold: f32) -> Result<Vec<MemoryId>> {
2431 let mut batch = rocksdb::WriteBatch::default();
2432 let mut flagged_ids = Vec::new();
2433 let now = Utc::now().to_rfc3339();
2434
2435 let iter = self.db.iterator(IteratorMode::Start);
2436 for item in iter {
2437 if let Ok((key, value)) = item {
2438 if key.len() != 16 {
2439 continue;
2440 }
2441 if let Ok((mut memory, _)) = deserialize_memory(&value) {
2442 if memory.is_forgotten() {
2443 continue;
2444 }
2445 if memory.importance() < threshold {
2446 flagged_ids.push(memory.id.clone());
2447 memory
2448 .experience
2449 .metadata
2450 .insert("forgotten".to_string(), "true".to_string());
2451 memory
2452 .experience
2453 .metadata
2454 .insert("forgotten_at".to_string(), now.clone());
2455
2456 let updated_value =
2457 bincode::serde::encode_to_vec(&memory, bincode::config::standard())?;
2458 batch.put(&key, updated_value);
2459 }
2460 }
2461 }
2462 }
2463
2464 if !flagged_ids.is_empty() {
2465 let mut write_opts = WriteOptions::default();
2466 write_opts.set_sync(true);
2467 self.db.write_opt(batch, &write_opts)?;
2468 }
2469
2470 Ok(flagged_ids)
2471 }
2472
2473 pub fn remove_matching(&self, regex: ®ex::Regex) -> Result<usize> {
2475 let mut count = 0;
2476 let mut to_delete: Vec<MemoryId> = Vec::new();
2477
2478 let iter = self.db.iterator(IteratorMode::Start);
2479 for item in iter {
2480 if let Ok((key, value)) = item {
2481 if key.len() != 16 {
2483 continue;
2484 }
2485 if let Ok((memory, _)) = deserialize_memory(&value) {
2486 if regex.is_match(&memory.experience.content) {
2487 to_delete.push(memory.id);
2488 count += 1;
2489 }
2490 }
2491 }
2492 }
2493
2494 for memory_id in to_delete {
2496 if let Err(e) = self.delete(&memory_id) {
2497 tracing::warn!("Failed to delete matching memory {}: {}", memory_id.0, e);
2498 }
2499 }
2500
2501 Ok(count)
2502 }
2503
2504 pub fn update_access(&self, id: &MemoryId) -> Result<()> {
2506 if let Ok(memory) = self.get(id) {
2507 memory.update_access();
2509
2510 self.update(&memory)?;
2512 }
2513 Ok(())
2514 }
2515
2516 pub fn get_stats(&self) -> Result<StorageStats> {
2518 let mut stats = StorageStats::default();
2519 let mut raw_count = 0;
2520 let mut skipped_non_memory = 0;
2521 let mut deserialize_errors = 0;
2522 let stats_prefix = b"stats:";
2523
2524 let iter = self.db.iterator(IteratorMode::Start);
2525 for item in iter {
2526 match item {
2527 Ok((key, value)) => {
2528 raw_count += 1;
2529
2530 if key.starts_with(stats_prefix) {
2532 skipped_non_memory += 1;
2533 continue;
2534 }
2535
2536 if key.len() != 16 {
2538 skipped_non_memory += 1;
2539 continue;
2540 }
2541
2542 match deserialize_memory(&value) {
2543 Ok((memory, _)) => {
2544 if memory.is_forgotten() {
2545 continue;
2546 }
2547 stats.total_count += 1;
2548 stats.total_size_bytes += value.len();
2549 if memory.compressed {
2550 stats.compressed_count += 1;
2551 }
2552 stats.importance_sum += memory.importance();
2553 }
2554 Err(e) => {
2555 deserialize_errors += 1;
2556 tracing::warn!(
2557 "Corrupted memory entry (key len: {}, value len: {}): {}",
2558 key.len(),
2559 value.len(),
2560 e
2561 );
2562 }
2563 }
2564 }
2565 Err(e) => {
2566 tracing::error!("Iterator error: {}", e);
2567 }
2568 }
2569 }
2570
2571 tracing::debug!(
2572 "get_stats: raw_count={}, memories={}, skipped={}, corrupted={}",
2573 raw_count,
2574 stats.total_count,
2575 skipped_non_memory,
2576 deserialize_errors
2577 );
2578
2579 if stats.total_count > 0 {
2580 stats.average_importance = stats.importance_sum / stats.total_count as f32;
2581 }
2582
2583 stats.total_retrievals = self.get_retrieval_count().unwrap_or(0);
2585
2586 Ok(stats)
2587 }
2588
2589 pub fn get_retrieval_count(&self) -> Result<usize> {
2591 const RETRIEVAL_KEY: &[u8] = b"stats:total_retrievals";
2592 match self.db.get(RETRIEVAL_KEY)? {
2593 Some(data) => {
2594 if data.len() >= 8 {
2595 Ok(usize::from_le_bytes(data[..8].try_into().unwrap_or([0; 8])))
2596 } else {
2597 Ok(0)
2598 }
2599 }
2600 None => Ok(0),
2601 }
2602 }
2603
2604 pub fn increment_retrieval_count(&self) -> Result<usize> {
2606 const RETRIEVAL_KEY: &[u8] = b"stats:total_retrievals";
2607 let current = self.get_retrieval_count().unwrap_or(0);
2608 let new_count = current + 1;
2609 self.db.put(RETRIEVAL_KEY, new_count.to_le_bytes())?;
2610 Ok(new_count)
2611 }
2612
2613 pub fn cleanup_corrupted(&self) -> Result<usize> {
2624 let mut to_delete = Vec::new();
2625
2626 let skip_prefixes: &[&[u8]] = &[
2631 b"stats:",
2632 b"vmapping:",
2633 b"interference:",
2634 b"interference_meta:",
2635 b"_watermark:",
2636 b"facts:",
2637 b"facts_by_entity:",
2638 b"facts_by_type:",
2639 b"facts_embedding:",
2640 b"temporal_facts:",
2641 b"temporal_by_time:",
2642 b"temporal_by_entity:",
2643 b"lineage:",
2644 b"learning:",
2645 b"geo:",
2646 ];
2647
2648 let iter = self.db.iterator(IteratorMode::Start);
2649 for item in iter {
2650 if let Ok((key, value)) = item {
2651 if skip_prefixes.iter().any(|p| key.starts_with(p)) {
2653 continue;
2654 }
2655
2656 let is_valid_memory_key = key.len() == 16;
2658
2659 if !is_valid_memory_key {
2660 tracing::debug!(
2662 "Marking for deletion: invalid key length {} (expected 16)",
2663 key.len()
2664 );
2665 to_delete.push(key.to_vec());
2666 } else if deserialize_memory(&value).is_err() {
2667 tracing::debug!(
2669 "Marking for deletion: valid key but corrupted value ({} bytes)",
2670 value.len()
2671 );
2672 to_delete.push(key.to_vec());
2673 }
2674 }
2675 }
2676
2677 let count = to_delete.len();
2678 if count > 0 {
2679 tracing::info!("Cleaning up {} corrupted memory entries", count);
2680
2681 let mut write_opts = WriteOptions::default();
2682 write_opts.set_sync(self.write_mode == WriteMode::Sync);
2683
2684 for key in to_delete {
2685 if let Err(e) = self.db.delete_opt(&key, &write_opts) {
2686 tracing::warn!("Failed to delete corrupted entry: {}", e);
2687 }
2688 }
2689
2690 self.flush()?;
2692 }
2693
2694 Ok(count)
2695 }
2696
2697 pub fn migrate_legacy(&self) -> Result<(usize, usize, usize)> {
2706 let mut migrated = 0;
2707 let mut already_current = 0;
2708 let mut failed = 0;
2709 let stats_prefix = b"stats:";
2710
2711 let iter = self.db.iterator(IteratorMode::Start);
2712 let mut to_migrate = Vec::new();
2713
2714 for item in iter {
2715 if let Ok((key, value)) = item {
2716 if key.starts_with(stats_prefix) {
2718 continue;
2719 }
2720
2721 if key.len() != 16 {
2723 continue;
2724 }
2725
2726 let is_current = bincode::serde::decode_from_slice::<Memory, _>(
2728 &value,
2729 bincode::config::standard(),
2730 )
2731 .is_ok();
2732
2733 if is_current {
2734 already_current += 1;
2735 continue;
2736 }
2737
2738 match deserialize_memory(&value) {
2740 Ok((memory, _)) => {
2741 to_migrate.push((key.to_vec(), memory));
2743 }
2744 Err(_) => {
2745 failed += 1;
2746 }
2747 }
2748 }
2749 }
2750
2751 if !to_migrate.is_empty() {
2753 tracing::info!(
2754 "Migrating {} legacy memories to current format",
2755 to_migrate.len()
2756 );
2757
2758 let mut write_opts = WriteOptions::default();
2759 write_opts.set_sync(self.write_mode == WriteMode::Sync);
2760
2761 for (key, memory) in to_migrate {
2762 match bincode::serde::encode_to_vec(&memory, bincode::config::standard()) {
2763 Ok(serialized) => {
2764 if let Err(e) = self.db.put_opt(&key, &serialized, &write_opts) {
2765 tracing::warn!("Failed to migrate memory: {e}");
2766 failed += 1;
2767 } else {
2768 migrated += 1;
2769 }
2770 }
2771 Err(e) => {
2772 tracing::warn!("Failed to serialize migrated memory: {e}");
2773 failed += 1;
2774 }
2775 }
2776 }
2777
2778 self.flush()?;
2780 }
2781
2782 tracing::info!(
2783 "Migration complete: {} migrated, {} already current, {} failed",
2784 migrated,
2785 already_current,
2786 failed
2787 );
2788
2789 Ok((migrated, already_current, failed))
2790 }
2791
2792 pub fn flush(&self) -> Result<()> {
2794 use rocksdb::FlushOptions;
2795
2796 let mut flush_opts = FlushOptions::default();
2797 flush_opts.set_wait(true); self.db
2801 .flush_opt(&flush_opts)
2802 .map_err(|e| anyhow::anyhow!("Failed to flush memory storage: {e}"))?;
2803
2804 self.db
2806 .flush_cf_opt(self.index_cf(), &flush_opts)
2807 .map_err(|e| anyhow::anyhow!("Failed to flush index CF: {e}"))?;
2808
2809 Ok(())
2810 }
2811
2812 pub fn db(&self) -> Arc<DB> {
2817 self.db.clone()
2818 }
2819}
2820
2821#[derive(Debug, Clone)]
2823pub enum SearchCriteria {
2824 ByDate {
2826 start: DateTime<Utc>,
2827 end: DateTime<Utc>,
2828 },
2829 ByType(ExperienceType),
2830 ByImportance {
2831 min: f32,
2832 max: f32,
2833 },
2834 ByEntity(String),
2835 ByTags(Vec<String>),
2837
2838 ByEpisode(String),
2841 ByEpisodeSequence {
2843 episode_id: String,
2844 min_sequence: Option<u32>,
2846 max_sequence: Option<u32>,
2848 },
2849
2850 ByRobot(String),
2853 ByMission(String),
2855 ByLocation {
2857 lat: f64,
2858 lon: f64,
2859 radius_meters: f64,
2860 },
2861 ByActionType(String),
2863 ByReward {
2865 min: f32,
2866 max: f32,
2867 },
2868
2869 Combined(Vec<SearchCriteria>),
2871
2872 ByParent(MemoryId),
2875 RootsOnly,
2877}
2878
2879#[derive(Debug, Default, Serialize, Deserialize)]
2881pub struct StorageStats {
2882 pub total_count: usize,
2883 pub compressed_count: usize,
2884 pub total_size_bytes: usize,
2885 pub average_importance: f32,
2886 pub importance_sum: f32,
2887 #[serde(default)]
2889 pub total_retrievals: usize,
2890}
2891
2892#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
2923pub enum Modality {
2924 Text,
2926 Image,
2928 Audio,
2930 Video,
2932 Unified,
2935}
2936
2937impl Modality {
2938 pub fn dimension(&self) -> usize {
2940 match self {
2941 Modality::Text => 384, Modality::Image => 1024,
2944 Modality::Audio => 1024,
2945 Modality::Video => 1024,
2946 Modality::Unified => 1024,
2947 }
2948 }
2949
2950 pub fn as_str(&self) -> &'static str {
2952 match self {
2953 Modality::Text => "text",
2954 Modality::Image => "image",
2955 Modality::Audio => "audio",
2956 Modality::Video => "video",
2957 Modality::Unified => "unified",
2958 }
2959 }
2960}
2961
2962impl std::fmt::Display for Modality {
2963 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2964 write!(f, "{}", self.as_str())
2965 }
2966}
2967
2968#[derive(Debug, Clone, Serialize, Deserialize, Default)]
2970pub struct ModalityVectors {
2971 pub vector_ids: Vec<u32>,
2973 pub dimension: usize,
2975 pub chunk_ranges: Option<Vec<(usize, usize)>>,
2978}
2979
2980#[derive(Debug, Clone, Serialize, Deserialize)]
2988pub struct VectorMappingEntry {
2989 pub modalities: HashMap<Modality, ModalityVectors>,
2993 pub created_at: i64,
2995 pub version: u8,
2997}
2998
2999impl Default for VectorMappingEntry {
3000 fn default() -> Self {
3001 Self {
3002 modalities: HashMap::new(),
3003 created_at: chrono::Utc::now().timestamp_millis(),
3004 version: 1,
3005 }
3006 }
3007}
3008
3009impl VectorMappingEntry {
3010 pub fn with_text(vector_ids: Vec<u32>) -> Self {
3012 let mut modalities = HashMap::new();
3013 modalities.insert(
3014 Modality::Text,
3015 ModalityVectors {
3016 vector_ids,
3017 dimension: 384,
3018 chunk_ranges: None,
3019 },
3020 );
3021 Self {
3022 modalities,
3023 created_at: chrono::Utc::now().timestamp_millis(),
3024 version: 1,
3025 }
3026 }
3027
3028 pub fn text_vectors(&self) -> Option<&Vec<u32>> {
3030 self.modalities.get(&Modality::Text).map(|m| &m.vector_ids)
3031 }
3032
3033 pub fn all_vector_ids(&self) -> Vec<(Modality, u32)> {
3035 self.modalities
3036 .iter()
3037 .flat_map(|(modality, mv)| mv.vector_ids.iter().map(|id| (*modality, *id)))
3038 .collect()
3039 }
3040
3041 pub fn is_empty(&self) -> bool {
3043 self.modalities.values().all(|mv| mv.vector_ids.is_empty())
3044 }
3045
3046 pub fn add_modality(&mut self, modality: Modality, vector_ids: Vec<u32>) {
3048 self.modalities.insert(
3049 modality,
3050 ModalityVectors {
3051 dimension: modality.dimension(),
3052 vector_ids,
3053 chunk_ranges: None,
3054 },
3055 );
3056 }
3057
3058 #[allow(dead_code)]
3060 pub fn with_image(mut self, vector_ids: Vec<u32>) -> Self {
3061 self.add_modality(Modality::Image, vector_ids);
3062 self
3063 }
3064
3065 #[allow(dead_code)]
3067 pub fn with_audio(mut self, vector_ids: Vec<u32>) -> Self {
3068 self.add_modality(Modality::Audio, vector_ids);
3069 self
3070 }
3071
3072 #[allow(dead_code)]
3074 pub fn with_video(mut self, vector_ids: Vec<u32>) -> Self {
3075 self.add_modality(Modality::Video, vector_ids);
3076 self
3077 }
3078}
3079
3080impl MemoryStorage {
3081 pub fn store_with_vectors(&self, memory: &Memory, vector_ids: Vec<u32>) -> Result<()> {
3093 self.store_with_multimodal_vectors(memory, Modality::Text, vector_ids)
3094 }
3095
3096 pub fn store_with_multimodal_vectors(
3101 &self,
3102 memory: &Memory,
3103 modality: Modality,
3104 vector_ids: Vec<u32>,
3105 ) -> Result<()> {
3106 let mut batch = WriteBatch::default();
3107
3108 let memory_key = memory.id.0.as_bytes();
3110 let memory_value = bincode::serde::encode_to_vec(memory, bincode::config::standard())
3111 .context(format!("Failed to serialize memory {}", memory.id.0))?;
3112 batch.put(memory_key, &memory_value);
3113
3114 let mapping_key = format!("vmapping:{}", memory.id.0);
3116
3117 let mut mapping_entry = self.get_vector_mapping(&memory.id)?.unwrap_or_default();
3119
3120 mapping_entry.add_modality(modality, vector_ids);
3122
3123 let mapping_value =
3124 bincode::serde::encode_to_vec(&mapping_entry, bincode::config::standard())
3125 .context("Failed to serialize vector mapping")?;
3126 batch.put(mapping_key.as_bytes(), &mapping_value);
3127
3128 let mut write_opts = WriteOptions::default();
3130 write_opts.set_sync(self.write_mode == WriteMode::Sync);
3131 self.db
3132 .write_opt(batch, &write_opts)
3133 .context("Atomic write of memory + vector mapping failed")?;
3134
3135 if let Err(e) = self.update_indices(memory) {
3137 tracing::warn!("Secondary index update failed (non-fatal): {}", e);
3138 }
3139
3140 Ok(())
3141 }
3142
3143 pub fn get_vector_mapping(&self, memory_id: &MemoryId) -> Result<Option<VectorMappingEntry>> {
3145 let mapping_key = format!("vmapping:{}", memory_id.0);
3146 match self.db.get(mapping_key.as_bytes())? {
3147 Some(data) => {
3148 let (entry, _): (VectorMappingEntry, _) =
3149 bincode::serde::decode_from_slice(&data, bincode::config::standard())
3150 .context("Failed to deserialize vector mapping")?;
3151 Ok(Some(entry))
3152 }
3153 None => Ok(None),
3154 }
3155 }
3156
3157 pub fn get_all_vector_mappings(&self) -> Result<Vec<(MemoryId, VectorMappingEntry)>> {
3162 let mut mappings = Vec::new();
3163 let prefix = b"vmapping:";
3164
3165 let iter = self
3166 .db
3167 .iterator(IteratorMode::From(prefix, rocksdb::Direction::Forward));
3168
3169 for item in iter {
3170 match item {
3171 Ok((key, value)) => {
3172 let key_str = String::from_utf8_lossy(&key);
3173 if !key_str.starts_with("vmapping:") {
3174 break;
3175 }
3176
3177 if let Some(id_str) = key_str.strip_prefix("vmapping:") {
3179 if let Ok(uuid) = uuid::Uuid::parse_str(id_str) {
3180 if let Ok((entry, _)) =
3181 bincode::serde::decode_from_slice::<VectorMappingEntry, _>(
3182 &value,
3183 bincode::config::standard(),
3184 )
3185 {
3186 mappings.push((MemoryId(uuid), entry));
3187 }
3188 }
3189 }
3190 }
3191 Err(e) => {
3192 tracing::warn!("Error reading vector mapping: {}", e);
3193 }
3194 }
3195 }
3196
3197 Ok(mappings)
3198 }
3199
3200 pub fn delete_vector_mapping(&self, memory_id: &MemoryId) -> Result<()> {
3202 let mapping_key = format!("vmapping:{}", memory_id.0);
3203 let mut write_opts = WriteOptions::default();
3204 write_opts.set_sync(self.write_mode == WriteMode::Sync);
3205 self.db.delete_opt(mapping_key.as_bytes(), &write_opts)?;
3206 Ok(())
3207 }
3208
3209 pub fn update_vector_mapping(&self, memory_id: &MemoryId, vector_ids: Vec<u32>) -> Result<()> {
3213 self.update_modality_vectors(memory_id, Modality::Text, vector_ids)
3214 }
3215
3216 pub fn update_modality_vectors(
3220 &self,
3221 memory_id: &MemoryId,
3222 modality: Modality,
3223 vector_ids: Vec<u32>,
3224 ) -> Result<()> {
3225 let mapping_key = format!("vmapping:{}", memory_id.0);
3226
3227 let mut mapping_entry = self.get_vector_mapping(memory_id)?.unwrap_or_default();
3229
3230 mapping_entry.add_modality(modality, vector_ids);
3232
3233 let mapping_value =
3234 bincode::serde::encode_to_vec(&mapping_entry, bincode::config::standard())?;
3235
3236 let mut write_opts = WriteOptions::default();
3237 write_opts.set_sync(self.write_mode == WriteMode::Sync);
3238 self.db
3239 .put_opt(mapping_key.as_bytes(), &mapping_value, &write_opts)?;
3240 Ok(())
3241 }
3242
3243 pub fn delete_with_vectors(&self, id: &MemoryId) -> Result<()> {
3245 let mut batch = WriteBatch::default();
3246
3247 batch.delete(id.0.as_bytes());
3249
3250 let mapping_key = format!("vmapping:{}", id.0);
3252 batch.delete(mapping_key.as_bytes());
3253
3254 let mut write_opts = WriteOptions::default();
3256 write_opts.set_sync(self.write_mode == WriteMode::Sync);
3257 self.db.write_opt(batch, &write_opts)?;
3258
3259 if let Err(e) = self.remove_from_indices(id) {
3261 tracing::warn!("Index cleanup failed (non-fatal): {}", e);
3262 }
3263
3264 Ok(())
3265 }
3266
3267 pub fn count_vector_mappings(&self) -> usize {
3269 let prefix = b"vmapping:";
3270 let iter = self
3271 .db
3272 .iterator(IteratorMode::From(prefix, rocksdb::Direction::Forward));
3273
3274 let mut count = 0;
3275 for item in iter {
3276 if let Ok((key, _)) = item {
3277 if key.starts_with(prefix) {
3278 count += 1;
3279 } else {
3280 break;
3281 }
3282 }
3283 }
3284 count
3285 }
3286
3287 pub fn find_memories_without_mappings(&self) -> Result<Vec<MemoryId>> {
3292 let mut orphans = Vec::new();
3293
3294 let iter = self.db.iterator(IteratorMode::Start);
3295 for item in iter {
3296 if let Ok((key, value)) = item {
3297 if key.len() != 16 {
3299 continue;
3300 }
3301
3302 if let Ok((memory, _)) = deserialize_memory(&value) {
3304 let has_mapping = match self.get_vector_mapping(&memory.id) {
3306 Ok(Some(entry)) => entry.text_vectors().is_some_and(|v| !v.is_empty()),
3307 _ => false,
3308 };
3309
3310 if !has_mapping && memory.experience.embeddings.is_some() {
3312 orphans.push(memory.id);
3313 }
3314 }
3315 }
3316 }
3317
3318 Ok(orphans)
3319 }
3320
3321 pub fn get_all_text_vector_ids(&self) -> Result<Vec<u32>> {
3323 let mut all_ids = Vec::new();
3324 let mappings = self.get_all_vector_mappings()?;
3325
3326 for (_, entry) in mappings {
3327 if let Some(text_vecs) = entry.text_vectors() {
3328 all_ids.extend(text_vecs.iter().copied());
3329 }
3330 }
3331
3332 Ok(all_ids)
3333 }
3334
3335 pub fn get_modality_stats(&self) -> Result<HashMap<Modality, usize>> {
3337 let mut stats: HashMap<Modality, usize> = HashMap::new();
3338 let mappings = self.get_all_vector_mappings()?;
3339
3340 for (_, entry) in mappings {
3341 for (modality, mv) in entry.modalities {
3342 *stats.entry(modality).or_insert(0) += mv.vector_ids.len();
3343 }
3344 }
3345
3346 Ok(stats)
3347 }
3348
3349 pub fn save_interference_records(
3364 &self,
3365 memory_id: &str,
3366 records: &[super::replay::InterferenceRecord],
3367 ) -> Result<()> {
3368 let key = format!("interference:{memory_id}");
3369 let value =
3370 serde_json::to_vec(records).context("Failed to serialize interference records")?;
3371
3372 let mut write_opts = WriteOptions::default();
3373 write_opts.set_sync(self.write_mode == WriteMode::Sync);
3374 self.db
3375 .put_opt(key.as_bytes(), &value, &write_opts)
3376 .context("Failed to persist interference records")?;
3377
3378 Ok(())
3379 }
3380
3381 pub fn load_all_interference_records(
3386 &self,
3387 ) -> Result<(
3388 HashMap<String, Vec<super::replay::InterferenceRecord>>,
3389 usize,
3390 )> {
3391 let prefix = b"interference:";
3392 let mut history: HashMap<String, Vec<super::replay::InterferenceRecord>> = HashMap::new();
3393 let mut total_events: usize = 0;
3394
3395 let iter = self
3396 .db
3397 .iterator(IteratorMode::From(prefix, rocksdb::Direction::Forward));
3398
3399 for item in iter.log_errors() {
3400 let (key, value) = item;
3401 let key_str = String::from_utf8_lossy(&key);
3402
3403 if !key_str.starts_with("interference:") {
3404 break;
3405 }
3406
3407 if let Some(memory_id) = key_str.strip_prefix("interference:") {
3408 match serde_json::from_slice::<Vec<super::replay::InterferenceRecord>>(&value) {
3409 Ok(records) => {
3410 total_events += records.len();
3411 history.insert(memory_id.to_string(), records);
3412 }
3413 Err(e) => {
3414 tracing::warn!(
3415 key = %key_str,
3416 error = %e,
3417 "Failed to deserialize interference records, skipping"
3418 );
3419 }
3420 }
3421 }
3422 }
3423
3424 let persisted_total = self
3426 .db
3427 .get(b"interference_meta:total")
3428 .ok()
3429 .flatten()
3430 .and_then(|v| {
3431 if v.len() == 8 {
3432 Some(u64::from_le_bytes(v[..8].try_into().unwrap()) as usize)
3433 } else {
3434 None
3435 }
3436 })
3437 .unwrap_or(total_events);
3438
3439 Ok((history, persisted_total.max(total_events)))
3440 }
3441
3442 pub fn delete_interference_records(&self, memory_id: &str) -> Result<()> {
3444 let key = format!("interference:{memory_id}");
3445 let mut write_opts = WriteOptions::default();
3446 write_opts.set_sync(self.write_mode == WriteMode::Sync);
3447 self.db
3448 .delete_opt(key.as_bytes(), &write_opts)
3449 .context("Failed to delete interference records")?;
3450 Ok(())
3451 }
3452
3453 pub fn save_interference_event_count(&self, count: usize) -> Result<()> {
3457 let mut write_opts = WriteOptions::default();
3458 write_opts.set_sync(self.write_mode == WriteMode::Sync);
3459 self.db
3460 .put_opt(
3461 b"interference_meta:total",
3462 &(count as u64).to_le_bytes(),
3463 &write_opts,
3464 )
3465 .context("Failed to persist interference event count")?;
3466 Ok(())
3467 }
3468
3469 pub fn get_fact_watermark(&self, user_id: &str) -> Option<i64> {
3474 let key = format!("_watermark:fact_extraction:{user_id}");
3475 match self.db.get(key.as_bytes()) {
3476 Ok(Some(bytes)) if bytes.len() == 8 => {
3477 Some(i64::from_le_bytes(bytes[..8].try_into().unwrap()))
3478 }
3479 _ => None,
3480 }
3481 }
3482
3483 pub fn set_fact_watermark(&self, user_id: &str, timestamp_millis: i64) {
3487 let key = format!("_watermark:fact_extraction:{user_id}");
3488 let mut write_opts = WriteOptions::default();
3489 write_opts.set_sync(self.write_mode == WriteMode::Sync);
3490 if let Err(e) =
3491 self.db
3492 .put_opt(key.as_bytes(), ×tamp_millis.to_le_bytes(), &write_opts)
3493 {
3494 tracing::warn!("Failed to persist fact extraction watermark: {e}");
3495 }
3496 }
3497
3498 pub fn clear_all_interference_records(&self) -> Result<usize> {
3502 let prefix = b"interference";
3503 let mut batch = WriteBatch::default();
3504 let mut count = 0;
3505
3506 let iter = self
3507 .db
3508 .iterator(IteratorMode::From(prefix, rocksdb::Direction::Forward));
3509
3510 for item in iter.log_errors() {
3511 let (key, _) = item;
3512 let key_str = String::from_utf8_lossy(&key);
3513 if !key_str.starts_with("interference") {
3514 break;
3515 }
3516 batch.delete(&key);
3517 count += 1;
3518 }
3519
3520 if count > 0 {
3521 let mut write_opts = WriteOptions::default();
3522 write_opts.set_sync(self.write_mode == WriteMode::Sync);
3523 self.db.write_opt(batch, &write_opts)?;
3524 }
3525
3526 Ok(count)
3527 }
3528}
3529
3530#[cfg(test)]
3531mod tests {
3532 use super::*;
3533 use serde::Serialize;
3534
3535 #[derive(Serialize)]
3536 struct LegacyMinimalFixture {
3537 id: MemoryId,
3538 content: String,
3539 }
3540
3541 fn sample_memory(id: MemoryId, content: &str) -> Memory {
3542 let now = Utc::now();
3543 let experience = Experience {
3544 experience_type: ExperienceType::Observation,
3545 content: content.to_string(),
3546 ..Default::default()
3547 };
3548 Memory::from_legacy(
3549 id,
3550 experience,
3551 0.5,
3552 0,
3553 now,
3554 now,
3555 false,
3556 MemoryTier::LongTerm,
3557 Vec::new(),
3558 1.0,
3559 None,
3560 None,
3561 None,
3562 None,
3563 0.0,
3564 None,
3565 None,
3566 1,
3567 Vec::new(),
3568 Vec::new(),
3569 )
3570 }
3571
3572 #[test]
3573 fn test_deserialize_with_fallback_records_current_bincode2_branch() {
3574 let id = MemoryId(uuid::Uuid::new_v4());
3575 let memory = sample_memory(id.clone(), "current format memory");
3576 let bytes = bincode::serde::encode_to_vec(&memory, bincode::config::standard()).unwrap();
3577
3578 let counter =
3579 crate::metrics::LEGACY_FALLBACK_BRANCH_TOTAL.with_label_values(&["bincode2_memory"]);
3580 let before = counter.get();
3581
3582 let (decoded, is_legacy) = deserialize_with_fallback(&bytes).unwrap();
3583 let after = counter.get();
3584
3585 assert_eq!(decoded.id, id);
3586 assert!(!is_legacy);
3587 assert_eq!(after, before);
3589 }
3590
3591 #[test]
3592 fn test_deserialize_with_fallback_bincode1_minimal_fixture() {
3593 let id = MemoryId(uuid::Uuid::new_v4());
3594 let fixture = LegacyMinimalFixture {
3595 id: id.clone(),
3596 content: "legacy bincode1 minimal".to_string(),
3597 };
3598 let bytes = bincode1::serialize(&fixture).unwrap();
3599
3600 let counter =
3601 crate::metrics::LEGACY_FALLBACK_BRANCH_TOTAL.with_label_values(&["bincode1_minimal"]);
3602 let before = counter.get();
3603
3604 let (decoded, is_legacy) = deserialize_with_fallback(&bytes).unwrap();
3605 let after = counter.get();
3606
3607 assert_eq!(decoded.id, id);
3608 assert!(is_legacy);
3609 assert_eq!(after, before + 1);
3610 }
3611
3612 #[test]
3613 fn test_deserialize_with_fallback_msgpack_minimal_fixture() {
3614 let id = MemoryId(uuid::Uuid::new_v4());
3615 let fixture = LegacyMinimalFixture {
3616 id: id.clone(),
3617 content: "legacy msgpack minimal".to_string(),
3618 };
3619 let bytes = rmp_serde::to_vec(&fixture).unwrap();
3620
3621 let counter =
3622 crate::metrics::LEGACY_FALLBACK_BRANCH_TOTAL.with_label_values(&["msgpack_minimal"]);
3623 let before = counter.get();
3624
3625 let (decoded, is_legacy) = deserialize_with_fallback(&bytes).unwrap();
3626 let after = counter.get();
3627
3628 assert_eq!(decoded.id, id);
3629 assert!(is_legacy);
3630 assert_eq!(after, before + 1);
3631 }
3632
3633 #[test]
3634 fn test_write_mode_default_async() {
3635 std::env::remove_var("SHODH_WRITE_MODE");
3636 let mode = WriteMode::default();
3637 assert_eq!(mode, WriteMode::Async);
3638 }
3639
3640 #[test]
3641 fn test_crc32_simple() {
3642 let data = b"test data for CRC32";
3643 let crc1 = crc32_simple(data);
3644 let crc2 = crc32_simple(data);
3645
3646 assert_eq!(crc1, crc2);
3647 assert_ne!(crc1, 0);
3648
3649 let crc3 = crc32_simple(b"different data");
3650 assert_ne!(crc1, crc3);
3651 }
3652
3653 #[test]
3654 fn test_crc32_empty() {
3655 let crc = crc32_simple(b"");
3656 assert_eq!(
3657 crc, 0,
3658 "IEEE CRC32 of empty input is 0 (init 0xFFFFFFFF XOR final 0xFFFFFFFF)"
3659 );
3660 }
3661
3662 #[test]
3663 fn test_modality_dimension() {
3664 assert_eq!(Modality::Text.dimension(), 384);
3665 assert_eq!(Modality::Image.dimension(), 1024);
3667 assert_eq!(Modality::Audio.dimension(), 1024);
3668 assert_eq!(Modality::Video.dimension(), 1024);
3669 assert_eq!(Modality::Unified.dimension(), 1024);
3670 }
3671
3672 #[test]
3673 fn test_modality_as_str() {
3674 assert_eq!(Modality::Text.as_str(), "text");
3675 assert_eq!(Modality::Image.as_str(), "image");
3676 assert_eq!(Modality::Audio.as_str(), "audio");
3677 assert_eq!(Modality::Video.as_str(), "video");
3678 }
3679
3680 #[test]
3681 fn test_vector_mapping_entry_with_text() {
3682 let entry = VectorMappingEntry::with_text(vec![1, 2, 3]);
3683
3684 assert_eq!(entry.text_vectors(), Some(&vec![1, 2, 3]));
3685 assert!(!entry.is_empty());
3686 }
3687
3688 #[test]
3689 fn test_vector_mapping_entry_multimodal() {
3690 let entry = VectorMappingEntry::with_text(vec![1])
3691 .with_image(vec![2])
3692 .with_audio(vec![3])
3693 .with_video(vec![4]);
3694
3695 let all = entry.all_vector_ids();
3696 assert_eq!(all.len(), 4);
3697
3698 assert!(all.contains(&(Modality::Text, 1)));
3699 assert!(all.contains(&(Modality::Image, 2)));
3700 assert!(all.contains(&(Modality::Audio, 3)));
3701 assert!(all.contains(&(Modality::Video, 4)));
3702 }
3703
3704 #[test]
3705 fn test_vector_mapping_entry_empty() {
3706 let entry = VectorMappingEntry::default();
3707
3708 assert!(entry.is_empty());
3709 assert!(entry.text_vectors().is_none());
3710 assert!(entry.all_vector_ids().is_empty());
3711 }
3712
3713 #[test]
3714 fn test_vector_mapping_entry_add_modality() {
3715 let mut entry = VectorMappingEntry::default();
3716 entry.add_modality(Modality::Text, vec![1, 2]);
3717
3718 assert_eq!(entry.text_vectors(), Some(&vec![1, 2]));
3719 }
3720
3721 #[test]
3722 fn test_storage_stats_default() {
3723 let stats = StorageStats::default();
3724
3725 assert_eq!(stats.total_count, 0);
3726 assert_eq!(stats.compressed_count, 0);
3727 assert_eq!(stats.total_size_bytes, 0);
3728 assert_eq!(stats.total_retrievals, 0);
3729 }
3730
3731 #[test]
3732 fn test_search_criteria_variants() {
3733 let criteria1 = SearchCriteria::ByEntity("test".to_string());
3734 let criteria2 = SearchCriteria::ByImportance { min: 0.5, max: 1.0 };
3735 let criteria3 = SearchCriteria::ByType(ExperienceType::Observation);
3736
3737 assert!(matches!(criteria1, SearchCriteria::ByEntity(_)));
3738 assert!(matches!(criteria2, SearchCriteria::ByImportance { .. }));
3739 assert!(matches!(criteria3, SearchCriteria::ByType(_)));
3740 }
3741
3742 #[test]
3743 fn test_search_criteria_by_date() {
3744 let now = Utc::now();
3745 let start = now - chrono::Duration::days(7);
3746 let criteria = SearchCriteria::ByDate { start, end: now };
3747
3748 if let SearchCriteria::ByDate { start: s, end: e } = criteria {
3749 assert!(s < e);
3750 } else {
3751 panic!("Expected ByDate");
3752 }
3753 }
3754
3755 #[test]
3756 fn test_search_criteria_combined() {
3757 let criteria = SearchCriteria::Combined(vec![
3758 SearchCriteria::ByEntity("test".to_string()),
3759 SearchCriteria::ByImportance { min: 0.5, max: 1.0 },
3760 ]);
3761
3762 if let SearchCriteria::Combined(inner) = criteria {
3763 assert_eq!(inner.len(), 2);
3764 } else {
3765 panic!("Expected Combined");
3766 }
3767 }
3768
3769 #[test]
3770 fn test_modality_vectors_struct() {
3771 let mv = ModalityVectors {
3772 vector_ids: vec![1, 2, 3],
3773 dimension: 384,
3774 chunk_ranges: None,
3775 };
3776
3777 assert_eq!(mv.vector_ids.len(), 3);
3778 assert_eq!(mv.dimension, 384);
3779 assert!(mv.chunk_ranges.is_none());
3780 }
3781}