Skip to main content

matrixcode_core/memory/
storage.rs

1//! Memory storage with file locking.
2
3use anyhow::Result;
4use chrono::Utc;
5use std::fs;
6use std::path::{Path, PathBuf};
7
8use super::config::MemoryConfig;
9use super::entry::MemoryEntry;
10use super::manager::AutoMemory;
11use crate::constants::MATRIX_DIR;
12
13// ============================================================================
14// File Lock
15// ============================================================================
16
17/// File lock for preventing concurrent access to memory storage.
18pub struct MemoryFileLock {
19    /// Path to the lock file.
20    lock_path: PathBuf,
21    /// Whether we currently hold the lock.
22    locked: bool,
23}
24
25impl MemoryFileLock {
26    /// Create a new file lock for the given directory.
27    pub fn new(base_dir: &Path) -> Self {
28        Self {
29            lock_path: base_dir.join("memory.lock"),
30            locked: false,
31        }
32    }
33
34    /// Acquire the lock (blocking with timeout).
35    /// Returns Ok(true) if lock acquired, Err if timeout.
36    pub fn acquire(&mut self, timeout_ms: u64) -> Result<()> {
37        if self.locked {
38            return Ok(());
39        }
40
41        let start = std::time::Instant::now();
42
43        while start.elapsed().as_millis() < timeout_ms as u128 {
44            match fs::File::create_new(&self.lock_path) {
45                Ok(_) => {
46                    let lock_info = format!("{}:{}", std::process::id(), Utc::now().to_rfc3339());
47                    fs::write(&self.lock_path, lock_info)?;
48                    self.locked = true;
49                    return Ok(());
50                }
51                Err(e) if e.kind() == std::io::ErrorKind::AlreadyExists => {
52                    if self.is_stale_lock()? {
53                        self.remove_stale_lock()?;
54                    }
55                    std::thread::sleep(std::time::Duration::from_millis(50));
56                }
57                Err(e) => {
58                    return Err(e.into());
59                }
60            }
61        }
62
63        // Timeout - return error instead of Ok(false)
64        anyhow::bail!(
65            "Failed to acquire memory lock after {}ms timeout",
66            timeout_ms
67        )
68    }
69
70    /// Check if the existing lock is stale (either old or process is dead).
71    fn is_stale_lock(&self) -> Result<bool> {
72        if !self.lock_path.exists() {
73            return Ok(false);
74        }
75
76        // First check if the lock owner process is still running
77        if let Ok(content) = fs::read_to_string(&self.lock_path)
78            && let Some(pid_str) = content.split(':').next()
79            && let Ok(pid) = pid_str.parse::<u32>()
80            && !self.is_process_running(pid)
81        {
82            // Process is dead, lock is stale
83            return Ok(true);
84        }
85
86        // Then check lock age as fallback
87        let metadata = fs::metadata(&self.lock_path)?;
88        let modified = metadata.modified()?;
89        let age = std::time::SystemTime::now()
90            .duration_since(modified)
91            .unwrap_or(std::time::Duration::ZERO);
92
93        Ok(age > std::time::Duration::from_secs(60))
94    }
95
96    /// Check if a process with the given PID is still running.
97    fn is_process_running(&self, pid: u32) -> bool {
98        #[cfg(unix)]
99        {
100            // On Unix, check if process exists by checking /proc
101            if std::path::Path::new("/proc").exists() {
102                std::path::Path::new(&format!("/proc/{}", pid)).exists()
103            } else {
104                // Fallback: assume process is running if we can't check
105                true
106            }
107        }
108        #[cfg(windows)]
109        {
110            // On Windows, use tasklist command to check if process exists
111            use std::process::Command;
112            let output = Command::new("tasklist")
113                .args(["/FI", &format!("PID eq {}", pid), "/NH"])
114                .output();
115
116            match output {
117                Ok(out) => {
118                    let stdout = String::from_utf8_lossy(&out.stdout);
119                    // tasklist returns "INFO: No tasks are running..." if process not found
120                    // or returns a line with the PID if running
121                    stdout.contains(&pid.to_string()) && !stdout.contains("No tasks")
122                }
123                Err(_) => {
124                    // If tasklist fails, assume process might be running (safer)
125                    true
126                }
127            }
128        }
129        #[cfg(not(any(unix, windows)))]
130        {
131            let _ = pid;
132            true
133        }
134    }
135
136    /// Remove stale lock file with atomic retry.
137    fn remove_stale_lock(&self) -> Result<()> {
138        // Use atomic rename to avoid race condition
139        // Create a temp deletion marker and rename it over the lock
140        let temp_path = self.lock_path.with_extension("lock.del");
141        if self.lock_path.exists() {
142            // Try atomic rename first
143            if fs::rename(&self.lock_path, &temp_path).is_ok() {
144                fs::remove_file(&temp_path)?;
145            } else {
146                // Fallback to direct removal if rename fails
147                fs::remove_file(&self.lock_path)?;
148            }
149        }
150        Ok(())
151    }
152
153    /// Release the lock.
154    pub fn release(&mut self) -> Result<()> {
155        if self.locked {
156            fs::remove_file(&self.lock_path)?;
157            self.locked = false;
158        }
159        Ok(())
160    }
161}
162
163impl Drop for MemoryFileLock {
164    fn drop(&mut self) {
165        let _ = self.release();
166    }
167}
168
169// ============================================================================
170// Memory Storage
171// ============================================================================
172
173/// Storage for memory files (global and project-level) with file locking.
174pub struct MemoryStorage {
175    /// Base directory for global memory (~/.matrix).
176    base_dir: PathBuf,
177    /// Project root directory (optional).
178    project_root: Option<PathBuf>,
179    /// File lock for preventing concurrent writes.
180    lock: MemoryFileLock,
181}
182
183impl MemoryStorage {
184    /// Create a new memory storage.
185    pub fn new(project_root: Option<&Path>) -> Result<Self> {
186        let base_dir = Self::get_base_dir()?;
187        let lock = MemoryFileLock::new(&base_dir);
188        Ok(Self {
189            base_dir,
190            project_root: project_root.map(|p| p.to_path_buf()),
191            lock,
192        })
193    }
194
195    /// Create a new storage with explicit lock timeout.
196    pub fn with_lock_timeout(project_root: Option<&Path>, timeout_ms: u64) -> Result<Self> {
197        let mut storage = Self::new(project_root)?;
198        storage.lock.acquire(timeout_ms)?;
199        Ok(storage)
200    }
201
202    /// Get the base directory for memory storage.
203    fn get_base_dir() -> Result<PathBuf> {
204        let home = std::env::var_os("HOME")
205            .or_else(|| std::env::var_os("USERPROFILE"))
206            .ok_or_else(|| anyhow::anyhow!("HOME or USERPROFILE not set"))?;
207        let mut p = PathBuf::from(home);
208        p.push(MATRIX_DIR);
209        Ok(p)
210    }
211
212    /// Path to global memory file.
213    pub fn global_memory_path(&self) -> PathBuf {
214        self.base_dir.join("memory.json")
215    }
216
217    /// Path to project memory file.
218    pub fn project_memory_path(&self) -> Option<PathBuf> {
219        self.project_root
220            .as_ref()
221            .map(|p| p.join(".matrix/memory.json"))
222    }
223
224    /// Path to config file.
225    pub fn config_path(&self) -> PathBuf {
226        self.base_dir.join("memory_config.json")
227    }
228
229    /// Ensure directories exist.
230    fn ensure_dirs(&self) -> Result<()> {
231        fs::create_dir_all(&self.base_dir)?;
232        if let Some(root) = &self.project_root {
233            let memory_dir = root.join(MATRIX_DIR);
234            fs::create_dir_all(memory_dir)?;
235        }
236        Ok(())
237    }
238
239    /// Acquire lock before write operations.
240    fn acquire_lock(&mut self) -> Result<()> {
241        self.lock.acquire(5000)?;
242        Ok(())
243    }
244
245    /// Release lock after write operations.
246    fn release_lock(&mut self) -> Result<()> {
247        self.lock.release()?;
248        Ok(())
249    }
250
251    /// Load global memory.
252    pub fn load_global(&self) -> Result<AutoMemory> {
253        let path = self.global_memory_path();
254        if !path.exists() {
255            return Ok(AutoMemory::new());
256        }
257        let data = fs::read_to_string(&path)?;
258        let memory: AutoMemory = serde_json::from_str(&data)?;
259        Ok(memory)
260    }
261
262    /// Load project memory.
263    pub fn load_project(&self) -> Result<Option<AutoMemory>> {
264        let path = self.project_memory_path();
265        match path {
266            Some(p) if p.exists() => {
267                let data = fs::read_to_string(&p)?;
268                let memory: AutoMemory = serde_json::from_str(&data)?;
269                Ok(Some(memory))
270            }
271            _ => Ok(None),
272        }
273    }
274
275    /// Load combined memory (global + project).
276    pub fn load_combined(&self) -> Result<AutoMemory> {
277        let mut combined = self.load_global()?;
278
279        if let Some(project) = self.load_project()? {
280            for entry in project.entries {
281                let mut tagged_entry = entry;
282                if !tagged_entry.tags.contains(&"project".to_string()) {
283                    tagged_entry.tags.push("project".to_string());
284                }
285                combined.entries.push(tagged_entry);
286            }
287            combined.prune();
288        }
289
290        Ok(combined)
291    }
292
293    /// Save global memory (with file lock).
294    pub fn save_global(&mut self, memory: &AutoMemory) -> Result<()> {
295        self.acquire_lock()?;
296        self.ensure_dirs()?;
297
298        let path = self.global_memory_path();
299        let json = serde_json::to_string_pretty(memory)?;
300
301        let tmp = path.with_extension("json.tmp");
302        fs::write(&tmp, json)?;
303        fs::rename(&tmp, &path)?;
304
305        self.release_lock()?;
306        Ok(())
307    }
308
309    /// Save project memory (with file lock).
310    pub fn save_project(&mut self, memory: &AutoMemory) -> Result<()> {
311        self.acquire_lock()?;
312        self.ensure_dirs()?;
313
314        let path = self
315            .project_memory_path()
316            .ok_or_else(|| anyhow::anyhow!("no project root"))?;
317        let json = serde_json::to_string_pretty(memory)?;
318
319        let tmp = path.with_extension("json.tmp");
320        fs::write(&tmp, json)?;
321        fs::rename(&tmp, &path)?;
322
323        self.release_lock()?;
324        Ok(())
325    }
326
327    /// Save config to separate file.
328    pub fn save_config(&mut self, config: &MemoryConfig) -> Result<()> {
329        self.ensure_dirs()?;
330        let path = self.config_path();
331        let json = serde_json::to_string_pretty(config)?;
332        fs::write(&path, json)?;
333        Ok(())
334    }
335
336    /// Load config from file.
337    pub fn load_config(&self) -> Result<MemoryConfig> {
338        let path = self.config_path();
339        if !path.exists() {
340            return Ok(MemoryConfig::default());
341        }
342        let data = fs::read_to_string(&path)?;
343        let config: MemoryConfig = serde_json::from_str(&data)?;
344        Ok(config)
345    }
346
347    /// Add entry to appropriate storage.
348    pub fn add_entry(&mut self, entry: MemoryEntry, is_project_specific: bool) -> Result<()> {
349        self.acquire_lock()?;
350
351        if is_project_specific {
352            let mut project = self.load_project()?.unwrap_or_else(AutoMemory::new);
353            project.add(entry);
354            self.save_project_locked(&project)?;
355        } else {
356            let mut global = self.load_global()?;
357            global.add(entry);
358            self.save_global_locked(&global)?;
359        }
360
361        self.release_lock()?;
362        Ok(())
363    }
364
365    /// Remove entry from storage by ID.
366    pub fn remove_entry(&mut self, id: &str, is_project_specific: bool) -> Result<bool> {
367        self.acquire_lock()?;
368
369        let removed = if is_project_specific {
370            if let Some(mut project) = self.load_project()? {
371                let removed = project.remove(id);
372                if removed {
373                    self.save_project_locked(&project)?;
374                }
375                removed
376            } else {
377                false
378            }
379        } else {
380            let mut global = self.load_global()?;
381            let removed = global.remove(id);
382            if removed {
383                self.save_global_locked(&global)?;
384            }
385            removed
386        };
387
388        self.release_lock()?;
389        Ok(removed)
390    }
391
392    /// Internal save methods (assumed already locked).
393    fn save_global_locked(&self, memory: &AutoMemory) -> Result<()> {
394        let path = self.global_memory_path();
395        let json = serde_json::to_string_pretty(memory)?;
396        let tmp = path.with_extension("json.tmp");
397        fs::write(&tmp, json)?;
398        fs::rename(&tmp, &path)?;
399        Ok(())
400    }
401
402    fn save_project_locked(&self, memory: &AutoMemory) -> Result<()> {
403        let path = self
404            .project_memory_path()
405            .ok_or_else(|| anyhow::anyhow!("no project root"))?;
406        let json = serde_json::to_string_pretty(memory)?;
407        let tmp = path.with_extension("json.tmp");
408        fs::write(&tmp, json)?;
409        fs::rename(&tmp, &path)?;
410        Ok(())
411    }
412}