catenary-mcp 1.6.1

A high-performance multiplexing bridge between MCP (Model Context Protocol) and LSP (Language Server Protocol). Enables LLMs to access IDE-grade code intelligence across multiple languages simultaneously with smart routing and UTF-8 accuracy.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
// SPDX-License-Identifier: AGPL-3.0-or-later
// Copyright (C) 2026 Mark Wells <contact@markwells.dev>

//! Path validation for LSP-aware operations.
//!
//! Workspace-root checks here are **not** a security layer — access control
//! is delegated to the host CLI's permission system (hooks, permission
//! dialogs). Instead, these checks gate LSP queries: if a file is outside
//! the roots we told the language server about, asking it for diagnostics
//! or symbols is pointless. Config file protection is the one true security
//! invariant — Catenary must not allow agents to modify its own config.

use anyhow::{Result, anyhow};
use std::path::{Path, PathBuf};
use tracing::debug;

/// Checks whether file paths fall within the workspace roots known to LSP
/// servers, and protects Catenary configuration files from modification.
///
/// The root check is an LSP-awareness gate, not a security boundary —
/// host CLI permission layers handle access control.
pub struct PathValidator {
    /// Canonical workspace root paths.
    roots: Vec<PathBuf>,
    /// Canonical paths of Catenary config files that must not be written.
    protected_configs: Vec<PathBuf>,
}

impl PathValidator {
    /// Creates a new `PathValidator` from workspace roots.
    ///
    /// Automatically discovers Catenary config file paths to protect:
    /// - `~/.config/catenary/config.toml` (user config)
    /// - `.catenary.toml` files found by searching upward from each root
    pub fn new(roots: Vec<PathBuf>) -> Self {
        let protected_configs = Self::discover_config_paths(&roots);
        debug!(
            "PathValidator initialized with {} root(s), {} protected config(s)",
            roots.len(),
            protected_configs.len()
        );
        Self {
            roots,
            protected_configs,
        }
    }

    /// Returns the current workspace roots.
    #[must_use]
    pub fn roots(&self) -> &[PathBuf] {
        &self.roots
    }

    /// Updates the workspace roots and re-discovers protected config paths.
    pub fn update_roots(&mut self, roots: Vec<PathBuf>) {
        self.protected_configs = Self::discover_config_paths(&roots);
        debug!(
            "PathValidator updated: {} root(s), {} protected config(s)",
            roots.len(),
            self.protected_configs.len()
        );
        self.roots = roots;
    }

    /// Validates that a path is within LSP-known workspace roots.
    ///
    /// Canonicalizes the path (resolving symlinks) and checks that the
    /// canonical path is within at least one workspace root. This is not
    /// a security check — it prevents wasted LSP round-trips for files
    /// the language server has no knowledge of.
    ///
    /// # Errors
    ///
    /// Returns an error if:
    /// - The path does not exist or cannot be canonicalized.
    /// - The canonical path is outside all workspace roots.
    pub fn validate_read(&self, path: &Path) -> Result<PathBuf> {
        let canonical = path
            .canonicalize()
            .map_err(|e| anyhow!("Path does not exist: {}: {e}", path.display()))?;

        if !self.is_within_roots(&canonical) {
            return Err(anyhow!(
                "Path is outside workspace roots: {}",
                path.display()
            ));
        }

        Ok(canonical)
    }

    /// Validates a path for write-side operations.
    ///
    /// Performs the same LSP-awareness root check as [`Self::validate_read`], plus:
    /// - Rejects Catenary configuration files (`.catenary.toml`,
    ///   `~/.config/catenary/config.toml`) — the one true security invariant.
    ///
    /// For new files that don't exist yet, validates the parent directory
    /// is within workspace roots instead.
    ///
    /// # Errors
    ///
    /// Returns an error if:
    /// - The path (or its parent for new files) is outside workspace roots.
    /// - The path resolves to a Catenary configuration file.
    pub fn validate_write(&self, path: &Path) -> Result<PathBuf> {
        // For new files, the path itself won't exist yet. Check parent instead.
        let canonical = if path.exists() {
            let canonical = path
                .canonicalize()
                .map_err(|e| anyhow!("Cannot resolve path: {}: {e}", path.display()))?;

            if !self.is_within_roots(&canonical) {
                return Err(anyhow!(
                    "Path is outside workspace roots: {}",
                    path.display()
                ));
            }

            canonical
        } else {
            // New file: validate parent directory exists and is within roots
            let parent = path
                .parent()
                .ok_or_else(|| anyhow!("Cannot determine parent directory: {}", path.display()))?;

            // The parent might also not exist yet (create_dir_all will handle it),
            // so walk up to find the first existing ancestor.
            let existing_ancestor = Self::find_existing_ancestor(parent)?;
            let canonical_ancestor = existing_ancestor.canonicalize().map_err(|e| {
                anyhow!(
                    "Cannot resolve ancestor path: {}: {e}",
                    existing_ancestor.display()
                )
            })?;

            if !self.is_within_roots(&canonical_ancestor) {
                return Err(anyhow!(
                    "Path is outside workspace roots: {}",
                    path.display()
                ));
            }

            // Return the intended path (not canonical, since it doesn't exist yet).
            // Build from the canonical ancestor + remaining components.
            let remaining = path.strip_prefix(&existing_ancestor).unwrap_or_else(|_| {
                // If strip_prefix fails, use just the filename
                path.file_name().map_or_else(|| Path::new(""), Path::new)
            });
            canonical_ancestor.join(remaining)
        };

        if self.is_config_file(&canonical) {
            return Err(anyhow!(
                "Cannot modify Catenary configuration file: {}",
                path.display()
            ));
        }

        Ok(canonical)
    }

    /// Checks if a canonical path is within any LSP-known workspace root.
    fn is_within_roots(&self, canonical: &Path) -> bool {
        self.roots.iter().any(|root| canonical.starts_with(root))
    }

    /// Checks if a canonical path matches any protected config file.
    fn is_config_file(&self, canonical: &Path) -> bool {
        self.protected_configs
            .iter()
            .any(|config| canonical == config)
    }

    /// Discovers Catenary config file paths to protect.
    fn discover_config_paths(roots: &[PathBuf]) -> Vec<PathBuf> {
        let mut paths = Vec::new();

        // User config: ~/.config/catenary/config.toml
        if let Some(config_dir) = dirs::config_dir() {
            let user_config = config_dir.join("catenary").join("config.toml");
            if let Ok(canonical) = user_config.canonicalize() {
                paths.push(canonical);
            }
        }

        // Project-local config: .catenary.toml (search upward from each root)
        for root in roots {
            let mut current = Some(root.as_path());
            while let Some(dir) = current {
                let config_path = dir.join(".catenary.toml");
                if let Ok(canonical) = config_path.canonicalize() {
                    if !paths.contains(&canonical) {
                        paths.push(canonical);
                    }
                    break;
                }
                current = dir.parent();
            }
        }

        paths
    }

    /// Walks up the directory tree to find the first existing ancestor.
    fn find_existing_ancestor(path: &Path) -> Result<PathBuf> {
        let mut current = path;
        loop {
            if current.exists() {
                return Ok(current.to_path_buf());
            }
            current = current
                .parent()
                .ok_or_else(|| anyhow!("No existing ancestor found for: {}", path.display()))?;
        }
    }
}

#[cfg(test)]
#[allow(
    clippy::expect_used,
    reason = "tests use expect for readable assertions"
)]
mod tests {
    use super::*;
    use std::fs;
    use tempfile::TempDir;

    fn setup_workspace() -> Result<(TempDir, PathValidator)> {
        let dir = TempDir::new().map_err(|e| anyhow!("{e}"))?;
        let root = dir.path().canonicalize()?;

        // Create some test files
        fs::write(root.join("test.rs"), "fn main() {}")?;
        fs::create_dir_all(root.join("src"))?;
        fs::write(root.join("src/lib.rs"), "// lib")?;

        let validator = PathValidator::new(vec![root]);
        Ok((dir, validator))
    }

    #[test]
    fn test_read_within_root_succeeds() -> Result<()> {
        let (dir, validator) = setup_workspace()?;
        let result = validator.validate_read(&dir.path().join("test.rs"));
        assert!(result.is_ok());
        Ok(())
    }

    #[test]
    fn test_read_subdirectory_succeeds() -> Result<()> {
        let (dir, validator) = setup_workspace()?;
        let result = validator.validate_read(&dir.path().join("src/lib.rs"));
        assert!(result.is_ok());
        Ok(())
    }

    #[test]
    fn test_read_outside_root_fails() -> Result<()> {
        let (_dir, validator) = setup_workspace()?;
        let result = validator.validate_read(Path::new("/etc/hostname"));
        assert!(result.is_err());
        let err = result.expect_err("expected error").to_string();
        assert!(
            err.contains("outside workspace roots"),
            "Error should mention workspace roots: {err}"
        );
        Ok(())
    }

    #[test]
    fn test_read_nonexistent_fails() -> Result<()> {
        let (dir, validator) = setup_workspace()?;
        let result = validator.validate_read(&dir.path().join("nonexistent.rs"));
        assert!(result.is_err());
        let err = result.expect_err("expected error").to_string();
        assert!(
            err.contains("does not exist"),
            "Error should mention file not existing: {err}"
        );
        Ok(())
    }

    #[test]
    fn test_read_path_traversal_outside_root_fails() -> Result<()> {
        let (_dir, validator) = setup_workspace()?;
        // Even with ../ that technically resolves to something that exists,
        // if it's outside the root, it should fail.
        let result = validator.validate_read(Path::new("/tmp/../etc/hostname"));
        assert!(result.is_err());
        Ok(())
    }

    #[test]
    fn test_write_within_root_succeeds() -> Result<()> {
        let (dir, validator) = setup_workspace()?;
        let result = validator.validate_write(&dir.path().join("test.rs"));
        assert!(result.is_ok());
        Ok(())
    }

    #[test]
    fn test_write_outside_root_fails() -> Result<()> {
        let (_dir, validator) = setup_workspace()?;
        let result = validator.validate_write(Path::new("/tmp/outside.rs"));
        assert!(result.is_err());
        Ok(())
    }

    #[test]
    fn test_write_new_file_within_root_succeeds() -> Result<()> {
        let (dir, validator) = setup_workspace()?;
        let result = validator.validate_write(&dir.path().join("new_file.rs"));
        assert!(result.is_ok());
        Ok(())
    }

    #[test]
    fn test_write_new_file_in_new_subdir_within_root() -> Result<()> {
        let (dir, validator) = setup_workspace()?;
        let result = validator.validate_write(&dir.path().join("new_dir/new_file.rs"));
        assert!(result.is_ok());
        Ok(())
    }

    #[test]
    fn test_write_config_file_rejected() -> Result<()> {
        let (dir, _) = setup_workspace()?;
        // Create a .catenary.toml in the root
        let config_path = dir.path().join(".catenary.toml");
        fs::write(&config_path, "idle_timeout = 300")?;

        // Recreate validator to pick up the config
        let root = dir.path().canonicalize()?;
        let validator = PathValidator::new(vec![root]);

        let result = validator.validate_write(&config_path);
        assert!(result.is_err());
        let err = result.expect_err("expected error").to_string();
        assert!(
            err.contains("configuration file"),
            "Error should mention config file: {err}"
        );
        Ok(())
    }

    #[test]
    fn test_read_config_file_allowed() -> Result<()> {
        let (dir, _) = setup_workspace()?;
        let config_path = dir.path().join(".catenary.toml");
        fs::write(&config_path, "idle_timeout = 300")?;

        let root = dir.path().canonicalize()?;
        let validator = PathValidator::new(vec![root]);

        // Reading config files is fine
        let result = validator.validate_read(&config_path);
        assert!(result.is_ok());
        Ok(())
    }

    #[test]
    fn test_multiple_roots() -> Result<()> {
        let dir1 = TempDir::new()?;
        let dir2 = TempDir::new()?;
        let root1 = dir1.path().canonicalize()?;
        let root2 = dir2.path().canonicalize()?;

        fs::write(root1.join("a.rs"), "// a")?;
        fs::write(root2.join("b.rs"), "// b")?;

        let validator = PathValidator::new(vec![root1, root2]);

        assert!(validator.validate_read(&dir1.path().join("a.rs")).is_ok());
        assert!(validator.validate_read(&dir2.path().join("b.rs")).is_ok());
        Ok(())
    }

    #[test]
    fn test_update_roots() -> Result<()> {
        let dir1 = TempDir::new()?;
        let dir2 = TempDir::new()?;
        let root1 = dir1.path().canonicalize()?;
        let root2 = dir2.path().canonicalize()?;

        fs::write(root1.join("a.rs"), "// a")?;
        fs::write(root2.join("b.rs"), "// b")?;

        let mut validator = PathValidator::new(vec![root1]);

        // b.rs is outside current roots
        assert!(validator.validate_read(&dir2.path().join("b.rs")).is_err());

        // Update roots to include dir2
        validator.update_roots(vec![dir1.path().canonicalize()?, root2]);

        // Now b.rs is within roots
        assert!(validator.validate_read(&dir2.path().join("b.rs")).is_ok());
        Ok(())
    }

    #[cfg(unix)]
    #[test]
    fn test_symlink_within_root_succeeds() -> Result<()> {
        use std::os::unix::fs as unix_fs;

        let (dir, validator) = setup_workspace()?;
        let root = dir.path().canonicalize()?;

        // Create a symlink within the workspace
        let link_path = root.join("link.rs");
        unix_fs::symlink(root.join("test.rs"), &link_path)?;

        let result = validator.validate_read(&link_path);
        assert!(result.is_ok());
        Ok(())
    }

    #[cfg(unix)]
    #[test]
    fn test_symlink_outside_root_fails() -> Result<()> {
        use std::os::unix::fs as unix_fs;

        let (dir, validator) = setup_workspace()?;
        let root = dir.path().canonicalize()?;

        // Create a file outside the workspace
        let outside_dir = TempDir::new()?;
        let outside_file = outside_dir.path().join("secret.txt");
        fs::write(&outside_file, "secret")?;

        // Create a symlink inside workspace pointing outside
        let link_path = root.join("sneaky_link.txt");
        unix_fs::symlink(&outside_file, &link_path)?;

        // canonicalize() will resolve the symlink to the outside path
        let result = validator.validate_read(&link_path);
        assert!(result.is_err());
        Ok(())
    }
}