1use super::block::*;
5use super::edge_cases::{self, NormalizedContent, RecoveryMarker};
6use super::immortal_log::*;
7use super::indexes::*;
8use super::recovery::RecoveryManager;
9use super::retrieval::*;
10use super::tiered::*;
11use chrono::{DateTime, Utc};
12use serde::{Deserialize, Serialize};
13use std::path::PathBuf;
14use std::sync::{Arc, RwLock};
15
16pub struct MemoryEngineV3 {
18 log: Arc<RwLock<ImmortalLog>>,
20 storage: Arc<RwLock<TieredStorage>>,
22 temporal_index: Arc<RwLock<temporal::TemporalIndex>>,
24 semantic_index: Arc<RwLock<semantic::SemanticIndex>>,
25 causal_index: Arc<RwLock<causal::CausalIndex>>,
26 entity_index: Arc<RwLock<entity::EntityIndex>>,
27 procedural_index: Arc<RwLock<procedural::ProceduralIndex>>,
28 retrieval: Arc<SmartRetrievalEngine>,
30 session_id: String,
32 #[allow(dead_code)]
34 config: EngineConfig,
35}
36
37#[derive(Clone, Debug)]
39pub struct EngineConfig {
40 pub data_dir: PathBuf,
42 pub embedding_dim: usize,
44 pub tier_config: TierConfig,
46 pub checkpoint_interval: u64,
48}
49
50impl Default for EngineConfig {
51 fn default() -> Self {
52 Self {
53 data_dir: PathBuf::from(".agentic/memory"),
54 embedding_dim: 384,
55 tier_config: TierConfig::default(),
56 checkpoint_interval: 100,
57 }
58 }
59}
60
61impl MemoryEngineV3 {
62 pub fn open(config: EngineConfig) -> Result<Self, std::io::Error> {
64 std::fs::create_dir_all(&config.data_dir)?;
65
66 let log_path = config.data_dir.join("immortal.log");
67 let log = ImmortalLog::open(log_path)?;
68
69 let mut temporal = temporal::TemporalIndex::new();
71 let mut semantic = semantic::SemanticIndex::new(config.embedding_dim);
72 let mut causal = causal::CausalIndex::new();
73 let mut entity = entity::EntityIndex::new();
74 let mut procedural = procedural::ProceduralIndex::new();
75 let mut storage = TieredStorage::new(config.tier_config.clone());
76
77 for block in log.iter() {
78 temporal.index(&block);
79 semantic.index(&block);
80 causal.index(&block);
81 entity.index(&block);
82 procedural.index(&block);
83 storage.store(block);
84 }
85
86 Ok(Self {
87 log: Arc::new(RwLock::new(log)),
88 storage: Arc::new(RwLock::new(storage)),
89 temporal_index: Arc::new(RwLock::new(temporal)),
90 semantic_index: Arc::new(RwLock::new(semantic)),
91 causal_index: Arc::new(RwLock::new(causal)),
92 entity_index: Arc::new(RwLock::new(entity)),
93 procedural_index: Arc::new(RwLock::new(procedural)),
94 retrieval: Arc::new(SmartRetrievalEngine::new()),
95 session_id: uuid::Uuid::new_v4().to_string(),
96 config,
97 })
98 }
99
100 pub fn open_with_recovery(config: EngineConfig) -> Result<Self, std::io::Error> {
102 std::fs::create_dir_all(&config.data_dir)?;
103
104 let marker = RecoveryMarker::new(&config.data_dir);
106 if marker.needs_recovery() && !marker.recovery_completed() {
107 log::warn!("Previous recovery was interrupted — restarting recovery");
108 }
109
110 if let Ok(recovery) = RecoveryManager::new(&config.data_dir) {
112 match recovery.recover() {
113 Ok(blocks) if !blocks.is_empty() => {
114 marker.mark_in_progress();
115 log::info!("Recovering {} blocks from WAL", blocks.len());
116
117 let log_path = config.data_dir.join("immortal.log");
118 let mut log = ImmortalLog::open(log_path)?;
119
120 for block in &blocks {
121 if log.get_by_hash(&block.hash).is_none() {
123 let _ = log.append(block.block_type, block.content.clone());
125 }
126 }
127
128 marker.mark_complete();
129 }
130 _ => {}
131 }
132 }
133
134 let engine = Self::open(config)?;
136 let report = engine.verify_integrity();
137
138 if !report.verified {
139 log::warn!(
140 "Integrity issues detected: {} corrupted, {} missing blocks",
141 report.corrupted_blocks.len(),
142 report.missing_blocks.len()
143 );
144 }
147
148 Ok(engine)
149 }
150
151 pub fn rebuild_all_indexes(&self) {
153 let log = self.log.read().unwrap();
154 let blocks: Vec<Block> = log.iter().collect();
155
156 self.temporal_index
158 .write()
159 .unwrap()
160 .rebuild(blocks.iter().cloned());
161 self.semantic_index
162 .write()
163 .unwrap()
164 .rebuild(blocks.iter().cloned());
165 self.causal_index
166 .write()
167 .unwrap()
168 .rebuild(blocks.iter().cloned());
169 self.entity_index
170 .write()
171 .unwrap()
172 .rebuild(blocks.iter().cloned());
173 self.procedural_index
174 .write()
175 .unwrap()
176 .rebuild(blocks.iter().cloned());
177
178 let mut storage = self.storage.write().unwrap();
180 *storage = TieredStorage::new(self.config.tier_config.clone());
181 for block in blocks {
182 storage.store(block);
183 }
184
185 log::info!("All indexes rebuilt from log");
186 }
187
188 pub fn verify_index_consistency(&self) -> edge_cases::IndexConsistencyReport {
190 let log = self.log.read().unwrap();
191 let temporal = self.temporal_index.read().unwrap();
192 let semantic = self.semantic_index.read().unwrap();
193
194 let mut report = edge_cases::IndexConsistencyReport {
195 total_blocks: log.len(),
196 ..Default::default()
197 };
198
199 for seq in 0..log.len() {
200 if let Some(block) = log.get(seq) {
201 let in_temporal = temporal
203 .query_range(
204 block.timestamp - chrono::Duration::seconds(1),
205 block.timestamp + chrono::Duration::seconds(1),
206 )
207 .iter()
208 .any(|r| r.block_sequence == seq);
209
210 if !in_temporal {
211 report.missing_in_temporal.push(seq);
212 }
213
214 if block.extract_text().is_some() && semantic.len() < seq as usize + 1 {
216 report.missing_in_semantic.push(seq);
217 }
218 }
219 }
220
221 report.consistent = report.missing_in_temporal.is_empty()
222 && report.missing_in_semantic.is_empty()
223 && report.missing_in_entity.is_empty();
224
225 report
226 }
227
228 pub fn rebuild_indexes_if_needed(&self) -> bool {
230 let report = self.verify_index_consistency();
231 if !report.consistent {
232 log::warn!("Index inconsistency detected, rebuilding...");
233 self.rebuild_all_indexes();
234 true
235 } else {
236 false
237 }
238 }
239
240 pub fn capture_user_message(
246 &self,
247 text: &str,
248 tokens: Option<u32>,
249 ) -> Result<BlockHash, std::io::Error> {
250 let validated = match edge_cases::normalize_content(text) {
251 NormalizedContent::Empty => {
252 return Err(std::io::Error::new(
253 std::io::ErrorKind::InvalidInput,
254 "Cannot capture empty message",
255 ));
256 }
257 NormalizedContent::WhitespaceOnly => {
258 log::warn!("Captured whitespace-only user message");
259 text.to_string()
260 }
261 NormalizedContent::Valid(v) => v,
262 };
263
264 self.append_block(
265 BlockType::UserMessage,
266 BlockContent::Text {
267 text: validated,
268 role: Some("user".to_string()),
269 tokens,
270 },
271 )
272 }
273
274 pub fn capture_assistant_message(
276 &self,
277 text: &str,
278 tokens: Option<u32>,
279 ) -> Result<BlockHash, std::io::Error> {
280 let validated = match edge_cases::normalize_content(text) {
281 NormalizedContent::Empty => {
282 return Err(std::io::Error::new(
283 std::io::ErrorKind::InvalidInput,
284 "Cannot capture empty message",
285 ));
286 }
287 NormalizedContent::WhitespaceOnly => {
288 log::warn!("Captured whitespace-only assistant message");
289 text.to_string()
290 }
291 NormalizedContent::Valid(v) => v,
292 };
293
294 self.append_block(
295 BlockType::AssistantMessage,
296 BlockContent::Text {
297 text: validated,
298 role: Some("assistant".to_string()),
299 tokens,
300 },
301 )
302 }
303
304 pub fn capture_tool_call(
306 &self,
307 tool_name: &str,
308 input: serde_json::Value,
309 output: Option<serde_json::Value>,
310 duration_ms: Option<u64>,
311 success: bool,
312 ) -> Result<BlockHash, std::io::Error> {
313 self.append_block(
314 BlockType::ToolCall,
315 BlockContent::Tool {
316 tool_name: tool_name.to_string(),
317 input,
318 output,
319 duration_ms,
320 success,
321 },
322 )
323 }
324
325 pub fn capture_file_operation(
327 &self,
328 path: &str,
329 operation: FileOperation,
330 content_hash: Option<BlockHash>,
331 lines: Option<u32>,
332 diff: Option<String>,
333 ) -> Result<BlockHash, std::io::Error> {
334 self.append_block(
335 BlockType::FileOperation,
336 BlockContent::File {
337 path: path.to_string(),
338 operation,
339 content_hash,
340 lines,
341 diff,
342 },
343 )
344 }
345
346 pub fn capture_decision(
348 &self,
349 decision: &str,
350 reasoning: Option<&str>,
351 evidence_blocks: Vec<BlockHash>,
352 confidence: Option<f32>,
353 ) -> Result<BlockHash, std::io::Error> {
354 self.append_block(
355 BlockType::Decision,
356 BlockContent::Decision {
357 decision: decision.to_string(),
358 reasoning: reasoning.map(String::from),
359 evidence_blocks,
360 confidence,
361 },
362 )
363 }
364
365 pub fn capture_error(
367 &self,
368 error_type: &str,
369 message: &str,
370 resolution: Option<&str>,
371 resolved: bool,
372 ) -> Result<BlockHash, std::io::Error> {
373 self.append_block(
374 BlockType::Error,
375 BlockContent::Error {
376 error_type: error_type.to_string(),
377 message: message.to_string(),
378 resolution: resolution.map(String::from),
379 resolved,
380 },
381 )
382 }
383
384 pub fn capture_boundary(
386 &self,
387 boundary_type: BoundaryType,
388 context_tokens_before: u32,
389 context_tokens_after: u32,
390 summary: &str,
391 continuation_hint: Option<&str>,
392 ) -> Result<BlockHash, std::io::Error> {
393 self.append_block(
394 BlockType::SessionBoundary,
395 BlockContent::Boundary {
396 boundary_type,
397 context_tokens_before,
398 context_tokens_after,
399 summary: summary.to_string(),
400 continuation_hint: continuation_hint.map(String::from),
401 },
402 )
403 }
404
405 pub fn capture_checkpoint(
407 &self,
408 active_files: Vec<String>,
409 working_context: &str,
410 pending_tasks: Vec<String>,
411 ) -> Result<BlockHash, std::io::Error> {
412 self.append_block(
413 BlockType::Checkpoint,
414 BlockContent::Checkpoint {
415 active_files,
416 working_context: working_context.to_string(),
417 pending_tasks,
418 },
419 )
420 }
421
422 fn append_block(
427 &self,
428 block_type: BlockType,
429 content: BlockContent,
430 ) -> Result<BlockHash, std::io::Error> {
431 let mut log = self
433 .log
434 .write()
435 .map_err(|e| std::io::Error::other(format!("Log lock poisoned: {}", e)))?;
436
437 let block = log.append(block_type, content)?;
438 let hash = block.hash;
439
440 if let Ok(mut idx) = self.temporal_index.write() {
442 idx.index(&block);
443 }
444 if let Ok(mut idx) = self.semantic_index.write() {
445 idx.index(&block);
446 }
447 if let Ok(mut idx) = self.causal_index.write() {
448 idx.index(&block);
449 }
450 if let Ok(mut idx) = self.entity_index.write() {
451 idx.index(&block);
452 }
453 if let Ok(mut idx) = self.procedural_index.write() {
454 idx.index(&block);
455 }
456
457 if let Ok(mut s) = self.storage.write() {
459 s.store(block);
460 }
461
462 Ok(hash)
463 }
464
465 pub fn retrieve(&self, request: RetrievalRequest) -> RetrievalResult {
471 self.retrieval.retrieve(
472 request,
473 &self.log.read().unwrap(),
474 &self.storage.read().unwrap(),
475 &self.temporal_index.read().unwrap(),
476 &self.semantic_index.read().unwrap(),
477 &self.causal_index.read().unwrap(),
478 &self.entity_index.read().unwrap(),
479 &self.procedural_index.read().unwrap(),
480 )
481 }
482
483 pub fn resurrect(&self, timestamp: DateTime<Utc>) -> ResurrectionResult {
485 let log = self.log.read().unwrap();
486 let storage = self.storage.read().unwrap();
487
488 let mut blocks = Vec::new();
489 for seq in 0..log.len() {
490 if let Some(block) = storage.get(seq) {
491 if block.timestamp <= timestamp {
492 blocks.push(block);
493 }
494 }
495 }
496
497 let mut messages = Vec::new();
498 let mut files_state = std::collections::HashMap::new();
499 let mut decisions = Vec::new();
500 let mut last_checkpoint = None;
501
502 for block in &blocks {
503 match &block.content {
504 BlockContent::Text { text, role, .. } => {
505 messages.push((role.clone().unwrap_or_default(), text.clone()));
506 }
507 BlockContent::File {
508 path, operation, ..
509 } => match operation {
510 FileOperation::Create | FileOperation::Update => {
511 files_state.insert(path.clone(), true);
512 }
513 FileOperation::Delete => {
514 files_state.insert(path.clone(), false);
515 }
516 _ => {}
517 },
518 BlockContent::Decision { decision, .. } => {
519 if !decisions.contains(decision) {
520 decisions.push(decision.clone());
521 }
522 }
523 BlockContent::Checkpoint { .. } => {
524 last_checkpoint = Some(block.clone());
525 }
526 _ => {}
527 }
528 }
529
530 ResurrectionResult {
531 timestamp,
532 block_count: blocks.len(),
533 messages,
534 files_state,
535 decisions,
536 last_checkpoint,
537 }
538 }
539
540 pub fn search_temporal(&self, start: DateTime<Utc>, end: DateTime<Utc>) -> Vec<Block> {
542 let temporal = self.temporal_index.read().unwrap();
543 let storage = self.storage.read().unwrap();
544
545 temporal
546 .query_range(start, end)
547 .into_iter()
548 .filter_map(|r| storage.get(r.block_sequence))
549 .collect()
550 }
551
552 pub fn search_semantic(&self, query: &str, limit: usize) -> Vec<Block> {
554 let semantic = self.semantic_index.read().unwrap();
555 let storage = self.storage.read().unwrap();
556
557 semantic
558 .search_by_text(query, limit)
559 .into_iter()
560 .filter_map(|r| storage.get(r.block_sequence))
561 .collect()
562 }
563
564 pub fn search_entity(&self, entity: &str) -> Vec<Block> {
566 let entity_idx = self.entity_index.read().unwrap();
567 let storage = self.storage.read().unwrap();
568
569 entity_idx
570 .query_entity(entity)
571 .into_iter()
572 .filter_map(|r| storage.get(r.block_sequence))
573 .collect()
574 }
575
576 pub fn get_decision_chain(&self, block_sequence: u64) -> Vec<Block> {
578 let causal = self.causal_index.read().unwrap();
579 let storage = self.storage.read().unwrap();
580
581 causal
582 .get_decision_chain(block_sequence)
583 .into_iter()
584 .filter_map(|r| storage.get(r.block_sequence))
585 .collect()
586 }
587
588 pub fn get_current_session(&self) -> Vec<Block> {
590 let procedural = self.procedural_index.read().unwrap();
591 let storage = self.storage.read().unwrap();
592
593 procedural
594 .get_current_session()
595 .into_iter()
596 .filter_map(|r| storage.get(r.block_sequence))
597 .collect()
598 }
599
600 pub fn verify_integrity(&self) -> IntegrityReport {
602 self.log.read().unwrap().verify_integrity()
603 }
604
605 pub fn stats(&self) -> EngineStats {
607 let log = self.log.read().unwrap();
608 let tier_stats = self.storage.read().unwrap().stats();
609
610 EngineStats {
611 total_blocks: log.len(),
612 tier_stats,
613 session_id: self.session_id.clone(),
614 }
615 }
616
617 pub fn session_resume(&self) -> SessionResumeResult {
619 let procedural = self.procedural_index.read().unwrap();
620 let storage = self.storage.read().unwrap();
621 let entity_idx = self.entity_index.read().unwrap();
622
623 let recent = procedural.get_recent_steps(50);
624 let recent_blocks: Vec<Block> = recent
625 .into_iter()
626 .filter_map(|r| storage.get(r.block_sequence))
627 .collect();
628
629 let mut messages = Vec::new();
630 let mut files_touched = Vec::new();
631 let mut decisions = Vec::new();
632 let mut errors_resolved = Vec::new();
633
634 for block in &recent_blocks {
635 match &block.content {
636 BlockContent::Text { text, role, .. } => {
637 let preview = if text.len() > 200 {
638 format!("{}...", &text[..200])
639 } else {
640 text.clone()
641 };
642 messages.push((role.clone().unwrap_or_default(), preview));
643 }
644 BlockContent::File {
645 path, operation, ..
646 } => {
647 files_touched.push((path.clone(), format!("{:?}", operation)));
648 }
649 BlockContent::Decision { decision, .. } => {
650 if !decisions.contains(decision) {
651 decisions.push(decision.clone());
652 }
653 }
654 BlockContent::Error {
655 error_type,
656 message,
657 resolution,
658 resolved,
659 } => {
660 if *resolved {
661 errors_resolved.push((
662 format!("{}: {}", error_type, message),
663 resolution.clone().unwrap_or_default(),
664 ));
665 }
666 }
667 _ => {}
668 }
669 }
670
671 let all_files = entity_idx.get_all_files();
672
673 SessionResumeResult {
674 session_id: self.session_id.clone(),
675 block_count: recent_blocks.len(),
676 recent_messages: messages,
677 files_touched,
678 decisions,
679 errors_resolved,
680 all_known_files: all_files,
681 }
682 }
683}
684
685#[derive(Debug, Clone, Serialize, Deserialize)]
687pub struct ResurrectionResult {
688 pub timestamp: DateTime<Utc>,
689 pub block_count: usize,
690 pub messages: Vec<(String, String)>,
691 pub files_state: std::collections::HashMap<String, bool>,
692 pub decisions: Vec<String>,
693 pub last_checkpoint: Option<Block>,
694}
695
696#[derive(Debug, Clone, Serialize, Deserialize)]
698pub struct SessionResumeResult {
699 pub session_id: String,
700 pub block_count: usize,
701 pub recent_messages: Vec<(String, String)>,
702 pub files_touched: Vec<(String, String)>,
703 pub decisions: Vec<String>,
704 pub errors_resolved: Vec<(String, String)>,
705 pub all_known_files: Vec<String>,
706}
707
708#[derive(Debug, Clone, Serialize, Deserialize)]
710pub struct EngineStats {
711 pub total_blocks: u64,
712 pub tier_stats: TierStats,
713 pub session_id: String,
714}