Skip to main content

kaish_kernel/vfs/
jobfs.rs

1//! JobFs — Virtual filesystem for job observability.
2//!
3//! Provides `/proc`-like access to background job state:
4//!
5//! ```text
6//! /v/jobs/
7//! └── {job_id}/
8//!     ├── stdout   ← live output stream (ring buffer snapshot)
9//!     ├── stderr   ← live error stream
10//!     ├── status   ← "running" | "done:0" | "failed:1"
11//!     └── command  ← the original command string
12//! ```
13//!
14//! This is a read-only, synthesized filesystem. Content is generated from
15//! the JobManager on each read.
16
17use async_trait::async_trait;
18use std::io;
19use std::path::Path;
20use std::sync::Arc;
21
22use super::traits::{DirEntry, EntryType, Filesystem, Metadata};
23use crate::scheduler::{JobId, JobManager};
24
25/// Virtual filesystem providing job observability.
26///
27/// Mounted at `/v/jobs`, this filesystem synthesizes content from the JobManager:
28/// - List root to see all job IDs as directories
29/// - Read `{id}/stdout` for live stdout output
30/// - Read `{id}/stderr` for live stderr output
31/// - Read `{id}/status` for job status ("running", "done:0", "failed:N")
32/// - Read `{id}/command` for the original command string
33pub struct JobFs {
34    jobs: Arc<JobManager>,
35}
36
37impl JobFs {
38    /// Create a new JobFs backed by the given JobManager.
39    pub fn new(jobs: Arc<JobManager>) -> Self {
40        Self { jobs }
41    }
42
43    /// Parse a path into job ID and file name.
44    ///
45    /// Expected formats:
46    /// - "" or "/" → root (list jobs)
47    /// - "{id}" → job directory
48    /// - "{id}/{file}" → specific file (stdout, stderr, status, command)
49    fn parse_path(path: &Path) -> Option<(Option<JobId>, Option<&str>)> {
50        let path_str = path.to_str()?;
51        let path_str = path_str.trim_start_matches('/');
52
53        if path_str.is_empty() {
54            return Some((None, None)); // Root
55        }
56
57        let parts: Vec<&str> = path_str.split('/').collect();
58
59        match parts.as_slice() {
60            [id_str] => {
61                // Just job ID
62                let id: u64 = id_str.parse().ok()?;
63                Some((Some(JobId(id)), None))
64            }
65            [id_str, file] => {
66                // Job ID and file
67                let id: u64 = id_str.parse().ok()?;
68                Some((Some(JobId(id)), Some(*file)))
69            }
70            _ => None,
71        }
72    }
73}
74
75impl std::fmt::Debug for JobFs {
76    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
77        f.debug_struct("JobFs").finish()
78    }
79}
80
81#[async_trait]
82impl Filesystem for JobFs {
83    async fn read(&self, path: &Path) -> io::Result<Vec<u8>> {
84        let (job_id, file) = Self::parse_path(path).ok_or_else(|| {
85            io::Error::new(io::ErrorKind::InvalidInput, "invalid job path")
86        })?;
87
88        let job_id = job_id.ok_or_else(|| {
89            io::Error::new(io::ErrorKind::IsADirectory, "cannot read directory")
90        })?;
91
92        let file = file.ok_or_else(|| {
93            io::Error::new(io::ErrorKind::IsADirectory, "cannot read directory")
94        })?;
95
96        // Check job exists
97        if !self.jobs.exists(job_id).await {
98            return Err(io::Error::new(
99                io::ErrorKind::NotFound,
100                format!("job {} not found", job_id),
101            ));
102        }
103
104        match file {
105            "stdout" => {
106                // Return stream content, or empty if no stream attached
107                let content = self.jobs.read_stdout(job_id).await.unwrap_or_default();
108                Ok(content)
109            }
110            "stderr" => {
111                let content = self.jobs.read_stderr(job_id).await.unwrap_or_default();
112                Ok(content)
113            }
114            "status" => {
115                let status = self
116                    .jobs
117                    .get_status_string(job_id)
118                    .await
119                    .ok_or_else(|| io::Error::new(io::ErrorKind::NotFound, "job not found"))?;
120                Ok(format!("{}\n", status).into_bytes())
121            }
122            "command" => {
123                let command = self
124                    .jobs
125                    .get_command(job_id)
126                    .await
127                    .ok_or_else(|| io::Error::new(io::ErrorKind::NotFound, "job not found"))?;
128                Ok(format!("{}\n", command).into_bytes())
129            }
130            _ => Err(io::Error::new(
131                io::ErrorKind::NotFound,
132                format!("unknown file: {}", file),
133            )),
134        }
135    }
136
137    async fn write(&self, _path: &Path, _data: &[u8]) -> io::Result<()> {
138        Err(io::Error::new(
139            io::ErrorKind::PermissionDenied,
140            "jobfs is read-only",
141        ))
142    }
143
144    async fn list(&self, path: &Path) -> io::Result<Vec<DirEntry>> {
145        let (job_id, file) = Self::parse_path(path).ok_or_else(|| {
146            io::Error::new(io::ErrorKind::InvalidInput, "invalid job path")
147        })?;
148
149        // Can't list a file
150        if file.is_some() {
151            return Err(io::Error::new(
152                io::ErrorKind::NotADirectory,
153                "not a directory",
154            ));
155        }
156
157        match job_id {
158            None => {
159                // List root: all job IDs as directories
160                let job_ids = self.jobs.list_ids().await;
161                let entries = job_ids
162                    .into_iter()
163                    .map(|id| DirEntry {
164                        name: id.0.to_string(),
165                        entry_type: EntryType::Directory,
166                        size: 0,
167                        symlink_target: None,
168                    })
169                    .collect();
170                Ok(entries)
171            }
172            Some(id) => {
173                // List job directory: stdout, stderr, status, command
174                if !self.jobs.exists(id).await {
175                    return Err(io::Error::new(
176                        io::ErrorKind::NotFound,
177                        format!("job {} not found", id),
178                    ));
179                }
180
181                Ok(vec![
182                    DirEntry {
183                        name: "stdout".to_string(),
184                        entry_type: EntryType::File,
185                        size: 0, // Dynamic content
186                        symlink_target: None,
187                    },
188                    DirEntry {
189                        name: "stderr".to_string(),
190                        entry_type: EntryType::File,
191                        size: 0,
192                        symlink_target: None,
193                    },
194                    DirEntry {
195                        name: "status".to_string(),
196                        entry_type: EntryType::File,
197                        size: 0,
198                        symlink_target: None,
199                    },
200                    DirEntry {
201                        name: "command".to_string(),
202                        entry_type: EntryType::File,
203                        size: 0,
204                        symlink_target: None,
205                    },
206                ])
207            }
208        }
209    }
210
211    async fn stat(&self, path: &Path) -> io::Result<Metadata> {
212        let (job_id, file) = Self::parse_path(path).ok_or_else(|| {
213            io::Error::new(io::ErrorKind::InvalidInput, "invalid job path")
214        })?;
215
216        match (job_id, file) {
217            (None, None) => {
218                // Root directory
219                Ok(Metadata {
220                    is_dir: true,
221                    is_file: false,
222                    is_symlink: false,
223                    size: 0,
224                    modified: None,
225                })
226            }
227            (Some(id), None) => {
228                // Job directory
229                if !self.jobs.exists(id).await {
230                    return Err(io::Error::new(
231                        io::ErrorKind::NotFound,
232                        format!("job {} not found", id),
233                    ));
234                }
235                Ok(Metadata {
236                    is_dir: true,
237                    is_file: false,
238                    is_symlink: false,
239                    size: 0,
240                    modified: None,
241                })
242            }
243            (Some(id), Some(file)) => {
244                // File inside job directory
245                if !self.jobs.exists(id).await {
246                    return Err(io::Error::new(
247                        io::ErrorKind::NotFound,
248                        format!("job {} not found", id),
249                    ));
250                }
251
252                // Validate file name
253                if !["stdout", "stderr", "status", "command"].contains(&file) {
254                    return Err(io::Error::new(
255                        io::ErrorKind::NotFound,
256                        format!("unknown file: {}", file),
257                    ));
258                }
259
260                Ok(Metadata {
261                    is_dir: false,
262                    is_file: true,
263                    is_symlink: false,
264                    size: 0, // Dynamic content
265                    modified: None,
266                })
267            }
268            (None, Some(_)) => {
269                // Invalid: file at root level
270                Err(io::Error::new(
271                    io::ErrorKind::NotFound,
272                    "invalid path",
273                ))
274            }
275        }
276    }
277
278    async fn mkdir(&self, _path: &Path) -> io::Result<()> {
279        Err(io::Error::new(
280            io::ErrorKind::PermissionDenied,
281            "jobfs is read-only",
282        ))
283    }
284
285    async fn remove(&self, _path: &Path) -> io::Result<()> {
286        Err(io::Error::new(
287            io::ErrorKind::PermissionDenied,
288            "jobfs is read-only",
289        ))
290    }
291
292    fn read_only(&self) -> bool {
293        true
294    }
295}
296
297#[cfg(test)]
298mod tests {
299    use super::*;
300    use crate::interpreter::ExecResult;
301    use crate::scheduler::BoundedStream;
302    use tokio::sync::oneshot;
303
304    async fn make_job_manager_with_job() -> (Arc<JobManager>, JobId) {
305        let manager = Arc::new(JobManager::new());
306
307        // Create streams
308        let stdout = Arc::new(BoundedStream::new(1024));
309        let stderr = Arc::new(BoundedStream::new(1024));
310
311        // Write some output
312        stdout.write(b"hello from stdout\n").await;
313        stderr.write(b"error message\n").await;
314
315        // Create channel for job completion
316        let (tx, rx) = oneshot::channel();
317
318        // Register job
319        let id = manager
320            .register_with_streams("echo test".to_string(), rx, stdout, stderr)
321            .await;
322
323        // Send result (job completes)
324        let _ = tx.send(ExecResult::success("done"));
325
326        (manager, id)
327    }
328
329    #[tokio::test]
330    async fn test_list_root_empty() {
331        let manager = Arc::new(JobManager::new());
332        let fs = JobFs::new(manager);
333
334        let entries = fs.list(Path::new("")).await.unwrap();
335        assert!(entries.is_empty());
336    }
337
338    #[tokio::test]
339    async fn test_list_root_with_jobs() {
340        let (manager, id) = make_job_manager_with_job().await;
341        let fs = JobFs::new(manager);
342
343        let entries = fs.list(Path::new("")).await.unwrap();
344        assert_eq!(entries.len(), 1);
345        assert_eq!(entries[0].name, id.0.to_string());
346        assert_eq!(entries[0].entry_type, EntryType::Directory);
347    }
348
349    #[tokio::test]
350    async fn test_list_job_directory() {
351        let (manager, id) = make_job_manager_with_job().await;
352        let fs = JobFs::new(manager);
353
354        let path = format!("{}", id);
355        let entries = fs.list(Path::new(&path)).await.unwrap();
356
357        let names: Vec<_> = entries.iter().map(|e| &e.name).collect();
358        assert!(names.contains(&&"stdout".to_string()));
359        assert!(names.contains(&&"stderr".to_string()));
360        assert!(names.contains(&&"status".to_string()));
361        assert!(names.contains(&&"command".to_string()));
362    }
363
364    #[tokio::test]
365    async fn test_read_stdout() {
366        let (manager, id) = make_job_manager_with_job().await;
367        let fs = JobFs::new(manager);
368
369        let path = format!("{}/stdout", id);
370        let data = fs.read(Path::new(&path)).await.unwrap();
371        assert_eq!(data, b"hello from stdout\n");
372    }
373
374    #[tokio::test]
375    async fn test_read_stderr() {
376        let (manager, id) = make_job_manager_with_job().await;
377        let fs = JobFs::new(manager);
378
379        let path = format!("{}/stderr", id);
380        let data = fs.read(Path::new(&path)).await.unwrap();
381        assert_eq!(data, b"error message\n");
382    }
383
384    #[tokio::test]
385    async fn test_read_status_running() {
386        let manager = Arc::new(JobManager::new());
387
388        // Create a job that won't complete
389        let stdout = Arc::new(BoundedStream::new(1024));
390        let stderr = Arc::new(BoundedStream::new(1024));
391        let (_tx, rx) = oneshot::channel();
392        let id = manager
393            .register_with_streams("sleep 100".to_string(), rx, stdout, stderr)
394            .await;
395
396        let fs = JobFs::new(manager);
397
398        let path = format!("{}/status", id);
399        let data = fs.read(Path::new(&path)).await.unwrap();
400        assert_eq!(String::from_utf8_lossy(&data), "running\n");
401    }
402
403    #[tokio::test]
404    async fn test_read_status_done() {
405        let (manager, id) = make_job_manager_with_job().await;
406
407        // Wait for job to complete
408        manager.wait(id).await;
409
410        let fs = JobFs::new(manager);
411
412        let path = format!("{}/status", id);
413        let data = fs.read(Path::new(&path)).await.unwrap();
414        assert_eq!(String::from_utf8_lossy(&data), "done:0\n");
415    }
416
417    #[tokio::test]
418    async fn test_read_command() {
419        let (manager, id) = make_job_manager_with_job().await;
420        let fs = JobFs::new(manager);
421
422        let path = format!("{}/command", id);
423        let data = fs.read(Path::new(&path)).await.unwrap();
424        assert_eq!(String::from_utf8_lossy(&data), "echo test\n");
425    }
426
427    #[tokio::test]
428    async fn test_stat_root() {
429        let manager = Arc::new(JobManager::new());
430        let fs = JobFs::new(manager);
431
432        let meta = fs.stat(Path::new("")).await.unwrap();
433        assert!(meta.is_dir);
434    }
435
436    #[tokio::test]
437    async fn test_stat_job_dir() {
438        let (manager, id) = make_job_manager_with_job().await;
439        let fs = JobFs::new(manager);
440
441        let path = format!("{}", id);
442        let meta = fs.stat(Path::new(&path)).await.unwrap();
443        assert!(meta.is_dir);
444    }
445
446    #[tokio::test]
447    async fn test_stat_file() {
448        let (manager, id) = make_job_manager_with_job().await;
449        let fs = JobFs::new(manager);
450
451        let path = format!("{}/stdout", id);
452        let meta = fs.stat(Path::new(&path)).await.unwrap();
453        assert!(meta.is_file);
454    }
455
456    #[tokio::test]
457    async fn test_read_only() {
458        let manager = Arc::new(JobManager::new());
459        let fs = JobFs::new(manager);
460
461        assert!(fs.read_only());
462
463        let write_result = fs.write(Path::new("1/stdout"), b"data").await;
464        assert!(write_result.is_err());
465
466        let mkdir_result = fs.mkdir(Path::new("1")).await;
467        assert!(mkdir_result.is_err());
468
469        let remove_result = fs.remove(Path::new("1")).await;
470        assert!(remove_result.is_err());
471    }
472
473    #[tokio::test]
474    async fn test_nonexistent_job() {
475        let manager = Arc::new(JobManager::new());
476        let fs = JobFs::new(manager);
477
478        let result = fs.read(Path::new("999/stdout")).await;
479        assert!(result.is_err());
480        assert_eq!(result.unwrap_err().kind(), io::ErrorKind::NotFound);
481    }
482
483    #[tokio::test]
484    async fn test_unknown_file() {
485        let (manager, id) = make_job_manager_with_job().await;
486        let fs = JobFs::new(manager);
487
488        let path = format!("{}/unknown", id);
489        let result = fs.read(Path::new(&path)).await;
490        assert!(result.is_err());
491        assert_eq!(result.unwrap_err().kind(), io::ErrorKind::NotFound);
492    }
493
494    #[tokio::test]
495    async fn test_read_directory_error() {
496        let (manager, id) = make_job_manager_with_job().await;
497        let fs = JobFs::new(manager);
498
499        let path = format!("{}", id);
500        let result = fs.read(Path::new(&path)).await;
501        assert!(result.is_err());
502        assert_eq!(result.unwrap_err().kind(), io::ErrorKind::IsADirectory);
503    }
504}