use super::fs_ops::{
absolutize_user_path, from_registry_resolved_config, read_json_object, safe_join_relative,
sha256_file, validate_manifest_entry_string, validate_resolved_config, validate_revision,
};
use super::root::validate_sha256;
use super::root_verify::verify_signed_registry_root_if_needed;
use super::*;
pub struct ConfigRegistryManager {
root: PathBuf,
compiled_dir: PathBuf,
current_path: PathBuf,
active_revision: String,
active_manifest_sha256: String,
configs: BTreeMap<String, Value>,
open_options: ConfigRegistryOpenOptions,
last_reload_error: Option<LlmixError>,
last_successful_reload_at: Option<SystemTime>,
last_reload_failure_at: Option<SystemTime>,
}
impl std::fmt::Debug for ConfigRegistryManager {
fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
formatter
.debug_struct("ConfigRegistryManager")
.field("root", &self.root)
.field("compiled_dir", &self.compiled_dir)
.field("current_path", &self.current_path)
.field("active_revision", &self.active_revision)
.field("active_manifest_sha256", &self.active_manifest_sha256)
.field("config_count", &self.configs.len())
.field("last_reload_error", &self.last_reload_error)
.field("last_successful_reload_at", &self.last_successful_reload_at)
.field("last_reload_failure_at", &self.last_reload_failure_at)
.finish()
}
}
impl ConfigRegistryManager {
pub fn open<P>(root: P) -> LlmixResult<Self>
where
P: AsRef<Path>,
{
Self::open_with_options(root, ConfigRegistryOpenOptions::default())
}
pub fn open_with_options<P>(
root: P,
open_options: ConfigRegistryOpenOptions,
) -> LlmixResult<Self>
where
P: AsRef<Path>,
{
let root = absolutize_user_path(root.as_ref())?;
let compiled_dir = root.join("compiled");
let current_path = root.join("current.json");
let pointer = read_current_pointer(¤t_path, &compiled_dir)?;
let configs = load_revision(
¤t_path,
&compiled_dir,
&pointer,
open_options.signed_root.as_ref(),
)?;
Ok(Self {
root,
compiled_dir,
current_path,
active_revision: pointer.revision,
active_manifest_sha256: pointer
.manifest_sha256
.expect("read_current_pointer fills manifest_sha256"),
configs,
open_options,
last_reload_error: None,
last_successful_reload_at: Some(SystemTime::now()),
last_reload_failure_at: None,
})
}
pub fn root(&self) -> &Path {
&self.root
}
pub fn active_revision(&self) -> &str {
&self.active_revision
}
pub fn active_manifest_sha256(&self) -> &str {
&self.active_manifest_sha256
}
pub fn current_path(&self) -> &Path {
&self.current_path
}
pub fn compiled_dir(&self) -> &Path {
&self.compiled_dir
}
pub fn last_reload_error(&self) -> Option<&LlmixError> {
self.last_reload_error.as_ref()
}
pub fn last_successful_reload_at(&self) -> Option<SystemTime> {
self.last_successful_reload_at
}
pub fn last_reload_failure_at(&self) -> Option<SystemTime> {
self.last_reload_failure_at
}
pub fn available_presets(&self) -> Vec<String> {
self.configs.keys().cloned().collect()
}
pub fn get_preset<S1, S2>(&mut self, module: S1, preset: S2) -> LlmixResult<Value>
where
S1: AsRef<str>,
S2: AsRef<str>,
{
let module = module.as_ref();
let preset = preset.as_ref();
validate_module(module)?;
validate_preset(preset)?;
self.refresh_if_needed()?;
let preset_id = format!("{module}/{preset}");
self.configs.get(&preset_id).cloned().ok_or_else(|| {
ConfigNotFoundError {
path: format!(
"Preset not found in active Config Registry revision {}: {preset_id}",
self.active_revision
),
}
.into()
})
}
fn refresh_if_needed(&mut self) -> LlmixResult<()> {
let current_pointer = match read_current_pointer(&self.current_path, &self.compiled_dir) {
Ok(value) => value,
Err(error) => {
return self.handle_reload_error(error);
}
};
let current_manifest_sha256 = current_pointer
.manifest_sha256
.clone()
.expect("read_current_pointer fills manifest_sha256");
if current_pointer.revision == self.active_revision
&& current_manifest_sha256 == self.active_manifest_sha256
{
return Ok(());
}
match load_revision(
&self.current_path,
&self.compiled_dir,
¤t_pointer,
self.open_options.signed_root.as_ref(),
) {
Ok(configs) => {
self.active_revision = current_pointer.revision;
self.active_manifest_sha256 = current_manifest_sha256;
self.configs = configs;
self.last_reload_error = None;
self.last_successful_reload_at = Some(SystemTime::now());
self.last_reload_failure_at = None;
Ok(())
}
Err(error) => self.handle_reload_error(error),
}
}
fn handle_reload_error(&mut self, error: LlmixError) -> LlmixResult<()> {
let fail_closed = self.open_options.signed_root.is_some();
self.last_reload_failure_at = Some(SystemTime::now());
if fail_closed {
self.last_reload_error = Some(reload_error_snapshot(&error));
return Err(error);
}
self.last_reload_error = Some(error);
Ok(())
}
}
fn reload_error_snapshot(error: &LlmixError) -> LlmixError {
match error {
LlmixError::InvalidResponseCacheConfig(message) => {
LlmixError::InvalidResponseCacheConfig(message.clone())
}
LlmixError::InvalidKeyPoolConfig(message) => {
LlmixError::InvalidKeyPoolConfig(message.clone())
}
LlmixError::InvalidAdaptiveSemaphoreConfig(message) => {
LlmixError::InvalidAdaptiveSemaphoreConfig(message.clone())
}
LlmixError::InvalidRetryPolicyConfig(message) => {
LlmixError::InvalidRetryPolicyConfig(message.clone())
}
LlmixError::InvalidFileLockConfig(message) => {
LlmixError::InvalidFileLockConfig(message.clone())
}
LlmixError::InvalidProviderKwargsConfig(message) => {
LlmixError::InvalidProviderKwargsConfig(message.clone())
}
LlmixError::Redis(message) => LlmixError::Redis(message.clone()),
LlmixError::UnknownKeyPoolKey(key) => LlmixError::UnknownKeyPoolKey(key.clone()),
LlmixError::CircuitOpen(error) => error.clone().into(),
LlmixError::KillSwitchActive(error) => error.clone().into(),
LlmixError::KeyPoolExhausted(error) => error.clone().into(),
LlmixError::ConfigNotFound(error) => error.clone().into(),
LlmixError::ConfigAccess(error) => error.clone().into(),
LlmixError::InvalidConfig(error) => error.clone().into(),
LlmixError::Security(error) => error.clone().into(),
LlmixError::AdaptiveSemaphoreClosed(error) => (*error).into(),
LlmixError::Provider(error) => error.clone().into(),
LlmixError::CanonicalJson(error) => InvalidConfigError {
message: error.to_string(),
}
.into(),
LlmixError::Io(error) => InvalidConfigError {
message: error.to_string(),
}
.into(),
}
}
fn load_revision(
current_path: &Path,
compiled_dir: &Path,
pointer: &CurrentPointer,
signed_root_options: Option<&RegistryRootVerificationOptions>,
) -> LlmixResult<BTreeMap<String, Value>> {
let revision = pointer.revision.as_str();
let manifest_sha256 = pointer
.manifest_sha256
.as_deref()
.ok_or_else(|| InvalidConfigError {
message: "Registry current pointer is missing manifest_sha256".to_string(),
})?;
validate_revision(revision)?;
let compiled_path = compiled_dir.join(revision);
match fs::metadata(&compiled_path) {
Ok(metadata) if metadata.is_dir() => {}
Ok(_) => {
return Err(InvalidConfigError {
message: format!(
"Config Registry compiled revision is not a directory: {}",
compiled_path.display()
),
}
.into())
}
Err(error) if error.kind() == ErrorKind::NotFound => {
return Err(ConfigNotFoundError {
path: compiled_path.display().to_string(),
}
.into())
}
Err(error) => return Err(error.into()),
}
let manifest_path = compiled_path.join("manifest.json");
verify_compiled_checksum(&manifest_path, manifest_sha256, revision)?;
let manifest_value = Value::Object(read_json_object(&manifest_path)?);
let manifest: RegistryManifest =
serde_json::from_value(manifest_value).map_err(|error| InvalidConfigError {
message: format!(
"Invalid Config Registry manifest {}: {error}",
manifest_path.display()
),
})?;
if manifest.revision != revision {
return Err(InvalidConfigError {
message: format!(
"Config Registry manifest revision mismatch in {}",
manifest_path.display()
),
}
.into());
}
if manifest.schema_version != MANIFEST_SCHEMA_VERSION {
return Err(InvalidConfigError {
message: format!(
"Unsupported Config Registry manifest schema version in {}",
manifest_path.display()
),
}
.into());
}
verify_signed_registry_root_if_needed(
pointer,
&manifest,
current_path,
&compiled_path,
signed_root_options,
)?;
let mut configs = BTreeMap::new();
for (preset_id, entry) in manifest.presets {
validate_manifest_entry_string(&entry.source_path, "source_path", &preset_id)?;
validate_manifest_entry_string(&entry.source_sha256, "source_sha256", &preset_id)?;
validate_manifest_entry_string(&entry.resolved_path, "resolved_path", &preset_id)?;
validate_manifest_entry_string(&entry.resolved_sha256, "resolved_sha256", &preset_id)?;
let source_path = safe_join_relative(&compiled_path, &entry.source_path)?;
let resolved_path = safe_join_relative(&compiled_path, &entry.resolved_path)?;
verify_compiled_checksum(&source_path, &entry.source_sha256, &preset_id)?;
verify_compiled_checksum(&resolved_path, &entry.resolved_sha256, &preset_id)?;
let resolved = Value::Object(read_json_object(&resolved_path)?);
validate_resolved_config(&resolved_path, &resolved)?;
configs.insert(preset_id, from_registry_resolved_config(resolved));
}
Ok(configs)
}
fn read_current_pointer(current_path: &Path, compiled_dir: &Path) -> LlmixResult<CurrentPointer> {
let pointer_value = Value::Object(read_json_object(current_path)?);
let mut pointer: CurrentPointer =
serde_json::from_value(pointer_value).map_err(|error| InvalidConfigError {
message: format!(
"Invalid Config Registry pointer {}: {error}",
current_path.display()
),
})?;
validate_revision(&pointer.revision)?;
if let Some(manifest_sha256) = pointer.manifest_sha256.as_deref() {
validate_sha256(manifest_sha256, "registry current pointer manifest")?;
} else {
pointer.manifest_sha256 = Some(sha256_file(
&compiled_dir.join(&pointer.revision).join("manifest.json"),
)?);
}
Ok(pointer)
}
fn verify_compiled_checksum(
artifact_path: &Path,
expected_sha: &str,
preset_id: &str,
) -> LlmixResult<()> {
if expected_sha.is_empty() {
return Err(InvalidConfigError {
message: format!("Config Registry manifest entry is missing checksum: {preset_id}"),
}
.into());
}
match fs::symlink_metadata(artifact_path) {
Ok(metadata) if metadata.file_type().is_file() => {}
Ok(metadata) if metadata.file_type().is_symlink() => {
return Err(InvalidConfigError {
message: format!(
"Config Registry artifact must not be a symlink: {} ({preset_id})",
artifact_path.display()
),
}
.into())
}
Ok(_) => {
return Err(InvalidConfigError {
message: format!(
"Config Registry artifact is not a file: {} ({preset_id})",
artifact_path.display()
),
}
.into())
}
Err(error) if error.kind() == ErrorKind::NotFound => {
return Err(ConfigNotFoundError {
path: artifact_path.display().to_string(),
}
.into())
}
Err(error) if error.kind() == ErrorKind::PermissionDenied => {
return Err(ConfigAccessError {
path: artifact_path.display().to_string(),
}
.into())
}
Err(error) => return Err(error.into()),
}
let actual_sha = sha256_file(artifact_path)?;
if actual_sha != expected_sha {
return Err(InvalidConfigError {
message: format!(
"Checksum mismatch for Config Registry artifact {}",
artifact_path.display()
),
}
.into());
}
Ok(())
}