Skip to main content

lean_ctx/core/knowledge/
persist.rs

1use chrono::Utc;
2use std::collections::HashMap;
3use std::path::{Path, PathBuf};
4use std::sync::{Arc, Mutex, OnceLock};
5
6use super::ranking::hash_project_root;
7use super::types::{ConsolidatedInsight, KnowledgeFact, ProjectKnowledge, ProjectPattern};
8use crate::core::memory_policy::MemoryPolicy;
9
10fn knowledge_dir(project_hash: &str) -> Result<PathBuf, String> {
11    Ok(crate::core::data_dir::lean_ctx_data_dir()?
12        .join("knowledge")
13        .join(project_hash))
14}
15
16/// Per-project-hash mutex registry. Serializes the read-modify-write cycle of
17/// `mutate_locked` so concurrent `remember` calls within a single process (e.g.
18/// parallel MCP tool calls) cannot clobber each other (issue #326). The outer
19/// map lock is held only briefly to clone the inner `Arc`; the inner lock is
20/// held across the load → mutate → save cycle.
21fn knowledge_lock(project_hash: &str) -> Arc<Mutex<()>> {
22    static KNOWLEDGE_LOCKS: OnceLock<Mutex<HashMap<String, Arc<Mutex<()>>>>> = OnceLock::new();
23    let map = KNOWLEDGE_LOCKS.get_or_init(|| Mutex::new(HashMap::new()));
24    let mut guard = map
25        .lock()
26        .unwrap_or_else(std::sync::PoisonError::into_inner);
27    guard
28        .entry(project_hash.to_string())
29        .or_insert_with(|| Arc::new(Mutex::new(())))
30        .clone()
31}
32
33/// Acquires an exclusive, cross-process advisory lock for a project's
34/// knowledge store. The returned file handle holds the lock until it is
35/// dropped; the OS releases it automatically if the process exits (even on
36/// crash), so there are no stale locks. This serializes the read-modify-write
37/// cycle across *separate processes* (parallel CLI invocations, CLI + daemon +
38/// MCP server), complementing the in-process mutex (issue #326).
39fn acquire_file_lock(dir: &Path) -> Option<std::fs::File> {
40    use fs2::FileExt;
41    let lock_path = dir.join(".knowledge.lock");
42    let file = std::fs::OpenOptions::new()
43        .create(true)
44        .truncate(false)
45        .write(true)
46        .open(&lock_path)
47        .ok()?;
48    #[cfg(unix)]
49    {
50        use std::os::unix::fs::PermissionsExt;
51        let _ = std::fs::set_permissions(&lock_path, std::fs::Permissions::from_mode(0o600));
52    }
53    // Blocks until every other process holding the lock releases it. A failure
54    // here (unsupported FS, etc.) degrades to the in-process lock only.
55    file.lock_exclusive().ok()?;
56    Some(file)
57}
58
59/// Atomically writes `json` to `path` by writing to a unique temp file in the
60/// same directory and renaming it into place. `rename` is atomic on every
61/// supported platform (and replaces the target on Windows), so readers and
62/// concurrent writers never observe a half-written file — preventing the
63/// trailing-garbage JSON corruption reported in issue #326.
64fn write_json_atomic(dir: &Path, path: &Path, json: &str) -> Result<(), String> {
65    let unique = format!(
66        "knowledge.json.tmp.{}.{}",
67        std::process::id(),
68        std::time::SystemTime::now()
69            .duration_since(std::time::UNIX_EPOCH)
70            .map_or(0, |d| d.as_nanos())
71    );
72    let tmp = dir.join(unique);
73    std::fs::write(&tmp, json).map_err(|e| e.to_string())?;
74    #[cfg(unix)]
75    {
76        use std::os::unix::fs::PermissionsExt;
77        let _ = std::fs::set_permissions(&tmp, std::fs::Permissions::from_mode(0o600));
78    }
79    if let Err(e) = std::fs::rename(&tmp, path) {
80        let _ = std::fs::remove_file(&tmp);
81        return Err(e.to_string());
82    }
83    Ok(())
84}
85
86impl ProjectKnowledge {
87    pub fn save(&self) -> Result<(), String> {
88        let dir = knowledge_dir(&self.project_hash)?;
89        std::fs::create_dir_all(&dir).map_err(|e| e.to_string())?;
90        #[cfg(unix)]
91        {
92            use std::os::unix::fs::PermissionsExt;
93            let _ = std::fs::set_permissions(&dir, std::fs::Permissions::from_mode(0o700));
94        }
95
96        let path = dir.join("knowledge.json");
97        let json = serde_json::to_string_pretty(self).map_err(|e| e.to_string())?;
98        write_json_atomic(&dir, &path, &json)?;
99        Ok(())
100    }
101
102    /// Runs a read-modify-write cycle under both an in-process mutex and a
103    /// cross-process file lock, then saves atomically. The knowledge is
104    /// (re)loaded *inside* the locks so the closure always operates on the
105    /// latest on-disk state; this is what prevents lost updates when several
106    /// `remember` calls run in parallel — whether as threads in one process
107    /// (parallel MCP calls) or as separate processes (parallel CLI invocations,
108    /// CLI + daemon + MCP server) — see issue #326. Returns the persisted
109    /// knowledge plus the closure's return value so the caller can build a
110    /// response from the committed state.
111    pub fn mutate_locked<T>(
112        project_root: &str,
113        f: impl FnOnce(&mut Self) -> T,
114    ) -> Result<(Self, T), String> {
115        let hash = hash_project_root(project_root);
116        let lock = knowledge_lock(&hash);
117        let _guard = lock
118            .lock()
119            .unwrap_or_else(std::sync::PoisonError::into_inner);
120
121        // Cross-process lock: create the dir up front so the lock file has a
122        // home, then block until any other process releases it. Held for the
123        // whole read-modify-write via `_file_lock`'s lifetime.
124        let _file_lock = match knowledge_dir(&hash) {
125            Ok(dir) => {
126                let _ = std::fs::create_dir_all(&dir);
127                acquire_file_lock(&dir)
128            }
129            Err(_) => None,
130        };
131
132        let mut knowledge = Self::load_or_create(project_root);
133        let out = f(&mut knowledge);
134        knowledge.save()?;
135        Ok((knowledge, out))
136    }
137
138    pub fn load(project_root: &str) -> Option<Self> {
139        let hash = hash_project_root(project_root);
140        let dir = knowledge_dir(&hash).ok()?;
141        let path = dir.join("knowledge.json");
142
143        if let Ok(content) = std::fs::read_to_string(&path) {
144            let size = content.len();
145            if size > 1_000_000 {
146                tracing::warn!(
147                    "knowledge.json is large ({:.1} MB) — recall may be slow. \
148                     Consider running ctx_knowledge(action=\"consolidate\") to compact it.",
149                    size as f64 / 1_048_576.0,
150                );
151            }
152            if let Ok(k) = serde_json::from_str::<Self>(&content) {
153                return Some(k);
154            }
155        }
156
157        let old_hash = crate::core::project_hash::hash_path_only(project_root);
158        if old_hash != hash {
159            crate::core::project_hash::migrate_if_needed(&old_hash, &hash, project_root);
160            if let Ok(content) = std::fs::read_to_string(&path) {
161                if let Ok(mut k) = serde_json::from_str::<Self>(&content) {
162                    k.project_hash = hash;
163                    let _ = k.save();
164                    return Some(k);
165                }
166            }
167        }
168
169        // Migrate stores created before path normalization (issue #325): on
170        // Windows the CLI keyed its store by a backslash path, splitting it from
171        // the forward-slash MCP store. Pull any such legacy store into the
172        // canonical (normalized) location so facts converge.
173        for legacy_hash in crate::core::project_hash::legacy_unnormalized_hashes(project_root) {
174            if legacy_hash == hash {
175                continue;
176            }
177            crate::core::project_hash::migrate_if_needed(&legacy_hash, &hash, project_root);
178            if let Ok(content) = std::fs::read_to_string(&path) {
179                if let Ok(mut k) = serde_json::from_str::<Self>(&content) {
180                    k.project_hash = hash;
181                    let _ = k.save();
182                    return Some(k);
183                }
184            }
185        }
186
187        None
188    }
189
190    pub fn load_or_create(project_root: &str) -> Self {
191        Self::load(project_root).unwrap_or_else(|| Self::new(project_root))
192    }
193
194    /// Migrates legacy knowledge that was accidentally stored under an empty project_root ("")
195    /// into the given `target_root`. Keeps a timestamped backup of the legacy file.
196    pub fn migrate_legacy_empty_root(
197        target_root: &str,
198        policy: &MemoryPolicy,
199    ) -> Result<bool, String> {
200        if target_root.trim().is_empty() {
201            return Ok(false);
202        }
203
204        let Some(legacy) = Self::load("") else {
205            return Ok(false);
206        };
207
208        if !legacy.project_root.trim().is_empty() {
209            return Ok(false);
210        }
211        if legacy.facts.is_empty() && legacy.patterns.is_empty() && legacy.history.is_empty() {
212            return Ok(false);
213        }
214
215        let mut target = Self::load_or_create(target_root);
216
217        fn fact_key(f: &KnowledgeFact) -> String {
218            format!(
219                "{}|{}|{}|{}|{}",
220                f.category, f.key, f.value, f.source_session, f.created_at
221            )
222        }
223        fn pattern_key(p: &ProjectPattern) -> String {
224            format!(
225                "{}|{}|{}|{}",
226                p.pattern_type, p.description, p.source_session, p.created_at
227            )
228        }
229        fn history_key(h: &ConsolidatedInsight) -> String {
230            format!(
231                "{}|{}|{}",
232                h.summary,
233                h.from_sessions.join(","),
234                h.timestamp
235            )
236        }
237
238        let mut seen_facts: std::collections::HashSet<String> =
239            target.facts.iter().map(fact_key).collect();
240        for f in legacy.facts {
241            if seen_facts.insert(fact_key(&f)) {
242                target.facts.push(f);
243            }
244        }
245
246        let mut seen_patterns: std::collections::HashSet<String> =
247            target.patterns.iter().map(pattern_key).collect();
248        for p in legacy.patterns {
249            if seen_patterns.insert(pattern_key(&p)) {
250                target.patterns.push(p);
251            }
252        }
253
254        let mut seen_history: std::collections::HashSet<String> =
255            target.history.iter().map(history_key).collect();
256        for h in legacy.history {
257            if seen_history.insert(history_key(&h)) {
258                target.history.push(h);
259            }
260        }
261
262        target.facts.sort_by(|a, b| {
263            b.created_at
264                .cmp(&a.created_at)
265                .then_with(|| b.confidence.total_cmp(&a.confidence))
266        });
267        if target.facts.len() > policy.knowledge.max_facts {
268            target.facts.truncate(policy.knowledge.max_facts);
269        }
270        target
271            .patterns
272            .sort_by_key(|x| std::cmp::Reverse(x.created_at));
273        if target.patterns.len() > policy.knowledge.max_patterns {
274            target.patterns.truncate(policy.knowledge.max_patterns);
275        }
276        target
277            .history
278            .sort_by_key(|x| std::cmp::Reverse(x.timestamp));
279        if target.history.len() > policy.knowledge.max_history {
280            target.history.truncate(policy.knowledge.max_history);
281        }
282
283        target.updated_at = Utc::now();
284        target.save()?;
285
286        let legacy_hash = crate::core::project_hash::hash_path_only("");
287        let legacy_dir = knowledge_dir(&legacy_hash)?;
288        let legacy_path = legacy_dir.join("knowledge.json");
289        if legacy_path.exists() {
290            let ts = Utc::now().format("%Y%m%d-%H%M%S");
291            let backup = legacy_dir.join(format!("knowledge.legacy-empty-root.{ts}.json"));
292            std::fs::rename(&legacy_path, &backup).map_err(|e| e.to_string())?;
293        }
294
295        Ok(true)
296    }
297}
298
299#[cfg(test)]
300mod tests {
301    use super::*;
302    use fs2::FileExt;
303
304    #[test]
305    fn file_lock_is_exclusive_across_handles() {
306        // flock-style locks are tied to the open file description, so two
307        // independent `open()`s in the same process behave like two separate
308        // processes: while the first holds the exclusive lock, the second must
309        // fail to acquire it. This validates the cross-process guarantee that
310        // protects parallel CLI writes (issue #326).
311        let dir = tempfile::tempdir().unwrap();
312        let held = acquire_file_lock(dir.path()).expect("first lock must succeed");
313
314        let second = std::fs::OpenOptions::new()
315            .create(true)
316            .truncate(false)
317            .write(true)
318            .open(dir.path().join(".knowledge.lock"))
319            .unwrap();
320        assert!(
321            second.try_lock_exclusive().is_err(),
322            "a second handle must not acquire the lock while it is held"
323        );
324
325        drop(held);
326        assert!(
327            second.try_lock_exclusive().is_ok(),
328            "lock must be acquirable once released"
329        );
330    }
331
332    #[test]
333    fn write_json_atomic_leaves_valid_file_and_no_temp() {
334        let dir = tempfile::tempdir().unwrap();
335        let path = dir.path().join("knowledge.json");
336        write_json_atomic(dir.path(), &path, "{\"ok\":true}").unwrap();
337        assert_eq!(std::fs::read_to_string(&path).unwrap(), "{\"ok\":true}");
338        let leftover = std::fs::read_dir(dir.path())
339            .unwrap()
340            .filter_map(Result::ok)
341            .any(|e| e.file_name().to_string_lossy().contains(".tmp."));
342        assert!(!leftover, "no temp file should remain");
343    }
344}