use std::fmt::Write as _;
use std::path::{Path, PathBuf};
use anyhow::{Context, Result, anyhow, bail};
use async_nats::jetstream;
use base64::Engine as _;
use kanade_shared::kv::OBJECT_SCRIPTS;
use sha2::{Digest, Sha256};
use tokio::io::AsyncReadExt;
use tracing::{debug, info, warn};
pub(crate) const MAX_BODY_BYTES: usize = 4 * 1024 * 1024;
#[derive(Clone)]
pub struct ScriptCache {
js: jetstream::Context,
dir: PathBuf,
}
impl ScriptCache {
pub fn new(js: jetstream::Context, dir: PathBuf) -> Self {
Self { js, dir }
}
pub async fn resolve(&self, key: &str, expected_sha: &str) -> Result<String> {
let path = self.cache_path(expected_sha);
match tokio::fs::read(&path).await {
Ok(bytes) => {
let actual = hex_sha256(&bytes);
if actual == expected_sha {
debug!(%key, %expected_sha, size = bytes.len(), "script_cache: hit");
return bytes_to_string(bytes, key);
}
warn!(
%key, %expected_sha, %actual,
"script_cache: file digest mismatch — refetching",
);
}
Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
debug!(%key, %expected_sha, "script_cache: miss");
}
Err(e) => {
return Err(anyhow::Error::new(e))
.with_context(|| format!("read cache file {}", path.display()));
}
}
let body = self.fetch(key, expected_sha).await?;
self.write_atomic(&path, body.as_bytes()).await?;
info!(
%key, %expected_sha, size = body.len(),
"script_cache: populated",
);
Ok(body)
}
fn cache_path(&self, sha: &str) -> PathBuf {
self.dir.join(sha)
}
pub async fn digest_of(&self, key: &str) -> Result<String> {
let store = self
.js
.get_object_store(OBJECT_SCRIPTS)
.await
.with_context(|| format!("get_object_store {OBJECT_SCRIPTS}"))?;
let info = store
.info(key)
.await
.with_context(|| format!("object_store.info {key}"))?;
let raw = info
.digest
.as_deref()
.ok_or_else(|| anyhow!("script_object '{key}' has no broker digest"))?;
let b64 = raw.strip_prefix("SHA-256=").unwrap_or(raw);
use base64::alphabet::URL_SAFE;
use base64::engine::{DecodePaddingMode, GeneralPurpose, GeneralPurposeConfig};
static DECODER: GeneralPurpose = GeneralPurpose::new(
&URL_SAFE,
GeneralPurposeConfig::new().with_decode_padding_mode(DecodePaddingMode::Indifferent),
);
let bytes = DECODER
.decode(b64)
.with_context(|| format!("decode digest for '{key}' (raw='{raw}')"))?;
Ok(hex_lower(&bytes))
}
async fn fetch(&self, key: &str, expected_sha: &str) -> Result<String> {
let store = self
.js
.get_object_store(OBJECT_SCRIPTS)
.await
.with_context(|| format!("get_object_store {OBJECT_SCRIPTS}"))?;
let mut obj = store
.get(key)
.await
.with_context(|| format!("object_store.get {key}"))?;
let reported = obj.info().size;
let cap_hint = reported.min(MAX_BODY_BYTES);
let mut buf: Vec<u8> = Vec::with_capacity(cap_hint);
let mut hasher = Sha256::new();
let mut chunk = [0u8; 8192];
loop {
let n = obj
.read(&mut chunk)
.await
.with_context(|| format!("read script object '{key}'"))?;
if n == 0 {
break;
}
if buf.len() + n > MAX_BODY_BYTES {
bail!(
"script_object '{key}' exceeds {MAX_BODY_BYTES} byte cap \
(broker reported size {reported})",
);
}
hasher.update(&chunk[..n]);
buf.extend_from_slice(&chunk[..n]);
}
let actual = hex_lower(&hasher.finalize());
if actual != expected_sha {
return Err(anyhow!(
"script_object '{key}' sha256 mismatch: expected={expected_sha} \
actual={actual} — operator likely re-uploaded mid-rollout, \
refusing to execute unverified bytes",
));
}
bytes_to_string(buf, key)
}
async fn write_atomic(&self, final_path: &Path, bytes: &[u8]) -> Result<()> {
write_atomic_into(&self.dir, final_path, bytes).await
}
}
async fn write_atomic_into(dir: &Path, final_path: &Path, bytes: &[u8]) -> Result<()> {
tokio::fs::create_dir_all(dir)
.await
.with_context(|| format!("mkdir {}", dir.display()))?;
let suffix = uuid::Uuid::new_v4().simple().to_string();
let tmp = final_path.with_extension(format!("{suffix}.tmp"));
tokio::fs::write(&tmp, bytes)
.await
.with_context(|| format!("write {}", tmp.display()))?;
tokio::fs::rename(&tmp, final_path)
.await
.with_context(|| format!("rename {} -> {}", tmp.display(), final_path.display()))
}
fn bytes_to_string(bytes: Vec<u8>, key: &str) -> Result<String> {
String::from_utf8(bytes)
.with_context(|| format!("script_object '{key}' bytes are not valid UTF-8"))
}
fn hex_sha256(bytes: &[u8]) -> String {
let mut h = Sha256::new();
h.update(bytes);
hex_lower(&h.finalize())
}
fn hex_lower(bytes: &[u8]) -> String {
let mut out = String::with_capacity(bytes.len() * 2);
for b in bytes {
let _ = write!(out, "{b:02x}");
}
out
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn hex_lower_matches_known_vector() {
let expected = "b94d27b9934d3e08a52e52d7da7dabfac484efe37a5380ee9088f7ace2efcde9";
assert_eq!(hex_sha256(b"hello world"), expected);
}
#[test]
fn hex_lower_pads_each_byte_to_two_chars() {
assert_eq!(hex_lower(&[0x00, 0x0a, 0xff]), "000aff");
}
#[tokio::test]
async fn write_atomic_creates_dir_then_renames() {
let tmp = tempfile::tempdir().expect("tempdir");
let dir = tmp.path().join("nested").join("script_cache");
let path = dir.join("deadbeef");
write_atomic_into(&dir, &path, b"echo hi")
.await
.expect("write_atomic");
assert_eq!(tokio::fs::read(&path).await.expect("read back"), b"echo hi");
let mut rd = tokio::fs::read_dir(&dir).await.expect("read_dir");
while let Some(entry) = rd.next_entry().await.expect("next_entry") {
let name = entry.file_name();
let name = name.to_string_lossy();
assert!(!name.ends_with(".tmp"), "leftover tmp file: {name}");
}
}
#[tokio::test]
async fn write_atomic_overwrites_existing_file() {
let tmp = tempfile::tempdir().expect("tempdir");
let dir = tmp.path().to_path_buf();
let path = dir.join("aa");
write_atomic_into(&dir, &path, b"first")
.await
.expect("first");
write_atomic_into(&dir, &path, b"second")
.await
.expect("second");
assert_eq!(tokio::fs::read(&path).await.expect("read back"), b"second");
}
#[tokio::test]
async fn write_atomic_concurrent_same_sha_does_not_corrupt() {
let tmp = tempfile::tempdir().expect("tempdir");
let dir = std::sync::Arc::new(tmp.path().to_path_buf());
let path = dir.join("samesha");
let n_writers = 8usize;
let body_a = vec![b'A'; 4096];
let body_b = vec![b'B'; 4096];
let mut handles = Vec::with_capacity(n_writers);
for i in 0..n_writers {
let dir = dir.clone();
let path = path.clone();
let bytes = if i.is_multiple_of(2) {
body_a.clone()
} else {
body_b.clone()
};
handles.push(tokio::spawn(async move {
write_atomic_into(&dir, &path, &bytes).await
}));
}
for h in handles {
h.await.expect("join").expect("write_atomic_into");
}
let final_bytes = tokio::fs::read(&path).await.expect("final read");
assert_eq!(final_bytes.len(), 4096, "final length matches one body");
let all_a = final_bytes.iter().all(|&b| b == b'A');
let all_b = final_bytes.iter().all(|&b| b == b'B');
assert!(
all_a || all_b,
"final file is one of the candidate bodies, not a mix",
);
}
}