Skip to main content

agent_code_lib/memory/
consolidation.rs

1//! Memory consolidation ("dreaming").
2//!
3//! Background process that reviews memory files and consolidates
4//! them: merging duplicates, resolving contradictions, converting
5//! relative dates to absolute, pruning stale entries, and keeping
6//! the index under limits.
7//!
8//! Uses a lock file to prevent concurrent consolidation across
9//! multiple agent sessions.
10
11use std::path::{Path, PathBuf};
12use std::sync::Arc;
13use std::time::{Duration, SystemTime};
14
15use tracing::{info, warn};
16
17/// Minimum hours between consolidation runs.
18const MIN_HOURS_BETWEEN_RUNS: u64 = 24;
19
20/// Lock file name within the memory directory.
21const LOCK_FILE: &str = ".consolidate-lock";
22
23/// Check if consolidation should run.
24pub fn should_consolidate(memory_dir: &Path) -> bool {
25    let lock_path = memory_dir.join(LOCK_FILE);
26
27    // If lock doesn't exist, we've never consolidated.
28    let modified = match std::fs::metadata(&lock_path)
29        .ok()
30        .and_then(|m| m.modified().ok())
31    {
32        Some(t) => t,
33        None => return true, // Never run before.
34    };
35
36    let elapsed = SystemTime::now()
37        .duration_since(modified)
38        .unwrap_or(Duration::ZERO);
39
40    elapsed.as_secs() >= MIN_HOURS_BETWEEN_RUNS * 3600
41}
42
43/// Try to acquire the consolidation lock.
44/// Returns the lock path if acquired, None if another process holds it.
45pub fn try_acquire_lock(memory_dir: &Path) -> Option<PathBuf> {
46    let lock_path = memory_dir.join(LOCK_FILE);
47
48    // Check for existing lock.
49    if lock_path.exists()
50        && let Ok(content) = std::fs::read_to_string(&lock_path)
51    {
52        let pid_str = content.trim();
53        if let Ok(pid) = pid_str.parse::<u32>() {
54            // Check if the holding process is still alive.
55            if is_process_alive(pid) {
56                // Check if lock is stale (> 1 hour).
57                if let Ok(meta) = std::fs::metadata(&lock_path)
58                    && let Ok(modified) = meta.modified()
59                {
60                    let age = SystemTime::now()
61                        .duration_since(modified)
62                        .unwrap_or(Duration::ZERO);
63                    if age.as_secs() < 3600 {
64                        return None; // Lock is fresh and holder is alive.
65                    }
66                }
67            }
68            // Process is dead or lock is stale — reclaim.
69        }
70    }
71
72    // Write our PID to the lock file.
73    let pid = std::process::id();
74    if std::fs::write(&lock_path, pid.to_string()).is_err() {
75        return None;
76    }
77
78    // Verify we actually hold the lock (race protection).
79    if let Ok(content) = std::fs::read_to_string(&lock_path)
80        && content.trim() == pid.to_string()
81    {
82        return Some(lock_path);
83    }
84
85    None // Lost the race.
86}
87
88/// Release the consolidation lock by updating its mtime to now.
89/// This marks the consolidation as complete (mtime = last consolidated time).
90pub fn release_lock(lock_path: &Path) {
91    // Rewrite the file to update mtime to now.
92    let _ = std::fs::write(lock_path, std::process::id().to_string());
93}
94
95/// Roll back the lock on failure (rewind mtime so next session retries).
96pub fn rollback_lock(lock_path: &Path) {
97    // Delete the lock file so next check sees "never consolidated".
98    let _ = std::fs::remove_file(lock_path);
99}
100
101/// Build the consolidation prompt for the dream agent.
102pub fn build_consolidation_prompt(memory_dir: &Path) -> String {
103    let mut prompt = String::from(
104        "You are a memory consolidation agent. Review and improve the memory \
105         directory. Work in four phases:\n\n\
106         Phase 1 — Orient:\n\
107         - List the memory directory contents\n\
108         - Read MEMORY.md to understand the current index\n\
109         - Skim existing files to avoid creating duplicates\n\n\
110         Phase 2 — Identify issues:\n\
111         - Find duplicate or near-duplicate memories\n\
112         - Find contradictions between memory files\n\
113         - Find memories with relative dates (convert to absolute)\n\
114         - Find memories about things derivable from code (delete these)\n\n\
115         Phase 3 — Consolidate:\n\
116         - Merge duplicates into single files\n\
117         - Delete contradicted facts at the source\n\
118         - Update vague descriptions with specific ones\n\
119         - Remove memories about code patterns, git history, or debugging\n\n\
120         Phase 4 — Prune and index:\n\
121         - Update MEMORY.md to stay under 200 lines\n\
122         - Remove pointers to deleted files\n\
123         - Shorten verbose index entries (detail belongs in topic files)\n\
124         - Resolve contradictions between index and files\n\n\
125         Be aggressive about pruning. Less memory is better than stale memory.\n",
126    );
127
128    prompt.push_str(&format!("\nMemory directory: {}\n", memory_dir.display()));
129
130    prompt
131}
132
133/// Run the full consolidation pipeline via LLM.
134pub async fn run_consolidation(
135    memory_dir: &Path,
136    lock_path: &Path,
137    llm: Arc<dyn crate::llm::provider::Provider>,
138    model: &str,
139) {
140    let prompt = build_consolidation_prompt(memory_dir);
141
142    // Build a manifest of all current memory files.
143    let manifest = super::extraction::build_memory_manifest_public(memory_dir);
144    let full_prompt = format!(
145        "{prompt}\n\n{manifest}\n\n\
146         Analyze these memory files. For each action you want to take, output a JSON \
147         line with one of these formats:\n\
148         To delete a file: {{\"action\": \"delete\", \"filename\": \"file.md\"}}\n\
149         To update a file: {{\"action\": \"update\", \"filename\": \"file.md\", \
150         \"name\": \"Name\", \"description\": \"desc\", \"type\": \"user\", \
151         \"content\": \"new content\"}}\n\
152         To update the index: {{\"action\": \"reindex\"}}\n\n\
153         Output ONLY JSON lines, nothing else. If no changes needed, output nothing."
154    );
155
156    let request = crate::llm::provider::ProviderRequest {
157        messages: vec![crate::llm::message::user_message(&full_prompt)],
158        system_prompt: "You are a memory consolidation agent. You merge, prune, and \
159                        organize memory files. Be aggressive about removing stale or \
160                        duplicate content. Output only JSON lines."
161            .to_string(),
162        tools: vec![],
163        model: model.to_string(),
164        max_tokens: 4096,
165        temperature: Some(0.0),
166        enable_caching: false,
167        tool_choice: Default::default(),
168        metadata: None,
169        // Background consolidation: not user-cancellable, passes a fresh token.
170        cancel: tokio_util::sync::CancellationToken::new(),
171    };
172
173    let result = match llm.stream(&request).await {
174        Ok(mut rx) => {
175            let mut output = String::new();
176            while let Some(event) = rx.recv().await {
177                if let crate::llm::stream::StreamEvent::TextDelta(text) = event {
178                    output.push_str(&text);
179                }
180            }
181            output
182        }
183        Err(e) => {
184            tracing::debug!("Memory consolidation skipped (API error): {e}");
185            rollback_lock(lock_path);
186            return;
187        }
188    };
189
190    // Process actions.
191    let mut actions_taken = 0;
192    for line in result.lines() {
193        let line = line.trim();
194        if line.is_empty() || !line.starts_with('{') {
195            continue;
196        }
197
198        if let Ok(entry) = serde_json::from_str::<serde_json::Value>(line) {
199            let action = entry.get("action").and_then(|v| v.as_str()).unwrap_or("");
200
201            match action {
202                "delete" => {
203                    if let Some(filename) = entry.get("filename").and_then(|v| v.as_str()) {
204                        let path = memory_dir.join(filename);
205                        if path.exists() {
206                            if let Err(e) = std::fs::remove_file(&path) {
207                                warn!("Failed to delete memory file {filename}: {e}");
208                            } else {
209                                info!("Consolidation: deleted {filename}");
210                                actions_taken += 1;
211                            }
212                        }
213                    }
214                }
215                "update" => {
216                    let filename = entry
217                        .get("filename")
218                        .and_then(|v| v.as_str())
219                        .unwrap_or("unknown.md");
220                    let name = entry
221                        .get("name")
222                        .and_then(|v| v.as_str())
223                        .unwrap_or("Unknown");
224                    let description = entry
225                        .get("description")
226                        .and_then(|v| v.as_str())
227                        .unwrap_or("");
228                    let mem_type = entry.get("type").and_then(|v| v.as_str()).unwrap_or("user");
229                    let content = entry.get("content").and_then(|v| v.as_str()).unwrap_or("");
230
231                    if !content.is_empty() {
232                        let memory_type = match mem_type {
233                            "feedback" => Some(super::types::MemoryType::Feedback),
234                            "project" => Some(super::types::MemoryType::Project),
235                            "reference" => Some(super::types::MemoryType::Reference),
236                            _ => Some(super::types::MemoryType::User),
237                        };
238
239                        let meta = super::types::MemoryMeta {
240                            name: name.to_string(),
241                            description: description.to_string(),
242                            memory_type,
243                        };
244
245                        match super::writer::write_memory(memory_dir, filename, &meta, content) {
246                            Ok(_) => {
247                                info!("Consolidation: updated {filename}");
248                                actions_taken += 1;
249                            }
250                            Err(e) => {
251                                warn!("Failed to update memory file {filename}: {e}");
252                            }
253                        }
254                    }
255                }
256                "reindex" => {
257                    // Rebuild the index from existing files.
258                    if let Err(e) = super::writer::rebuild_index(memory_dir) {
259                        warn!("Failed to rebuild memory index: {e}");
260                    } else {
261                        info!("Consolidation: reindexed MEMORY.md");
262                        actions_taken += 1;
263                    }
264                }
265                _ => {}
266            }
267        }
268    }
269
270    if actions_taken > 0 {
271        info!("Memory consolidation complete: {actions_taken} actions taken");
272    } else {
273        info!("Memory consolidation: no changes needed");
274    }
275
276    release_lock(lock_path);
277}
278
279fn is_process_alive(pid: u32) -> bool {
280    #[cfg(unix)]
281    {
282        // kill(pid, 0) checks if process exists without sending a signal.
283        unsafe { libc::kill(pid as i32, 0) == 0 }
284    }
285    #[cfg(not(unix))]
286    {
287        let _ = pid; // Suppress unused variable warning on non-Unix.
288        true // Assume alive on non-Unix.
289    }
290}