1use async_trait::async_trait;
18use std::io;
19use std::path::Path;
20use std::sync::Arc;
21
22use super::traits::{DirEntry, DirEntryKind, Filesystem};
23use crate::scheduler::{JobId, JobManager};
24
25pub struct JobFs {
34 jobs: Arc<JobManager>,
35}
36
37impl JobFs {
38 pub fn new(jobs: Arc<JobManager>) -> Self {
40 Self { jobs }
41 }
42
43 fn parse_path(path: &Path) -> Option<(Option<JobId>, Option<&str>)> {
50 let path_str = path.to_str()?;
51 let path_str = path_str.trim_start_matches('/');
52
53 if path_str.is_empty() {
54 return Some((None, None)); }
56
57 let parts: Vec<&str> = path_str.split('/').collect();
58
59 match parts.as_slice() {
60 [id_str] => {
61 let id: u64 = id_str.parse().ok()?;
63 Some((Some(JobId(id)), None))
64 }
65 [id_str, file] => {
66 let id: u64 = id_str.parse().ok()?;
68 Some((Some(JobId(id)), Some(*file)))
69 }
70 _ => None,
71 }
72 }
73}
74
75impl std::fmt::Debug for JobFs {
76 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
77 f.debug_struct("JobFs").finish()
78 }
79}
80
81#[async_trait]
82impl Filesystem for JobFs {
83 async fn read(&self, path: &Path) -> io::Result<Vec<u8>> {
84 let (job_id, file) = Self::parse_path(path).ok_or_else(|| {
85 io::Error::new(io::ErrorKind::InvalidInput, "invalid job path")
86 })?;
87
88 let job_id = job_id.ok_or_else(|| {
89 io::Error::new(io::ErrorKind::IsADirectory, "cannot read directory")
90 })?;
91
92 let file = file.ok_or_else(|| {
93 io::Error::new(io::ErrorKind::IsADirectory, "cannot read directory")
94 })?;
95
96 if !self.jobs.exists(job_id).await {
98 return Err(io::Error::new(
99 io::ErrorKind::NotFound,
100 format!("job {} not found", job_id),
101 ));
102 }
103
104 match file {
105 "stdout" => {
106 let content = self.jobs.read_stdout(job_id).await.unwrap_or_default();
108 Ok(content)
109 }
110 "stderr" => {
111 let content = self.jobs.read_stderr(job_id).await.unwrap_or_default();
112 Ok(content)
113 }
114 "status" => {
115 let status = self
116 .jobs
117 .get_status_string(job_id)
118 .await
119 .ok_or_else(|| io::Error::new(io::ErrorKind::NotFound, "job not found"))?;
120 Ok(format!("{}\n", status).into_bytes())
121 }
122 "command" => {
123 let command = self
124 .jobs
125 .get_command(job_id)
126 .await
127 .ok_or_else(|| io::Error::new(io::ErrorKind::NotFound, "job not found"))?;
128 Ok(format!("{}\n", command).into_bytes())
129 }
130 _ => Err(io::Error::new(
131 io::ErrorKind::NotFound,
132 format!("unknown file: {}", file),
133 )),
134 }
135 }
136
137 async fn write(&self, _path: &Path, _data: &[u8]) -> io::Result<()> {
138 Err(io::Error::new(
139 io::ErrorKind::PermissionDenied,
140 "jobfs is read-only",
141 ))
142 }
143
144 async fn list(&self, path: &Path) -> io::Result<Vec<DirEntry>> {
145 let (job_id, file) = Self::parse_path(path).ok_or_else(|| {
146 io::Error::new(io::ErrorKind::InvalidInput, "invalid job path")
147 })?;
148
149 if file.is_some() {
151 return Err(io::Error::new(
152 io::ErrorKind::NotADirectory,
153 "not a directory",
154 ));
155 }
156
157 match job_id {
158 None => {
159 let job_ids = self.jobs.list_ids().await;
161 let entries = job_ids
162 .into_iter()
163 .map(|id| DirEntry {
164 name: id.0.to_string(),
165 kind: DirEntryKind::Directory,
166 modified: None,
167 permissions: None,
168 size: 0,
169 symlink_target: None,
170 })
171 .collect();
172 Ok(entries)
173 }
174 Some(id) => {
175 if !self.jobs.exists(id).await {
177 return Err(io::Error::new(
178 io::ErrorKind::NotFound,
179 format!("job {} not found", id),
180 ));
181 }
182
183 Ok(vec![
184 DirEntry {
185 name: "stdout".to_string(),
186 kind: DirEntryKind::File,
187 modified: None,
188 permissions: None,
189 size: 0, symlink_target: None,
191 },
192 DirEntry {
193 name: "stderr".to_string(),
194 kind: DirEntryKind::File,
195 modified: None,
196 permissions: None,
197 size: 0,
198 symlink_target: None,
199 },
200 DirEntry {
201 name: "status".to_string(),
202 kind: DirEntryKind::File,
203 modified: None,
204 permissions: None,
205 size: 0,
206 symlink_target: None,
207 },
208 DirEntry {
209 name: "command".to_string(),
210 kind: DirEntryKind::File,
211 modified: None,
212 permissions: None,
213 size: 0,
214 symlink_target: None,
215 },
216 ])
217 }
218 }
219 }
220
221 async fn stat(&self, path: &Path) -> io::Result<DirEntry> {
222 let (job_id, file) = Self::parse_path(path).ok_or_else(|| {
223 io::Error::new(io::ErrorKind::InvalidInput, "invalid job path")
224 })?;
225
226 let name = path
227 .file_name()
228 .map(|n| n.to_string_lossy().into_owned())
229 .unwrap_or_else(|| "/".to_string());
230
231 match (job_id, file) {
232 (None, None) => {
233 Ok(DirEntry::directory(name))
235 }
236 (Some(id), None) => {
237 if !self.jobs.exists(id).await {
239 return Err(io::Error::new(
240 io::ErrorKind::NotFound,
241 format!("job {} not found", id),
242 ));
243 }
244 Ok(DirEntry::directory(name))
245 }
246 (Some(id), Some(file)) => {
247 if !self.jobs.exists(id).await {
249 return Err(io::Error::new(
250 io::ErrorKind::NotFound,
251 format!("job {} not found", id),
252 ));
253 }
254
255 if !["stdout", "stderr", "status", "command"].contains(&file) {
257 return Err(io::Error::new(
258 io::ErrorKind::NotFound,
259 format!("unknown file: {}", file),
260 ));
261 }
262
263 Ok(DirEntry::file(name, 0))
264 }
265 (None, Some(_)) => {
266 Err(io::Error::new(
268 io::ErrorKind::NotFound,
269 "invalid path",
270 ))
271 }
272 }
273 }
274
275 async fn mkdir(&self, _path: &Path) -> io::Result<()> {
276 Err(io::Error::new(
277 io::ErrorKind::PermissionDenied,
278 "jobfs is read-only",
279 ))
280 }
281
282 async fn remove(&self, _path: &Path) -> io::Result<()> {
283 Err(io::Error::new(
284 io::ErrorKind::PermissionDenied,
285 "jobfs is read-only",
286 ))
287 }
288
289 fn read_only(&self) -> bool {
290 true
291 }
292}
293
294#[cfg(test)]
295mod tests {
296 use super::*;
297 use crate::interpreter::ExecResult;
298 use crate::scheduler::BoundedStream;
299 use tokio::sync::oneshot;
300
301 async fn make_job_manager_with_job() -> (Arc<JobManager>, JobId) {
302 let manager = Arc::new(JobManager::new());
303
304 let stdout = Arc::new(BoundedStream::new(1024));
306 let stderr = Arc::new(BoundedStream::new(1024));
307
308 stdout.write(b"hello from stdout\n").await;
310 stderr.write(b"error message\n").await;
311
312 let (tx, rx) = oneshot::channel();
314
315 let id = manager
317 .register_with_streams("echo test".to_string(), rx, stdout, stderr)
318 .await;
319
320 let _ = tx.send(ExecResult::success("done"));
322
323 (manager, id)
324 }
325
326 #[tokio::test]
327 async fn test_list_root_empty() {
328 let manager = Arc::new(JobManager::new());
329 let fs = JobFs::new(manager);
330
331 let entries = fs.list(Path::new("")).await.unwrap();
332 assert!(entries.is_empty());
333 }
334
335 #[tokio::test]
336 async fn test_list_root_with_jobs() {
337 let (manager, id) = make_job_manager_with_job().await;
338 let fs = JobFs::new(manager);
339
340 let entries = fs.list(Path::new("")).await.unwrap();
341 assert_eq!(entries.len(), 1);
342 assert_eq!(entries[0].name, id.0.to_string());
343 assert_eq!(entries[0].kind, DirEntryKind::Directory);
344 }
345
346 #[tokio::test]
347 async fn test_list_job_directory() {
348 let (manager, id) = make_job_manager_with_job().await;
349 let fs = JobFs::new(manager);
350
351 let path = format!("{}", id);
352 let entries = fs.list(Path::new(&path)).await.unwrap();
353
354 let names: Vec<_> = entries.iter().map(|e| &e.name).collect();
355 assert!(names.contains(&&"stdout".to_string()));
356 assert!(names.contains(&&"stderr".to_string()));
357 assert!(names.contains(&&"status".to_string()));
358 assert!(names.contains(&&"command".to_string()));
359 }
360
361 #[tokio::test]
362 async fn test_read_stdout() {
363 let (manager, id) = make_job_manager_with_job().await;
364 let fs = JobFs::new(manager);
365
366 let path = format!("{}/stdout", id);
367 let data = fs.read(Path::new(&path)).await.unwrap();
368 assert_eq!(data, b"hello from stdout\n");
369 }
370
371 #[tokio::test]
372 async fn test_read_stderr() {
373 let (manager, id) = make_job_manager_with_job().await;
374 let fs = JobFs::new(manager);
375
376 let path = format!("{}/stderr", id);
377 let data = fs.read(Path::new(&path)).await.unwrap();
378 assert_eq!(data, b"error message\n");
379 }
380
381 #[tokio::test]
382 async fn test_read_status_running() {
383 let manager = Arc::new(JobManager::new());
384
385 let stdout = Arc::new(BoundedStream::new(1024));
387 let stderr = Arc::new(BoundedStream::new(1024));
388 let (_tx, rx) = oneshot::channel();
389 let id = manager
390 .register_with_streams("sleep 100".to_string(), rx, stdout, stderr)
391 .await;
392
393 let fs = JobFs::new(manager);
394
395 let path = format!("{}/status", id);
396 let data = fs.read(Path::new(&path)).await.unwrap();
397 assert_eq!(String::from_utf8_lossy(&data), "running\n");
398 }
399
400 #[tokio::test]
401 async fn test_read_status_done() {
402 let (manager, id) = make_job_manager_with_job().await;
403
404 manager.wait(id).await;
406
407 let fs = JobFs::new(manager);
408
409 let path = format!("{}/status", id);
410 let data = fs.read(Path::new(&path)).await.unwrap();
411 assert_eq!(String::from_utf8_lossy(&data), "done:0\n");
412 }
413
414 #[tokio::test]
415 async fn test_read_command() {
416 let (manager, id) = make_job_manager_with_job().await;
417 let fs = JobFs::new(manager);
418
419 let path = format!("{}/command", id);
420 let data = fs.read(Path::new(&path)).await.unwrap();
421 assert_eq!(String::from_utf8_lossy(&data), "echo test\n");
422 }
423
424 #[tokio::test]
425 async fn test_stat_root() {
426 let manager = Arc::new(JobManager::new());
427 let fs = JobFs::new(manager);
428
429 let entry = fs.stat(Path::new("")).await.unwrap();
430 assert_eq!(entry.kind, DirEntryKind::Directory);
431 }
432
433 #[tokio::test]
434 async fn test_stat_job_dir() {
435 let (manager, id) = make_job_manager_with_job().await;
436 let fs = JobFs::new(manager);
437
438 let path = format!("{}", id);
439 let entry = fs.stat(Path::new(&path)).await.unwrap();
440 assert_eq!(entry.kind, DirEntryKind::Directory);
441 }
442
443 #[tokio::test]
444 async fn test_stat_file() {
445 let (manager, id) = make_job_manager_with_job().await;
446 let fs = JobFs::new(manager);
447
448 let path = format!("{}/stdout", id);
449 let entry = fs.stat(Path::new(&path)).await.unwrap();
450 assert_eq!(entry.kind, DirEntryKind::File);
451 }
452
453 #[tokio::test]
454 async fn test_read_only() {
455 let manager = Arc::new(JobManager::new());
456 let fs = JobFs::new(manager);
457
458 assert!(fs.read_only());
459
460 let write_result = fs.write(Path::new("1/stdout"), b"data").await;
461 assert!(write_result.is_err());
462
463 let mkdir_result = fs.mkdir(Path::new("1")).await;
464 assert!(mkdir_result.is_err());
465
466 let remove_result = fs.remove(Path::new("1")).await;
467 assert!(remove_result.is_err());
468 }
469
470 #[tokio::test]
471 async fn test_nonexistent_job() {
472 let manager = Arc::new(JobManager::new());
473 let fs = JobFs::new(manager);
474
475 let result = fs.read(Path::new("999/stdout")).await;
476 assert!(result.is_err());
477 assert_eq!(result.unwrap_err().kind(), io::ErrorKind::NotFound);
478 }
479
480 #[tokio::test]
481 async fn test_unknown_file() {
482 let (manager, id) = make_job_manager_with_job().await;
483 let fs = JobFs::new(manager);
484
485 let path = format!("{}/unknown", id);
486 let result = fs.read(Path::new(&path)).await;
487 assert!(result.is_err());
488 assert_eq!(result.unwrap_err().kind(), io::ErrorKind::NotFound);
489 }
490
491 #[tokio::test]
492 async fn test_read_directory_error() {
493 let (manager, id) = make_job_manager_with_job().await;
494 let fs = JobFs::new(manager);
495
496 let path = format!("{}", id);
497 let result = fs.read(Path::new(&path)).await;
498 assert!(result.is_err());
499 assert_eq!(result.unwrap_err().kind(), io::ErrorKind::IsADirectory);
500 }
501}