use std::path::{Path, PathBuf};
use greentic_deploy_spec::{
CapabilitySlot, EnvId, Environment, EnvironmentRuntime, MessagingEndpoint, MessagingEndpointId,
RuntimeConfig, SchemaVersion, SpecError,
};
use serde_json::Value;
use thiserror::Error;
use super::atomic_write::{AtomicWriteError, atomic_write_json, copy_to_backup};
use super::file_lock::{EnvFlock, LockError};
#[derive(Debug, Error)]
pub enum StoreError {
#[error("environment `{0}` not found")]
NotFound(EnvId),
#[error("environment_id mismatch: file is `{file}`, value is `{value}`")]
EnvIdMismatch { file: EnvId, value: EnvId },
#[error(
"environment id `{0}` is not safe as a path segment (rejects \"\", \".\", \"..\", and ids containing path separators)"
)]
UnsafeEnvId(EnvId),
#[error("spec validation failed: {0}")]
Spec(#[from] SpecError),
#[error(transparent)]
Lock(#[from] LockError),
#[error(transparent)]
AtomicWrite(#[from] AtomicWriteError),
#[error("io error on {path}: {source}")]
Io {
path: PathBuf,
#[source]
source: std::io::Error,
},
#[error("json error on {path}: {source}")]
Json {
path: PathBuf,
#[source]
source: serde_json::Error,
},
}
fn safe_env_segment(env_id: &EnvId) -> Result<&str, StoreError> {
let s = env_id.as_str();
if s.is_empty() || s == "." || s == ".." {
return Err(StoreError::UnsafeEnvId(env_id.clone()));
}
if s.contains('/') || s.contains('\\') || s.contains(':') || s.contains('\0') {
return Err(StoreError::UnsafeEnvId(env_id.clone()));
}
Ok(s)
}
pub trait EnvironmentStore: Send + Sync {
fn list(&self) -> Result<Vec<EnvId>, StoreError>;
fn exists(&self, env_id: &EnvId) -> Result<bool, StoreError>;
fn load(&self, env_id: &EnvId) -> Result<Environment, StoreError>;
fn save(&self, env: &Environment) -> Result<(), StoreError>;
fn load_runtime(&self, env_id: &EnvId) -> Result<Option<EnvironmentRuntime>, StoreError>;
fn save_runtime(&self, runtime: &EnvironmentRuntime) -> Result<(), StoreError>;
fn load_pack_answers(
&self,
env_id: &EnvId,
slot: CapabilitySlot,
) -> Result<Option<Value>, StoreError>;
fn save_pack_answers(
&self,
env_id: &EnvId,
slot: CapabilitySlot,
answers: &Value,
) -> Result<(), StoreError>;
fn delete_pack_answers(&self, env_id: &EnvId, slot: CapabilitySlot) -> Result<(), StoreError>;
}
#[derive(Debug, Clone)]
pub struct LocalFsStore {
root: PathBuf,
}
impl LocalFsStore {
pub fn new(root: impl Into<PathBuf>) -> Self {
Self { root: root.into() }
}
pub fn default_root() -> Option<PathBuf> {
dirs_home().map(|h| h.join(".greentic").join("environments"))
}
pub fn root(&self) -> &Path {
&self.root
}
pub(crate) fn env_dir(&self, env_id: &EnvId) -> Result<PathBuf, StoreError> {
Ok(self.root.join(safe_env_segment(env_id)?))
}
fn lock_path(&self, env_id: &EnvId) -> Result<PathBuf, StoreError> {
Ok(self.env_dir(env_id)?.join(".lock"))
}
fn environment_path(&self, env_id: &EnvId) -> Result<PathBuf, StoreError> {
Ok(self.env_dir(env_id)?.join("environment.json"))
}
fn runtime_path(&self, env_id: &EnvId) -> Result<PathBuf, StoreError> {
Ok(self.env_dir(env_id)?.join("runtime.json"))
}
fn runtime_config_path(&self, env_id: &EnvId) -> Result<PathBuf, StoreError> {
Ok(self.env_dir(env_id)?.join("runtime-config.json"))
}
fn pack_answers_path(
&self,
env_id: &EnvId,
slot: CapabilitySlot,
) -> Result<PathBuf, StoreError> {
Ok(self
.env_dir(env_id)?
.join("env-packs")
.join(slot.as_str())
.join("answers.json"))
}
fn messaging_dir(&self, env_id: &EnvId) -> Result<PathBuf, StoreError> {
Ok(self.env_dir(env_id)?.join("messaging"))
}
fn messaging_index_path(&self, env_id: &EnvId) -> Result<PathBuf, StoreError> {
Ok(self.messaging_dir(env_id)?.join("index.json"))
}
fn messaging_endpoint_path(
&self,
env_id: &EnvId,
endpoint_id: &MessagingEndpointId,
) -> Result<PathBuf, StoreError> {
Ok(self
.messaging_dir(env_id)?
.join(format!("{endpoint_id}.json")))
}
fn messaging_backups_dir(&self, env_id: &EnvId) -> Result<PathBuf, StoreError> {
Ok(self.backups_dir(env_id)?.join("messaging"))
}
fn backups_dir(&self, env_id: &EnvId) -> Result<PathBuf, StoreError> {
Ok(self.env_dir(env_id)?.join("backups"))
}
fn pack_backups_dir(
&self,
env_id: &EnvId,
slot: CapabilitySlot,
) -> Result<PathBuf, StoreError> {
Ok(self
.backups_dir(env_id)?
.join("env-packs")
.join(slot.as_str()))
}
fn read_json<T: serde::de::DeserializeOwned>(&self, path: &Path) -> Result<T, StoreError> {
let bytes = std::fs::read(path).map_err(|source| StoreError::Io {
path: path.to_path_buf(),
source,
})?;
serde_json::from_slice(&bytes).map_err(|source| StoreError::Json {
path: path.to_path_buf(),
source,
})
}
}
impl EnvironmentStore for LocalFsStore {
fn list(&self) -> Result<Vec<EnvId>, StoreError> {
if !self.root.exists() {
return Ok(vec![]);
}
let mut out = Vec::new();
for entry in std::fs::read_dir(&self.root).map_err(|source| StoreError::Io {
path: self.root.clone(),
source,
})? {
let entry = entry.map_err(|source| StoreError::Io {
path: self.root.clone(),
source,
})?;
let path = entry.path();
if !path.is_dir() {
continue;
}
let Some(name) = path.file_name().and_then(|s| s.to_str()) else {
continue;
};
let Ok(id) = EnvId::try_from(name) else {
continue;
};
if safe_env_segment(&id).is_err() {
continue;
}
let env_path = path.join("environment.json");
if !env_path.exists() {
continue;
}
let Ok(env) = self.read_json::<Environment>(&env_path) else {
continue;
};
if env.environment_id != id {
continue;
}
out.push(id);
}
out.sort_by(|a, b| a.as_str().cmp(b.as_str()));
Ok(out)
}
fn exists(&self, env_id: &EnvId) -> Result<bool, StoreError> {
Ok(self.environment_path(env_id)?.exists())
}
fn load(&self, env_id: &EnvId) -> Result<Environment, StoreError> {
let path = self.environment_path(env_id)?;
if !path.exists() {
return Err(StoreError::NotFound(env_id.clone()));
}
let env: Environment = self.read_json(&path)?;
if env.environment_id != *env_id {
return Err(StoreError::EnvIdMismatch {
file: env_id.clone(),
value: env.environment_id,
});
}
env.validate()?;
Ok(env)
}
fn save(&self, env: &Environment) -> Result<(), StoreError> {
env.validate()?;
let env_id = &env.environment_id;
let _guard = EnvFlock::acquire(&self.lock_path(env_id)?)?;
self.save_locked(env)
}
fn load_runtime(&self, env_id: &EnvId) -> Result<Option<EnvironmentRuntime>, StoreError> {
let path = self.runtime_path(env_id)?;
if !path.exists() {
return Ok(None);
}
let runtime: EnvironmentRuntime = self.read_json(&path)?;
if runtime.environment_id != *env_id {
return Err(StoreError::EnvIdMismatch {
file: env_id.clone(),
value: runtime.environment_id,
});
}
if runtime.schema.as_str() != SchemaVersion::ENVIRONMENT_RUNTIME_V1 {
return Err(StoreError::Spec(SpecError::SchemaMismatch {
expected: SchemaVersion::ENVIRONMENT_RUNTIME_V1,
actual: runtime.schema.as_str().to_string(),
}));
}
Ok(Some(runtime))
}
fn save_runtime(&self, runtime: &EnvironmentRuntime) -> Result<(), StoreError> {
let env_id = &runtime.environment_id;
let _guard = EnvFlock::acquire(&self.lock_path(env_id)?)?;
self.save_runtime_locked(runtime)
}
fn load_pack_answers(
&self,
env_id: &EnvId,
slot: CapabilitySlot,
) -> Result<Option<Value>, StoreError> {
let path = self.pack_answers_path(env_id, slot)?;
if !path.exists() {
return Ok(None);
}
Ok(Some(self.read_json(&path)?))
}
fn save_pack_answers(
&self,
env_id: &EnvId,
slot: CapabilitySlot,
answers: &Value,
) -> Result<(), StoreError> {
let _guard = EnvFlock::acquire(&self.lock_path(env_id)?)?;
self.save_pack_answers_locked(env_id, slot, answers)
}
fn delete_pack_answers(&self, env_id: &EnvId, slot: CapabilitySlot) -> Result<(), StoreError> {
let _guard = EnvFlock::acquire(&self.lock_path(env_id)?)?;
self.delete_pack_answers_locked(env_id, slot)
}
}
impl LocalFsStore {
fn save_locked(&self, env: &Environment) -> Result<(), StoreError> {
if env.schema.as_str() != SchemaVersion::ENVIRONMENT_V1 {
return Err(StoreError::Spec(SpecError::SchemaMismatch {
expected: SchemaVersion::ENVIRONMENT_V1,
actual: env.schema.as_str().to_string(),
}));
}
env.validate()?;
let env_id = &env.environment_id;
let target = self.environment_path(env_id)?;
copy_to_backup(&target, &self.backups_dir(env_id)?)?;
atomic_write_json(&target, env)?;
Ok(())
}
fn save_runtime_locked(&self, runtime: &EnvironmentRuntime) -> Result<(), StoreError> {
if runtime.schema.as_str() != SchemaVersion::ENVIRONMENT_RUNTIME_V1 {
return Err(StoreError::Spec(SpecError::SchemaMismatch {
expected: SchemaVersion::ENVIRONMENT_RUNTIME_V1,
actual: runtime.schema.as_str().to_string(),
}));
}
let env_id = &runtime.environment_id;
let target = self.runtime_path(env_id)?;
copy_to_backup(&target, &self.backups_dir(env_id)?)?;
atomic_write_json(&target, runtime)?;
Ok(())
}
fn save_runtime_config_locked(&self, cfg: &RuntimeConfig) -> Result<(), StoreError> {
let env_id = &cfg.env_id;
let target = self.runtime_config_path(env_id)?;
if let Ok(existing) = self.read_json::<RuntimeConfig>(&target)
&& &existing == cfg
{
return Ok(());
}
copy_to_backup(&target, &self.backups_dir(env_id)?)?;
atomic_write_json(&target, cfg)?;
Ok(())
}
fn delete_runtime_config_locked(&self, env_id: &EnvId) -> Result<(), StoreError> {
let target = self.runtime_config_path(env_id)?;
if target.exists() {
copy_to_backup(&target, &self.backups_dir(env_id)?)?;
std::fs::remove_file(&target).map_err(|source| StoreError::Io {
path: target,
source,
})?;
}
Ok(())
}
fn refresh_messaging_locked(&self, env: &Environment) -> Result<(), StoreError> {
let env_id = &env.environment_id;
let dir = self.messaging_dir(env_id)?;
let mut existing_files: Vec<(MessagingEndpointId, PathBuf)> = Vec::new();
if dir.exists() {
for entry in std::fs::read_dir(&dir).map_err(|source| StoreError::Io {
path: dir.clone(),
source,
})? {
let entry = entry.map_err(|source| StoreError::Io {
path: dir.clone(),
source,
})?;
let path = entry.path();
let Some(stem) = path.file_stem().and_then(|s| s.to_str()) else {
continue;
};
if path.extension().and_then(|e| e.to_str()) != Some("json") {
continue;
}
if stem == "index" {
continue;
}
let Ok(ulid) = stem.parse::<ulid::Ulid>() else {
continue;
};
existing_files.push((MessagingEndpointId(ulid), path));
}
}
let mut written_ids: std::collections::HashSet<MessagingEndpointId> =
std::collections::HashSet::with_capacity(env.messaging_endpoints.len());
for endpoint in &env.messaging_endpoints {
written_ids.insert(endpoint.endpoint_id);
let target = self.messaging_endpoint_path(env_id, &endpoint.endpoint_id)?;
if let Ok(existing) = self.read_json::<MessagingEndpoint>(&target)
&& existing == *endpoint
{
continue;
}
copy_to_backup(&target, &self.messaging_backups_dir(env_id)?)?;
atomic_write_json(&target, endpoint)?;
}
for (id, path) in existing_files {
if !written_ids.contains(&id) {
copy_to_backup(&path, &self.messaging_backups_dir(env_id)?)?;
std::fs::remove_file(&path).map_err(|source| StoreError::Io {
path: path.clone(),
source,
})?;
}
}
let index_path = self.messaging_index_path(env_id)?;
let index = super::messaging::materialize_messaging_index(env);
if index.is_empty() {
if index_path.exists() {
copy_to_backup(&index_path, &self.messaging_backups_dir(env_id)?)?;
std::fs::remove_file(&index_path).map_err(|source| StoreError::Io {
path: index_path,
source,
})?;
}
} else {
if let Ok(existing) =
self.read_json::<Vec<super::messaging::MessagingEndpointIndexEntry>>(&index_path)
&& existing == index
{
return Ok(());
}
copy_to_backup(&index_path, &self.messaging_backups_dir(env_id)?)?;
atomic_write_json(&index_path, &index)?;
}
Ok(())
}
fn save_pack_answers_locked(
&self,
env_id: &EnvId,
slot: CapabilitySlot,
answers: &Value,
) -> Result<(), StoreError> {
let target = self.pack_answers_path(env_id, slot)?;
copy_to_backup(&target, &self.pack_backups_dir(env_id, slot)?)?;
atomic_write_json(&target, answers)?;
Ok(())
}
fn delete_pack_answers_locked(
&self,
env_id: &EnvId,
slot: CapabilitySlot,
) -> Result<(), StoreError> {
let target = self.pack_answers_path(env_id, slot)?;
if target.exists() {
copy_to_backup(&target, &self.pack_backups_dir(env_id, slot)?)?;
std::fs::remove_file(&target).map_err(|source| StoreError::Io {
path: target,
source,
})?;
}
Ok(())
}
pub fn env_lock_path(&self, env_id: &EnvId) -> Result<PathBuf, StoreError> {
self.lock_path(env_id)
}
pub fn transact<F, R, E>(&self, env_id: &EnvId, f: F) -> Result<R, E>
where
F: FnOnce(&Locked<'_>) -> Result<R, E>,
E: From<StoreError>,
{
let lock_path = self.lock_path(env_id).map_err(E::from)?;
let _guard = EnvFlock::acquire(&lock_path).map_err(|e| E::from(StoreError::Lock(e)))?;
let locked = Locked {
store: self,
env_id: env_id.clone(),
};
f(&locked)
}
}
#[derive(Debug)]
pub struct Locked<'a> {
store: &'a LocalFsStore,
env_id: EnvId,
}
impl<'a> Locked<'a> {
pub fn env_id(&self) -> &EnvId {
&self.env_id
}
pub fn load(&self) -> Result<Environment, StoreError> {
self.store.load(&self.env_id)
}
pub fn save(&self, env: &Environment) -> Result<(), StoreError> {
if env.environment_id != self.env_id {
return Err(StoreError::EnvIdMismatch {
file: self.env_id.clone(),
value: env.environment_id.clone(),
});
}
self.store.save_locked(env)
}
pub fn load_runtime(&self) -> Result<Option<EnvironmentRuntime>, StoreError> {
self.store.load_runtime(&self.env_id)
}
pub fn save_runtime(&self, runtime: &EnvironmentRuntime) -> Result<(), StoreError> {
if runtime.environment_id != self.env_id {
return Err(StoreError::EnvIdMismatch {
file: self.env_id.clone(),
value: runtime.environment_id.clone(),
});
}
self.store.save_runtime_locked(runtime)
}
pub fn load_pack_answers(&self, slot: CapabilitySlot) -> Result<Option<Value>, StoreError> {
self.store.load_pack_answers(&self.env_id, slot)
}
pub fn save_pack_answers(
&self,
slot: CapabilitySlot,
answers: &Value,
) -> Result<(), StoreError> {
self.store
.save_pack_answers_locked(&self.env_id, slot, answers)
}
pub fn delete_pack_answers(&self, slot: CapabilitySlot) -> Result<(), StoreError> {
self.store.delete_pack_answers_locked(&self.env_id, slot)
}
pub fn refresh_runtime_config(&self, env: &Environment) -> Result<(), StoreError> {
let cfg = super::runtime_config::materialize_runtime_config(env);
if cfg.revisions.is_empty() {
self.store.delete_runtime_config_locked(&self.env_id)
} else {
self.store.save_runtime_config_locked(&cfg)
}
}
pub fn refresh_messaging_projection(&self, env: &Environment) -> Result<(), StoreError> {
if env.environment_id != self.env_id {
return Err(StoreError::EnvIdMismatch {
file: self.env_id.clone(),
value: env.environment_id.clone(),
});
}
self.store.refresh_messaging_locked(env)
}
}
#[cfg(unix)]
pub(crate) fn dirs_home() -> Option<PathBuf> {
std::env::var_os("HOME").map(PathBuf::from)
}
#[cfg(windows)]
pub(crate) fn dirs_home() -> Option<PathBuf> {
std::env::var_os("USERPROFILE").map(PathBuf::from)
}
#[cfg(not(any(unix, windows)))]
pub(crate) fn dirs_home() -> Option<PathBuf> {
None
}