Skip to main content

agentic_memory/v3/
engine.rs

1//! The V3 Memory Engine: Immortal Architecture.
2//! Integrates log + storage + indexes + retrieval into a single API.
3
4use super::block::*;
5use super::edge_cases::{self, NormalizedContent, RecoveryMarker};
6use super::immortal_log::*;
7use super::indexes::*;
8use super::recovery::RecoveryManager;
9use super::retrieval::*;
10use super::tiered::*;
11use chrono::{DateTime, Utc};
12use serde::{Deserialize, Serialize};
13use std::path::PathBuf;
14use std::sync::{Arc, RwLock};
15
16/// The V3 Memory Engine
17pub struct MemoryEngineV3 {
18    /// The append-only immortal log (source of truth)
19    log: Arc<RwLock<ImmortalLog>>,
20    /// Tiered storage for fast access
21    storage: Arc<RwLock<TieredStorage>>,
22    /// The five indexes
23    temporal_index: Arc<RwLock<temporal::TemporalIndex>>,
24    semantic_index: Arc<RwLock<semantic::SemanticIndex>>,
25    causal_index: Arc<RwLock<causal::CausalIndex>>,
26    entity_index: Arc<RwLock<entity::EntityIndex>>,
27    procedural_index: Arc<RwLock<procedural::ProceduralIndex>>,
28    /// Smart retrieval engine
29    retrieval: Arc<SmartRetrievalEngine>,
30    /// Current session ID
31    session_id: String,
32    /// Configuration
33    #[allow(dead_code)]
34    config: EngineConfig,
35}
36
37/// Engine configuration
38#[derive(Clone, Debug)]
39pub struct EngineConfig {
40    /// Data directory
41    pub data_dir: PathBuf,
42    /// Embedding dimension (for semantic index)
43    pub embedding_dim: usize,
44    /// Tier configuration
45    pub tier_config: TierConfig,
46    /// Auto-checkpoint interval (blocks)
47    pub checkpoint_interval: u64,
48}
49
50impl Default for EngineConfig {
51    fn default() -> Self {
52        Self {
53            data_dir: PathBuf::from(".agentic/memory"),
54            embedding_dim: 384,
55            tier_config: TierConfig::default(),
56            checkpoint_interval: 100,
57        }
58    }
59}
60
61impl MemoryEngineV3 {
62    /// Create or open memory engine
63    pub fn open(config: EngineConfig) -> Result<Self, std::io::Error> {
64        std::fs::create_dir_all(&config.data_dir)?;
65
66        let log_path = config.data_dir.join("immortal.log");
67        let log = ImmortalLog::open(log_path)?;
68
69        // Build indexes from log
70        let mut temporal = temporal::TemporalIndex::new();
71        let mut semantic = semantic::SemanticIndex::new(config.embedding_dim);
72        let mut causal = causal::CausalIndex::new();
73        let mut entity = entity::EntityIndex::new();
74        let mut procedural = procedural::ProceduralIndex::new();
75        let mut storage = TieredStorage::new(config.tier_config.clone());
76
77        for block in log.iter() {
78            temporal.index(&block);
79            semantic.index(&block);
80            causal.index(&block);
81            entity.index(&block);
82            procedural.index(&block);
83            storage.store(block);
84        }
85
86        Ok(Self {
87            log: Arc::new(RwLock::new(log)),
88            storage: Arc::new(RwLock::new(storage)),
89            temporal_index: Arc::new(RwLock::new(temporal)),
90            semantic_index: Arc::new(RwLock::new(semantic)),
91            causal_index: Arc::new(RwLock::new(causal)),
92            entity_index: Arc::new(RwLock::new(entity)),
93            procedural_index: Arc::new(RwLock::new(procedural)),
94            retrieval: Arc::new(SmartRetrievalEngine::new()),
95            session_id: uuid::Uuid::new_v4().to_string(),
96            config,
97        })
98    }
99
100    /// Open with crash recovery: check WAL, verify integrity, rebuild indexes if needed
101    pub fn open_with_recovery(config: EngineConfig) -> Result<Self, std::io::Error> {
102        std::fs::create_dir_all(&config.data_dir)?;
103
104        // Check recovery markers
105        let marker = RecoveryMarker::new(&config.data_dir);
106        if marker.needs_recovery() && !marker.recovery_completed() {
107            log::warn!("Previous recovery was interrupted — restarting recovery");
108        }
109
110        // Try WAL recovery first
111        if let Ok(recovery) = RecoveryManager::new(&config.data_dir) {
112            match recovery.recover() {
113                Ok(blocks) if !blocks.is_empty() => {
114                    marker.mark_in_progress();
115                    log::info!("Recovering {} blocks from WAL", blocks.len());
116
117                    let log_path = config.data_dir.join("immortal.log");
118                    let mut log = ImmortalLog::open(log_path)?;
119
120                    for block in &blocks {
121                        // Check if block already in log (idempotent)
122                        if log.get_by_hash(&block.hash).is_none() {
123                            // Re-append the block content
124                            let _ = log.append(block.block_type, block.content.clone());
125                        }
126                    }
127
128                    marker.mark_complete();
129                }
130                _ => {}
131            }
132        }
133
134        // Open normally and verify integrity
135        let engine = Self::open(config)?;
136        let report = engine.verify_integrity();
137
138        if !report.verified {
139            log::warn!(
140                "Integrity issues detected: {} corrupted, {} missing blocks",
141                report.corrupted_blocks.len(),
142                report.missing_blocks.len()
143            );
144            // Indexes were already rebuilt from log during open()
145            // The log is the source of truth
146        }
147
148        Ok(engine)
149    }
150
151    /// Rebuild all indexes from the immortal log (source of truth)
152    pub fn rebuild_all_indexes(&self) {
153        let log = self.log.read().unwrap();
154        let blocks: Vec<Block> = log.iter().collect();
155
156        // Rebuild each index
157        self.temporal_index
158            .write()
159            .unwrap()
160            .rebuild(blocks.iter().cloned());
161        self.semantic_index
162            .write()
163            .unwrap()
164            .rebuild(blocks.iter().cloned());
165        self.causal_index
166            .write()
167            .unwrap()
168            .rebuild(blocks.iter().cloned());
169        self.entity_index
170            .write()
171            .unwrap()
172            .rebuild(blocks.iter().cloned());
173        self.procedural_index
174            .write()
175            .unwrap()
176            .rebuild(blocks.iter().cloned());
177
178        // Rebuild tiered storage
179        let mut storage = self.storage.write().unwrap();
180        *storage = TieredStorage::new(self.config.tier_config.clone());
181        for block in blocks {
182            storage.store(block);
183        }
184
185        log::info!("All indexes rebuilt from log");
186    }
187
188    /// Verify index consistency against the log
189    pub fn verify_index_consistency(&self) -> edge_cases::IndexConsistencyReport {
190        let log = self.log.read().unwrap();
191        let temporal = self.temporal_index.read().unwrap();
192        let semantic = self.semantic_index.read().unwrap();
193
194        let mut report = edge_cases::IndexConsistencyReport {
195            total_blocks: log.len(),
196            ..Default::default()
197        };
198
199        for seq in 0..log.len() {
200            if let Some(block) = log.get(seq) {
201                // Check temporal index
202                let in_temporal = temporal
203                    .query_range(
204                        block.timestamp - chrono::Duration::seconds(1),
205                        block.timestamp + chrono::Duration::seconds(1),
206                    )
207                    .iter()
208                    .any(|r| r.block_sequence == seq);
209
210                if !in_temporal {
211                    report.missing_in_temporal.push(seq);
212                }
213
214                // Check semantic index (only for text content)
215                if block.extract_text().is_some() && semantic.len() < seq as usize + 1 {
216                    report.missing_in_semantic.push(seq);
217                }
218            }
219        }
220
221        report.consistent = report.missing_in_temporal.is_empty()
222            && report.missing_in_semantic.is_empty()
223            && report.missing_in_entity.is_empty();
224
225        report
226    }
227
228    /// Rebuild indexes if inconsistencies are detected
229    pub fn rebuild_indexes_if_needed(&self) -> bool {
230        let report = self.verify_index_consistency();
231        if !report.consistent {
232            log::warn!("Index inconsistency detected, rebuilding...");
233            self.rebuild_all_indexes();
234            true
235        } else {
236            false
237        }
238    }
239
240    // ═══════════════════════════════════════════════════════════════════
241    // CAPTURE API
242    // ═══════════════════════════════════════════════════════════════════
243
244    /// Capture a user message (with content validation)
245    pub fn capture_user_message(
246        &self,
247        text: &str,
248        tokens: Option<u32>,
249    ) -> Result<BlockHash, std::io::Error> {
250        let validated = match edge_cases::normalize_content(text) {
251            NormalizedContent::Empty => {
252                return Err(std::io::Error::new(
253                    std::io::ErrorKind::InvalidInput,
254                    "Cannot capture empty message",
255                ));
256            }
257            NormalizedContent::WhitespaceOnly => {
258                log::warn!("Captured whitespace-only user message");
259                text.to_string()
260            }
261            NormalizedContent::Valid(v) => v,
262        };
263
264        self.append_block(
265            BlockType::UserMessage,
266            BlockContent::Text {
267                text: validated,
268                role: Some("user".to_string()),
269                tokens,
270            },
271        )
272    }
273
274    /// Capture an assistant message (with content validation)
275    pub fn capture_assistant_message(
276        &self,
277        text: &str,
278        tokens: Option<u32>,
279    ) -> Result<BlockHash, std::io::Error> {
280        let validated = match edge_cases::normalize_content(text) {
281            NormalizedContent::Empty => {
282                return Err(std::io::Error::new(
283                    std::io::ErrorKind::InvalidInput,
284                    "Cannot capture empty message",
285                ));
286            }
287            NormalizedContent::WhitespaceOnly => {
288                log::warn!("Captured whitespace-only assistant message");
289                text.to_string()
290            }
291            NormalizedContent::Valid(v) => v,
292        };
293
294        self.append_block(
295            BlockType::AssistantMessage,
296            BlockContent::Text {
297                text: validated,
298                role: Some("assistant".to_string()),
299                tokens,
300            },
301        )
302    }
303
304    /// Capture a tool call
305    pub fn capture_tool_call(
306        &self,
307        tool_name: &str,
308        input: serde_json::Value,
309        output: Option<serde_json::Value>,
310        duration_ms: Option<u64>,
311        success: bool,
312    ) -> Result<BlockHash, std::io::Error> {
313        self.append_block(
314            BlockType::ToolCall,
315            BlockContent::Tool {
316                tool_name: tool_name.to_string(),
317                input,
318                output,
319                duration_ms,
320                success,
321            },
322        )
323    }
324
325    /// Capture a file operation
326    pub fn capture_file_operation(
327        &self,
328        path: &str,
329        operation: FileOperation,
330        content_hash: Option<BlockHash>,
331        lines: Option<u32>,
332        diff: Option<String>,
333    ) -> Result<BlockHash, std::io::Error> {
334        self.append_block(
335            BlockType::FileOperation,
336            BlockContent::File {
337                path: path.to_string(),
338                operation,
339                content_hash,
340                lines,
341                diff,
342            },
343        )
344    }
345
346    /// Capture a decision
347    pub fn capture_decision(
348        &self,
349        decision: &str,
350        reasoning: Option<&str>,
351        evidence_blocks: Vec<BlockHash>,
352        confidence: Option<f32>,
353    ) -> Result<BlockHash, std::io::Error> {
354        self.append_block(
355            BlockType::Decision,
356            BlockContent::Decision {
357                decision: decision.to_string(),
358                reasoning: reasoning.map(String::from),
359                evidence_blocks,
360                confidence,
361            },
362        )
363    }
364
365    /// Capture an error
366    pub fn capture_error(
367        &self,
368        error_type: &str,
369        message: &str,
370        resolution: Option<&str>,
371        resolved: bool,
372    ) -> Result<BlockHash, std::io::Error> {
373        self.append_block(
374            BlockType::Error,
375            BlockContent::Error {
376                error_type: error_type.to_string(),
377                message: message.to_string(),
378                resolution: resolution.map(String::from),
379                resolved,
380            },
381        )
382    }
383
384    /// Capture a session boundary
385    pub fn capture_boundary(
386        &self,
387        boundary_type: BoundaryType,
388        context_tokens_before: u32,
389        context_tokens_after: u32,
390        summary: &str,
391        continuation_hint: Option<&str>,
392    ) -> Result<BlockHash, std::io::Error> {
393        self.append_block(
394            BlockType::SessionBoundary,
395            BlockContent::Boundary {
396                boundary_type,
397                context_tokens_before,
398                context_tokens_after,
399                summary: summary.to_string(),
400                continuation_hint: continuation_hint.map(String::from),
401            },
402        )
403    }
404
405    /// Capture a checkpoint
406    pub fn capture_checkpoint(
407        &self,
408        active_files: Vec<String>,
409        working_context: &str,
410        pending_tasks: Vec<String>,
411    ) -> Result<BlockHash, std::io::Error> {
412        self.append_block(
413            BlockType::Checkpoint,
414            BlockContent::Checkpoint {
415                active_files,
416                working_context: working_context.to_string(),
417                pending_tasks,
418            },
419        )
420    }
421
422    // ═══════════════════════════════════════════════════════════════════
423    // INTERNAL
424    // ═══════════════════════════════════════════════════════════════════
425
426    fn append_block(
427        &self,
428        block_type: BlockType,
429        content: BlockContent,
430    ) -> Result<BlockHash, std::io::Error> {
431        // Acquire write lock with poisoning recovery
432        let mut log = self
433            .log
434            .write()
435            .map_err(|e| std::io::Error::other(format!("Log lock poisoned: {}", e)))?;
436
437        let block = log.append(block_type, content)?;
438        let hash = block.hash;
439
440        // Update indexes (continue even if individual index fails)
441        if let Ok(mut idx) = self.temporal_index.write() {
442            idx.index(&block);
443        }
444        if let Ok(mut idx) = self.semantic_index.write() {
445            idx.index(&block);
446        }
447        if let Ok(mut idx) = self.causal_index.write() {
448            idx.index(&block);
449        }
450        if let Ok(mut idx) = self.entity_index.write() {
451            idx.index(&block);
452        }
453        if let Ok(mut idx) = self.procedural_index.write() {
454            idx.index(&block);
455        }
456
457        // Store in tiered storage
458        if let Ok(mut s) = self.storage.write() {
459            s.store(block);
460        }
461
462        Ok(hash)
463    }
464
465    // ═══════════════════════════════════════════════════════════════════
466    // RETRIEVAL API
467    // ═══════════════════════════════════════════════════════════════════
468
469    /// Smart retrieval: assemble perfect context for a query
470    pub fn retrieve(&self, request: RetrievalRequest) -> RetrievalResult {
471        self.retrieval.retrieve(
472            request,
473            &self.log.read().unwrap(),
474            &self.storage.read().unwrap(),
475            &self.temporal_index.read().unwrap(),
476            &self.semantic_index.read().unwrap(),
477            &self.causal_index.read().unwrap(),
478            &self.entity_index.read().unwrap(),
479            &self.procedural_index.read().unwrap(),
480        )
481    }
482
483    /// Resurrect: fully restore state at any timestamp
484    pub fn resurrect(&self, timestamp: DateTime<Utc>) -> ResurrectionResult {
485        let log = self.log.read().unwrap();
486        let storage = self.storage.read().unwrap();
487
488        let mut blocks = Vec::new();
489        for seq in 0..log.len() {
490            if let Some(block) = storage.get(seq) {
491                if block.timestamp <= timestamp {
492                    blocks.push(block);
493                }
494            }
495        }
496
497        let mut messages = Vec::new();
498        let mut files_state = std::collections::HashMap::new();
499        let mut decisions = Vec::new();
500        let mut last_checkpoint = None;
501
502        for block in &blocks {
503            match &block.content {
504                BlockContent::Text { text, role, .. } => {
505                    messages.push((role.clone().unwrap_or_default(), text.clone()));
506                }
507                BlockContent::File {
508                    path, operation, ..
509                } => match operation {
510                    FileOperation::Create | FileOperation::Update => {
511                        files_state.insert(path.clone(), true);
512                    }
513                    FileOperation::Delete => {
514                        files_state.insert(path.clone(), false);
515                    }
516                    _ => {}
517                },
518                BlockContent::Decision { decision, .. } => {
519                    decisions.push(decision.clone());
520                }
521                BlockContent::Checkpoint { .. } => {
522                    last_checkpoint = Some(block.clone());
523                }
524                _ => {}
525            }
526        }
527
528        ResurrectionResult {
529            timestamp,
530            block_count: blocks.len(),
531            messages,
532            files_state,
533            decisions,
534            last_checkpoint,
535        }
536    }
537
538    /// Search by time range
539    pub fn search_temporal(&self, start: DateTime<Utc>, end: DateTime<Utc>) -> Vec<Block> {
540        let temporal = self.temporal_index.read().unwrap();
541        let storage = self.storage.read().unwrap();
542
543        temporal
544            .query_range(start, end)
545            .into_iter()
546            .filter_map(|r| storage.get(r.block_sequence))
547            .collect()
548    }
549
550    /// Search by text/meaning
551    pub fn search_semantic(&self, query: &str, limit: usize) -> Vec<Block> {
552        let semantic = self.semantic_index.read().unwrap();
553        let storage = self.storage.read().unwrap();
554
555        semantic
556            .search_by_text(query, limit)
557            .into_iter()
558            .filter_map(|r| storage.get(r.block_sequence))
559            .collect()
560    }
561
562    /// Search by entity (file, person, etc.)
563    pub fn search_entity(&self, entity: &str) -> Vec<Block> {
564        let entity_idx = self.entity_index.read().unwrap();
565        let storage = self.storage.read().unwrap();
566
567        entity_idx
568            .query_entity(entity)
569            .into_iter()
570            .filter_map(|r| storage.get(r.block_sequence))
571            .collect()
572    }
573
574    /// Get decision chain
575    pub fn get_decision_chain(&self, block_sequence: u64) -> Vec<Block> {
576        let causal = self.causal_index.read().unwrap();
577        let storage = self.storage.read().unwrap();
578
579        causal
580            .get_decision_chain(block_sequence)
581            .into_iter()
582            .filter_map(|r| storage.get(r.block_sequence))
583            .collect()
584    }
585
586    /// Get current session blocks
587    pub fn get_current_session(&self) -> Vec<Block> {
588        let procedural = self.procedural_index.read().unwrap();
589        let storage = self.storage.read().unwrap();
590
591        procedural
592            .get_current_session()
593            .into_iter()
594            .filter_map(|r| storage.get(r.block_sequence))
595            .collect()
596    }
597
598    /// Verify integrity
599    pub fn verify_integrity(&self) -> IntegrityReport {
600        self.log.read().unwrap().verify_integrity()
601    }
602
603    /// Get statistics
604    pub fn stats(&self) -> EngineStats {
605        let log = self.log.read().unwrap();
606        let tier_stats = self.storage.read().unwrap().stats();
607
608        EngineStats {
609            total_blocks: log.len(),
610            tier_stats,
611            session_id: self.session_id.clone(),
612        }
613    }
614
615    /// Session resume: get everything needed to continue
616    pub fn session_resume(&self) -> SessionResumeResult {
617        let procedural = self.procedural_index.read().unwrap();
618        let storage = self.storage.read().unwrap();
619        let entity_idx = self.entity_index.read().unwrap();
620
621        let recent = procedural.get_recent_steps(50);
622        let recent_blocks: Vec<Block> = recent
623            .into_iter()
624            .filter_map(|r| storage.get(r.block_sequence))
625            .collect();
626
627        let mut messages = Vec::new();
628        let mut files_touched = Vec::new();
629        let mut decisions = Vec::new();
630        let mut errors_resolved = Vec::new();
631
632        for block in &recent_blocks {
633            match &block.content {
634                BlockContent::Text { text, role, .. } => {
635                    let preview = if text.len() > 200 {
636                        format!("{}...", &text[..200])
637                    } else {
638                        text.clone()
639                    };
640                    messages.push((role.clone().unwrap_or_default(), preview));
641                }
642                BlockContent::File {
643                    path, operation, ..
644                } => {
645                    files_touched.push((path.clone(), format!("{:?}", operation)));
646                }
647                BlockContent::Decision { decision, .. } => {
648                    decisions.push(decision.clone());
649                }
650                BlockContent::Error {
651                    error_type,
652                    message,
653                    resolution,
654                    resolved,
655                } => {
656                    if *resolved {
657                        errors_resolved.push((
658                            format!("{}: {}", error_type, message),
659                            resolution.clone().unwrap_or_default(),
660                        ));
661                    }
662                }
663                _ => {}
664            }
665        }
666
667        let all_files = entity_idx.get_all_files();
668
669        SessionResumeResult {
670            session_id: self.session_id.clone(),
671            block_count: recent_blocks.len(),
672            recent_messages: messages,
673            files_touched,
674            decisions,
675            errors_resolved,
676            all_known_files: all_files,
677        }
678    }
679}
680
681/// Result of resurrect operation
682#[derive(Debug, Clone, Serialize, Deserialize)]
683pub struct ResurrectionResult {
684    pub timestamp: DateTime<Utc>,
685    pub block_count: usize,
686    pub messages: Vec<(String, String)>,
687    pub files_state: std::collections::HashMap<String, bool>,
688    pub decisions: Vec<String>,
689    pub last_checkpoint: Option<Block>,
690}
691
692/// Result of session resume
693#[derive(Debug, Clone, Serialize, Deserialize)]
694pub struct SessionResumeResult {
695    pub session_id: String,
696    pub block_count: usize,
697    pub recent_messages: Vec<(String, String)>,
698    pub files_touched: Vec<(String, String)>,
699    pub decisions: Vec<String>,
700    pub errors_resolved: Vec<(String, String)>,
701    pub all_known_files: Vec<String>,
702}
703
704/// Engine statistics
705#[derive(Debug, Clone, Serialize, Deserialize)]
706pub struct EngineStats {
707    pub total_blocks: u64,
708    pub tier_stats: TierStats,
709    pub session_id: String,
710}