1use crate::executor::backend::{
11 BackendError, BackendResult, CacheBackend, CacheEntry, CacheLookupResult, CacheOutput,
12 policy_allows_read, policy_allows_write,
13};
14use crate::ir::{CachePolicy, Task as IRTask};
15use async_trait::async_trait;
16use chrono::{DateTime, Utc};
17use serde::{Deserialize, Serialize};
18use std::fs;
19use std::io;
20use std::path::{Path, PathBuf};
21use thiserror::Error;
22
23#[derive(Debug, Error)]
25pub enum CacheError {
26 #[error("Cache IO error: {0}")]
28 Io(#[from] io::Error),
29
30 #[error("Failed to {operation} '{path}': {source}")]
32 IoWithContext {
33 operation: &'static str,
34 path: PathBuf,
35 source: io::Error,
36 },
37
38 #[error("Cache serialization error: {0}")]
40 Serialization(#[from] serde_json::Error),
41
42 #[error("Backend error: {0}")]
44 Backend(#[from] BackendError),
45}
46
47impl CacheError {
48 pub fn io_with_context(
50 operation: &'static str,
51 path: impl Into<PathBuf>,
52 source: io::Error,
53 ) -> Self {
54 CacheError::IoWithContext {
55 operation,
56 path: path.into(),
57 source,
58 }
59 }
60}
61
62#[derive(Debug, Clone)]
64pub struct CacheResult {
65 pub hit: bool,
67 pub key: String,
69 pub path: Option<PathBuf>,
71}
72
73#[derive(Debug, Clone, Serialize, Deserialize)]
75pub struct CacheMetadata {
76 pub task_id: String,
78 pub digest: String,
80 pub command: Vec<String>,
82 pub created_at: DateTime<Utc>,
84 pub duration_ms: u64,
86 pub exit_code: i32,
88}
89
90#[derive(Debug, Clone, Default)]
92pub struct TaskLogs {
93 pub stdout: Option<String>,
95 pub stderr: Option<String>,
97}
98
99fn cache_path_for_digest(cache_root: &Path, digest: &str) -> PathBuf {
104 let hash = digest.strip_prefix("sha256:").unwrap_or(digest);
106
107 if hash.len() < 4 {
108 return cache_root.join(hash);
110 }
111
112 cache_root.join(&hash[..2]).join(&hash[2..4]).join(hash)
113}
114
115pub fn check_cache(
126 task: &IRTask,
127 digest: &str,
128 cache_root: &Path,
129 policy_override: Option<CachePolicy>,
130) -> CacheResult {
131 let effective_policy = policy_override.unwrap_or(task.cache_policy);
132
133 match effective_policy {
134 CachePolicy::Normal | CachePolicy::Readonly => {
135 let cache_path = cache_path_for_digest(cache_root, digest);
137 let metadata_path = cache_path.join("metadata.json");
138
139 if metadata_path.exists() {
140 tracing::debug!(
141 task = %task.id,
142 digest = %digest,
143 path = %cache_path.display(),
144 "Cache hit"
145 );
146 return CacheResult {
147 hit: true,
148 key: digest.to_string(),
149 path: Some(cache_path),
150 };
151 }
152
153 tracing::debug!(
154 task = %task.id,
155 digest = %digest,
156 "Cache miss"
157 );
158 }
159 CachePolicy::Writeonly | CachePolicy::Disabled => {
160 tracing::debug!(
162 task = %task.id,
163 policy = ?effective_policy,
164 "Cache lookup skipped due to policy"
165 );
166 }
167 }
168
169 CacheResult {
170 hit: false,
171 key: digest.to_string(),
172 path: None,
173 }
174}
175
176pub fn store_result(
190 task: &IRTask,
191 digest: &str,
192 cache_root: &Path,
193 logs: &TaskLogs,
194 duration_ms: u64,
195 exit_code: i32,
196 policy_override: Option<CachePolicy>,
197) -> Result<(), CacheError> {
198 let effective_policy = policy_override.unwrap_or(task.cache_policy);
199
200 match effective_policy {
201 CachePolicy::Normal | CachePolicy::Writeonly => {
202 let cache_path = cache_path_for_digest(cache_root, digest);
203 fs::create_dir_all(&cache_path)
204 .map_err(|e| CacheError::io_with_context("create directory", &cache_path, e))?;
205
206 let meta = CacheMetadata {
208 task_id: task.id.clone(),
209 digest: digest.to_string(),
210 command: task.command.clone(),
211 created_at: Utc::now(),
212 duration_ms,
213 exit_code,
214 };
215 let meta_path = cache_path.join("metadata.json");
216 let meta_json = serde_json::to_string_pretty(&meta)?;
217 fs::write(&meta_path, &meta_json)
218 .map_err(|e| CacheError::io_with_context("write", &meta_path, e))?;
219
220 let logs_dir = cache_path.join("logs");
222 fs::create_dir_all(&logs_dir)
223 .map_err(|e| CacheError::io_with_context("create directory", &logs_dir, e))?;
224
225 if let Some(stdout) = &logs.stdout {
226 let stdout_path = logs_dir.join("stdout.log");
227 fs::write(&stdout_path, stdout)
228 .map_err(|e| CacheError::io_with_context("write", &stdout_path, e))?;
229 }
230 if let Some(stderr) = &logs.stderr {
231 let stderr_path = logs_dir.join("stderr.log");
232 fs::write(&stderr_path, stderr)
233 .map_err(|e| CacheError::io_with_context("write", &stderr_path, e))?;
234 }
235
236 tracing::debug!(
237 task = %task.id,
238 digest = %digest,
239 path = %cache_path.display(),
240 "Cache entry stored"
241 );
242 }
243 CachePolicy::Readonly | CachePolicy::Disabled => {
244 tracing::debug!(
246 task = %task.id,
247 policy = ?effective_policy,
248 "Cache write skipped due to policy"
249 );
250 }
251 }
252
253 Ok(())
254}
255
256pub fn load_metadata(cache_path: &Path) -> Result<CacheMetadata, CacheError> {
261 let meta_path = cache_path.join("metadata.json");
262 let content = fs::read_to_string(&meta_path)
263 .map_err(|e| CacheError::io_with_context("read", &meta_path, e))?;
264 let meta: CacheMetadata = serde_json::from_str(&content)?;
265 Ok(meta)
266}
267
268#[must_use]
270pub fn load_logs(cache_path: &Path) -> TaskLogs {
271 let logs_dir = cache_path.join("logs");
272
273 let stdout = fs::read_to_string(logs_dir.join("stdout.log")).ok();
274 let stderr = fs::read_to_string(logs_dir.join("stderr.log")).ok();
275
276 TaskLogs { stdout, stderr }
277}
278
279#[derive(Debug, Clone)]
288pub struct LocalCacheBackend {
289 cache_root: PathBuf,
291}
292
293impl LocalCacheBackend {
294 #[must_use]
296 pub fn new(cache_root: impl Into<PathBuf>) -> Self {
297 Self {
298 cache_root: cache_root.into(),
299 }
300 }
301
302 fn cache_path(&self, digest: &str) -> PathBuf {
304 cache_path_for_digest(&self.cache_root, digest)
305 }
306
307 fn outputs_path(&self, digest: &str) -> PathBuf {
309 self.cache_path(digest).join("outputs")
310 }
311}
312
313#[async_trait]
314impl CacheBackend for LocalCacheBackend {
315 async fn check(
316 &self,
317 task: &IRTask,
318 digest: &str,
319 policy: CachePolicy,
320 ) -> BackendResult<CacheLookupResult> {
321 if !policy_allows_read(policy) {
322 tracing::debug!(
323 task = %task.id,
324 policy = ?policy,
325 "Cache lookup skipped due to policy"
326 );
327 return Ok(CacheLookupResult::miss(digest));
328 }
329
330 let cache_path = self.cache_path(digest);
331 let metadata_path = cache_path.join("metadata.json");
332
333 if metadata_path.exists() {
334 match load_metadata(&cache_path) {
336 Ok(meta) => {
337 tracing::debug!(
338 task = %task.id,
339 digest = %digest,
340 path = %cache_path.display(),
341 "Cache hit"
342 );
343 return Ok(CacheLookupResult::hit(digest, meta.duration_ms));
344 }
345 Err(e) => {
346 tracing::warn!(
347 task = %task.id,
348 error = %e,
349 "Failed to load cache metadata, treating as miss"
350 );
351 }
352 }
353 }
354
355 tracing::debug!(
356 task = %task.id,
357 digest = %digest,
358 "Cache miss"
359 );
360 Ok(CacheLookupResult::miss(digest))
361 }
362
363 async fn store(
364 &self,
365 task: &IRTask,
366 digest: &str,
367 entry: &CacheEntry,
368 policy: CachePolicy,
369 ) -> BackendResult<()> {
370 if !policy_allows_write(policy) {
371 tracing::debug!(
372 task = %task.id,
373 policy = ?policy,
374 "Cache write skipped due to policy"
375 );
376 return Ok(());
377 }
378
379 let cache_path = self.cache_path(digest);
380 fs::create_dir_all(&cache_path)
381 .map_err(|e| BackendError::io_with_context("create directory", &cache_path, e))?;
382
383 let meta = CacheMetadata {
385 task_id: task.id.clone(),
386 digest: digest.to_string(),
387 command: task.command.clone(),
388 created_at: Utc::now(),
389 duration_ms: entry.duration_ms,
390 exit_code: entry.exit_code,
391 };
392 let meta_path = cache_path.join("metadata.json");
393 let meta_json = serde_json::to_string_pretty(&meta)
394 .map_err(|e| BackendError::Serialization(e.to_string()))?;
395 fs::write(&meta_path, &meta_json)
396 .map_err(|e| BackendError::io_with_context("write", &meta_path, e))?;
397
398 let logs_dir = cache_path.join("logs");
400 fs::create_dir_all(&logs_dir)
401 .map_err(|e| BackendError::io_with_context("create directory", &logs_dir, e))?;
402
403 if let Some(stdout) = &entry.stdout {
404 let stdout_path = logs_dir.join("stdout.log");
405 fs::write(&stdout_path, stdout)
406 .map_err(|e| BackendError::io_with_context("write", &stdout_path, e))?;
407 }
408 if let Some(stderr) = &entry.stderr {
409 let stderr_path = logs_dir.join("stderr.log");
410 fs::write(&stderr_path, stderr)
411 .map_err(|e| BackendError::io_with_context("write", &stderr_path, e))?;
412 }
413
414 if !entry.outputs.is_empty() {
416 let outputs_dir = self.outputs_path(digest);
417 fs::create_dir_all(&outputs_dir)
418 .map_err(|e| BackendError::io_with_context("create directory", &outputs_dir, e))?;
419
420 for output in &entry.outputs {
421 let output_path = outputs_dir.join(&output.path);
422 if let Some(parent) = output_path.parent() {
423 fs::create_dir_all(parent).map_err(|e| {
424 BackendError::io_with_context("create directory", parent, e)
425 })?;
426 }
427 fs::write(&output_path, &output.data)
428 .map_err(|e| BackendError::io_with_context("write", &output_path, e))?;
429
430 #[cfg(unix)]
431 if output.is_executable {
432 use std::os::unix::fs::PermissionsExt;
433 let mut perms = fs::metadata(&output_path)
434 .map_err(|e| {
435 BackendError::io_with_context("read metadata", &output_path, e)
436 })?
437 .permissions();
438 perms.set_mode(perms.mode() | 0o111);
439 fs::set_permissions(&output_path, perms).map_err(|e| {
440 BackendError::io_with_context("set permissions", &output_path, e)
441 })?;
442 }
443 }
444 }
445
446 tracing::debug!(
447 task = %task.id,
448 digest = %digest,
449 path = %cache_path.display(),
450 outputs = entry.outputs.len(),
451 "Cache entry stored"
452 );
453
454 Ok(())
455 }
456
457 async fn restore_outputs(
458 &self,
459 task: &IRTask,
460 digest: &str,
461 workspace: &Path,
462 ) -> BackendResult<Vec<CacheOutput>> {
463 let outputs_dir = self.outputs_path(digest);
464 let mut restored = Vec::new();
465
466 if !outputs_dir.exists() {
467 return Ok(restored);
468 }
469
470 for entry in walkdir(&outputs_dir)
472 .map_err(|e| BackendError::io_with_context("read directory", &outputs_dir, e))?
473 {
474 let rel_path = entry
475 .strip_prefix(&outputs_dir)
476 .map_err(|e| BackendError::Io(io::Error::other(e.to_string())))?;
477
478 let dest_path = workspace.join(rel_path);
479 if let Some(parent) = dest_path.parent() {
480 fs::create_dir_all(parent)
481 .map_err(|e| BackendError::io_with_context("create directory", parent, e))?;
482 }
483
484 let data =
485 fs::read(&entry).map_err(|e| BackendError::io_with_context("read", &entry, e))?;
486 let is_executable = is_file_executable(&entry);
487
488 fs::write(&dest_path, &data)
489 .map_err(|e| BackendError::io_with_context("write", &dest_path, e))?;
490
491 #[cfg(unix)]
492 if is_executable {
493 use std::os::unix::fs::PermissionsExt;
494 let mut perms = fs::metadata(&dest_path)
495 .map_err(|e| BackendError::io_with_context("read metadata", &dest_path, e))?
496 .permissions();
497 perms.set_mode(perms.mode() | 0o111);
498 fs::set_permissions(&dest_path, perms)
499 .map_err(|e| BackendError::io_with_context("set permissions", &dest_path, e))?;
500 }
501
502 restored.push(CacheOutput {
503 path: rel_path.to_string_lossy().to_string(),
504 data,
505 is_executable,
506 });
507 }
508
509 tracing::debug!(
510 task = %task.id,
511 digest = %digest,
512 restored = restored.len(),
513 "Restored outputs from cache"
514 );
515
516 Ok(restored)
517 }
518
519 async fn get_logs(
520 &self,
521 _task: &IRTask,
522 digest: &str,
523 ) -> BackendResult<(Option<String>, Option<String>)> {
524 let cache_path = self.cache_path(digest);
525 let logs = load_logs(&cache_path);
526 Ok((logs.stdout, logs.stderr))
527 }
528
529 fn name(&self) -> &'static str {
530 "local"
531 }
532
533 async fn health_check(&self) -> BackendResult<()> {
534 fs::create_dir_all(&self.cache_root)
536 .map_err(|e| BackendError::io_with_context("create directory", &self.cache_root, e))?;
537 Ok(())
538 }
539}
540
541fn walkdir(dir: &Path) -> io::Result<Vec<PathBuf>> {
543 let mut files = Vec::new();
544 if dir.is_dir() {
545 for entry in fs::read_dir(dir)? {
546 let entry = entry?;
547 let path = entry.path();
548 if path.is_dir() {
549 files.extend(walkdir(&path)?);
550 } else {
551 files.push(path);
552 }
553 }
554 }
555 Ok(files)
556}
557
558#[cfg(unix)]
560fn is_file_executable(path: &Path) -> bool {
561 use std::os::unix::fs::PermissionsExt;
562 fs::metadata(path)
563 .map(|m| m.permissions().mode() & 0o111 != 0)
564 .unwrap_or(false)
565}
566
567#[cfg(not(unix))]
568fn is_file_executable(_path: &Path) -> bool {
569 false
570}
571
572#[cfg(test)]
573mod tests {
574 use super::*;
575 use tempfile::TempDir;
576
577 fn make_task(id: &str, policy: CachePolicy) -> IRTask {
578 IRTask {
579 id: id.to_string(),
580 runtime: None,
581 command: vec!["echo".to_string(), "hello".to_string()],
582 shell: false,
583 env: std::collections::HashMap::new(),
584 secrets: std::collections::HashMap::new(),
585 resources: None,
586 concurrency_group: None,
587 inputs: vec![],
588 outputs: vec![],
589 depends_on: vec![],
590 cache_policy: policy,
591 deployment: false,
592 manual_approval: false,
593 }
594 }
595
596 #[test]
597 fn test_cache_path_structure() {
598 let root = Path::new("/cache");
599 let path = cache_path_for_digest(root, "sha256:abcdef123456");
600 assert_eq!(path, PathBuf::from("/cache/ab/cd/abcdef123456"));
601 }
602
603 #[test]
604 fn test_cache_miss() {
605 let tmp = TempDir::new().unwrap();
606 let task = make_task("test", CachePolicy::Normal);
607
608 let result = check_cache(&task, "sha256:nonexistent", tmp.path(), None);
609 assert!(!result.hit);
610 assert!(result.path.is_none());
611 }
612
613 #[test]
614 fn test_cache_hit_after_store() {
615 let tmp = TempDir::new().unwrap();
616 let task = make_task("test", CachePolicy::Normal);
617 let digest = "sha256:testdigest123456";
618
619 let logs = TaskLogs {
621 stdout: Some("output".to_string()),
622 stderr: None,
623 };
624 store_result(&task, digest, tmp.path(), &logs, 100, 0, None).unwrap();
625
626 let result = check_cache(&task, digest, tmp.path(), None);
628 assert!(result.hit);
629 assert!(result.path.is_some());
630
631 let meta = load_metadata(result.path.as_ref().unwrap()).unwrap();
633 assert_eq!(meta.task_id, "test");
634 assert_eq!(meta.exit_code, 0);
635 assert_eq!(meta.duration_ms, 100);
636 }
637
638 #[test]
639 fn test_readonly_policy_no_write() {
640 let tmp = TempDir::new().unwrap();
641 let task = make_task("test", CachePolicy::Readonly);
642 let digest = "sha256:readonly123";
643
644 let logs = TaskLogs::default();
646 store_result(&task, digest, tmp.path(), &logs, 100, 0, None).unwrap();
647
648 let result = check_cache(&task, digest, tmp.path(), None);
650 assert!(!result.hit);
651 }
652
653 #[test]
654 fn test_writeonly_policy_no_read() {
655 let tmp = TempDir::new().unwrap();
656 let task = make_task("test", CachePolicy::Normal);
657 let digest = "sha256:writeonly123";
658
659 let logs = TaskLogs::default();
661 store_result(&task, digest, tmp.path(), &logs, 100, 0, None).unwrap();
662
663 let result = check_cache(&task, digest, tmp.path(), Some(CachePolicy::Writeonly));
665 assert!(!result.hit);
666 }
667
668 #[test]
669 fn test_disabled_policy_no_cache() {
670 let tmp = TempDir::new().unwrap();
671 let task = make_task("test", CachePolicy::Disabled);
672 let digest = "sha256:disabled123";
673
674 let logs = TaskLogs::default();
676 store_result(&task, digest, tmp.path(), &logs, 100, 0, None).unwrap();
677
678 let result = check_cache(&task, digest, tmp.path(), None);
680 assert!(!result.hit);
681 }
682
683 #[test]
684 fn test_policy_override() {
685 let tmp = TempDir::new().unwrap();
686 let task = make_task("test", CachePolicy::Normal);
687 let digest = "sha256:override123";
688
689 let logs = TaskLogs::default();
691 store_result(
692 &task,
693 digest,
694 tmp.path(),
695 &logs,
696 100,
697 0,
698 Some(CachePolicy::Disabled),
699 )
700 .unwrap();
701
702 let result = check_cache(&task, digest, tmp.path(), None);
704 assert!(!result.hit);
705 }
706}