1use super::block::*;
5use super::edge_cases::{self, NormalizedContent, RecoveryMarker};
6use super::immortal_log::*;
7use super::indexes::*;
8use super::recovery::RecoveryManager;
9use super::retrieval::*;
10use super::tiered::*;
11use chrono::{DateTime, Utc};
12use serde::{Deserialize, Serialize};
13use std::path::PathBuf;
14use std::sync::{Arc, RwLock};
15
16pub struct MemoryEngineV3 {
18 log: Arc<RwLock<ImmortalLog>>,
20 storage: Arc<RwLock<TieredStorage>>,
22 temporal_index: Arc<RwLock<temporal::TemporalIndex>>,
24 semantic_index: Arc<RwLock<semantic::SemanticIndex>>,
25 causal_index: Arc<RwLock<causal::CausalIndex>>,
26 entity_index: Arc<RwLock<entity::EntityIndex>>,
27 procedural_index: Arc<RwLock<procedural::ProceduralIndex>>,
28 retrieval: Arc<SmartRetrievalEngine>,
30 session_id: String,
32 #[allow(dead_code)]
34 config: EngineConfig,
35}
36
37#[derive(Clone, Debug)]
39pub struct EngineConfig {
40 pub data_dir: PathBuf,
42 pub embedding_dim: usize,
44 pub tier_config: TierConfig,
46 pub checkpoint_interval: u64,
48}
49
50impl Default for EngineConfig {
51 fn default() -> Self {
52 Self {
53 data_dir: PathBuf::from(".agentic/memory"),
54 embedding_dim: 384,
55 tier_config: TierConfig::default(),
56 checkpoint_interval: 100,
57 }
58 }
59}
60
61impl MemoryEngineV3 {
62 pub fn open(config: EngineConfig) -> Result<Self, std::io::Error> {
64 std::fs::create_dir_all(&config.data_dir)?;
65
66 let log_path = config.data_dir.join("immortal.log");
67 let log = ImmortalLog::open(log_path)?;
68
69 let mut temporal = temporal::TemporalIndex::new();
71 let mut semantic = semantic::SemanticIndex::new(config.embedding_dim);
72 let mut causal = causal::CausalIndex::new();
73 let mut entity = entity::EntityIndex::new();
74 let mut procedural = procedural::ProceduralIndex::new();
75 let mut storage = TieredStorage::new(config.tier_config.clone());
76
77 for block in log.iter() {
78 temporal.index(&block);
79 semantic.index(&block);
80 causal.index(&block);
81 entity.index(&block);
82 procedural.index(&block);
83 storage.store(block);
84 }
85
86 Ok(Self {
87 log: Arc::new(RwLock::new(log)),
88 storage: Arc::new(RwLock::new(storage)),
89 temporal_index: Arc::new(RwLock::new(temporal)),
90 semantic_index: Arc::new(RwLock::new(semantic)),
91 causal_index: Arc::new(RwLock::new(causal)),
92 entity_index: Arc::new(RwLock::new(entity)),
93 procedural_index: Arc::new(RwLock::new(procedural)),
94 retrieval: Arc::new(SmartRetrievalEngine::new()),
95 session_id: uuid::Uuid::new_v4().to_string(),
96 config,
97 })
98 }
99
100 pub fn open_with_recovery(config: EngineConfig) -> Result<Self, std::io::Error> {
102 std::fs::create_dir_all(&config.data_dir)?;
103
104 let marker = RecoveryMarker::new(&config.data_dir);
106 if marker.needs_recovery() && !marker.recovery_completed() {
107 log::warn!("Previous recovery was interrupted — restarting recovery");
108 }
109
110 if let Ok(recovery) = RecoveryManager::new(&config.data_dir) {
112 match recovery.recover() {
113 Ok(blocks) if !blocks.is_empty() => {
114 marker.mark_in_progress();
115 log::info!("Recovering {} blocks from WAL", blocks.len());
116
117 let log_path = config.data_dir.join("immortal.log");
118 let mut log = ImmortalLog::open(log_path)?;
119
120 for block in &blocks {
121 if log.get_by_hash(&block.hash).is_none() {
123 let _ = log.append(block.block_type, block.content.clone());
125 }
126 }
127
128 marker.mark_complete();
129 }
130 _ => {}
131 }
132 }
133
134 let engine = Self::open(config)?;
136 let report = engine.verify_integrity();
137
138 if !report.verified {
139 log::warn!(
140 "Integrity issues detected: {} corrupted, {} missing blocks",
141 report.corrupted_blocks.len(),
142 report.missing_blocks.len()
143 );
144 }
147
148 Ok(engine)
149 }
150
151 pub fn rebuild_all_indexes(&self) {
153 let log = self.log.read().unwrap();
154 let blocks: Vec<Block> = log.iter().collect();
155
156 self.temporal_index
158 .write()
159 .unwrap()
160 .rebuild(blocks.iter().cloned());
161 self.semantic_index
162 .write()
163 .unwrap()
164 .rebuild(blocks.iter().cloned());
165 self.causal_index
166 .write()
167 .unwrap()
168 .rebuild(blocks.iter().cloned());
169 self.entity_index
170 .write()
171 .unwrap()
172 .rebuild(blocks.iter().cloned());
173 self.procedural_index
174 .write()
175 .unwrap()
176 .rebuild(blocks.iter().cloned());
177
178 let mut storage = self.storage.write().unwrap();
180 *storage = TieredStorage::new(self.config.tier_config.clone());
181 for block in blocks {
182 storage.store(block);
183 }
184
185 log::info!("All indexes rebuilt from log");
186 }
187
188 pub fn verify_index_consistency(&self) -> edge_cases::IndexConsistencyReport {
190 let log = self.log.read().unwrap();
191 let temporal = self.temporal_index.read().unwrap();
192 let semantic = self.semantic_index.read().unwrap();
193
194 let mut report = edge_cases::IndexConsistencyReport {
195 total_blocks: log.len(),
196 ..Default::default()
197 };
198
199 for seq in 0..log.len() {
200 if let Some(block) = log.get(seq) {
201 let in_temporal = temporal
203 .query_range(
204 block.timestamp - chrono::Duration::seconds(1),
205 block.timestamp + chrono::Duration::seconds(1),
206 )
207 .iter()
208 .any(|r| r.block_sequence == seq);
209
210 if !in_temporal {
211 report.missing_in_temporal.push(seq);
212 }
213
214 if block.extract_text().is_some() && semantic.len() < seq as usize + 1 {
216 report.missing_in_semantic.push(seq);
217 }
218 }
219 }
220
221 report.consistent = report.missing_in_temporal.is_empty()
222 && report.missing_in_semantic.is_empty()
223 && report.missing_in_entity.is_empty();
224
225 report
226 }
227
228 pub fn rebuild_indexes_if_needed(&self) -> bool {
230 let report = self.verify_index_consistency();
231 if !report.consistent {
232 log::warn!("Index inconsistency detected, rebuilding...");
233 self.rebuild_all_indexes();
234 true
235 } else {
236 false
237 }
238 }
239
240 pub fn capture_user_message(
246 &self,
247 text: &str,
248 tokens: Option<u32>,
249 ) -> Result<BlockHash, std::io::Error> {
250 let validated = match edge_cases::normalize_content(text) {
251 NormalizedContent::Empty => {
252 return Err(std::io::Error::new(
253 std::io::ErrorKind::InvalidInput,
254 "Cannot capture empty message",
255 ));
256 }
257 NormalizedContent::WhitespaceOnly => {
258 log::warn!("Captured whitespace-only user message");
259 text.to_string()
260 }
261 NormalizedContent::Valid(v) => v,
262 };
263
264 self.append_block(
265 BlockType::UserMessage,
266 BlockContent::Text {
267 text: validated,
268 role: Some("user".to_string()),
269 tokens,
270 },
271 )
272 }
273
274 pub fn capture_assistant_message(
276 &self,
277 text: &str,
278 tokens: Option<u32>,
279 ) -> Result<BlockHash, std::io::Error> {
280 let validated = match edge_cases::normalize_content(text) {
281 NormalizedContent::Empty => {
282 return Err(std::io::Error::new(
283 std::io::ErrorKind::InvalidInput,
284 "Cannot capture empty message",
285 ));
286 }
287 NormalizedContent::WhitespaceOnly => {
288 log::warn!("Captured whitespace-only assistant message");
289 text.to_string()
290 }
291 NormalizedContent::Valid(v) => v,
292 };
293
294 self.append_block(
295 BlockType::AssistantMessage,
296 BlockContent::Text {
297 text: validated,
298 role: Some("assistant".to_string()),
299 tokens,
300 },
301 )
302 }
303
304 pub fn capture_tool_call(
306 &self,
307 tool_name: &str,
308 input: serde_json::Value,
309 output: Option<serde_json::Value>,
310 duration_ms: Option<u64>,
311 success: bool,
312 ) -> Result<BlockHash, std::io::Error> {
313 self.append_block(
314 BlockType::ToolCall,
315 BlockContent::Tool {
316 tool_name: tool_name.to_string(),
317 input,
318 output,
319 duration_ms,
320 success,
321 },
322 )
323 }
324
325 pub fn capture_file_operation(
327 &self,
328 path: &str,
329 operation: FileOperation,
330 content_hash: Option<BlockHash>,
331 lines: Option<u32>,
332 diff: Option<String>,
333 ) -> Result<BlockHash, std::io::Error> {
334 self.append_block(
335 BlockType::FileOperation,
336 BlockContent::File {
337 path: path.to_string(),
338 operation,
339 content_hash,
340 lines,
341 diff,
342 },
343 )
344 }
345
346 pub fn capture_decision(
348 &self,
349 decision: &str,
350 reasoning: Option<&str>,
351 evidence_blocks: Vec<BlockHash>,
352 confidence: Option<f32>,
353 ) -> Result<BlockHash, std::io::Error> {
354 self.append_block(
355 BlockType::Decision,
356 BlockContent::Decision {
357 decision: decision.to_string(),
358 reasoning: reasoning.map(String::from),
359 evidence_blocks,
360 confidence,
361 },
362 )
363 }
364
365 pub fn capture_error(
367 &self,
368 error_type: &str,
369 message: &str,
370 resolution: Option<&str>,
371 resolved: bool,
372 ) -> Result<BlockHash, std::io::Error> {
373 self.append_block(
374 BlockType::Error,
375 BlockContent::Error {
376 error_type: error_type.to_string(),
377 message: message.to_string(),
378 resolution: resolution.map(String::from),
379 resolved,
380 },
381 )
382 }
383
384 pub fn capture_boundary(
386 &self,
387 boundary_type: BoundaryType,
388 context_tokens_before: u32,
389 context_tokens_after: u32,
390 summary: &str,
391 continuation_hint: Option<&str>,
392 ) -> Result<BlockHash, std::io::Error> {
393 self.append_block(
394 BlockType::SessionBoundary,
395 BlockContent::Boundary {
396 boundary_type,
397 context_tokens_before,
398 context_tokens_after,
399 summary: summary.to_string(),
400 continuation_hint: continuation_hint.map(String::from),
401 },
402 )
403 }
404
405 pub fn capture_checkpoint(
407 &self,
408 active_files: Vec<String>,
409 working_context: &str,
410 pending_tasks: Vec<String>,
411 ) -> Result<BlockHash, std::io::Error> {
412 self.append_block(
413 BlockType::Checkpoint,
414 BlockContent::Checkpoint {
415 active_files,
416 working_context: working_context.to_string(),
417 pending_tasks,
418 },
419 )
420 }
421
422 fn append_block(
427 &self,
428 block_type: BlockType,
429 content: BlockContent,
430 ) -> Result<BlockHash, std::io::Error> {
431 let mut log = self
433 .log
434 .write()
435 .map_err(|e| std::io::Error::other(format!("Log lock poisoned: {}", e)))?;
436
437 let block = log.append(block_type, content)?;
438 let hash = block.hash;
439
440 if let Ok(mut idx) = self.temporal_index.write() {
442 idx.index(&block);
443 }
444 if let Ok(mut idx) = self.semantic_index.write() {
445 idx.index(&block);
446 }
447 if let Ok(mut idx) = self.causal_index.write() {
448 idx.index(&block);
449 }
450 if let Ok(mut idx) = self.entity_index.write() {
451 idx.index(&block);
452 }
453 if let Ok(mut idx) = self.procedural_index.write() {
454 idx.index(&block);
455 }
456
457 if let Ok(mut s) = self.storage.write() {
459 s.store(block);
460 }
461
462 Ok(hash)
463 }
464
465 pub fn retrieve(&self, request: RetrievalRequest) -> RetrievalResult {
471 self.retrieval.retrieve(
472 request,
473 &self.log.read().unwrap(),
474 &self.storage.read().unwrap(),
475 &self.temporal_index.read().unwrap(),
476 &self.semantic_index.read().unwrap(),
477 &self.causal_index.read().unwrap(),
478 &self.entity_index.read().unwrap(),
479 &self.procedural_index.read().unwrap(),
480 )
481 }
482
483 pub fn resurrect(&self, timestamp: DateTime<Utc>) -> ResurrectionResult {
485 let log = self.log.read().unwrap();
486 let storage = self.storage.read().unwrap();
487
488 let mut blocks = Vec::new();
489 for seq in 0..log.len() {
490 if let Some(block) = storage.get(seq) {
491 if block.timestamp <= timestamp {
492 blocks.push(block);
493 }
494 }
495 }
496
497 let mut messages = Vec::new();
498 let mut files_state = std::collections::HashMap::new();
499 let mut decisions = Vec::new();
500 let mut last_checkpoint = None;
501
502 for block in &blocks {
503 match &block.content {
504 BlockContent::Text { text, role, .. } => {
505 messages.push((role.clone().unwrap_or_default(), text.clone()));
506 }
507 BlockContent::File {
508 path, operation, ..
509 } => match operation {
510 FileOperation::Create | FileOperation::Update => {
511 files_state.insert(path.clone(), true);
512 }
513 FileOperation::Delete => {
514 files_state.insert(path.clone(), false);
515 }
516 _ => {}
517 },
518 BlockContent::Decision { decision, .. } => {
519 decisions.push(decision.clone());
520 }
521 BlockContent::Checkpoint { .. } => {
522 last_checkpoint = Some(block.clone());
523 }
524 _ => {}
525 }
526 }
527
528 ResurrectionResult {
529 timestamp,
530 block_count: blocks.len(),
531 messages,
532 files_state,
533 decisions,
534 last_checkpoint,
535 }
536 }
537
538 pub fn search_temporal(&self, start: DateTime<Utc>, end: DateTime<Utc>) -> Vec<Block> {
540 let temporal = self.temporal_index.read().unwrap();
541 let storage = self.storage.read().unwrap();
542
543 temporal
544 .query_range(start, end)
545 .into_iter()
546 .filter_map(|r| storage.get(r.block_sequence))
547 .collect()
548 }
549
550 pub fn search_semantic(&self, query: &str, limit: usize) -> Vec<Block> {
552 let semantic = self.semantic_index.read().unwrap();
553 let storage = self.storage.read().unwrap();
554
555 semantic
556 .search_by_text(query, limit)
557 .into_iter()
558 .filter_map(|r| storage.get(r.block_sequence))
559 .collect()
560 }
561
562 pub fn search_entity(&self, entity: &str) -> Vec<Block> {
564 let entity_idx = self.entity_index.read().unwrap();
565 let storage = self.storage.read().unwrap();
566
567 entity_idx
568 .query_entity(entity)
569 .into_iter()
570 .filter_map(|r| storage.get(r.block_sequence))
571 .collect()
572 }
573
574 pub fn get_decision_chain(&self, block_sequence: u64) -> Vec<Block> {
576 let causal = self.causal_index.read().unwrap();
577 let storage = self.storage.read().unwrap();
578
579 causal
580 .get_decision_chain(block_sequence)
581 .into_iter()
582 .filter_map(|r| storage.get(r.block_sequence))
583 .collect()
584 }
585
586 pub fn get_current_session(&self) -> Vec<Block> {
588 let procedural = self.procedural_index.read().unwrap();
589 let storage = self.storage.read().unwrap();
590
591 procedural
592 .get_current_session()
593 .into_iter()
594 .filter_map(|r| storage.get(r.block_sequence))
595 .collect()
596 }
597
598 pub fn verify_integrity(&self) -> IntegrityReport {
600 self.log.read().unwrap().verify_integrity()
601 }
602
603 pub fn stats(&self) -> EngineStats {
605 let log = self.log.read().unwrap();
606 let tier_stats = self.storage.read().unwrap().stats();
607
608 EngineStats {
609 total_blocks: log.len(),
610 tier_stats,
611 session_id: self.session_id.clone(),
612 }
613 }
614
615 pub fn session_resume(&self) -> SessionResumeResult {
617 let procedural = self.procedural_index.read().unwrap();
618 let storage = self.storage.read().unwrap();
619 let entity_idx = self.entity_index.read().unwrap();
620
621 let recent = procedural.get_recent_steps(50);
622 let recent_blocks: Vec<Block> = recent
623 .into_iter()
624 .filter_map(|r| storage.get(r.block_sequence))
625 .collect();
626
627 let mut messages = Vec::new();
628 let mut files_touched = Vec::new();
629 let mut decisions = Vec::new();
630 let mut errors_resolved = Vec::new();
631
632 for block in &recent_blocks {
633 match &block.content {
634 BlockContent::Text { text, role, .. } => {
635 let preview = if text.len() > 200 {
636 format!("{}...", &text[..200])
637 } else {
638 text.clone()
639 };
640 messages.push((role.clone().unwrap_or_default(), preview));
641 }
642 BlockContent::File {
643 path, operation, ..
644 } => {
645 files_touched.push((path.clone(), format!("{:?}", operation)));
646 }
647 BlockContent::Decision { decision, .. } => {
648 decisions.push(decision.clone());
649 }
650 BlockContent::Error {
651 error_type,
652 message,
653 resolution,
654 resolved,
655 } => {
656 if *resolved {
657 errors_resolved.push((
658 format!("{}: {}", error_type, message),
659 resolution.clone().unwrap_or_default(),
660 ));
661 }
662 }
663 _ => {}
664 }
665 }
666
667 let all_files = entity_idx.get_all_files();
668
669 SessionResumeResult {
670 session_id: self.session_id.clone(),
671 block_count: recent_blocks.len(),
672 recent_messages: messages,
673 files_touched,
674 decisions,
675 errors_resolved,
676 all_known_files: all_files,
677 }
678 }
679}
680
681#[derive(Debug, Clone, Serialize, Deserialize)]
683pub struct ResurrectionResult {
684 pub timestamp: DateTime<Utc>,
685 pub block_count: usize,
686 pub messages: Vec<(String, String)>,
687 pub files_state: std::collections::HashMap<String, bool>,
688 pub decisions: Vec<String>,
689 pub last_checkpoint: Option<Block>,
690}
691
692#[derive(Debug, Clone, Serialize, Deserialize)]
694pub struct SessionResumeResult {
695 pub session_id: String,
696 pub block_count: usize,
697 pub recent_messages: Vec<(String, String)>,
698 pub files_touched: Vec<(String, String)>,
699 pub decisions: Vec<String>,
700 pub errors_resolved: Vec<(String, String)>,
701 pub all_known_files: Vec<String>,
702}
703
704#[derive(Debug, Clone, Serialize, Deserialize)]
706pub struct EngineStats {
707 pub total_blocks: u64,
708 pub tier_stats: TierStats,
709 pub session_id: String,
710}