Skip to main content

matrixcode_core/memory/
storage.rs

1//! Memory storage with file locking.
2
3use anyhow::Result;
4use chrono::Utc;
5use std::fs;
6use std::path::{Path, PathBuf};
7
8use super::config::MemoryConfig;
9use super::types::{AutoMemory, MemoryEntry};
10
11// ============================================================================
12// File Lock
13// ============================================================================
14
15/// File lock for preventing concurrent access to memory storage.
16pub struct MemoryFileLock {
17    /// Path to the lock file.
18    lock_path: PathBuf,
19    /// Whether we currently hold the lock.
20    locked: bool,
21}
22
23impl MemoryFileLock {
24    /// Create a new file lock for the given directory.
25    pub fn new(base_dir: &Path) -> Self {
26        Self {
27            lock_path: base_dir.join("memory.lock"),
28            locked: false,
29        }
30    }
31
32    /// Acquire the lock (blocking with timeout).
33    /// Returns Ok(true) if lock acquired, Err if timeout.
34    pub fn acquire(&mut self, timeout_ms: u64) -> Result<()> {
35        if self.locked {
36            return Ok(());
37        }
38
39        let start = std::time::Instant::now();
40
41        while start.elapsed().as_millis() < timeout_ms as u128 {
42            match fs::File::create_new(&self.lock_path) {
43                Ok(_) => {
44                    let lock_info = format!("{}:{}", std::process::id(), Utc::now().to_rfc3339());
45                    fs::write(&self.lock_path, lock_info)?;
46                    self.locked = true;
47                    return Ok(());
48                }
49                Err(e) if e.kind() == std::io::ErrorKind::AlreadyExists => {
50                    if self.is_stale_lock()? {
51                        self.remove_stale_lock()?;
52                    }
53                    std::thread::sleep(std::time::Duration::from_millis(50));
54                }
55                Err(e) => {
56                    return Err(e.into());
57                }
58            }
59        }
60
61        // Timeout - return error instead of Ok(false)
62        anyhow::bail!("Failed to acquire memory lock after {}ms timeout", timeout_ms)
63    }
64
65    /// Check if the existing lock is stale (either old or process is dead).
66    fn is_stale_lock(&self) -> Result<bool> {
67        if !self.lock_path.exists() {
68            return Ok(false);
69        }
70
71        // First check if the lock owner process is still running
72        if let Ok(content) = fs::read_to_string(&self.lock_path)
73            && let Some(pid_str) = content.split(':').next()
74            && let Ok(pid) = pid_str.parse::<u32>()
75            && !self.is_process_running(pid)
76        {
77            // Process is dead, lock is stale
78            return Ok(true);
79        }
80
81        // Then check lock age as fallback
82        let metadata = fs::metadata(&self.lock_path)?;
83        let modified = metadata.modified()?;
84        let age = std::time::SystemTime::now()
85            .duration_since(modified)
86            .unwrap_or(std::time::Duration::ZERO);
87
88        Ok(age > std::time::Duration::from_secs(60))
89    }
90
91    /// Check if a process with the given PID is still running.
92    fn is_process_running(&self, pid: u32) -> bool {
93        #[cfg(unix)]
94        {
95            // On Unix, check if process exists by checking /proc
96            if std::path::Path::new("/proc").exists() {
97                std::path::Path::new(&format!("/proc/{}", pid)).exists()
98            } else {
99                // Fallback: assume process is running if we can't check
100                true
101            }
102        }
103        #[cfg(windows)]
104        {
105            // On Windows, use tasklist command to check if process exists
106            use std::process::Command;
107            let output = Command::new("tasklist")
108                .args(["/FI", &format!("PID eq {}", pid), "/NH"])
109                .output();
110
111            match output {
112                Ok(out) => {
113                    let stdout = String::from_utf8_lossy(&out.stdout);
114                    // tasklist returns "INFO: No tasks are running..." if process not found
115                    // or returns a line with the PID if running
116                    stdout.contains(&pid.to_string()) && !stdout.contains("No tasks")
117                }
118                Err(_) => {
119                    // If tasklist fails, assume process might be running (safer)
120                    true
121                }
122            }
123        }
124        #[cfg(not(any(unix, windows)))]
125        {
126            let _ = pid;
127            true
128        }
129    }
130
131    /// Remove stale lock file with atomic retry.
132    fn remove_stale_lock(&self) -> Result<()> {
133        // Use atomic rename to avoid race condition
134        // Create a temp deletion marker and rename it over the lock
135        let temp_path = self.lock_path.with_extension("lock.del");
136        if self.lock_path.exists() {
137            // Try atomic rename first
138            if fs::rename(&self.lock_path, &temp_path).is_ok() {
139                fs::remove_file(&temp_path)?;
140            } else {
141                // Fallback to direct removal if rename fails
142                fs::remove_file(&self.lock_path)?;
143            }
144        }
145        Ok(())
146    }
147
148    /// Release the lock.
149    pub fn release(&mut self) -> Result<()> {
150        if self.locked {
151            fs::remove_file(&self.lock_path)?;
152            self.locked = false;
153        }
154        Ok(())
155    }
156}
157
158impl Drop for MemoryFileLock {
159    fn drop(&mut self) {
160        let _ = self.release();
161    }
162}
163
164// ============================================================================
165// Memory Storage
166// ============================================================================
167
168/// Storage for memory files (global and project-level) with file locking.
169pub struct MemoryStorage {
170    /// Base directory for global memory (~/.matrix).
171    base_dir: PathBuf,
172    /// Project root directory (optional).
173    project_root: Option<PathBuf>,
174    /// File lock for preventing concurrent writes.
175    lock: MemoryFileLock,
176}
177
178impl MemoryStorage {
179    /// Create a new memory storage.
180    pub fn new(project_root: Option<&Path>) -> Result<Self> {
181        let base_dir = Self::get_base_dir()?;
182        let lock = MemoryFileLock::new(&base_dir);
183        Ok(Self {
184            base_dir,
185            project_root: project_root.map(|p| p.to_path_buf()),
186            lock,
187        })
188    }
189
190    /// Create a new storage with explicit lock timeout.
191    pub fn with_lock_timeout(project_root: Option<&Path>, timeout_ms: u64) -> Result<Self> {
192        let mut storage = Self::new(project_root)?;
193        storage.lock.acquire(timeout_ms)?;
194        Ok(storage)
195    }
196
197    /// Get the base directory for memory storage.
198    fn get_base_dir() -> Result<PathBuf> {
199        let home = std::env::var_os("HOME")
200            .or_else(|| std::env::var_os("USERPROFILE"))
201            .ok_or_else(|| anyhow::anyhow!("HOME or USERPROFILE not set"))?;
202        let mut p = PathBuf::from(home);
203        p.push(".matrix");
204        Ok(p)
205    }
206
207    /// Path to global memory file.
208    pub fn global_memory_path(&self) -> PathBuf {
209        self.base_dir.join("memory.json")
210    }
211
212    /// Path to project memory file.
213    pub fn project_memory_path(&self) -> Option<PathBuf> {
214        self.project_root
215            .as_ref()
216            .map(|p| p.join(".matrix/memory.json"))
217    }
218
219    /// Path to config file.
220    pub fn config_path(&self) -> PathBuf {
221        self.base_dir.join("memory_config.json")
222    }
223
224    /// Ensure directories exist.
225    fn ensure_dirs(&self) -> Result<()> {
226        fs::create_dir_all(&self.base_dir)?;
227        if let Some(root) = &self.project_root {
228            let memory_dir = root.join(".matrix");
229            fs::create_dir_all(memory_dir)?;
230        }
231        Ok(())
232    }
233
234    /// Acquire lock before write operations.
235    fn acquire_lock(&mut self) -> Result<()> {
236        self.lock.acquire(5000)?;
237        Ok(())
238    }
239
240    /// Release lock after write operations.
241    fn release_lock(&mut self) -> Result<()> {
242        self.lock.release()?;
243        Ok(())
244    }
245
246    /// Load global memory.
247    pub fn load_global(&self) -> Result<AutoMemory> {
248        let path = self.global_memory_path();
249        if !path.exists() {
250            return Ok(AutoMemory::new());
251        }
252        let data = fs::read_to_string(&path)?;
253        let memory: AutoMemory = serde_json::from_str(&data)?;
254        Ok(memory)
255    }
256
257    /// Load project memory.
258    pub fn load_project(&self) -> Result<Option<AutoMemory>> {
259        let path = self.project_memory_path();
260        match path {
261            Some(p) if p.exists() => {
262                let data = fs::read_to_string(&p)?;
263                let memory: AutoMemory = serde_json::from_str(&data)?;
264                Ok(Some(memory))
265            }
266            _ => Ok(None),
267        }
268    }
269
270    /// Load combined memory (global + project).
271    pub fn load_combined(&self) -> Result<AutoMemory> {
272        let mut combined = self.load_global()?;
273
274        if let Some(project) = self.load_project()? {
275            for entry in project.entries {
276                let mut tagged_entry = entry;
277                if !tagged_entry.tags.contains(&"project".to_string()) {
278                    tagged_entry.tags.push("project".to_string());
279                }
280                combined.entries.push(tagged_entry);
281            }
282            combined.prune();
283        }
284
285        Ok(combined)
286    }
287
288    /// Save global memory (with file lock).
289    pub fn save_global(&mut self, memory: &AutoMemory) -> Result<()> {
290        self.acquire_lock()?;
291        self.ensure_dirs()?;
292
293        let path = self.global_memory_path();
294        let json = serde_json::to_string_pretty(memory)?;
295
296        let tmp = path.with_extension("json.tmp");
297        fs::write(&tmp, json)?;
298        fs::rename(&tmp, &path)?;
299
300        self.release_lock()?;
301        Ok(())
302    }
303
304    /// Save project memory (with file lock).
305    pub fn save_project(&mut self, memory: &AutoMemory) -> Result<()> {
306        self.acquire_lock()?;
307        self.ensure_dirs()?;
308
309        let path = self
310            .project_memory_path()
311            .ok_or_else(|| anyhow::anyhow!("no project root"))?;
312        let json = serde_json::to_string_pretty(memory)?;
313
314        let tmp = path.with_extension("json.tmp");
315        fs::write(&tmp, json)?;
316        fs::rename(&tmp, &path)?;
317
318        self.release_lock()?;
319        Ok(())
320    }
321
322    /// Save config to separate file.
323    pub fn save_config(&mut self, config: &MemoryConfig) -> Result<()> {
324        self.ensure_dirs()?;
325        let path = self.config_path();
326        let json = serde_json::to_string_pretty(config)?;
327        fs::write(&path, json)?;
328        Ok(())
329    }
330
331    /// Load config from file.
332    pub fn load_config(&self) -> Result<MemoryConfig> {
333        let path = self.config_path();
334        if !path.exists() {
335            return Ok(MemoryConfig::default());
336        }
337        let data = fs::read_to_string(&path)?;
338        let config: MemoryConfig = serde_json::from_str(&data)?;
339        Ok(config)
340    }
341
342    /// Add entry to appropriate storage.
343    pub fn add_entry(&mut self, entry: MemoryEntry, is_project_specific: bool) -> Result<()> {
344        self.acquire_lock()?;
345
346        if is_project_specific {
347            let mut project = self.load_project()?.unwrap_or_else(AutoMemory::new);
348            project.add(entry);
349            self.save_project_locked(&project)?;
350        } else {
351            let mut global = self.load_global()?;
352            global.add(entry);
353            self.save_global_locked(&global)?;
354        }
355
356        self.release_lock()?;
357        Ok(())
358    }
359
360    /// Remove entry from storage by ID.
361    pub fn remove_entry(&mut self, id: &str, is_project_specific: bool) -> Result<bool> {
362        self.acquire_lock()?;
363
364        let removed = if is_project_specific {
365            if let Some(mut project) = self.load_project()? {
366                let removed = project.remove(id);
367                if removed {
368                    self.save_project_locked(&project)?;
369                }
370                removed
371            } else {
372                false
373            }
374        } else {
375            let mut global = self.load_global()?;
376            let removed = global.remove(id);
377            if removed {
378                self.save_global_locked(&global)?;
379            }
380            removed
381        };
382
383        self.release_lock()?;
384        Ok(removed)
385    }
386
387    /// Internal save methods (assumed already locked).
388    fn save_global_locked(&self, memory: &AutoMemory) -> Result<()> {
389        let path = self.global_memory_path();
390        let json = serde_json::to_string_pretty(memory)?;
391        let tmp = path.with_extension("json.tmp");
392        fs::write(&tmp, json)?;
393        fs::rename(&tmp, &path)?;
394        Ok(())
395    }
396
397    fn save_project_locked(&self, memory: &AutoMemory) -> Result<()> {
398        let path = self
399            .project_memory_path()
400            .ok_or_else(|| anyhow::anyhow!("no project root"))?;
401        let json = serde_json::to_string_pretty(memory)?;
402        let tmp = path.with_extension("json.tmp");
403        fs::write(&tmp, json)?;
404        fs::rename(&tmp, &path)?;
405        Ok(())
406    }
407}