use std::collections::HashMap;
use std::fs;
use std::io::Read;
use std::path::{Path, PathBuf};
use std::sync::{Arc, Mutex, OnceLock};
use std::time::{SystemTime, UNIX_EPOCH};
use super::request_body::parse_json_body_allow_empty;
use super::transport::{json_error, json_response};
use crate::application::CreateKvInput;
use crate::json::{parse_json, to_string as json_to_string, Map, Value as JsonValue};
use crate::storage::schema::Value;
use crate::RedDBServer;
const RED_CONFIG_COLLECTION: &str = "red_config";
const AI_MODEL_KEY_PREFIX: &str = "red.config.ai.models.";
const AI_LOCAL_CACHE_DIR_KEY: &str = "red.config.ai.local.cache_dir";
const AI_LOCAL_FIXTURE_DIR_KEY: &str = "red.config.ai.local.fixture_dir";
const CACHE_DIR_NAME: &str = "ai_models_cache";
const STAGING_DIR_NAME: &str = ".staging";
const PURGE_DIR_NAME: &str = ".purge";
const MANIFEST_FILE: &str = "manifest.json";
const PULL_REJECTED_PLAINTEXT_FIELDS: &[&str] = &[
"api_key",
"apikey",
"api_token",
"token",
"auth_token",
"bearer_token",
"password",
"secret",
"hf_token",
"huggingface_token",
"huggingface_api_key",
];
const STATUS_REGISTERED: &str = "registered";
const STATUS_INSTALLED: &str = "installed";
const STATUS_MISSING: &str = "missing";
const STATUS_UNHEALTHY: &str = "unhealthy";
impl RedDBServer {
pub(crate) fn handle_ai_model_pull(&self, name: &str, body: Vec<u8>) -> HandlerResp {
let name = match validate_path_name(name) {
Ok(value) => value,
Err(resp) => return resp,
};
let payload = match parse_json_body_allow_empty(&body) {
Ok(p) => p,
Err(resp) => return resp,
};
for field in PULL_REJECTED_PLAINTEXT_FIELDS {
if payload.get(field).is_some() {
return json_error(
400,
format!(
"field '{field}' is rejected: pull must not accept plaintext credentials. \
Store the secret in the vault at 'red.secret.ai.huggingface.{{alias}}' and \
reference it via the model's 'credential_alias' or pass 'credential_alias' \
on the pull request."
),
);
}
}
let entry = match self.read_model_entry(&name) {
Ok(Some(entry)) => entry,
Ok(None) => {
return json_error(404, format!("local AI model '{name}' is not registered"));
}
Err(err) => return json_error(500, err),
};
let credential_alias = payload
.get("credential_alias")
.and_then(JsonValue::as_str)
.map(|s| s.trim().to_string())
.filter(|s| !s.is_empty())
.or_else(|| {
entry
.get("credential_alias")
.and_then(JsonValue::as_str)
.map(|s| s.trim().to_string())
.filter(|s| !s.is_empty())
});
let _resolved_credential =
match self.resolve_pull_credential(&entry, credential_alias.as_deref()) {
Ok(value) => value,
Err((status, message)) => return json_error(status, message),
};
let fixture_dir = match resolve_fixture_dir(&payload, |k| self.read_config_text(k)) {
Ok(p) => p,
Err(err) => return json_error(400, err),
};
if !fixture_dir.is_dir() {
return json_error(
400,
format!(
"fixture_dir '{}' does not exist or is not a directory",
fixture_dir.display()
),
);
}
let cache_root = match self.cache_root() {
Ok(p) => p,
Err(err) => return json_error(500, err),
};
let model_dir = cache_root.join(&name);
let lock_key = lock_key(&cache_root, &name);
let lock = acquire_model_lock(&lock_key);
let _guard = lock.lock().expect("model lock poisoned");
let manifest = match install_artifacts(&entry, &cache_root, &model_dir, &fixture_dir) {
Ok(m) => m,
Err(err) => return json_error(500, err),
};
if let Err(err) = self.stamp_installed(&name, &model_dir, &manifest) {
return json_error(500, err);
}
let mut object = Map::new();
object.insert("ok".to_string(), JsonValue::Bool(true));
object.insert("name".to_string(), JsonValue::String(name));
object.insert(
"status".to_string(),
JsonValue::String(STATUS_INSTALLED.into()),
);
object.insert(
"cache_dir".to_string(),
JsonValue::String(model_dir.display().to_string()),
);
object.insert("manifest".to_string(), manifest_to_json(&manifest));
json_response(200, JsonValue::Object(object))
}
pub(crate) fn handle_ai_model_cache_status(&self, name: &str) -> HandlerResp {
let name = match validate_path_name(name) {
Ok(v) => v,
Err(resp) => return resp,
};
if self.read_model_entry(&name).ok().flatten().is_none() {
return json_error(404, format!("local AI model '{name}' is not registered"));
}
let cache_root = match self.cache_root() {
Ok(p) => p,
Err(err) => return json_error(500, err),
};
let model_dir = cache_root.join(&name);
let report = inspect_cache(&model_dir);
let mut object = Map::new();
object.insert("ok".to_string(), JsonValue::Bool(true));
object.insert("name".to_string(), JsonValue::String(name));
object.insert(
"status".to_string(),
JsonValue::String(report.status.to_string()),
);
object.insert(
"cache_dir".to_string(),
JsonValue::String(model_dir.display().to_string()),
);
if let Some(manifest) = report.manifest {
object.insert("manifest".to_string(), manifest_to_json(&manifest));
}
if let Some(detail) = report.detail {
object.insert("detail".to_string(), JsonValue::String(detail));
}
object.insert(
"footprint_bytes".to_string(),
JsonValue::Number(report.footprint_bytes as f64),
);
json_response(200, JsonValue::Object(object))
}
pub(crate) fn handle_ai_model_cache_drop(&self, name: &str) -> HandlerResp {
let name = match validate_path_name(name) {
Ok(v) => v,
Err(resp) => return resp,
};
if self.read_model_entry(&name).ok().flatten().is_none() {
return json_error(404, format!("local AI model '{name}' is not registered"));
}
let cache_root = match self.cache_root() {
Ok(p) => p,
Err(err) => return json_error(500, err),
};
let model_dir = cache_root.join(&name);
let lock_key = lock_key(&cache_root, &name);
let lock = acquire_model_lock(&lock_key);
let _guard = lock.lock().expect("model lock poisoned");
let removed = match drop_cache(&cache_root, &model_dir) {
Ok(value) => value,
Err(err) => return json_error(500, err),
};
if let Err(err) = self.stamp_registered(&name) {
return json_error(500, err);
}
let mut object = Map::new();
object.insert("ok".to_string(), JsonValue::Bool(true));
object.insert("name".to_string(), JsonValue::String(name));
object.insert("removed".to_string(), JsonValue::Bool(removed));
object.insert(
"status".to_string(),
JsonValue::String(STATUS_REGISTERED.into()),
);
json_response(200, JsonValue::Object(object))
}
fn resolve_pull_credential(
&self,
_entry: &JsonValue,
credential_alias: Option<&str>,
) -> Result<Option<String>, (u16, String)> {
let Some(alias) = credential_alias else {
return Ok(None);
};
let result = crate::ai::resolve_api_key(
&crate::ai::AiProvider::HuggingFace,
Some(alias),
|kv_key| {
if kv_key.starts_with("red.secret.") {
return Ok(self.runtime().vault_kv_get(kv_key));
}
match self
.entity_use_cases()
.get_kv(RED_CONFIG_COLLECTION, kv_key)
{
Ok(Some((Value::Text(secret), _))) => Ok(Some(secret.to_string())),
Ok(_) => Ok(None),
Err(err) => Err(crate::RedDBError::Query(format!(
"failed to read AI credential store: {err}"
))),
}
},
);
match result {
Ok(key) if !key.trim().is_empty() => Ok(Some(key)),
Ok(_) => Err((
400,
format!(
"credential_alias '{alias}' resolved to an empty secret; store the \
HuggingFace token at 'red.secret.ai.huggingface.{alias}' before pulling"
),
)),
Err(err) => Err((
400,
format!(
"failed to resolve HuggingFace credentials for alias '{alias}': {err}. \
Store the token at 'red.secret.ai.huggingface.{alias}' in the vault."
),
)),
}
}
fn read_model_entry(&self, name: &str) -> Result<Option<JsonValue>, String> {
let key = format!("{AI_MODEL_KEY_PREFIX}{name}");
match self.entity_use_cases().get_kv(RED_CONFIG_COLLECTION, &key) {
Ok(Some((Value::Text(text), _))) => match parse_json(&text) {
Ok(parsed) => Ok(Some(JsonValue::from(parsed))),
Err(err) => Err(format!("model entry for '{name}' is corrupted: {err}")),
},
Ok(_) => Ok(None),
Err(err) => Err(format!("failed to read model registry: {err}")),
}
}
fn read_config_text(&self, key: &str) -> Option<String> {
match self.entity_use_cases().get_kv(RED_CONFIG_COLLECTION, key) {
Ok(Some((Value::Text(s), _))) => {
let trimmed = s.trim().to_string();
if trimmed.is_empty() {
None
} else {
Some(trimmed)
}
}
_ => None,
}
}
fn cache_root(&self) -> Result<PathBuf, String> {
if let Some(override_path) = self.read_config_text(AI_LOCAL_CACHE_DIR_KEY) {
let p = PathBuf::from(override_path);
ensure_dir(&p)?;
return Ok(p);
}
let db = self.runtime().db();
let store = db.store();
let db_path = store.db_path();
let base = match db_path {
Some(p) => match p.parent() {
Some(parent) if !parent.as_os_str().is_empty() => parent.to_path_buf(),
_ => std::env::temp_dir(),
},
None => std::env::temp_dir(),
};
let root = base.join(CACHE_DIR_NAME);
ensure_dir(&root)?;
Ok(root)
}
fn stamp_installed(
&self,
name: &str,
cache_dir: &Path,
manifest: &Manifest,
) -> Result<(), String> {
self.rewrite_model_entry(name, |obj| {
obj.insert(
"status".to_string(),
JsonValue::String(STATUS_INSTALLED.into()),
);
obj.insert(
"cache_dir".to_string(),
JsonValue::String(cache_dir.display().to_string()),
);
obj.insert(
"installed_at_unix_ms".to_string(),
JsonValue::Number(manifest.installed_at_unix_ms as f64),
);
obj.insert(
"cache_size_bytes".to_string(),
JsonValue::Number(manifest.total_size_bytes as f64),
);
})
}
fn stamp_registered(&self, name: &str) -> Result<(), String> {
self.rewrite_model_entry(name, |obj| {
obj.insert(
"status".to_string(),
JsonValue::String(STATUS_REGISTERED.into()),
);
obj.remove("cache_dir");
obj.remove("installed_at_unix_ms");
obj.remove("cache_size_bytes");
})
}
fn rewrite_model_entry<F: FnOnce(&mut Map<String, JsonValue>)>(
&self,
name: &str,
edit: F,
) -> Result<(), String> {
let key = format!("{AI_MODEL_KEY_PREFIX}{name}");
let raw = match self.entity_use_cases().get_kv(RED_CONFIG_COLLECTION, &key) {
Ok(Some((Value::Text(s), _))) => s.to_string(),
Ok(_) => return Err(format!("local AI model '{name}' is not registered")),
Err(err) => return Err(format!("failed to read model registry: {err}")),
};
let parsed = parse_json(&raw)
.map_err(|err| format!("model entry for '{name}' is corrupted: {err}"))?;
let mut value = JsonValue::from(parsed);
let JsonValue::Object(ref mut object) = value else {
return Err(format!("model entry for '{name}' is not an object"));
};
object.insert(
"updated_at_unix_ms".to_string(),
JsonValue::Number(now_unix_ms() as f64),
);
edit(object);
let encoded = json_to_string(&value)
.map_err(|err| format!("failed to re-encode model entry: {err}"))?;
let _ = self
.entity_use_cases()
.delete_kv(RED_CONFIG_COLLECTION, &key);
self.entity_use_cases()
.create_kv(CreateKvInput {
collection: RED_CONFIG_COLLECTION.to_string(),
key,
value: Value::text(encoded),
metadata: Vec::new(),
})
.map(|_| ())
.map_err(|err| format!("failed to persist model update: {err}"))
}
}
type HandlerResp = crate::server::transport::HttpResponse;
fn validate_path_name(name: &str) -> Result<String, HandlerResp> {
let trimmed = name.trim();
if trimmed.is_empty() {
return Err(json_error(400, "model name path segment cannot be empty"));
}
if !trimmed
.chars()
.all(|c| c.is_ascii_alphanumeric() || c == '_' || c == '-')
{
return Err(json_error(
400,
format!("model name '{trimmed}' contains unsupported characters"),
));
}
Ok(trimmed.to_string())
}
fn resolve_fixture_dir<F>(payload: &JsonValue, config_lookup: F) -> Result<PathBuf, String>
where
F: FnOnce(&str) -> Option<String>,
{
if let Some(value) = payload.get("fixture_dir").and_then(JsonValue::as_str) {
let trimmed = value.trim();
if trimmed.is_empty() {
return Err("'fixture_dir' cannot be empty".to_string());
}
return Ok(PathBuf::from(trimmed));
}
if let Some(value) = config_lookup(AI_LOCAL_FIXTURE_DIR_KEY) {
return Ok(PathBuf::from(value));
}
Err(format!(
"no artifact source configured: provide 'fixture_dir' in the request body or set '{AI_LOCAL_FIXTURE_DIR_KEY}'; live HuggingFace pull is not implemented in this slice"
))
}
fn ensure_dir(path: &Path) -> Result<(), String> {
if path.exists() {
if !path.is_dir() {
return Err(format!(
"cache path '{}' exists but is not a directory",
path.display()
));
}
return Ok(());
}
fs::create_dir_all(path)
.map_err(|err| format!("failed to create directory '{}': {err}", path.display()))
}
#[derive(Debug, Clone)]
struct ManifestFile {
path: String,
sha256_hex: String,
size_bytes: u64,
}
#[derive(Debug, Clone)]
struct Manifest {
name: String,
source: String,
revision: String,
task: String,
engine: String,
dimensions: u32,
installed_at_unix_ms: u64,
total_size_bytes: u64,
files: Vec<ManifestFile>,
}
fn install_artifacts(
entry: &JsonValue,
cache_root: &Path,
model_dir: &Path,
fixture_dir: &Path,
) -> Result<Manifest, String> {
let staging_root = cache_root.join(STAGING_DIR_NAME);
ensure_dir(&staging_root)?;
let unique = unique_suffix();
let name = entry
.get("name")
.and_then(JsonValue::as_str)
.unwrap_or("model")
.to_string();
let staging_dir = staging_root.join(format!("{}-{}", name, unique));
if staging_dir.exists() {
let _ = fs::remove_dir_all(&staging_dir);
}
fs::create_dir_all(&staging_dir).map_err(|err| {
format!(
"failed to create staging dir '{}': {err}",
staging_dir.display()
)
})?;
let result = (|| -> Result<Manifest, String> {
let mut files = Vec::new();
let mut total: u64 = 0;
let mut entries = collect_files_relative(fixture_dir)?;
entries.sort_by(|a, b| a.relative.cmp(&b.relative));
if entries.is_empty() {
return Err(format!(
"fixture_dir '{}' contains no files to install",
fixture_dir.display()
));
}
for entry in entries {
let src = entry.absolute;
let dst = staging_dir.join(&entry.relative);
if let Some(parent) = dst.parent() {
fs::create_dir_all(parent)
.map_err(|err| format!("failed to create dir '{}': {err}", parent.display()))?;
}
fs::copy(&src, &dst)
.map_err(|err| format!("failed to copy '{}': {err}", src.display()))?;
let (sha, size) = sha256_file(&dst)
.map_err(|err| format!("failed to hash '{}': {err}", dst.display()))?;
total = total.saturating_add(size);
files.push(ManifestFile {
path: entry.relative,
sha256_hex: sha,
size_bytes: size,
});
}
let manifest = Manifest {
name: entry
.get("name")
.and_then(JsonValue::as_str)
.unwrap_or("")
.to_string(),
source: entry
.get("source")
.and_then(JsonValue::as_str)
.unwrap_or("")
.to_string(),
revision: entry
.get("revision")
.and_then(JsonValue::as_str)
.unwrap_or("")
.to_string(),
task: entry
.get("task")
.and_then(JsonValue::as_str)
.unwrap_or("embedding")
.to_string(),
engine: entry
.get("engine")
.and_then(JsonValue::as_str)
.unwrap_or("candle")
.to_string(),
dimensions: entry
.get("dimensions")
.and_then(JsonValue::as_u64)
.unwrap_or(0) as u32,
installed_at_unix_ms: now_unix_ms(),
total_size_bytes: total,
files,
};
let manifest_json = manifest_to_json(&manifest);
let manifest_text = json_to_string(&manifest_json)
.map_err(|err| format!("failed to encode manifest: {err}"))?;
let manifest_tmp = staging_dir.join(format!("{}.tmp", MANIFEST_FILE));
fs::write(&manifest_tmp, manifest_text.as_bytes())
.map_err(|err| format!("failed to write manifest tmp: {err}"))?;
fs::rename(&manifest_tmp, staging_dir.join(MANIFEST_FILE))
.map_err(|err| format!("failed to finalize manifest: {err}"))?;
Ok(manifest)
})();
let manifest = match result {
Ok(m) => m,
Err(err) => {
let _ = fs::remove_dir_all(&staging_dir);
return Err(err);
}
};
let purge_root = cache_root.join(PURGE_DIR_NAME);
ensure_dir(&purge_root)?;
let purge_dir = purge_root.join(format!("{}-{}", name, unique));
if model_dir.exists() {
fs::rename(model_dir, &purge_dir).map_err(|err| {
format!(
"failed to move existing cache aside ('{}' → '{}'): {err}",
model_dir.display(),
purge_dir.display()
)
})?;
}
if let Err(err) = fs::rename(&staging_dir, model_dir) {
if purge_dir.exists() {
let _ = fs::rename(&purge_dir, model_dir);
}
let _ = fs::remove_dir_all(&staging_dir);
return Err(format!(
"failed to promote staging dir '{}' → '{}': {err}",
staging_dir.display(),
model_dir.display()
));
}
if purge_dir.exists() {
let _ = fs::remove_dir_all(&purge_dir);
}
Ok(manifest)
}
#[derive(Debug)]
struct CollectedFile {
absolute: PathBuf,
relative: String,
}
fn collect_files_relative(root: &Path) -> Result<Vec<CollectedFile>, String> {
let mut out = Vec::new();
let mut stack: Vec<(PathBuf, String)> = vec![(root.to_path_buf(), String::new())];
while let Some((dir, prefix)) = stack.pop() {
let entries = fs::read_dir(&dir)
.map_err(|err| format!("failed to read fixture dir '{}': {err}", dir.display()))?;
for entry in entries {
let entry = entry
.map_err(|err| format!("fixture dir entry error in '{}': {err}", dir.display()))?;
let file_type = entry
.file_type()
.map_err(|err| format!("fixture file type error: {err}"))?;
let name = entry.file_name().to_string_lossy().to_string();
if name.starts_with('.') || name == MANIFEST_FILE {
continue;
}
let rel = if prefix.is_empty() {
name.clone()
} else {
format!("{prefix}/{name}")
};
if file_type.is_dir() {
stack.push((entry.path(), rel));
} else if file_type.is_file() {
out.push(CollectedFile {
absolute: entry.path(),
relative: rel,
});
}
}
}
Ok(out)
}
fn drop_cache(cache_root: &Path, model_dir: &Path) -> Result<bool, String> {
if !model_dir.exists() {
return Ok(false);
}
let purge_root = cache_root.join(PURGE_DIR_NAME);
ensure_dir(&purge_root)?;
let unique = unique_suffix();
let model_name = model_dir
.file_name()
.and_then(|s| s.to_str())
.unwrap_or("model");
let purge_dir = purge_root.join(format!("{}-{}", model_name, unique));
fs::rename(model_dir, &purge_dir).map_err(|err| {
format!(
"failed to move cache aside ('{}' → '{}'): {err}",
model_dir.display(),
purge_dir.display()
)
})?;
let _ = fs::remove_dir_all(&purge_dir);
Ok(true)
}
#[derive(Debug)]
struct CacheReport {
status: &'static str,
manifest: Option<Manifest>,
detail: Option<String>,
footprint_bytes: u64,
}
fn inspect_cache(model_dir: &Path) -> CacheReport {
if !model_dir.exists() {
return CacheReport {
status: STATUS_MISSING,
manifest: None,
detail: None,
footprint_bytes: 0,
};
}
let manifest_path = model_dir.join(MANIFEST_FILE);
let manifest_text = match fs::read_to_string(&manifest_path) {
Ok(s) => s,
Err(err) => {
return CacheReport {
status: STATUS_UNHEALTHY,
manifest: None,
detail: Some(format!("manifest unreadable: {err}")),
footprint_bytes: directory_footprint(model_dir),
};
}
};
let parsed = match parse_json(&manifest_text).map(JsonValue::from) {
Ok(v) => v,
Err(err) => {
return CacheReport {
status: STATUS_UNHEALTHY,
manifest: None,
detail: Some(format!("manifest is not valid JSON: {err}")),
footprint_bytes: directory_footprint(model_dir),
};
}
};
let manifest = match manifest_from_json(&parsed) {
Ok(m) => m,
Err(err) => {
return CacheReport {
status: STATUS_UNHEALTHY,
manifest: None,
detail: Some(format!("manifest schema invalid: {err}")),
footprint_bytes: directory_footprint(model_dir),
};
}
};
let mut footprint: u64 = 0;
for entry in &manifest.files {
let path = model_dir.join(&entry.path);
let metadata = match fs::metadata(&path) {
Ok(m) => m,
Err(err) => {
return CacheReport {
status: STATUS_UNHEALTHY,
manifest: Some(manifest.clone()),
detail: Some(format!("missing artifact file '{}': {err}", entry.path)),
footprint_bytes: directory_footprint(model_dir),
};
}
};
if metadata.len() != entry.size_bytes {
return CacheReport {
status: STATUS_UNHEALTHY,
manifest: Some(manifest.clone()),
detail: Some(format!(
"size mismatch for '{}': manifest={} actual={}",
entry.path,
entry.size_bytes,
metadata.len()
)),
footprint_bytes: directory_footprint(model_dir),
};
}
footprint = footprint.saturating_add(metadata.len());
}
CacheReport {
status: STATUS_INSTALLED,
manifest: Some(manifest),
detail: None,
footprint_bytes: footprint,
}
}
fn directory_footprint(path: &Path) -> u64 {
let mut total: u64 = 0;
let mut stack = vec![path.to_path_buf()];
while let Some(dir) = stack.pop() {
let Ok(entries) = fs::read_dir(&dir) else {
continue;
};
for entry in entries.flatten() {
let Ok(meta) = entry.metadata() else {
continue;
};
if meta.is_dir() {
stack.push(entry.path());
} else if meta.is_file() {
total = total.saturating_add(meta.len());
}
}
}
total
}
fn manifest_to_json(manifest: &Manifest) -> JsonValue {
let mut object = Map::new();
object.insert("name".to_string(), JsonValue::String(manifest.name.clone()));
object.insert(
"source".to_string(),
JsonValue::String(manifest.source.clone()),
);
object.insert(
"revision".to_string(),
JsonValue::String(manifest.revision.clone()),
);
object.insert("task".to_string(), JsonValue::String(manifest.task.clone()));
object.insert(
"engine".to_string(),
JsonValue::String(manifest.engine.clone()),
);
object.insert(
"dimensions".to_string(),
JsonValue::Number(manifest.dimensions as f64),
);
object.insert(
"installed_at_unix_ms".to_string(),
JsonValue::Number(manifest.installed_at_unix_ms as f64),
);
object.insert(
"total_size_bytes".to_string(),
JsonValue::Number(manifest.total_size_bytes as f64),
);
let files: Vec<JsonValue> = manifest
.files
.iter()
.map(|f| {
let mut o = Map::new();
o.insert("path".to_string(), JsonValue::String(f.path.clone()));
o.insert(
"sha256".to_string(),
JsonValue::String(f.sha256_hex.clone()),
);
o.insert(
"size_bytes".to_string(),
JsonValue::Number(f.size_bytes as f64),
);
JsonValue::Object(o)
})
.collect();
object.insert("files".to_string(), JsonValue::Array(files));
JsonValue::Object(object)
}
fn manifest_from_json(value: &JsonValue) -> Result<Manifest, String> {
let object = value
.as_object()
.ok_or_else(|| "manifest is not an object".to_string())?;
let required_str = |k: &str| -> Result<String, String> {
object
.get(k)
.and_then(JsonValue::as_str)
.map(str::to_string)
.ok_or_else(|| format!("manifest field '{k}' missing or not a string"))
};
let name = required_str("name")?;
let source = required_str("source")?;
let revision = required_str("revision")?;
let task = required_str("task")?;
let engine = required_str("engine")?;
let dimensions = object
.get("dimensions")
.and_then(JsonValue::as_u64)
.ok_or_else(|| "manifest field 'dimensions' missing or not a number".to_string())?
as u32;
let installed_at_unix_ms = object
.get("installed_at_unix_ms")
.and_then(JsonValue::as_u64)
.ok_or_else(|| "manifest field 'installed_at_unix_ms' missing".to_string())?;
let total_size_bytes = object
.get("total_size_bytes")
.and_then(JsonValue::as_u64)
.ok_or_else(|| "manifest field 'total_size_bytes' missing".to_string())?;
let files_raw = object
.get("files")
.and_then(JsonValue::as_array)
.ok_or_else(|| "manifest field 'files' must be an array".to_string())?;
let mut files = Vec::with_capacity(files_raw.len());
for (idx, raw) in files_raw.iter().enumerate() {
let entry = raw
.as_object()
.ok_or_else(|| format!("manifest files[{idx}] is not an object"))?;
let path = entry
.get("path")
.and_then(JsonValue::as_str)
.map(str::to_string)
.ok_or_else(|| format!("manifest files[{idx}].path missing"))?;
let sha256_hex = entry
.get("sha256")
.and_then(JsonValue::as_str)
.map(str::to_string)
.ok_or_else(|| format!("manifest files[{idx}].sha256 missing"))?;
let size_bytes = entry
.get("size_bytes")
.and_then(JsonValue::as_u64)
.ok_or_else(|| format!("manifest files[{idx}].size_bytes missing"))?;
files.push(ManifestFile {
path,
sha256_hex,
size_bytes,
});
}
Ok(Manifest {
name,
source,
revision,
task,
engine,
dimensions,
installed_at_unix_ms,
total_size_bytes,
files,
})
}
fn sha256_file(path: &Path) -> std::io::Result<(String, u64)> {
let mut file = fs::File::open(path)?;
let mut hasher = crate::crypto::sha256::Sha256::new();
let mut buf = [0u8; 64 * 1024];
let mut size: u64 = 0;
loop {
let n = file.read(&mut buf)?;
if n == 0 {
break;
}
hasher.update(&buf[..n]);
size += n as u64;
}
let digest = hasher.finalize();
let mut hex = String::with_capacity(digest.len() * 2);
for byte in digest.iter() {
hex.push_str(&format!("{:02x}", byte));
}
Ok((hex, size))
}
fn now_unix_ms() -> u64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_millis() as u64)
.unwrap_or(0)
}
fn unique_suffix() -> String {
let now_nanos = SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_nanos())
.unwrap_or(0);
static COUNTER: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(0);
let seq = COUNTER.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
format!("{}-{}-{}", std::process::id(), now_nanos, seq)
}
fn lock_key(cache_root: &Path, name: &str) -> String {
format!("{}::{name}", cache_root.display())
}
fn model_lock_table() -> &'static Mutex<HashMap<String, Arc<Mutex<()>>>> {
static TABLE: OnceLock<Mutex<HashMap<String, Arc<Mutex<()>>>>> = OnceLock::new();
TABLE.get_or_init(|| Mutex::new(HashMap::new()))
}
fn acquire_model_lock(key: &str) -> Arc<Mutex<()>> {
let mut table = model_lock_table().lock().expect("lock table poisoned");
table
.entry(key.to_string())
.or_insert_with(|| Arc::new(Mutex::new(())))
.clone()
}
#[cfg(test)]
mod tests {
use super::*;
use std::io::Write;
fn tempdir(label: &str) -> PathBuf {
let nanos = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_nanos();
let p = std::env::temp_dir().join(format!(
"reddb_cache_test_{label}_{}_{}",
std::process::id(),
nanos
));
let _ = fs::remove_dir_all(&p);
fs::create_dir_all(&p).unwrap();
p
}
#[test]
fn sha256_file_hashes_known_payload() {
let dir = tempdir("sha");
let path = dir.join("a.bin");
let mut f = fs::File::create(&path).unwrap();
f.write_all(b"hello world").unwrap();
let (hex, size) = sha256_file(&path).unwrap();
assert_eq!(size, 11);
assert_eq!(
hex,
"b94d27b9934d3e08a52e52d7da7dabfac484efe37a5380ee9088f7ace2efcde9"
);
}
#[test]
fn collect_files_relative_skips_dotfiles_and_manifest() {
let dir = tempdir("collect");
fs::write(dir.join("a.txt"), b"a").unwrap();
fs::write(dir.join(".hidden"), b"h").unwrap();
fs::write(dir.join(MANIFEST_FILE), b"m").unwrap();
fs::create_dir(dir.join("sub")).unwrap();
fs::write(dir.join("sub").join("b.txt"), b"b").unwrap();
let mut files = collect_files_relative(&dir).unwrap();
files.sort_by(|a, b| a.relative.cmp(&b.relative));
let names: Vec<_> = files.iter().map(|f| f.relative.clone()).collect();
assert_eq!(names, vec!["a.txt".to_string(), "sub/b.txt".to_string()]);
}
}