1use async_trait::async_trait;
18use std::io;
19use std::path::Path;
20use std::sync::Arc;
21
22use super::traits::{DirEntry, EntryType, Filesystem, Metadata};
23use crate::scheduler::{JobId, JobManager};
24
25pub struct JobFs {
34 jobs: Arc<JobManager>,
35}
36
37impl JobFs {
38 pub fn new(jobs: Arc<JobManager>) -> Self {
40 Self { jobs }
41 }
42
43 fn parse_path(path: &Path) -> Option<(Option<JobId>, Option<&str>)> {
50 let path_str = path.to_str()?;
51 let path_str = path_str.trim_start_matches('/');
52
53 if path_str.is_empty() {
54 return Some((None, None)); }
56
57 let parts: Vec<&str> = path_str.split('/').collect();
58
59 match parts.as_slice() {
60 [id_str] => {
61 let id: u64 = id_str.parse().ok()?;
63 Some((Some(JobId(id)), None))
64 }
65 [id_str, file] => {
66 let id: u64 = id_str.parse().ok()?;
68 Some((Some(JobId(id)), Some(*file)))
69 }
70 _ => None,
71 }
72 }
73}
74
75impl std::fmt::Debug for JobFs {
76 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
77 f.debug_struct("JobFs").finish()
78 }
79}
80
81#[async_trait]
82impl Filesystem for JobFs {
83 async fn read(&self, path: &Path) -> io::Result<Vec<u8>> {
84 let (job_id, file) = Self::parse_path(path).ok_or_else(|| {
85 io::Error::new(io::ErrorKind::InvalidInput, "invalid job path")
86 })?;
87
88 let job_id = job_id.ok_or_else(|| {
89 io::Error::new(io::ErrorKind::IsADirectory, "cannot read directory")
90 })?;
91
92 let file = file.ok_or_else(|| {
93 io::Error::new(io::ErrorKind::IsADirectory, "cannot read directory")
94 })?;
95
96 if !self.jobs.exists(job_id).await {
98 return Err(io::Error::new(
99 io::ErrorKind::NotFound,
100 format!("job {} not found", job_id),
101 ));
102 }
103
104 match file {
105 "stdout" => {
106 let content = self.jobs.read_stdout(job_id).await.unwrap_or_default();
108 Ok(content)
109 }
110 "stderr" => {
111 let content = self.jobs.read_stderr(job_id).await.unwrap_or_default();
112 Ok(content)
113 }
114 "status" => {
115 let status = self
116 .jobs
117 .get_status_string(job_id)
118 .await
119 .ok_or_else(|| io::Error::new(io::ErrorKind::NotFound, "job not found"))?;
120 Ok(format!("{}\n", status).into_bytes())
121 }
122 "command" => {
123 let command = self
124 .jobs
125 .get_command(job_id)
126 .await
127 .ok_or_else(|| io::Error::new(io::ErrorKind::NotFound, "job not found"))?;
128 Ok(format!("{}\n", command).into_bytes())
129 }
130 _ => Err(io::Error::new(
131 io::ErrorKind::NotFound,
132 format!("unknown file: {}", file),
133 )),
134 }
135 }
136
137 async fn write(&self, _path: &Path, _data: &[u8]) -> io::Result<()> {
138 Err(io::Error::new(
139 io::ErrorKind::PermissionDenied,
140 "jobfs is read-only",
141 ))
142 }
143
144 async fn list(&self, path: &Path) -> io::Result<Vec<DirEntry>> {
145 let (job_id, file) = Self::parse_path(path).ok_or_else(|| {
146 io::Error::new(io::ErrorKind::InvalidInput, "invalid job path")
147 })?;
148
149 if file.is_some() {
151 return Err(io::Error::new(
152 io::ErrorKind::NotADirectory,
153 "not a directory",
154 ));
155 }
156
157 match job_id {
158 None => {
159 let job_ids = self.jobs.list_ids().await;
161 let entries = job_ids
162 .into_iter()
163 .map(|id| DirEntry {
164 name: id.0.to_string(),
165 entry_type: EntryType::Directory,
166 size: 0,
167 symlink_target: None,
168 })
169 .collect();
170 Ok(entries)
171 }
172 Some(id) => {
173 if !self.jobs.exists(id).await {
175 return Err(io::Error::new(
176 io::ErrorKind::NotFound,
177 format!("job {} not found", id),
178 ));
179 }
180
181 Ok(vec![
182 DirEntry {
183 name: "stdout".to_string(),
184 entry_type: EntryType::File,
185 size: 0, symlink_target: None,
187 },
188 DirEntry {
189 name: "stderr".to_string(),
190 entry_type: EntryType::File,
191 size: 0,
192 symlink_target: None,
193 },
194 DirEntry {
195 name: "status".to_string(),
196 entry_type: EntryType::File,
197 size: 0,
198 symlink_target: None,
199 },
200 DirEntry {
201 name: "command".to_string(),
202 entry_type: EntryType::File,
203 size: 0,
204 symlink_target: None,
205 },
206 ])
207 }
208 }
209 }
210
211 async fn stat(&self, path: &Path) -> io::Result<Metadata> {
212 let (job_id, file) = Self::parse_path(path).ok_or_else(|| {
213 io::Error::new(io::ErrorKind::InvalidInput, "invalid job path")
214 })?;
215
216 match (job_id, file) {
217 (None, None) => {
218 Ok(Metadata {
220 is_dir: true,
221 is_file: false,
222 is_symlink: false,
223 size: 0,
224 modified: None,
225 })
226 }
227 (Some(id), None) => {
228 if !self.jobs.exists(id).await {
230 return Err(io::Error::new(
231 io::ErrorKind::NotFound,
232 format!("job {} not found", id),
233 ));
234 }
235 Ok(Metadata {
236 is_dir: true,
237 is_file: false,
238 is_symlink: false,
239 size: 0,
240 modified: None,
241 })
242 }
243 (Some(id), Some(file)) => {
244 if !self.jobs.exists(id).await {
246 return Err(io::Error::new(
247 io::ErrorKind::NotFound,
248 format!("job {} not found", id),
249 ));
250 }
251
252 if !["stdout", "stderr", "status", "command"].contains(&file) {
254 return Err(io::Error::new(
255 io::ErrorKind::NotFound,
256 format!("unknown file: {}", file),
257 ));
258 }
259
260 Ok(Metadata {
261 is_dir: false,
262 is_file: true,
263 is_symlink: false,
264 size: 0, modified: None,
266 })
267 }
268 (None, Some(_)) => {
269 Err(io::Error::new(
271 io::ErrorKind::NotFound,
272 "invalid path",
273 ))
274 }
275 }
276 }
277
278 async fn mkdir(&self, _path: &Path) -> io::Result<()> {
279 Err(io::Error::new(
280 io::ErrorKind::PermissionDenied,
281 "jobfs is read-only",
282 ))
283 }
284
285 async fn remove(&self, _path: &Path) -> io::Result<()> {
286 Err(io::Error::new(
287 io::ErrorKind::PermissionDenied,
288 "jobfs is read-only",
289 ))
290 }
291
292 fn read_only(&self) -> bool {
293 true
294 }
295}
296
297#[cfg(test)]
298mod tests {
299 use super::*;
300 use crate::interpreter::ExecResult;
301 use crate::scheduler::BoundedStream;
302 use tokio::sync::oneshot;
303
304 async fn make_job_manager_with_job() -> (Arc<JobManager>, JobId) {
305 let manager = Arc::new(JobManager::new());
306
307 let stdout = Arc::new(BoundedStream::new(1024));
309 let stderr = Arc::new(BoundedStream::new(1024));
310
311 stdout.write(b"hello from stdout\n").await;
313 stderr.write(b"error message\n").await;
314
315 let (tx, rx) = oneshot::channel();
317
318 let id = manager
320 .register_with_streams("echo test".to_string(), rx, stdout, stderr)
321 .await;
322
323 let _ = tx.send(ExecResult::success("done"));
325
326 (manager, id)
327 }
328
329 #[tokio::test]
330 async fn test_list_root_empty() {
331 let manager = Arc::new(JobManager::new());
332 let fs = JobFs::new(manager);
333
334 let entries = fs.list(Path::new("")).await.unwrap();
335 assert!(entries.is_empty());
336 }
337
338 #[tokio::test]
339 async fn test_list_root_with_jobs() {
340 let (manager, id) = make_job_manager_with_job().await;
341 let fs = JobFs::new(manager);
342
343 let entries = fs.list(Path::new("")).await.unwrap();
344 assert_eq!(entries.len(), 1);
345 assert_eq!(entries[0].name, id.0.to_string());
346 assert_eq!(entries[0].entry_type, EntryType::Directory);
347 }
348
349 #[tokio::test]
350 async fn test_list_job_directory() {
351 let (manager, id) = make_job_manager_with_job().await;
352 let fs = JobFs::new(manager);
353
354 let path = format!("{}", id);
355 let entries = fs.list(Path::new(&path)).await.unwrap();
356
357 let names: Vec<_> = entries.iter().map(|e| &e.name).collect();
358 assert!(names.contains(&&"stdout".to_string()));
359 assert!(names.contains(&&"stderr".to_string()));
360 assert!(names.contains(&&"status".to_string()));
361 assert!(names.contains(&&"command".to_string()));
362 }
363
364 #[tokio::test]
365 async fn test_read_stdout() {
366 let (manager, id) = make_job_manager_with_job().await;
367 let fs = JobFs::new(manager);
368
369 let path = format!("{}/stdout", id);
370 let data = fs.read(Path::new(&path)).await.unwrap();
371 assert_eq!(data, b"hello from stdout\n");
372 }
373
374 #[tokio::test]
375 async fn test_read_stderr() {
376 let (manager, id) = make_job_manager_with_job().await;
377 let fs = JobFs::new(manager);
378
379 let path = format!("{}/stderr", id);
380 let data = fs.read(Path::new(&path)).await.unwrap();
381 assert_eq!(data, b"error message\n");
382 }
383
384 #[tokio::test]
385 async fn test_read_status_running() {
386 let manager = Arc::new(JobManager::new());
387
388 let stdout = Arc::new(BoundedStream::new(1024));
390 let stderr = Arc::new(BoundedStream::new(1024));
391 let (_tx, rx) = oneshot::channel();
392 let id = manager
393 .register_with_streams("sleep 100".to_string(), rx, stdout, stderr)
394 .await;
395
396 let fs = JobFs::new(manager);
397
398 let path = format!("{}/status", id);
399 let data = fs.read(Path::new(&path)).await.unwrap();
400 assert_eq!(String::from_utf8_lossy(&data), "running\n");
401 }
402
403 #[tokio::test]
404 async fn test_read_status_done() {
405 let (manager, id) = make_job_manager_with_job().await;
406
407 manager.wait(id).await;
409
410 let fs = JobFs::new(manager);
411
412 let path = format!("{}/status", id);
413 let data = fs.read(Path::new(&path)).await.unwrap();
414 assert_eq!(String::from_utf8_lossy(&data), "done:0\n");
415 }
416
417 #[tokio::test]
418 async fn test_read_command() {
419 let (manager, id) = make_job_manager_with_job().await;
420 let fs = JobFs::new(manager);
421
422 let path = format!("{}/command", id);
423 let data = fs.read(Path::new(&path)).await.unwrap();
424 assert_eq!(String::from_utf8_lossy(&data), "echo test\n");
425 }
426
427 #[tokio::test]
428 async fn test_stat_root() {
429 let manager = Arc::new(JobManager::new());
430 let fs = JobFs::new(manager);
431
432 let meta = fs.stat(Path::new("")).await.unwrap();
433 assert!(meta.is_dir);
434 }
435
436 #[tokio::test]
437 async fn test_stat_job_dir() {
438 let (manager, id) = make_job_manager_with_job().await;
439 let fs = JobFs::new(manager);
440
441 let path = format!("{}", id);
442 let meta = fs.stat(Path::new(&path)).await.unwrap();
443 assert!(meta.is_dir);
444 }
445
446 #[tokio::test]
447 async fn test_stat_file() {
448 let (manager, id) = make_job_manager_with_job().await;
449 let fs = JobFs::new(manager);
450
451 let path = format!("{}/stdout", id);
452 let meta = fs.stat(Path::new(&path)).await.unwrap();
453 assert!(meta.is_file);
454 }
455
456 #[tokio::test]
457 async fn test_read_only() {
458 let manager = Arc::new(JobManager::new());
459 let fs = JobFs::new(manager);
460
461 assert!(fs.read_only());
462
463 let write_result = fs.write(Path::new("1/stdout"), b"data").await;
464 assert!(write_result.is_err());
465
466 let mkdir_result = fs.mkdir(Path::new("1")).await;
467 assert!(mkdir_result.is_err());
468
469 let remove_result = fs.remove(Path::new("1")).await;
470 assert!(remove_result.is_err());
471 }
472
473 #[tokio::test]
474 async fn test_nonexistent_job() {
475 let manager = Arc::new(JobManager::new());
476 let fs = JobFs::new(manager);
477
478 let result = fs.read(Path::new("999/stdout")).await;
479 assert!(result.is_err());
480 assert_eq!(result.unwrap_err().kind(), io::ErrorKind::NotFound);
481 }
482
483 #[tokio::test]
484 async fn test_unknown_file() {
485 let (manager, id) = make_job_manager_with_job().await;
486 let fs = JobFs::new(manager);
487
488 let path = format!("{}/unknown", id);
489 let result = fs.read(Path::new(&path)).await;
490 assert!(result.is_err());
491 assert_eq!(result.unwrap_err().kind(), io::ErrorKind::NotFound);
492 }
493
494 #[tokio::test]
495 async fn test_read_directory_error() {
496 let (manager, id) = make_job_manager_with_job().await;
497 let fs = JobFs::new(manager);
498
499 let path = format!("{}", id);
500 let result = fs.read(Path::new(&path)).await;
501 assert!(result.is_err());
502 assert_eq!(result.unwrap_err().kind(), io::ErrorKind::IsADirectory);
503 }
504}