Skip to main content

kaish_kernel/vfs/
jobfs.rs

1//! JobFs — Virtual filesystem for job observability.
2//!
3//! Provides `/proc`-like access to background job state:
4//!
5//! ```text
6//! /v/jobs/
7//! └── {job_id}/
8//!     ├── stdout   ← live output stream (ring buffer snapshot)
9//!     ├── stderr   ← live error stream
10//!     ├── status   ← "running" | "done:0" | "failed:1"
11//!     └── command  ← the original command string
12//! ```
13//!
14//! This is a read-only, synthesized filesystem. Content is generated from
15//! the JobManager on each read.
16
17use async_trait::async_trait;
18use std::io;
19use std::path::Path;
20use std::sync::Arc;
21
22use super::traits::{DirEntry, DirEntryKind, Filesystem};
23use crate::scheduler::{JobId, JobManager};
24
25/// Virtual filesystem providing job observability.
26///
27/// Mounted at `/v/jobs`, this filesystem synthesizes content from the JobManager:
28/// - List root to see all job IDs as directories
29/// - Read `{id}/stdout` for live stdout output
30/// - Read `{id}/stderr` for live stderr output
31/// - Read `{id}/status` for job status ("running", "done:0", "failed:N")
32/// - Read `{id}/command` for the original command string
33pub struct JobFs {
34    jobs: Arc<JobManager>,
35}
36
37impl JobFs {
38    /// Create a new JobFs backed by the given JobManager.
39    pub fn new(jobs: Arc<JobManager>) -> Self {
40        Self { jobs }
41    }
42
43    /// Parse a path into job ID and file name.
44    ///
45    /// Expected formats:
46    /// - "" or "/" → root (list jobs)
47    /// - "{id}" → job directory
48    /// - "{id}/{file}" → specific file (stdout, stderr, status, command)
49    fn parse_path(path: &Path) -> Option<(Option<JobId>, Option<&str>)> {
50        let path_str = path.to_str()?;
51        let path_str = path_str.trim_start_matches('/');
52
53        if path_str.is_empty() {
54            return Some((None, None)); // Root
55        }
56
57        let parts: Vec<&str> = path_str.split('/').collect();
58
59        match parts.as_slice() {
60            [id_str] => {
61                // Just job ID
62                let id: u64 = id_str.parse().ok()?;
63                Some((Some(JobId(id)), None))
64            }
65            [id_str, file] => {
66                // Job ID and file
67                let id: u64 = id_str.parse().ok()?;
68                Some((Some(JobId(id)), Some(*file)))
69            }
70            _ => None,
71        }
72    }
73}
74
75impl std::fmt::Debug for JobFs {
76    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
77        f.debug_struct("JobFs").finish()
78    }
79}
80
81#[async_trait]
82impl Filesystem for JobFs {
83    async fn read(&self, path: &Path) -> io::Result<Vec<u8>> {
84        let (job_id, file) = Self::parse_path(path).ok_or_else(|| {
85            io::Error::new(io::ErrorKind::InvalidInput, "invalid job path")
86        })?;
87
88        let job_id = job_id.ok_or_else(|| {
89            io::Error::new(io::ErrorKind::IsADirectory, "cannot read directory")
90        })?;
91
92        let file = file.ok_or_else(|| {
93            io::Error::new(io::ErrorKind::IsADirectory, "cannot read directory")
94        })?;
95
96        // Check job exists
97        if !self.jobs.exists(job_id).await {
98            return Err(io::Error::new(
99                io::ErrorKind::NotFound,
100                format!("job {} not found", job_id),
101            ));
102        }
103
104        match file {
105            "stdout" => {
106                // Return stream content, or empty if no stream attached
107                let content = self.jobs.read_stdout(job_id).await.unwrap_or_default();
108                Ok(content)
109            }
110            "stderr" => {
111                let content = self.jobs.read_stderr(job_id).await.unwrap_or_default();
112                Ok(content)
113            }
114            "status" => {
115                let status = self
116                    .jobs
117                    .get_status_string(job_id)
118                    .await
119                    .ok_or_else(|| io::Error::new(io::ErrorKind::NotFound, "job not found"))?;
120                Ok(format!("{}\n", status).into_bytes())
121            }
122            "command" => {
123                let command = self
124                    .jobs
125                    .get_command(job_id)
126                    .await
127                    .ok_or_else(|| io::Error::new(io::ErrorKind::NotFound, "job not found"))?;
128                Ok(format!("{}\n", command).into_bytes())
129            }
130            _ => Err(io::Error::new(
131                io::ErrorKind::NotFound,
132                format!("unknown file: {}", file),
133            )),
134        }
135    }
136
137    async fn write(&self, _path: &Path, _data: &[u8]) -> io::Result<()> {
138        Err(io::Error::new(
139            io::ErrorKind::PermissionDenied,
140            "jobfs is read-only",
141        ))
142    }
143
144    async fn list(&self, path: &Path) -> io::Result<Vec<DirEntry>> {
145        let (job_id, file) = Self::parse_path(path).ok_or_else(|| {
146            io::Error::new(io::ErrorKind::InvalidInput, "invalid job path")
147        })?;
148
149        // Can't list a file
150        if file.is_some() {
151            return Err(io::Error::new(
152                io::ErrorKind::NotADirectory,
153                "not a directory",
154            ));
155        }
156
157        match job_id {
158            None => {
159                // List root: all job IDs as directories
160                let job_ids = self.jobs.list_ids().await;
161                let entries = job_ids
162                    .into_iter()
163                    .map(|id| DirEntry {
164                        name: id.0.to_string(),
165                        kind: DirEntryKind::Directory,
166                        modified: None,
167                        permissions: None,
168                        size: 0,
169                        symlink_target: None,
170                    })
171                    .collect();
172                Ok(entries)
173            }
174            Some(id) => {
175                // List job directory: stdout, stderr, status, command
176                if !self.jobs.exists(id).await {
177                    return Err(io::Error::new(
178                        io::ErrorKind::NotFound,
179                        format!("job {} not found", id),
180                    ));
181                }
182
183                Ok(vec![
184                    DirEntry {
185                        name: "stdout".to_string(),
186                        kind: DirEntryKind::File,
187                        modified: None,
188                        permissions: None,
189                        size: 0, // Dynamic content
190                        symlink_target: None,
191                    },
192                    DirEntry {
193                        name: "stderr".to_string(),
194                        kind: DirEntryKind::File,
195                        modified: None,
196                        permissions: None,
197                        size: 0,
198                        symlink_target: None,
199                    },
200                    DirEntry {
201                        name: "status".to_string(),
202                        kind: DirEntryKind::File,
203                        modified: None,
204                        permissions: None,
205                        size: 0,
206                        symlink_target: None,
207                    },
208                    DirEntry {
209                        name: "command".to_string(),
210                        kind: DirEntryKind::File,
211                        modified: None,
212                        permissions: None,
213                        size: 0,
214                        symlink_target: None,
215                    },
216                ])
217            }
218        }
219    }
220
221    async fn stat(&self, path: &Path) -> io::Result<DirEntry> {
222        let (job_id, file) = Self::parse_path(path).ok_or_else(|| {
223            io::Error::new(io::ErrorKind::InvalidInput, "invalid job path")
224        })?;
225
226        let name = path
227            .file_name()
228            .map(|n| n.to_string_lossy().into_owned())
229            .unwrap_or_else(|| "/".to_string());
230
231        match (job_id, file) {
232            (None, None) => {
233                // Root directory
234                Ok(DirEntry::directory(name))
235            }
236            (Some(id), None) => {
237                // Job directory
238                if !self.jobs.exists(id).await {
239                    return Err(io::Error::new(
240                        io::ErrorKind::NotFound,
241                        format!("job {} not found", id),
242                    ));
243                }
244                Ok(DirEntry::directory(name))
245            }
246            (Some(id), Some(file)) => {
247                // File inside job directory
248                if !self.jobs.exists(id).await {
249                    return Err(io::Error::new(
250                        io::ErrorKind::NotFound,
251                        format!("job {} not found", id),
252                    ));
253                }
254
255                // Validate file name
256                if !["stdout", "stderr", "status", "command"].contains(&file) {
257                    return Err(io::Error::new(
258                        io::ErrorKind::NotFound,
259                        format!("unknown file: {}", file),
260                    ));
261                }
262
263                Ok(DirEntry::file(name, 0))
264            }
265            (None, Some(_)) => {
266                // Invalid: file at root level
267                Err(io::Error::new(
268                    io::ErrorKind::NotFound,
269                    "invalid path",
270                ))
271            }
272        }
273    }
274
275    async fn mkdir(&self, _path: &Path) -> io::Result<()> {
276        Err(io::Error::new(
277            io::ErrorKind::PermissionDenied,
278            "jobfs is read-only",
279        ))
280    }
281
282    async fn remove(&self, _path: &Path) -> io::Result<()> {
283        Err(io::Error::new(
284            io::ErrorKind::PermissionDenied,
285            "jobfs is read-only",
286        ))
287    }
288
289    fn read_only(&self) -> bool {
290        true
291    }
292}
293
294#[cfg(test)]
295mod tests {
296    use super::*;
297    use crate::interpreter::ExecResult;
298    use crate::scheduler::BoundedStream;
299    use tokio::sync::oneshot;
300
301    async fn make_job_manager_with_job() -> (Arc<JobManager>, JobId) {
302        let manager = Arc::new(JobManager::new());
303
304        // Create streams
305        let stdout = Arc::new(BoundedStream::new(1024));
306        let stderr = Arc::new(BoundedStream::new(1024));
307
308        // Write some output
309        stdout.write(b"hello from stdout\n").await;
310        stderr.write(b"error message\n").await;
311
312        // Create channel for job completion
313        let (tx, rx) = oneshot::channel();
314
315        // Register job
316        let id = manager
317            .register_with_streams("echo test".to_string(), rx, stdout, stderr)
318            .await;
319
320        // Send result (job completes)
321        let _ = tx.send(ExecResult::success("done"));
322
323        (manager, id)
324    }
325
326    #[tokio::test]
327    async fn test_list_root_empty() {
328        let manager = Arc::new(JobManager::new());
329        let fs = JobFs::new(manager);
330
331        let entries = fs.list(Path::new("")).await.unwrap();
332        assert!(entries.is_empty());
333    }
334
335    #[tokio::test]
336    async fn test_list_root_with_jobs() {
337        let (manager, id) = make_job_manager_with_job().await;
338        let fs = JobFs::new(manager);
339
340        let entries = fs.list(Path::new("")).await.unwrap();
341        assert_eq!(entries.len(), 1);
342        assert_eq!(entries[0].name, id.0.to_string());
343        assert_eq!(entries[0].kind, DirEntryKind::Directory);
344    }
345
346    #[tokio::test]
347    async fn test_list_job_directory() {
348        let (manager, id) = make_job_manager_with_job().await;
349        let fs = JobFs::new(manager);
350
351        let path = format!("{}", id);
352        let entries = fs.list(Path::new(&path)).await.unwrap();
353
354        let names: Vec<_> = entries.iter().map(|e| &e.name).collect();
355        assert!(names.contains(&&"stdout".to_string()));
356        assert!(names.contains(&&"stderr".to_string()));
357        assert!(names.contains(&&"status".to_string()));
358        assert!(names.contains(&&"command".to_string()));
359    }
360
361    #[tokio::test]
362    async fn test_read_stdout() {
363        let (manager, id) = make_job_manager_with_job().await;
364        let fs = JobFs::new(manager);
365
366        let path = format!("{}/stdout", id);
367        let data = fs.read(Path::new(&path)).await.unwrap();
368        assert_eq!(data, b"hello from stdout\n");
369    }
370
371    #[tokio::test]
372    async fn test_read_stderr() {
373        let (manager, id) = make_job_manager_with_job().await;
374        let fs = JobFs::new(manager);
375
376        let path = format!("{}/stderr", id);
377        let data = fs.read(Path::new(&path)).await.unwrap();
378        assert_eq!(data, b"error message\n");
379    }
380
381    #[tokio::test]
382    async fn test_read_status_running() {
383        let manager = Arc::new(JobManager::new());
384
385        // Create a job that won't complete
386        let stdout = Arc::new(BoundedStream::new(1024));
387        let stderr = Arc::new(BoundedStream::new(1024));
388        let (_tx, rx) = oneshot::channel();
389        let id = manager
390            .register_with_streams("sleep 100".to_string(), rx, stdout, stderr)
391            .await;
392
393        let fs = JobFs::new(manager);
394
395        let path = format!("{}/status", id);
396        let data = fs.read(Path::new(&path)).await.unwrap();
397        assert_eq!(String::from_utf8_lossy(&data), "running\n");
398    }
399
400    #[tokio::test]
401    async fn test_read_status_done() {
402        let (manager, id) = make_job_manager_with_job().await;
403
404        // Wait for job to complete
405        manager.wait(id).await;
406
407        let fs = JobFs::new(manager);
408
409        let path = format!("{}/status", id);
410        let data = fs.read(Path::new(&path)).await.unwrap();
411        assert_eq!(String::from_utf8_lossy(&data), "done:0\n");
412    }
413
414    #[tokio::test]
415    async fn test_read_command() {
416        let (manager, id) = make_job_manager_with_job().await;
417        let fs = JobFs::new(manager);
418
419        let path = format!("{}/command", id);
420        let data = fs.read(Path::new(&path)).await.unwrap();
421        assert_eq!(String::from_utf8_lossy(&data), "echo test\n");
422    }
423
424    #[tokio::test]
425    async fn test_stat_root() {
426        let manager = Arc::new(JobManager::new());
427        let fs = JobFs::new(manager);
428
429        let entry = fs.stat(Path::new("")).await.unwrap();
430        assert_eq!(entry.kind, DirEntryKind::Directory);
431    }
432
433    #[tokio::test]
434    async fn test_stat_job_dir() {
435        let (manager, id) = make_job_manager_with_job().await;
436        let fs = JobFs::new(manager);
437
438        let path = format!("{}", id);
439        let entry = fs.stat(Path::new(&path)).await.unwrap();
440        assert_eq!(entry.kind, DirEntryKind::Directory);
441    }
442
443    #[tokio::test]
444    async fn test_stat_file() {
445        let (manager, id) = make_job_manager_with_job().await;
446        let fs = JobFs::new(manager);
447
448        let path = format!("{}/stdout", id);
449        let entry = fs.stat(Path::new(&path)).await.unwrap();
450        assert_eq!(entry.kind, DirEntryKind::File);
451    }
452
453    #[tokio::test]
454    async fn test_read_only() {
455        let manager = Arc::new(JobManager::new());
456        let fs = JobFs::new(manager);
457
458        assert!(fs.read_only());
459
460        let write_result = fs.write(Path::new("1/stdout"), b"data").await;
461        assert!(write_result.is_err());
462
463        let mkdir_result = fs.mkdir(Path::new("1")).await;
464        assert!(mkdir_result.is_err());
465
466        let remove_result = fs.remove(Path::new("1")).await;
467        assert!(remove_result.is_err());
468    }
469
470    #[tokio::test]
471    async fn test_nonexistent_job() {
472        let manager = Arc::new(JobManager::new());
473        let fs = JobFs::new(manager);
474
475        let result = fs.read(Path::new("999/stdout")).await;
476        assert!(result.is_err());
477        assert_eq!(result.unwrap_err().kind(), io::ErrorKind::NotFound);
478    }
479
480    #[tokio::test]
481    async fn test_unknown_file() {
482        let (manager, id) = make_job_manager_with_job().await;
483        let fs = JobFs::new(manager);
484
485        let path = format!("{}/unknown", id);
486        let result = fs.read(Path::new(&path)).await;
487        assert!(result.is_err());
488        assert_eq!(result.unwrap_err().kind(), io::ErrorKind::NotFound);
489    }
490
491    #[tokio::test]
492    async fn test_read_directory_error() {
493        let (manager, id) = make_job_manager_with_job().await;
494        let fs = JobFs::new(manager);
495
496        let path = format!("{}", id);
497        let result = fs.read(Path::new(&path)).await;
498        assert!(result.is_err());
499        assert_eq!(result.unwrap_err().kind(), io::ErrorKind::IsADirectory);
500    }
501}