algocline_app/pool/registry.rs
1//! Pool registry: persistent session-to-worker mapping.
2//!
3//! [`PoolRegistry`] tracks live pool worker processes in
4//! `~/.algocline/state/pool/registry.json`. The file survives MCP-process
5//! death so a restarted MCP can rediscover live worker sockets.
6//!
7//! ## Crux invariant (Registry reconnect across restarts)
8//!
9//! `registry.json` is the **persistent source of truth**. The in-memory
10//! `PoolRegistry` value is a short-lived view — callers must reload from disk
11//! after acquiring the advisory lock rather than caching across lock cycles.
12
13use std::path::Path;
14use std::path::PathBuf;
15
16use serde::{Deserialize, Serialize};
17
18use crate::pool::PoolError;
19use crate::service::lock::LockError;
20use crate::service::manifest::now_iso8601;
21
22// ─── Entry ────────────────────────────────────────────────────────────────────
23
24/// A single session entry in the pool registry.
25///
26/// # Fields
27///
28/// - `sid` — session ID string (UUID or similar).
29/// - `pid` — OS process-ID of the worker; used for liveness checks via
30/// `kill -0`.
31/// - `sock` — absolute path to the Unix-domain socket owned by the worker.
32/// - `version` — crate version at the time the worker was spawned; used in
33/// version-handshake validation.
34/// - `created_at` — ISO 8601 timestamp of worker creation.
35#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
36pub struct PoolSessionEntry {
37 /// Session identifier.
38 pub sid: String,
39 /// Worker process ID (`u32` — never zero or negative on POSIX).
40 pub pid: u32,
41 /// Absolute path of the worker's Unix-domain socket.
42 pub sock: PathBuf,
43 /// Crate version that spawned the worker (for handshake validation).
44 pub version: String,
45 /// ISO 8601 creation timestamp.
46 pub created_at: String,
47}
48
49impl PoolSessionEntry {
50 /// Create a new entry stamped with the current time.
51 ///
52 /// # Arguments
53 ///
54 /// * `sid` — session identifier.
55 /// * `pid` — worker process ID.
56 /// * `sock` — absolute path to the worker's UDS socket.
57 /// * `version` — crate version string (e.g. `env!("CARGO_PKG_VERSION")`).
58 ///
59 /// # Returns
60 ///
61 /// A new `PoolSessionEntry` with `created_at` set to the current UTC time.
62 pub fn new(
63 sid: impl Into<String>,
64 pid: u32,
65 sock: PathBuf,
66 version: impl Into<String>,
67 ) -> Self {
68 Self {
69 sid: sid.into(),
70 pid,
71 sock,
72 version: version.into(),
73 created_at: now_iso8601(),
74 }
75 }
76}
77
78// ─── Registry ─────────────────────────────────────────────────────────────────
79
80/// In-memory view of `registry.json`.
81///
82/// This struct must **always** be loaded from and saved to disk within a
83/// single advisory-lock region (see [`with_registry_lock`]). Do not hold
84/// a `PoolRegistry` value across lock boundaries.
85///
86/// ## Crux: registry.json is the persistent source of truth
87///
88/// MCP processes must not rely on any in-memory state to discover live
89/// workers after a restart. Every mutation path must call [`save`] before
90/// dropping the lock.
91///
92/// [`save`]: PoolRegistry::save
93#[derive(Debug, Clone, Serialize, Deserialize, Default, PartialEq)]
94pub struct PoolRegistry {
95 /// All currently-registered worker sessions.
96 pub sessions: Vec<PoolSessionEntry>,
97}
98
99impl PoolRegistry {
100 /// Load `registry.json` from disk, returning an empty registry if the
101 /// file does not exist.
102 ///
103 /// # Arguments
104 ///
105 /// * `path` — absolute path to `registry.json`.
106 ///
107 /// # Returns
108 ///
109 /// `Ok(PoolRegistry)` — either the parsed on-disk state or an empty
110 /// registry when the file is absent.
111 ///
112 /// # Errors
113 ///
114 /// Returns `PoolError::RegistryCorrupted(reason)` if the file exists but
115 /// cannot be parsed as valid JSON. **Never** falls back to an empty
116 /// registry on parse failure — callers must handle the error explicitly
117 /// and propagate it to the MCP wire layer.
118 ///
119 /// # Concurrency
120 ///
121 /// This is a synchronous file read. The caller must hold the advisory
122 /// `fs4::fs_std::FileExt::lock_exclusive` on `registry.lock` before
123 /// calling this method to prevent concurrent read-modify-write races
124 /// between multiple MCP processes.
125 pub fn load_or_default(path: &Path) -> Result<Self, PoolError> {
126 if !path.exists() {
127 return Ok(Self::default());
128 }
129 let content = std::fs::read_to_string(path).map_err(|e| {
130 PoolError::RegistryCorrupted(format!("failed to read {}: {e}", path.display()))
131 })?;
132 serde_json::from_str(&content).map_err(|e| {
133 PoolError::RegistryCorrupted(format!("failed to parse {}: {e}", path.display()))
134 })
135 }
136
137 /// Atomically persist the registry to `registry.json` via
138 /// `tempfile::NamedTempFile::persist` (POSIX `rename(2)`).
139 ///
140 /// # Arguments
141 ///
142 /// * `path` — absolute path to `registry.json`.
143 ///
144 /// # Returns
145 ///
146 /// `Ok(())` on success.
147 ///
148 /// # Errors
149 ///
150 /// Returns `PoolError::RegistryCorrupted` if parent-directory creation,
151 /// serialization, temp-file creation/write/fsync, or the atomic rename
152 /// fails.
153 ///
154 /// # Atomicity
155 ///
156 /// `NamedTempFile::persist` is atomic on modern Linux filesystems and
157 /// macOS. It is **not** guaranteed atomic on all platforms.
158 ///
159 /// # Concurrency
160 ///
161 /// Callers must hold the advisory `fs4::fs_std::FileExt::lock_exclusive`
162 /// on `registry.lock` for the entire read-modify-write cycle
163 /// (load → mutate → save) to prevent last-writer-wins data loss when
164 /// multiple MCP processes write concurrently.
165 pub fn save(&self, path: &Path) -> Result<(), PoolError> {
166 let parent = path.parent().ok_or_else(|| {
167 PoolError::RegistryCorrupted(format!(
168 "registry path has no parent directory: {}",
169 path.display()
170 ))
171 })?;
172
173 std::fs::create_dir_all(parent).map_err(|e| {
174 PoolError::RegistryCorrupted(format!(
175 "failed to create registry directory {}: {e}",
176 parent.display()
177 ))
178 })?;
179
180 // Restrict the pool directory to the owning user only (0700 = drwx------).
181 // This prevents other local users from listing or accessing pool sockets and
182 // the registry file. set_permissions failure is fatal — propagate as
183 // RegistryCorrupted per CLAUDE.md §Service 層の Error 伝播規律.
184 #[cfg(unix)]
185 {
186 use std::os::unix::fs::PermissionsExt;
187 std::fs::set_permissions(parent, std::fs::Permissions::from_mode(0o700)).map_err(
188 |e| {
189 PoolError::RegistryCorrupted(format!(
190 "failed to set permissions on {}: {e}",
191 parent.display()
192 ))
193 },
194 )?;
195 }
196
197 let content = serde_json::to_string_pretty(self).map_err(|e| {
198 PoolError::RegistryCorrupted(format!("failed to serialize registry: {e}"))
199 })?;
200
201 let mut tmp = tempfile::NamedTempFile::new_in(parent).map_err(|e| {
202 PoolError::RegistryCorrupted(format!(
203 "failed to create temp file in {}: {e}",
204 parent.display()
205 ))
206 })?;
207
208 {
209 use std::io::Write;
210 tmp.write_all(content.as_bytes()).map_err(|e| {
211 PoolError::RegistryCorrupted(format!("failed to write registry temp file: {e}"))
212 })?;
213 tmp.as_file().sync_all().map_err(|e| {
214 PoolError::RegistryCorrupted(format!("failed to fsync registry temp file: {e}"))
215 })?;
216 }
217
218 tmp.persist(path).map_err(|e| {
219 PoolError::RegistryCorrupted(format!(
220 "failed to atomically replace {} with temp file: {e}",
221 path.display()
222 ))
223 })?;
224
225 // Restrict the registry file to the owning user only (0600 = -rw-------).
226 // Applied after persist so we operate on the final path, not the temp file.
227 #[cfg(unix)]
228 {
229 use std::os::unix::fs::PermissionsExt;
230 std::fs::set_permissions(path, std::fs::Permissions::from_mode(0o600)).map_err(
231 |e| {
232 PoolError::RegistryCorrupted(format!(
233 "failed to set permissions on {}: {e}",
234 path.display()
235 ))
236 },
237 )?;
238 }
239
240 Ok(())
241 }
242
243 /// Scan all registered sessions and remove entries whose worker process
244 /// is no longer alive, returning the surviving (live) entries.
245 ///
246 /// Liveness is tested with `kill(pid, 0)` (POSIX signal 0 — does not
247 /// send a signal, only checks whether the process exists). An `ESRCH`
248 /// return value means the process does not exist and the entry is pruned.
249 ///
250 /// # Arguments
251 ///
252 /// None — operates on `&mut self` in place.
253 ///
254 /// # Returns
255 ///
256 /// `Ok(Vec<PoolSessionEntry>)` — the subset of sessions that survived GC
257 /// (i.e. whose worker process is still alive).
258 ///
259 /// # Errors
260 ///
261 /// Currently infallible on POSIX; the `Result` wrapper is kept for future
262 /// extension.
263 ///
264 /// # Platform support
265 ///
266 /// On non-Unix targets the liveness check is omitted and all entries are
267 /// assumed live (conservative).
268 pub fn scan_and_gc(&mut self) -> Result<Vec<PoolSessionEntry>, PoolError> {
269 let before_len = self.sessions.len();
270
271 #[cfg(unix)]
272 self.sessions.retain(|entry| {
273 // Guard u32 → i32 (pid_t) conversion: pids above i32::MAX would
274 // produce a negative value and send the signal to an unintended
275 // process group (K-52). Treat overflow as "dead" and prune.
276 let pid_t = match i32::try_from(entry.pid) {
277 Ok(p) => p,
278 Err(_) => {
279 tracing::warn!(
280 pid = entry.pid,
281 sid = %entry.sid,
282 "pid exceeds i32::MAX, treating as dead (K-52)"
283 );
284 return false;
285 }
286 };
287 // SAFETY: libc::kill(pid, sig) is a thin syscall wrapper.
288 // pid > 0 sends to the specific process (never to a group).
289 // sig == 0 performs an existence check without delivering a signal.
290 // pid fits in i32, verified by try_from above.
291 // Return 0 → process exists (live).
292 // Return -1 with errno ESRCH → no such process (dead / orphan).
293 let result = unsafe { libc::kill(pid_t, 0) };
294 result == 0
295 });
296
297 // On non-Unix platforms: retain all entries (conservative fallback).
298 #[cfg(not(unix))]
299 let _ = before_len;
300
301 let _ = before_len; // suppress unused warning on non-unix
302 Ok(self.sessions.clone())
303 }
304
305 /// Add a session entry to the registry.
306 ///
307 /// # Arguments
308 ///
309 /// * `entry` — the `PoolSessionEntry` to insert.
310 ///
311 /// # Notes
312 ///
313 /// Does not persist to disk. Call [`save`](PoolRegistry::save) after
314 /// mutating to ensure durability.
315 pub fn add(&mut self, entry: PoolSessionEntry) {
316 self.sessions.push(entry);
317 }
318
319 /// Remove the entry with the given session ID.
320 ///
321 /// # Arguments
322 ///
323 /// * `sid` — session ID to remove.
324 ///
325 /// # Returns
326 ///
327 /// `true` if an entry was found and removed, `false` if no matching entry
328 /// existed.
329 ///
330 /// # Notes
331 ///
332 /// Does not persist to disk. Call [`save`](PoolRegistry::save) after
333 /// mutating to ensure durability.
334 pub fn remove(&mut self, sid: &str) -> bool {
335 let before = self.sessions.len();
336 self.sessions.retain(|e| e.sid != sid);
337 self.sessions.len() < before
338 }
339
340 /// Look up a session entry by ID.
341 ///
342 /// # Arguments
343 ///
344 /// * `sid` — session ID to search for.
345 ///
346 /// # Returns
347 ///
348 /// `Some(&PoolSessionEntry)` if found, `None` otherwise.
349 pub fn find(&self, sid: &str) -> Option<&PoolSessionEntry> {
350 self.sessions.iter().find(|e| e.sid == sid)
351 }
352}
353
354// ─── Advisory-lock helper ─────────────────────────────────────────────────────
355
356/// Run `f` while holding an exclusive advisory lock on `lock_path`, using the
357/// same `fs4`-backed mechanism as `service::lock::with_exclusive_lock`.
358///
359/// Callers should pass the `registry.lock` sentinel path (e.g.
360/// `app_dir.root().join("pool/registry.lock")`).
361///
362/// # Arguments
363///
364/// * `lock_path` — path to the advisory lock file (created if absent).
365/// * `f` — closure to run under the lock.
366///
367/// # Returns
368///
369/// Propagates the return value of `f`.
370///
371/// # Errors
372///
373/// Returns `PoolError::RegistryCorrupted` if the lock file cannot be created
374/// or the exclusive lock cannot be acquired.
375///
376/// # Concurrency
377///
378/// The lock is released when the underlying `File` is dropped, which occurs
379/// on all exit paths from this function including panics (RAII / drop).
380pub fn with_registry_lock<F, R>(lock_path: &Path, f: F) -> Result<R, PoolError>
381where
382 F: FnOnce() -> Result<R, PoolError>,
383{
384 crate::service::lock::with_exclusive_lock(lock_path, f)
385}
386
387/// Bridge so that `lock::with_exclusive_lock` generic `E: From<LockError>`
388/// constraint is satisfied when `E = PoolError`.
389impl From<LockError> for PoolError {
390 fn from(e: LockError) -> Self {
391 PoolError::RegistryCorrupted(e.to_string())
392 }
393}
394
395// ─── Tests ────────────────────────────────────────────────────────────────────
396
397#[cfg(test)]
398mod tests {
399 use std::sync::Arc;
400
401 use super::*;
402
403 // ── helpers ──────────────────────────────────────────────────────────────
404
405 fn make_entry(sid: &str, pid: u32) -> PoolSessionEntry {
406 PoolSessionEntry::new(
407 sid,
408 pid,
409 PathBuf::from(format!("/tmp/alc-pool/{sid}.sock")),
410 "0.30.0",
411 )
412 }
413
414 // ── T1: happy path ────────────────────────────────────────────────────────
415
416 /// T1 — load_or_default returns empty registry when the file is absent.
417 #[test]
418 fn load_default_when_absent() {
419 let dir = tempfile::tempdir().expect("tempdir");
420 let path = dir.path().join("registry.json");
421
422 let reg = PoolRegistry::load_or_default(&path).expect("load_or_default");
423 assert!(reg.sessions.is_empty(), "expected empty registry");
424 }
425
426 // ── T2: boundary / edge ───────────────────────────────────────────────────
427
428 /// T2 — scan_and_gc removes the dead-PID entry and retains the live one.
429 ///
430 /// Uses the current process PID as the "live" entry (guaranteed to exist)
431 /// and pid=999999 as the "dead" entry (virtually certain to be absent).
432 #[test]
433 fn scan_and_gc_removes_dead_pid() {
434 // SAFETY: std::process::id() returns the current PID, which is alive.
435 let live_pid = std::process::id();
436
437 let mut reg = PoolRegistry::default();
438 reg.add(make_entry("live-session", live_pid));
439 reg.add(make_entry("dead-session", 999_999));
440
441 let survivors = reg.scan_and_gc().expect("scan_and_gc");
442
443 assert_eq!(survivors.len(), 1, "expected 1 survivor");
444 assert_eq!(survivors[0].sid, "live-session");
445 assert_eq!(
446 reg.sessions.len(),
447 1,
448 "in-place mutation must prune dead entry"
449 );
450 assert!(
451 reg.find("dead-session").is_none(),
452 "dead entry must be gone"
453 );
454 assert!(reg.find("live-session").is_some(), "live entry must remain");
455 }
456
457 // ── T3: error path ────────────────────────────────────────────────────────
458
459 /// T3 — load_or_default returns PoolError::RegistryCorrupted for bad JSON.
460 ///
461 /// Verifies that parse failures are NOT silently swallowed as empty
462 /// registries — CLAUDE.md 2026-04-22 事故と同じパターンの再発防止。
463 #[test]
464 fn load_corrupted_returns_pool_error() {
465 let dir = tempfile::tempdir().expect("tempdir");
466 let path = dir.path().join("registry.json");
467
468 // Write intentionally broken JSON.
469 std::fs::write(&path, b"{ not valid json !!!").expect("write");
470
471 let result = PoolRegistry::load_or_default(&path);
472 match result {
473 Err(PoolError::RegistryCorrupted(msg)) => {
474 assert!(!msg.is_empty(), "error message must not be empty");
475 }
476 other => panic!("expected RegistryCorrupted, got {other:?}"),
477 }
478 }
479
480 // ── T4: concurrent writers (advisory lock prevents entry loss) ────────────
481
482 /// T4 — two concurrent tasks each perform 50 add→save cycles under the
483 /// advisory lock; the final registry must contain all entries.
484 ///
485 /// Uses `#[tokio::test(flavor = "multi_thread", worker_threads = 4)]` as
486 /// required by the concurrency-analysis spec.
487 #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
488 async fn concurrent_writers_no_entry_loss() {
489 let dir = Arc::new(tempfile::tempdir().expect("tempdir"));
490 let reg_path = Arc::new(dir.path().join("registry.json"));
491 let lock_path = Arc::new(dir.path().join("registry.lock"));
492
493 let n_per_task: u32 = 50;
494 let n_tasks: u32 = 2;
495
496 let mut handles = Vec::new();
497 for task_id in 0..n_tasks {
498 let reg_path = Arc::clone(®_path);
499 let lock_path = Arc::clone(&lock_path);
500
501 let handle = tokio::task::spawn_blocking(move || {
502 for i in 0..n_per_task {
503 let sid = format!("t{task_id}-s{i}");
504 // SAFETY: std::process::id() is the live PID of this process.
505 let entry = make_entry(&sid, std::process::id());
506
507 with_registry_lock(&lock_path, || {
508 let mut reg = PoolRegistry::load_or_default(®_path)?;
509 reg.add(entry);
510 reg.save(®_path)
511 })
512 .expect("lock + save must not fail");
513 }
514 });
515 handles.push(handle);
516 }
517
518 for h in handles {
519 h.await.expect("task did not panic");
520 }
521
522 // Final verification: load without lock (no concurrent writers left).
523 let final_reg = PoolRegistry::load_or_default(®_path).expect("final load_or_default");
524 let expected = (n_per_task * n_tasks) as usize;
525 assert_eq!(
526 final_reg.sessions.len(),
527 expected,
528 "all {expected} entries must be present (no last-writer-wins loss)"
529 );
530 }
531
532 // ── T5: permissions (Unix only) ───────────────────────────────────────────
533
534 /// T5 — save() restricts pool dir to 0700 and registry.json to 0600.
535 ///
536 /// Verifies that the secure-permissions requirement introduced in ST2
537 /// (item 6) is met on Unix targets. The pool directory must be accessible
538 /// only by its owner (`drwx------`) and the registry file must be
539 /// readable/writable only by its owner (`-rw-------`).
540 #[test]
541 #[cfg(unix)]
542 fn save_sets_secure_permissions() {
543 use std::os::unix::fs::PermissionsExt;
544
545 let dir = tempfile::tempdir().expect("tempdir");
546 // Use a nested path so save() has to create the "pool" subdirectory.
547 let path = dir.path().join("pool/registry.json");
548 let reg = PoolRegistry::default();
549 reg.save(&path).expect("save");
550
551 // (T1) pool directory must be 0700
552 let parent_meta = std::fs::metadata(path.parent().expect("parent")).expect("dir metadata");
553 assert_eq!(
554 parent_meta.permissions().mode() & 0o777,
555 0o700,
556 "pool dir must be 0700 (drwx------)"
557 );
558
559 // (T2) registry.json must be 0600
560 let file_meta = std::fs::metadata(&path).expect("file metadata");
561 assert_eq!(
562 file_meta.permissions().mode() & 0o777,
563 0o600,
564 "registry.json must be 0600 (-rw-------)"
565 );
566 }
567
568 // ── T6: lock file permissions (Unix only) ─────────────────────────────────
569
570 /// T6 — with_registry_lock() restricts the lock file to 0600.
571 ///
572 /// Verifies that the advisory lock sentinel file (`registry.lock`) is
573 /// restricted to owner-only access on Unix targets.
574 #[test]
575 #[cfg(unix)]
576 fn lock_file_sets_secure_permissions() {
577 use std::os::unix::fs::PermissionsExt;
578
579 let dir = tempfile::tempdir().expect("tempdir");
580 let lock_path = dir.path().join("registry.lock");
581 let reg_path = dir.path().join("registry.json");
582
583 // Run a no-op closure so the lock file is created and chmod'd.
584 with_registry_lock(&lock_path, || {
585 let reg = PoolRegistry::default();
586 reg.save(®_path)
587 })
588 .expect("with_registry_lock");
589
590 let lock_meta = std::fs::metadata(&lock_path).expect("lock metadata");
591 assert_eq!(
592 lock_meta.permissions().mode() & 0o777,
593 0o600,
594 "registry.lock must be 0600 (-rw-------)"
595 );
596 }
597}