Skip to main content

algocline_app/pool/
registry.rs

1//! Pool registry: persistent session-to-worker mapping.
2//!
3//! [`PoolRegistry`] tracks live pool worker processes in
4//! `~/.algocline/state/pool/registry.json`.  The file survives MCP-process
5//! death so a restarted MCP can rediscover live worker sockets.
6//!
7//! ## Crux invariant (Registry reconnect across restarts)
8//!
9//! `registry.json` is the **persistent source of truth**.  The in-memory
10//! `PoolRegistry` value is a short-lived view — callers must reload from disk
11//! after acquiring the advisory lock rather than caching across lock cycles.
12
13use std::path::Path;
14use std::path::PathBuf;
15
16use serde::{Deserialize, Serialize};
17
18use crate::pool::PoolError;
19use crate::service::lock::LockError;
20use crate::service::manifest::now_iso8601;
21
22// ─── Entry ────────────────────────────────────────────────────────────────────
23
24/// A single session entry in the pool registry.
25///
26/// # Fields
27///
28/// - `sid` — session ID string (UUID or similar).
29/// - `pid` — OS process-ID of the worker; used for liveness checks via
30///   `kill -0`.
31/// - `sock` — absolute path to the Unix-domain socket owned by the worker.
32/// - `version` — crate version at the time the worker was spawned; used in
33///   version-handshake validation.
34/// - `created_at` — ISO 8601 timestamp of worker creation.
35#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
36pub struct PoolSessionEntry {
37    /// Session identifier.
38    pub sid: String,
39    /// Worker process ID (`u32` — never zero or negative on POSIX).
40    pub pid: u32,
41    /// Absolute path of the worker's Unix-domain socket.
42    pub sock: PathBuf,
43    /// Crate version that spawned the worker (for handshake validation).
44    pub version: String,
45    /// ISO 8601 creation timestamp.
46    pub created_at: String,
47}
48
49impl PoolSessionEntry {
50    /// Create a new entry stamped with the current time.
51    ///
52    /// # Arguments
53    ///
54    /// * `sid` — session identifier.
55    /// * `pid` — worker process ID.
56    /// * `sock` — absolute path to the worker's UDS socket.
57    /// * `version` — crate version string (e.g. `env!("CARGO_PKG_VERSION")`).
58    ///
59    /// # Returns
60    ///
61    /// A new `PoolSessionEntry` with `created_at` set to the current UTC time.
62    pub fn new(
63        sid: impl Into<String>,
64        pid: u32,
65        sock: PathBuf,
66        version: impl Into<String>,
67    ) -> Self {
68        Self {
69            sid: sid.into(),
70            pid,
71            sock,
72            version: version.into(),
73            created_at: now_iso8601(),
74        }
75    }
76}
77
78// ─── Registry ─────────────────────────────────────────────────────────────────
79
80/// In-memory view of `registry.json`.
81///
82/// This struct must **always** be loaded from and saved to disk within a
83/// single advisory-lock region (see [`with_registry_lock`]).  Do not hold
84/// a `PoolRegistry` value across lock boundaries.
85///
86/// ## Crux: registry.json is the persistent source of truth
87///
88/// MCP processes must not rely on any in-memory state to discover live
89/// workers after a restart.  Every mutation path must call [`save`] before
90/// dropping the lock.
91///
92/// [`save`]: PoolRegistry::save
93#[derive(Debug, Clone, Serialize, Deserialize, Default, PartialEq)]
94pub struct PoolRegistry {
95    /// All currently-registered worker sessions.
96    pub sessions: Vec<PoolSessionEntry>,
97}
98
99impl PoolRegistry {
100    /// Load `registry.json` from disk, returning an empty registry if the
101    /// file does not exist.
102    ///
103    /// # Arguments
104    ///
105    /// * `path` — absolute path to `registry.json`.
106    ///
107    /// # Returns
108    ///
109    /// `Ok(PoolRegistry)` — either the parsed on-disk state or an empty
110    /// registry when the file is absent.
111    ///
112    /// # Errors
113    ///
114    /// Returns `PoolError::RegistryCorrupted(reason)` if the file exists but
115    /// cannot be parsed as valid JSON.  **Never** falls back to an empty
116    /// registry on parse failure — callers must handle the error explicitly
117    /// and propagate it to the MCP wire layer.
118    ///
119    /// # Concurrency
120    ///
121    /// This is a synchronous file read.  The caller must hold the advisory
122    /// `fs4::fs_std::FileExt::lock_exclusive` on `registry.lock` before
123    /// calling this method to prevent concurrent read-modify-write races
124    /// between multiple MCP processes.
125    pub fn load_or_default(path: &Path) -> Result<Self, PoolError> {
126        if !path.exists() {
127            return Ok(Self::default());
128        }
129        let content = std::fs::read_to_string(path).map_err(|e| {
130            PoolError::RegistryCorrupted(format!("failed to read {}: {e}", path.display()))
131        })?;
132        serde_json::from_str(&content).map_err(|e| {
133            PoolError::RegistryCorrupted(format!("failed to parse {}: {e}", path.display()))
134        })
135    }
136
137    /// Atomically persist the registry to `registry.json` via
138    /// `tempfile::NamedTempFile::persist` (POSIX `rename(2)`).
139    ///
140    /// # Arguments
141    ///
142    /// * `path` — absolute path to `registry.json`.
143    ///
144    /// # Returns
145    ///
146    /// `Ok(())` on success.
147    ///
148    /// # Errors
149    ///
150    /// Returns `PoolError::RegistryCorrupted` if parent-directory creation,
151    /// serialization, temp-file creation/write/fsync, or the atomic rename
152    /// fails.
153    ///
154    /// # Atomicity
155    ///
156    /// `NamedTempFile::persist` is atomic on modern Linux filesystems and
157    /// macOS.  It is **not** guaranteed atomic on all platforms.
158    ///
159    /// # Concurrency
160    ///
161    /// Callers must hold the advisory `fs4::fs_std::FileExt::lock_exclusive`
162    /// on `registry.lock` for the entire read-modify-write cycle
163    /// (load → mutate → save) to prevent last-writer-wins data loss when
164    /// multiple MCP processes write concurrently.
165    pub fn save(&self, path: &Path) -> Result<(), PoolError> {
166        let parent = path.parent().ok_or_else(|| {
167            PoolError::RegistryCorrupted(format!(
168                "registry path has no parent directory: {}",
169                path.display()
170            ))
171        })?;
172
173        std::fs::create_dir_all(parent).map_err(|e| {
174            PoolError::RegistryCorrupted(format!(
175                "failed to create registry directory {}: {e}",
176                parent.display()
177            ))
178        })?;
179
180        // Restrict the pool directory to the owning user only (0700 = drwx------).
181        // This prevents other local users from listing or accessing pool sockets and
182        // the registry file.  set_permissions failure is fatal — propagate as
183        // RegistryCorrupted per CLAUDE.md §Service 層の Error 伝播規律.
184        #[cfg(unix)]
185        {
186            use std::os::unix::fs::PermissionsExt;
187            std::fs::set_permissions(parent, std::fs::Permissions::from_mode(0o700)).map_err(
188                |e| {
189                    PoolError::RegistryCorrupted(format!(
190                        "failed to set permissions on {}: {e}",
191                        parent.display()
192                    ))
193                },
194            )?;
195        }
196
197        let content = serde_json::to_string_pretty(self).map_err(|e| {
198            PoolError::RegistryCorrupted(format!("failed to serialize registry: {e}"))
199        })?;
200
201        let mut tmp = tempfile::NamedTempFile::new_in(parent).map_err(|e| {
202            PoolError::RegistryCorrupted(format!(
203                "failed to create temp file in {}: {e}",
204                parent.display()
205            ))
206        })?;
207
208        {
209            use std::io::Write;
210            tmp.write_all(content.as_bytes()).map_err(|e| {
211                PoolError::RegistryCorrupted(format!("failed to write registry temp file: {e}"))
212            })?;
213            tmp.as_file().sync_all().map_err(|e| {
214                PoolError::RegistryCorrupted(format!("failed to fsync registry temp file: {e}"))
215            })?;
216        }
217
218        tmp.persist(path).map_err(|e| {
219            PoolError::RegistryCorrupted(format!(
220                "failed to atomically replace {} with temp file: {e}",
221                path.display()
222            ))
223        })?;
224
225        // Restrict the registry file to the owning user only (0600 = -rw-------).
226        // Applied after persist so we operate on the final path, not the temp file.
227        #[cfg(unix)]
228        {
229            use std::os::unix::fs::PermissionsExt;
230            std::fs::set_permissions(path, std::fs::Permissions::from_mode(0o600)).map_err(
231                |e| {
232                    PoolError::RegistryCorrupted(format!(
233                        "failed to set permissions on {}: {e}",
234                        path.display()
235                    ))
236                },
237            )?;
238        }
239
240        Ok(())
241    }
242
243    /// Scan all registered sessions and remove entries whose worker process
244    /// is no longer alive, returning the surviving (live) entries.
245    ///
246    /// Liveness is tested with `kill(pid, 0)` (POSIX signal 0 — does not
247    /// send a signal, only checks whether the process exists).  An `ESRCH`
248    /// return value means the process does not exist and the entry is pruned.
249    ///
250    /// # Arguments
251    ///
252    /// None — operates on `&mut self` in place.
253    ///
254    /// # Returns
255    ///
256    /// `Ok(Vec<PoolSessionEntry>)` — the subset of sessions that survived GC
257    /// (i.e. whose worker process is still alive).
258    ///
259    /// # Errors
260    ///
261    /// Currently infallible on POSIX; the `Result` wrapper is kept for future
262    /// extension.
263    ///
264    /// # Platform support
265    ///
266    /// On non-Unix targets the liveness check is omitted and all entries are
267    /// assumed live (conservative).
268    pub fn scan_and_gc(&mut self) -> Result<Vec<PoolSessionEntry>, PoolError> {
269        let before_len = self.sessions.len();
270
271        #[cfg(unix)]
272        self.sessions.retain(|entry| {
273            // Guard u32 → i32 (pid_t) conversion: pids above i32::MAX would
274            // produce a negative value and send the signal to an unintended
275            // process group (K-52).  Treat overflow as "dead" and prune.
276            let pid_t = match i32::try_from(entry.pid) {
277                Ok(p) => p,
278                Err(_) => {
279                    tracing::warn!(
280                        pid = entry.pid,
281                        sid = %entry.sid,
282                        "pid exceeds i32::MAX, treating as dead (K-52)"
283                    );
284                    return false;
285                }
286            };
287            // SAFETY: libc::kill(pid, sig) is a thin syscall wrapper.
288            // pid > 0 sends to the specific process (never to a group).
289            // sig == 0 performs an existence check without delivering a signal.
290            // pid fits in i32, verified by try_from above.
291            // Return 0 → process exists (live).
292            // Return -1 with errno ESRCH → no such process (dead / orphan).
293            let result = unsafe { libc::kill(pid_t, 0) };
294            result == 0
295        });
296
297        // On non-Unix platforms: retain all entries (conservative fallback).
298        #[cfg(not(unix))]
299        let _ = before_len;
300
301        let _ = before_len; // suppress unused warning on non-unix
302        Ok(self.sessions.clone())
303    }
304
305    /// Add a session entry to the registry.
306    ///
307    /// # Arguments
308    ///
309    /// * `entry` — the `PoolSessionEntry` to insert.
310    ///
311    /// # Notes
312    ///
313    /// Does not persist to disk.  Call [`save`](PoolRegistry::save) after
314    /// mutating to ensure durability.
315    pub fn add(&mut self, entry: PoolSessionEntry) {
316        self.sessions.push(entry);
317    }
318
319    /// Remove the entry with the given session ID.
320    ///
321    /// # Arguments
322    ///
323    /// * `sid` — session ID to remove.
324    ///
325    /// # Returns
326    ///
327    /// `true` if an entry was found and removed, `false` if no matching entry
328    /// existed.
329    ///
330    /// # Notes
331    ///
332    /// Does not persist to disk.  Call [`save`](PoolRegistry::save) after
333    /// mutating to ensure durability.
334    pub fn remove(&mut self, sid: &str) -> bool {
335        let before = self.sessions.len();
336        self.sessions.retain(|e| e.sid != sid);
337        self.sessions.len() < before
338    }
339
340    /// Look up a session entry by ID.
341    ///
342    /// # Arguments
343    ///
344    /// * `sid` — session ID to search for.
345    ///
346    /// # Returns
347    ///
348    /// `Some(&PoolSessionEntry)` if found, `None` otherwise.
349    pub fn find(&self, sid: &str) -> Option<&PoolSessionEntry> {
350        self.sessions.iter().find(|e| e.sid == sid)
351    }
352}
353
354// ─── Advisory-lock helper ─────────────────────────────────────────────────────
355
356/// Run `f` while holding an exclusive advisory lock on `lock_path`, using the
357/// same `fs4`-backed mechanism as `service::lock::with_exclusive_lock`.
358///
359/// Callers should pass the `registry.lock` sentinel path (e.g.
360/// `app_dir.root().join("pool/registry.lock")`).
361///
362/// # Arguments
363///
364/// * `lock_path` — path to the advisory lock file (created if absent).
365/// * `f` — closure to run under the lock.
366///
367/// # Returns
368///
369/// Propagates the return value of `f`.
370///
371/// # Errors
372///
373/// Returns `PoolError::RegistryCorrupted` if the lock file cannot be created
374/// or the exclusive lock cannot be acquired.
375///
376/// # Concurrency
377///
378/// The lock is released when the underlying `File` is dropped, which occurs
379/// on all exit paths from this function including panics (RAII / drop).
380pub fn with_registry_lock<F, R>(lock_path: &Path, f: F) -> Result<R, PoolError>
381where
382    F: FnOnce() -> Result<R, PoolError>,
383{
384    crate::service::lock::with_exclusive_lock(lock_path, f)
385}
386
387/// Bridge so that `lock::with_exclusive_lock` generic `E: From<LockError>`
388/// constraint is satisfied when `E = PoolError`.
389impl From<LockError> for PoolError {
390    fn from(e: LockError) -> Self {
391        PoolError::RegistryCorrupted(e.to_string())
392    }
393}
394
395// ─── Tests ────────────────────────────────────────────────────────────────────
396
397#[cfg(test)]
398mod tests {
399    use std::sync::Arc;
400
401    use super::*;
402
403    // ── helpers ──────────────────────────────────────────────────────────────
404
405    fn make_entry(sid: &str, pid: u32) -> PoolSessionEntry {
406        PoolSessionEntry::new(
407            sid,
408            pid,
409            PathBuf::from(format!("/tmp/alc-pool/{sid}.sock")),
410            "0.30.0",
411        )
412    }
413
414    // ── T1: happy path ────────────────────────────────────────────────────────
415
416    /// T1 — load_or_default returns empty registry when the file is absent.
417    #[test]
418    fn load_default_when_absent() {
419        let dir = tempfile::tempdir().expect("tempdir");
420        let path = dir.path().join("registry.json");
421
422        let reg = PoolRegistry::load_or_default(&path).expect("load_or_default");
423        assert!(reg.sessions.is_empty(), "expected empty registry");
424    }
425
426    // ── T2: boundary / edge ───────────────────────────────────────────────────
427
428    /// T2 — scan_and_gc removes the dead-PID entry and retains the live one.
429    ///
430    /// Uses the current process PID as the "live" entry (guaranteed to exist)
431    /// and pid=999999 as the "dead" entry (virtually certain to be absent).
432    #[test]
433    fn scan_and_gc_removes_dead_pid() {
434        // SAFETY: std::process::id() returns the current PID, which is alive.
435        let live_pid = std::process::id();
436
437        let mut reg = PoolRegistry::default();
438        reg.add(make_entry("live-session", live_pid));
439        reg.add(make_entry("dead-session", 999_999));
440
441        let survivors = reg.scan_and_gc().expect("scan_and_gc");
442
443        assert_eq!(survivors.len(), 1, "expected 1 survivor");
444        assert_eq!(survivors[0].sid, "live-session");
445        assert_eq!(
446            reg.sessions.len(),
447            1,
448            "in-place mutation must prune dead entry"
449        );
450        assert!(
451            reg.find("dead-session").is_none(),
452            "dead entry must be gone"
453        );
454        assert!(reg.find("live-session").is_some(), "live entry must remain");
455    }
456
457    // ── T3: error path ────────────────────────────────────────────────────────
458
459    /// T3 — load_or_default returns PoolError::RegistryCorrupted for bad JSON.
460    ///
461    /// Verifies that parse failures are NOT silently swallowed as empty
462    /// registries — CLAUDE.md 2026-04-22 事故と同じパターンの再発防止。
463    #[test]
464    fn load_corrupted_returns_pool_error() {
465        let dir = tempfile::tempdir().expect("tempdir");
466        let path = dir.path().join("registry.json");
467
468        // Write intentionally broken JSON.
469        std::fs::write(&path, b"{ not valid json !!!").expect("write");
470
471        let result = PoolRegistry::load_or_default(&path);
472        match result {
473            Err(PoolError::RegistryCorrupted(msg)) => {
474                assert!(!msg.is_empty(), "error message must not be empty");
475            }
476            other => panic!("expected RegistryCorrupted, got {other:?}"),
477        }
478    }
479
480    // ── T4: concurrent writers (advisory lock prevents entry loss) ────────────
481
482    /// T4 — two concurrent tasks each perform 50 add→save cycles under the
483    /// advisory lock; the final registry must contain all entries.
484    ///
485    /// Uses `#[tokio::test(flavor = "multi_thread", worker_threads = 4)]` as
486    /// required by the concurrency-analysis spec.
487    #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
488    async fn concurrent_writers_no_entry_loss() {
489        let dir = Arc::new(tempfile::tempdir().expect("tempdir"));
490        let reg_path = Arc::new(dir.path().join("registry.json"));
491        let lock_path = Arc::new(dir.path().join("registry.lock"));
492
493        let n_per_task: u32 = 50;
494        let n_tasks: u32 = 2;
495
496        let mut handles = Vec::new();
497        for task_id in 0..n_tasks {
498            let reg_path = Arc::clone(&reg_path);
499            let lock_path = Arc::clone(&lock_path);
500
501            let handle = tokio::task::spawn_blocking(move || {
502                for i in 0..n_per_task {
503                    let sid = format!("t{task_id}-s{i}");
504                    // SAFETY: std::process::id() is the live PID of this process.
505                    let entry = make_entry(&sid, std::process::id());
506
507                    with_registry_lock(&lock_path, || {
508                        let mut reg = PoolRegistry::load_or_default(&reg_path)?;
509                        reg.add(entry);
510                        reg.save(&reg_path)
511                    })
512                    .expect("lock + save must not fail");
513                }
514            });
515            handles.push(handle);
516        }
517
518        for h in handles {
519            h.await.expect("task did not panic");
520        }
521
522        // Final verification: load without lock (no concurrent writers left).
523        let final_reg = PoolRegistry::load_or_default(&reg_path).expect("final load_or_default");
524        let expected = (n_per_task * n_tasks) as usize;
525        assert_eq!(
526            final_reg.sessions.len(),
527            expected,
528            "all {expected} entries must be present (no last-writer-wins loss)"
529        );
530    }
531
532    // ── T5: permissions (Unix only) ───────────────────────────────────────────
533
534    /// T5 — save() restricts pool dir to 0700 and registry.json to 0600.
535    ///
536    /// Verifies that the secure-permissions requirement introduced in ST2
537    /// (item 6) is met on Unix targets.  The pool directory must be accessible
538    /// only by its owner (`drwx------`) and the registry file must be
539    /// readable/writable only by its owner (`-rw-------`).
540    #[test]
541    #[cfg(unix)]
542    fn save_sets_secure_permissions() {
543        use std::os::unix::fs::PermissionsExt;
544
545        let dir = tempfile::tempdir().expect("tempdir");
546        // Use a nested path so save() has to create the "pool" subdirectory.
547        let path = dir.path().join("pool/registry.json");
548        let reg = PoolRegistry::default();
549        reg.save(&path).expect("save");
550
551        // (T1) pool directory must be 0700
552        let parent_meta = std::fs::metadata(path.parent().expect("parent")).expect("dir metadata");
553        assert_eq!(
554            parent_meta.permissions().mode() & 0o777,
555            0o700,
556            "pool dir must be 0700 (drwx------)"
557        );
558
559        // (T2) registry.json must be 0600
560        let file_meta = std::fs::metadata(&path).expect("file metadata");
561        assert_eq!(
562            file_meta.permissions().mode() & 0o777,
563            0o600,
564            "registry.json must be 0600 (-rw-------)"
565        );
566    }
567
568    // ── T6: lock file permissions (Unix only) ─────────────────────────────────
569
570    /// T6 — with_registry_lock() restricts the lock file to 0600.
571    ///
572    /// Verifies that the advisory lock sentinel file (`registry.lock`) is
573    /// restricted to owner-only access on Unix targets.
574    #[test]
575    #[cfg(unix)]
576    fn lock_file_sets_secure_permissions() {
577        use std::os::unix::fs::PermissionsExt;
578
579        let dir = tempfile::tempdir().expect("tempdir");
580        let lock_path = dir.path().join("registry.lock");
581        let reg_path = dir.path().join("registry.json");
582
583        // Run a no-op closure so the lock file is created and chmod'd.
584        with_registry_lock(&lock_path, || {
585            let reg = PoolRegistry::default();
586            reg.save(&reg_path)
587        })
588        .expect("with_registry_lock");
589
590        let lock_meta = std::fs::metadata(&lock_path).expect("lock metadata");
591        assert_eq!(
592            lock_meta.permissions().mode() & 0o777,
593            0o600,
594            "registry.lock must be 0600 (-rw-------)"
595        );
596    }
597}