use std::collections::BTreeMap;
use std::collections::HashMap;
use std::collections::HashSet;
use std::fmt;
use std::io::BufReader;
use std::io::BufWriter;
use std::path::Path;
use std::path::PathBuf;
use std::sync::Arc;
use anyhow::Context;
use anyhow::Result;
use anyhow::bail;
use arrayvec::ArrayString;
use serde::Deserialize;
use serde::Serialize;
use tokio::fs;
use tracing::debug;
use tracing::info;
use url::Url;
use crate::ContentKind;
use crate::EvaluationPath;
use crate::PrimitiveValue;
use crate::Value;
use crate::backend::Input;
use crate::backend::TaskExecutionResult;
use crate::cache::hash::hash_sequence;
use crate::cache::lock::LockedFile;
use crate::config::ContentDigestMode;
use crate::http::Transferer;
use crate::v1::requirements::ContainerSource;
const CURRENT_CACHE_VERSION: u32 = 0;
const CALL_CACHE_SUBDIR: &str = "calls";
const CACHE_LOCK_FILE_NAME: &str = ".lock";
mod hash;
mod lock;
pub use hash::Hashable;
pub struct CallCacheExclusions {
pub inputs: HashSet<String>,
pub requirements: HashSet<String>,
pub hints: HashSet<String>,
}
#[derive(Clone)]
struct State {
_lock: Arc<LockedFile>,
cache_dir: Arc<PathBuf>,
transferer: Arc<dyn Transferer>,
mode: ContentDigestMode,
exclusions: Arc<CallCacheExclusions>,
}
impl State {
fn entry_path(&self, key: &Key) -> PathBuf {
self.cache_dir.join(key.as_str())
}
}
#[derive(Debug, Default, Serialize, Deserialize)]
struct Content {
location: String,
digest: ArrayString<64>,
}
impl Content {
async fn from_evaluation_path(
transferer: &dyn Transferer,
path: EvaluationPath,
kind: ContentKind,
mode: ContentDigestMode,
) -> Result<Self> {
let digest = path.calculate_digest(transferer, kind, mode).await?;
Ok(Self {
location: path.try_into()?,
digest: digest.to_hex(),
})
}
async fn to_evaluation_path(
&self,
transferer: &dyn Transferer,
kind: ContentKind,
mode: ContentDigestMode,
) -> Result<EvaluationPath> {
let path: EvaluationPath = self.location.parse()?;
let digest = path.calculate_digest(transferer, kind, mode).await?;
if digest.to_hex() != self.digest {
bail!(
"cached content `{location}` was modified",
location = self.location
);
}
Ok(path)
}
}
#[derive(Debug, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub struct CallCacheEntry {
version: u32,
command: ArrayString<64>,
container: Option<ContainerSource>,
default_container: Option<String>,
shell: String,
requirements: HashMap<String, ArrayString<64>>,
hints: HashMap<String, ArrayString<64>>,
inputs: HashMap<String, ArrayString<64>>,
exit: i32,
stdout: Content,
stderr: Content,
work: Content,
}
#[derive(Debug)]
pub struct Key {
key: ArrayString<64>,
command: ArrayString<64>,
default_container: Option<String>,
shell: String,
requirements: HashMap<String, ArrayString<64>>,
hints: HashMap<String, ArrayString<64>>,
backend_inputs: HashMap<String, ArrayString<64>>,
}
impl Key {
pub fn as_str(&self) -> &str {
self.key.as_str()
}
fn ensure_matches(
&self,
entry: &CallCacheEntry,
exclusions: &CallCacheExclusions,
) -> Result<()> {
fn compare_maps<K, V>(
a: &HashMap<K, V>,
b: &HashMap<K, V>,
kind: &str,
excluded: &HashSet<String>,
) -> Result<()>
where
K: std::hash::Hash + fmt::Display + Eq,
V: Eq,
{
for (k, v) in a {
let key_str = k.to_string();
if excluded.contains(&key_str) {
debug!("{} `{}` is excluded from cache checking, skipping", kind, k);
continue;
}
match b.get(k) {
Some(ov) => {
if v != ov {
bail!("{kind} `{k}` was modified")
}
}
None => bail!("{kind} `{k}` was added"),
}
}
for k in b.keys() {
let key_str = k.to_string();
if excluded.contains(&key_str) {
debug!("{} `{}` is excluded from cache checking, skipping", kind, k);
continue;
}
if !a.contains_key(k) {
bail!("{kind} `{k}` was removed");
}
}
Ok(())
}
if self.command != entry.command {
bail!("the command of the task was modified");
}
if self.default_container != entry.default_container {
bail!("the default container for the task was modified");
}
if self.shell != entry.shell {
bail!("the shell used by the task was modified");
}
compare_maps(
&self.requirements,
&entry.requirements,
"task requirement",
&exclusions.requirements,
)?;
compare_maps(&self.hints, &entry.hints, "task hint", &exclusions.hints)?;
compare_maps(
&self.backend_inputs,
&entry.inputs,
"task input",
&exclusions.inputs,
)?;
Ok(())
}
}
impl fmt::Display for Key {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
self.key.fmt(f)
}
}
#[derive(Debug, Copy, Clone)]
pub struct KeyRequest<'a> {
pub document_uri: &'a Url,
pub task_name: &'a str,
pub inputs: &'a BTreeMap<String, Value>,
pub command: &'a str,
pub default_container: Option<&'a str>,
pub shell: &'a str,
pub requirements: &'a HashMap<String, Value>,
pub hints: &'a HashMap<String, Value>,
pub backend_inputs: &'a [Input],
}
#[derive(Clone)]
pub struct CallCache(State);
impl CallCache {
pub async fn new(
cache_dir: Option<&Path>,
mode: ContentDigestMode,
transferer: Arc<dyn Transferer>,
exclusions: Arc<CallCacheExclusions>,
) -> Result<Self> {
let cache_dir = match cache_dir {
Some(cache_dir) => cache_dir.into(),
None => crate::config::cache_dir()?.join(CALL_CACHE_SUBDIR),
};
info!(
"using call cache directory `{cache_dir}`",
cache_dir = cache_dir.display()
);
fs::create_dir_all(&cache_dir).await.with_context(|| {
format!(
"failed to create call cache directory `{dir}`",
dir = cache_dir.display()
)
})?;
Ok(Self(State {
_lock: LockedFile::acquire_shared(&cache_dir.join(CACHE_LOCK_FILE_NAME), true)
.await?
.expect("file should exist")
.into(),
cache_dir: cache_dir.into(),
transferer,
mode,
exclusions,
}))
}
pub async fn key(&self, request: KeyRequest<'_>) -> Result<Key> {
let mut hasher = blake3::Hasher::new();
request.command.hash(&mut hasher);
let command_digest = hasher.finalize().to_hex();
let requirement_digests = request
.requirements
.iter()
.map(|(k, v)| {
let mut hasher = blake3::Hasher::new();
v.hash(&mut hasher);
(k.clone(), hasher.finalize().to_hex())
})
.collect();
let hint_digests = request
.hints
.iter()
.map(|(k, v)| {
let mut hasher = blake3::Hasher::new();
v.hash(&mut hasher);
(k.clone(), hasher.finalize().to_hex())
})
.collect();
let mut backend_inputs = HashMap::with_capacity(request.backend_inputs.len());
for input in request.backend_inputs {
let digest = input
.path()
.calculate_digest(self.0.transferer.as_ref(), input.kind(), self.0.mode)
.await?;
backend_inputs.insert(input.path().to_string(), digest.to_hex());
}
let mut hasher = blake3::Hasher::new();
request.document_uri.hash(&mut hasher);
request.task_name.hash(&mut hasher);
hash_sequence(
&mut hasher,
request
.inputs
.iter()
.filter(|(k, _)| !self.0.exclusions.inputs.contains(*k))
.collect::<Vec<_>>()
.into_iter(),
);
let key = hasher.finalize().to_hex();
Ok(Key {
key,
command: command_digest,
default_container: request.default_container.map(Into::into),
shell: request.shell.into(),
requirements: requirement_digests,
hints: hint_digests,
backend_inputs,
})
}
pub async fn get(&self, key: &Key) -> Result<Option<TaskExecutionResult>> {
let path = self.0.entry_path(key);
let file = match LockedFile::acquire_shared(&path, false).await? {
Some(file) => file,
None => return Ok(None),
};
let entry: CallCacheEntry = serde_json::from_reader(BufReader::new(file))
.with_context(|| format!("failed to deserialize call cache entry `{key}`"))?;
if entry.version != CURRENT_CACHE_VERSION {
bail!(
"cache entry `{key}` has a mismatched version: expected version is \
{CURRENT_CACHE_VERSION}, but the entry is {version}",
version = entry.version
);
}
key.ensure_matches(&entry, &self.0.exclusions)?;
let stdout = entry
.stdout
.to_evaluation_path(self.0.transferer.as_ref(), ContentKind::File, self.0.mode)
.await?;
let stderr = entry
.stderr
.to_evaluation_path(self.0.transferer.as_ref(), ContentKind::File, self.0.mode)
.await?;
let work = entry
.work
.to_evaluation_path(
self.0.transferer.as_ref(),
ContentKind::Directory,
self.0.mode,
)
.await?;
Ok(Some(TaskExecutionResult {
container: entry.container,
exit_code: entry.exit,
work_dir: work,
stdout: PrimitiveValue::new_file(String::try_from(stdout)?).into(),
stderr: PrimitiveValue::new_file(String::try_from(stderr)?).into(),
}))
}
pub async fn put(&self, key: Key, result: &TaskExecutionResult) -> Result<ArrayString<64>> {
let file = LockedFile::acquire_exclusive_truncated(&self.0.entry_path(&key)).await?;
let entry = CallCacheEntry {
version: CURRENT_CACHE_VERSION,
command: key.command,
container: result.container.clone(),
default_container: key.default_container,
shell: key.shell,
requirements: key.requirements,
hints: key.hints,
inputs: key.backend_inputs,
exit: result.exit_code,
stdout: Content::from_evaluation_path(
self.0.transferer.as_ref(),
result
.stdout
.as_file()
.expect("value should be a `File`")
.as_str()
.parse()?,
ContentKind::File,
self.0.mode,
)
.await?,
stderr: Content::from_evaluation_path(
self.0.transferer.as_ref(),
result
.stderr
.as_file()
.expect("value should be a `File`")
.as_str()
.parse()?,
ContentKind::File,
self.0.mode,
)
.await?,
work: Content::from_evaluation_path(
self.0.transferer.as_ref(),
result.work_dir.clone(),
ContentKind::Directory,
self.0.mode,
)
.await?,
};
serde_json::to_writer(BufWriter::new(file), &entry).with_context(|| {
format!(
"failed to serialize call cache entry `{key}`",
key = key.key
)
})?;
Ok(key.key)
}
}
#[cfg(test)]
mod test {
use pretty_assertions::assert_eq;
use tempfile::TempDir;
use tempfile::tempdir;
use super::*;
use crate::GuestPath;
use crate::digest::test::DigestTransferer;
use crate::digest::test::clear_digest_cache;
struct Paths {
source: PathBuf,
input: PathBuf,
stdout: PathBuf,
stderr: PathBuf,
work_dir: PathBuf,
}
struct Task {
paths: Paths,
document_uri: Url,
inputs: BTreeMap<String, Value>,
requirements: HashMap<String, Value>,
hints: HashMap<String, Value>,
default_container: Option<String>,
backend_inputs: [Input; 1],
}
impl Task {
fn new(paths: Paths) -> Self {
let document_uri = Url::from_file_path(&paths.source).unwrap();
let input = paths.input.clone();
Self {
paths,
document_uri,
inputs: BTreeMap::from([(
"file".into(),
PrimitiveValue::new_file(input.to_str().unwrap()).into(),
)]),
requirements: HashMap::from_iter([(
"container".into(),
PrimitiveValue::new_string("ubuntu:latest").into(),
)]),
hints: HashMap::from_iter([(
"foo".into(),
PrimitiveValue::new_string("bar").into(),
)]),
default_container: None,
backend_inputs: [Input::new(
ContentKind::File,
EvaluationPath::from_local_path(input),
Some(GuestPath::new("/mnt/task/0/input")),
)],
}
}
fn key_request(&self) -> KeyRequest<'_> {
KeyRequest {
document_uri: &self.document_uri,
task_name: "test",
inputs: &self.inputs,
command: "cat /mnt/task/0/input",
default_container: self.default_container.as_deref(),
shell: "bash",
requirements: &self.requirements,
hints: &self.hints,
backend_inputs: &self.backend_inputs,
}
}
}
async fn prepare_task(root_dir: &Path) -> Task {
let source_dir = root_dir.join("src");
fs::create_dir_all(&source_dir).await.unwrap();
let inputs_dir = root_dir.join("inputs");
fs::create_dir_all(&inputs_dir).await.unwrap();
let outputs_dir = root_dir.join("outputs");
fs::create_dir_all(&outputs_dir).await.unwrap();
let paths = Paths {
source: source_dir.join("source.wdl"),
input: inputs_dir.join("input"),
stdout: outputs_dir.join("stdout"),
stderr: outputs_dir.join("stderr"),
work_dir: outputs_dir.join("work"),
};
fs::write(&paths.source, "").await.unwrap();
fs::write(&paths.input, "hello world!").await.unwrap();
fs::write(&paths.stdout, "hello world!").await.unwrap();
fs::write(&paths.stderr, "").await.unwrap();
fs::create_dir(&paths.work_dir).await.unwrap();
Task::new(paths)
}
async fn populate_cache(cache: &CallCache, task: &Task) {
let key = cache.key(task.key_request()).await.unwrap();
assert!(cache.get(&key).await.unwrap().is_none());
let result = TaskExecutionResult {
container: None,
exit_code: 0,
work_dir: EvaluationPath::from_local_path(task.paths.work_dir.clone()),
stdout: PrimitiveValue::new_file(task.paths.stdout.to_str().unwrap()).into(),
stderr: PrimitiveValue::new_file(task.paths.stderr.to_str().unwrap()).into(),
};
cache.put(key, &result).await.unwrap();
let key = cache.key(task.key_request()).await.unwrap();
let cached_result = cache
.get(&key)
.await
.unwrap()
.expect("should have cache entry");
assert_eq!(
result.exit_code, cached_result.exit_code,
"exit code mismatch"
);
assert_eq!(
result.work_dir, cached_result.work_dir,
"work directory mismatch"
);
assert_eq!(
result.stdout.as_file().unwrap(),
cached_result.stdout.as_file().unwrap(),
"stdout mismatch"
);
assert_eq!(
result.stderr.as_file().unwrap(),
cached_result.stderr.as_file().unwrap(),
"stderr mismatch"
);
}
struct TestContext {
_root_dir: TempDir,
task: Task,
cache: CallCache,
}
impl TestContext {
async fn new() -> Self {
let root_dir = tempdir().expect("failed to create temporary directory");
let task = prepare_task(root_dir.path()).await;
let transfer = Arc::new(DigestTransferer::new([]));
let cache = CallCache::new(
Some(&root_dir.path().join("cache")),
ContentDigestMode::Strong,
transfer,
Arc::new(CallCacheExclusions {
inputs: HashSet::new(),
requirements: HashSet::new(),
hints: HashSet::new(),
}),
)
.await
.unwrap();
populate_cache(&cache, &task).await;
Self {
_root_dir: root_dir,
task,
cache,
}
}
}
#[tokio::test]
async fn modified_command() {
let ctx = TestContext::new().await;
let request = ctx.task.key_request();
let key = ctx
.cache
.key(KeyRequest {
command: "modified!",
..request
})
.await
.unwrap();
assert_eq!(
ctx.cache.get(&key).await.unwrap_err().to_string(),
"the command of the task was modified"
);
}
#[tokio::test]
async fn modified_default_container() {
let root_dir = tempdir().expect("failed to create temporary directory");
let mut task = prepare_task(root_dir.path()).await;
task.requirements.clear();
task.default_container = Some("ubuntu:latest".into());
let transfer = Arc::new(DigestTransferer::new([]));
let cache = CallCache::new(
Some(&root_dir.path().join("cache")),
ContentDigestMode::Strong,
transfer,
Arc::new(CallCacheExclusions {
inputs: HashSet::new(),
requirements: HashSet::new(),
hints: HashSet::new(),
}),
)
.await
.unwrap();
populate_cache(&cache, &task).await;
let modified_default = Some("ubuntu:cthulhu".to_string());
let key = cache
.key(KeyRequest {
default_container: modified_default.as_deref(),
..task.key_request()
})
.await
.unwrap();
assert_eq!(
cache.get(&key).await.unwrap_err().to_string(),
"the default container for the task was modified"
);
}
#[tokio::test]
async fn modified_shell() {
let ctx = TestContext::new().await;
let request = ctx.task.key_request();
let key = ctx
.cache
.key(KeyRequest {
shell: "modified!",
..request
})
.await
.unwrap();
assert_eq!(
ctx.cache.get(&key).await.unwrap_err().to_string(),
"the shell used by the task was modified"
);
}
#[tokio::test]
async fn requirement_removed() {
let ctx = TestContext::new().await;
let request = ctx.task.key_request();
let key = ctx
.cache
.key(KeyRequest {
requirements: &Default::default(),
..request
})
.await
.unwrap();
assert_eq!(
ctx.cache.get(&key).await.unwrap_err().to_string(),
"task requirement `container` was removed"
);
}
#[tokio::test]
async fn requirement_added() {
let ctx = TestContext::new().await;
let request = ctx.task.key_request();
let key = ctx
.cache
.key(KeyRequest {
requirements: &HashMap::from_iter([
(
"container".into(),
PrimitiveValue::new_string("ubuntu:latest").into(),
),
("memory".into(), 1000.into()),
]),
..request
})
.await
.unwrap();
assert_eq!(
ctx.cache.get(&key).await.unwrap_err().to_string(),
"task requirement `memory` was added"
);
}
#[tokio::test]
async fn requirement_modified() {
let ctx = TestContext::new().await;
let request = ctx.task.key_request();
let key = ctx
.cache
.key(KeyRequest {
requirements: &HashMap::from_iter([(
"container".into(),
PrimitiveValue::new_string("ubuntu:cthulhu").into(),
)]),
..request
})
.await
.unwrap();
assert_eq!(
ctx.cache.get(&key).await.unwrap_err().to_string(),
"task requirement `container` was modified"
);
}
#[tokio::test]
async fn hint_removed() {
let ctx = TestContext::new().await;
let request = ctx.task.key_request();
let key = ctx
.cache
.key(KeyRequest {
hints: &Default::default(),
..request
})
.await
.unwrap();
assert_eq!(
ctx.cache.get(&key).await.unwrap_err().to_string(),
"task hint `foo` was removed"
);
}
#[tokio::test]
async fn hint_added() {
let ctx = TestContext::new().await;
let request = ctx.task.key_request();
let key = ctx
.cache
.key(KeyRequest {
hints: &HashMap::from_iter([
("foo".into(), PrimitiveValue::new_string("bar").into()),
("max_memory".into(), 1000.into()),
]),
..request
})
.await
.unwrap();
assert_eq!(
ctx.cache.get(&key).await.unwrap_err().to_string(),
"task hint `max_memory` was added"
);
}
#[tokio::test]
async fn hint_modified() {
let ctx = TestContext::new().await;
let request = ctx.task.key_request();
let key = ctx
.cache
.key(KeyRequest {
hints: &HashMap::from_iter([(
"foo".into(),
PrimitiveValue::new_string("baz!").into(),
)]),
..request
})
.await
.unwrap();
assert_eq!(
ctx.cache.get(&key).await.unwrap_err().to_string(),
"task hint `foo` was modified"
);
}
#[tokio::test]
async fn backend_input_removed() {
let ctx = TestContext::new().await;
let request = ctx.task.key_request();
let key = ctx
.cache
.key(KeyRequest {
backend_inputs: &[],
..request
})
.await
.unwrap();
assert_eq!(
ctx.cache.get(&key).await.unwrap_err().to_string(),
format!(
"task input `{}` was removed",
ctx.task.paths.input.display()
)
);
}
#[tokio::test]
async fn backend_input_added() {
let ctx = TestContext::new().await;
let request = ctx.task.key_request();
let input2 = ctx.task.paths.input.with_file_name("input2");
fs::write(&input2, "hello world!!!").await.unwrap();
let key = ctx
.cache
.key(KeyRequest {
backend_inputs: &[
ctx.task.backend_inputs[0].clone(),
Input::new(
ContentKind::File,
EvaluationPath::from_local_path(input2.clone()),
Some(GuestPath::new("/mnt/task/0/input2")),
),
],
..request
})
.await
.unwrap();
assert_eq!(
ctx.cache.get(&key).await.unwrap_err().to_string(),
format!("task input `{}` was added", input2.display())
);
}
#[tokio::test]
async fn backend_input_modified() {
let ctx = TestContext::new().await;
fs::write(&ctx.task.paths.input, "modified!").await.unwrap();
clear_digest_cache();
let key = ctx.cache.key(ctx.task.key_request()).await.unwrap();
assert_eq!(
ctx.cache.get(&key).await.unwrap_err().to_string(),
format!(
"task input `{}` was modified",
ctx.task.paths.input.display()
)
);
}
#[tokio::test]
async fn stdout_modified() {
let ctx = TestContext::new().await;
fs::write(&ctx.task.paths.stdout, "modified!")
.await
.unwrap();
clear_digest_cache();
let key = ctx.cache.key(ctx.task.key_request()).await.unwrap();
assert_eq!(
ctx.cache.get(&key).await.unwrap_err().to_string(),
format!(
"cached content `{}` was modified",
ctx.task.paths.stdout.display()
)
);
}
#[tokio::test]
async fn stdout_missing() {
let ctx = TestContext::new().await;
fs::remove_file(&ctx.task.paths.stdout).await.unwrap();
clear_digest_cache();
let key = ctx.cache.key(ctx.task.key_request()).await.unwrap();
assert_eq!(
ctx.cache.get(&key).await.unwrap_err().to_string(),
format!(
"failed to read metadata of `{}`",
ctx.task.paths.stdout.display()
)
);
}
#[tokio::test]
async fn stderr_modified() {
let ctx = TestContext::new().await;
fs::write(&ctx.task.paths.stderr, "modified!")
.await
.unwrap();
clear_digest_cache();
let key = ctx.cache.key(ctx.task.key_request()).await.unwrap();
assert_eq!(
ctx.cache.get(&key).await.unwrap_err().to_string(),
format!(
"cached content `{}` was modified",
ctx.task.paths.stderr.display()
)
);
}
#[tokio::test]
async fn stderr_missing() {
let ctx = TestContext::new().await;
fs::remove_file(&ctx.task.paths.stderr).await.unwrap();
clear_digest_cache();
let key = ctx.cache.key(ctx.task.key_request()).await.unwrap();
assert_eq!(
ctx.cache.get(&key).await.unwrap_err().to_string(),
format!(
"failed to read metadata of `{}`",
ctx.task.paths.stderr.display()
)
);
}
#[tokio::test]
async fn work_dir_modified() {
let ctx = TestContext::new().await;
fs::write(&ctx.task.paths.work_dir.join("foo"), "added!")
.await
.unwrap();
clear_digest_cache();
let key = ctx.cache.key(ctx.task.key_request()).await.unwrap();
assert_eq!(
ctx.cache.get(&key).await.unwrap_err().to_string(),
format!(
"cached content `{}` was modified",
ctx.task.paths.work_dir.display()
)
);
}
#[tokio::test]
async fn work_dir_deleted() {
let ctx = TestContext::new().await;
fs::remove_dir_all(&ctx.task.paths.work_dir).await.unwrap();
clear_digest_cache();
let key = ctx.cache.key(ctx.task.key_request()).await.unwrap();
assert_eq!(
ctx.cache.get(&key).await.unwrap_err().to_string(),
format!(
"failed to read metadata of `{}`",
ctx.task.paths.work_dir.display()
)
);
}
#[tokio::test]
async fn excluded_requirement_modified() {
let ctx = TestContext::new().await;
let cache = CallCache::new(
Some(ctx.cache.0.cache_dir.as_path()),
ContentDigestMode::Strong,
Arc::new(DigestTransferer::new([])),
Arc::new(CallCacheExclusions {
inputs: HashSet::new(),
requirements: HashSet::from_iter(["memory".to_string()]),
hints: HashSet::new(),
}),
)
.await
.unwrap();
let request = ctx.task.key_request();
let key = cache
.key(KeyRequest {
requirements: &HashMap::from_iter([
(
"container".into(),
PrimitiveValue::new_string("ubuntu:latest").into(),
),
("memory".into(), 1000.into()),
]),
..request
})
.await
.unwrap();
assert!(
cache.get(&key).await.is_ok(),
"Expected cache hit when excluded requirement is added"
);
let key = cache
.key(KeyRequest {
requirements: &HashMap::from_iter([(
"container".into(),
PrimitiveValue::new_string("ubuntu:cthulhu").into(),
)]),
..request
})
.await
.unwrap();
assert_eq!(
cache.get(&key).await.unwrap_err().to_string(),
"task requirement `container` was modified"
);
}
#[tokio::test]
async fn excluded_hint_modified() {
let ctx = TestContext::new().await;
let cache = CallCache::new(
Some(ctx.cache.0.cache_dir.as_path()),
ContentDigestMode::Strong,
Arc::new(DigestTransferer::new([])),
Arc::new(CallCacheExclusions {
inputs: HashSet::new(),
requirements: HashSet::new(),
hints: HashSet::from_iter(["localization_optional".to_string()]),
}),
)
.await
.unwrap();
let request = ctx.task.key_request();
let key = cache
.key(KeyRequest {
hints: &HashMap::from_iter([
("foo".into(), PrimitiveValue::new_string("bar").into()),
(
"localization_optional".into(),
PrimitiveValue::new_string("true").into(),
),
]),
..request
})
.await
.unwrap();
assert!(
cache.get(&key).await.is_ok(),
"Expected cache hit when excluded hint is added"
);
let key = cache
.key(KeyRequest {
hints: &HashMap::from_iter([
("foo".into(), PrimitiveValue::new_string("baz").into()),
(
"localization_optional".into(),
PrimitiveValue::new_string("true").into(),
),
]),
..request
})
.await
.unwrap();
assert_eq!(
cache.get(&key).await.unwrap_err().to_string(),
"task hint `foo` was modified"
);
}
#[tokio::test]
async fn excluded_input_modified() {
let root_dir = tempdir().expect("failed to create temporary directory");
let cache = CallCache::new(
Some(&root_dir.path().join("cache")),
ContentDigestMode::Strong,
Arc::new(DigestTransferer::new([])),
Arc::new(CallCacheExclusions {
inputs: HashSet::from_iter(["file".to_string()]),
requirements: HashSet::new(),
hints: HashSet::new(),
}),
)
.await
.unwrap();
let root_dir = tempdir().expect("failed to create temporary directory");
let mut task = prepare_task(root_dir.path()).await;
task.inputs.insert(
"file2".into(),
PrimitiveValue::new_file(task.paths.input.clone().to_str().unwrap()).into(),
);
let request = task.key_request();
let original_key = cache.key(task.key_request()).await.unwrap();
let key = cache
.key(KeyRequest {
inputs: &BTreeMap::from([
(
"file".into(),
PrimitiveValue::new_file(task.paths.stdout.clone().to_str().unwrap())
.into(),
),
(
"file2".into(),
PrimitiveValue::new_file(task.paths.input.clone().to_str().unwrap()).into(),
),
]),
..request
})
.await
.unwrap();
assert!(
original_key.key == key.key,
"Expected cache key to be the same when excluded input is modified"
);
let key = cache
.key(KeyRequest {
inputs: &BTreeMap::from([(
"file2".into(),
PrimitiveValue::new_file(task.paths.stdout.clone().to_str().unwrap()).into(),
)]),
..request
})
.await
.unwrap();
assert_ne!(
original_key.key, key.key,
"Expected key change when non-excluded input is modified"
);
}
}