use std::collections::{HashMap, HashSet};
use std::path::{Path, PathBuf};
use std::sync::Arc;
use anyhow::{Context, anyhow, bail};
use greentic_deploy_spec::{
BundleDeploymentStatus, BundleId, DeploymentId, Environment, PackListLock, RevisionId,
};
use greentic_deployer::path_safety::normalize_under_root;
use greentic_runner_host::runtime::{RevisionPackRef, TenantRuntime};
use greentic_runner_host::storage::{
new_session_store, new_state_store, session_host_from, state_host_from,
};
use greentic_runner_host::{HostBuilder, HostConfig, RunnerHost, TenantBindings};
use crate::deployment_routes::{DeploymentRouteTable, RevisionIngressRouting};
use crate::http_routes::{
HttpRouteDescriptor, HttpRouteTable, RevisionScope, discover_revision_http_routes,
};
use crate::revision_dispatcher::{RevisionDispatcher, RevisionDispatcherConfig, parse_ulid};
use crate::runtime_config::{LoadedRuntimeConfig, env_dir_in};
use crate::secrets_gate::DynSecretsManager;
const SIGNING_KEY_FILE: &str = "revision-signing.key";
pub(crate) struct RuntimeConfigActivation {
pub(crate) host: RunnerHost,
pub(crate) routing: RevisionIngressRouting,
}
#[derive(Clone, Debug, PartialEq, Eq)]
struct DeploymentMeta {
tenant: String,
customer_id: String,
status: BundleDeploymentStatus,
bundle_id: String,
}
pub(crate) async fn activate_runtime_config(
store_root: &Path,
rc: &LoadedRuntimeConfig,
secrets: DynSecretsManager,
env: &Environment,
) -> anyhow::Result<RuntimeConfigActivation> {
let env_dir = env_dir_in(store_root, &rc.env_id)?;
if env.environment_id.as_str() != rc.env_id {
bail!(
"environment `{}` was loaded for activation of runtime-config naming env `{}`",
env.environment_id,
rc.env_id
);
}
let deployments = deployment_index(env);
let mut tenants: HashSet<&str> = HashSet::new();
for block in &rc.revisions {
let meta = deployments.get(block.deployment_id.as_str()).ok_or_else(|| {
anyhow!(
"runtime-config references deployment `{}` that is not present in environment `{}`",
block.deployment_id,
rc.env_id
)
})?;
if meta.status != BundleDeploymentStatus::Active {
bail!(
"runtime-config references deployment `{}` whose status is {:?}, not Active — \
refusing to activate a stale runtime-config",
block.deployment_id,
meta.status
);
}
if meta.bundle_id != block.bundle_id {
bail!(
"runtime-config block for deployment `{}` pins bundle `{}`, but that deployment \
is bound to bundle `{}`",
block.deployment_id,
block.bundle_id,
meta.bundle_id
);
}
tenants.insert(meta.tenant.as_str());
}
let mut builder = HostBuilder::new().with_secrets_manager(secrets);
let effective_tenants: Vec<String> = if tenants.is_empty() {
vec![rc.env_id.clone()]
} else {
tenants.iter().map(|t| (*t).to_string()).collect()
};
for tenant in effective_tenants {
builder = builder.with_config(HostConfig::from_gtbind(TenantBindings {
tenant,
packs: Vec::new(),
env_passthrough: Vec::new(),
}));
}
let host = builder
.build()
.context("building embedded runner host for revision activation")?;
let mut scoped_routes: Vec<HttpRouteDescriptor> = Vec::new();
let configs = host.tenant_configs();
for block in &rc.revisions {
let meta = deployments
.get(block.deployment_id.as_str())
.expect("deployment validated in the loop above");
let deployment_id = DeploymentId(parse_ulid(&block.deployment_id, "deployment_id")?);
let revision_id = RevisionId(parse_ulid(&block.revision_id, "revision_id")?);
let bundle_id = BundleId::new(&block.bundle_id);
let config = configs
.get(&meta.tenant)
.cloned()
.expect("host config registered for tenant in the loop above");
let pack_refs = read_revision_pack_refs(&env_dir, &revision_id, &block.pack_list_refs)
.with_context(|| {
format!("reading pinned packs for revision `{}`", block.revision_id)
})?;
let scope = RevisionScope {
deployment_id,
bundle_id: bundle_id.clone(),
revision_id,
};
let pack_paths: Vec<PathBuf> = pack_refs.iter().map(|r| r.path.clone()).collect();
scoped_routes.extend(discover_revision_http_routes(&pack_paths, &scope));
let session_store = new_session_store();
let session_host = session_host_from(Arc::clone(&session_store));
let state_store = new_state_store();
let state_host = state_host_from(Arc::clone(&state_store));
let runtime = TenantRuntime::load_revision(
&pack_refs,
config,
None,
host.wasi_policy(),
session_host,
session_store,
state_store,
state_host,
host.secrets_manager(),
deployment_id,
bundle_id.clone(),
revision_id,
Some(meta.customer_id.clone()),
)
.await
.with_context(|| format!("loading revision `{}`", block.revision_id))?;
host.active_packs()
.insert_revision(&meta.tenant, deployment_id, bundle_id, revision_id, runtime)
.with_context(|| {
format!(
"inserting revision `{}` into active packs",
block.revision_id
)
})?;
}
let signing_key = load_or_create_signing_key(&env_dir)?;
let dispatcher = RevisionDispatcher::from_runtime_config(
RevisionDispatcherConfig::new(&rc.env_id, signing_key),
rc,
)
.context("building revision dispatcher")?;
let routing = RevisionIngressRouting {
dispatcher: Arc::new(dispatcher),
http_routes: HttpRouteTable::from_descriptors(scoped_routes),
deployment_routes: DeploymentRouteTable::from_environment(env),
};
Ok(RuntimeConfigActivation { host, routing })
}
fn deployment_index(env: &Environment) -> HashMap<String, DeploymentMeta> {
env.bundles
.iter()
.map(|dep| {
(
dep.deployment_id.to_string(),
DeploymentMeta {
tenant: dep.route_binding.tenant_selector.tenant.clone(),
customer_id: dep.customer_id.as_str().to_string(),
status: dep.status,
bundle_id: dep.bundle_id.as_str().to_string(),
},
)
})
.collect()
}
fn read_revision_pack_refs(
env_dir: &Path,
expected: &RevisionId,
lock_paths: &[PathBuf],
) -> anyhow::Result<Vec<RevisionPackRef>> {
let mut refs = Vec::new();
for lock_path in lock_paths {
let bytes = std::fs::read(lock_path)
.with_context(|| format!("reading pack-list.lock `{}`", lock_path.display()))?;
let lock: PackListLock = serde_json::from_slice(&bytes)
.with_context(|| format!("parsing pack-list.lock `{}`", lock_path.display()))?;
if &lock.revision_id != expected {
bail!(
"pack-list.lock `{}` pins revision `{}` but is referenced by revision `{}`",
lock_path.display(),
lock.revision_id,
expected
);
}
for pack in lock.packs {
let abs = normalize_under_root(env_dir, &pack.path).with_context(|| {
format!(
"resolving pinned pack `{}` from `{}`",
pack.path.display(),
lock_path.display()
)
})?;
if !abs.is_file() {
bail!(
"pinned pack `{}` (from `{}`) is not a file",
abs.display(),
lock_path.display()
);
}
refs.push(RevisionPackRef {
path: abs,
digest: pack.digest,
});
}
}
if refs.is_empty() {
bail!("revision `{expected}` resolved to no pinned packs");
}
Ok(refs)
}
fn load_or_create_signing_key(env_dir: &Path) -> anyhow::Result<[u8; 32]> {
let path = env_dir.join(SIGNING_KEY_FILE);
match std::fs::read(&path) {
Ok(bytes) => key_from_bytes(&bytes, &path),
Err(e) if e.kind() == std::io::ErrorKind::NotFound => create_signing_key(&path),
Err(e) => Err(anyhow::Error::new(e)
.context(format!("reading revision signing key `{}`", path.display()))),
}
}
fn create_signing_key(path: &Path) -> anyhow::Result<[u8; 32]> {
use rand::Rng;
let mut key = [0u8; 32];
rand::rng().fill_bytes(&mut key);
let mut opts = std::fs::OpenOptions::new();
opts.write(true).create_new(true);
#[cfg(unix)]
{
use std::os::unix::fs::OpenOptionsExt;
opts.mode(0o600);
}
match opts.open(path) {
Ok(mut file) => {
use std::io::Write;
file.write_all(&key)
.with_context(|| format!("writing revision signing key `{}`", path.display()))?;
let _ = file.sync_all();
Ok(key)
}
Err(e) if e.kind() == std::io::ErrorKind::AlreadyExists => {
let bytes = std::fs::read(path)
.with_context(|| format!("reading revision signing key `{}`", path.display()))?;
key_from_bytes(&bytes, path)
}
Err(e) => Err(anyhow::Error::new(e).context(format!(
"creating revision signing key `{}`",
path.display()
))),
}
}
fn key_from_bytes(bytes: &[u8], path: &Path) -> anyhow::Result<[u8; 32]> {
bytes.try_into().map_err(|_| {
anyhow!(
"revision signing key `{}` is {} bytes, expected 32 (corrupt — delete to regenerate)",
path.display(),
bytes.len()
)
})
}
#[cfg(test)]
mod tests {
use super::*;
use greentic_deploy_spec::{
BundleDeployment, BundleDeploymentStatus, CustomerId, EnvironmentHostConfig, LockedPack,
PackId, PartyId, RevenueShareEntry, RouteBinding, SchemaVersion, TenantSelector,
};
use greentic_types::EnvId;
use tempfile::tempdir;
const ENV_ID: &str = "local";
fn env_id() -> EnvId {
EnvId::try_from(ENV_ID).unwrap()
}
fn make_deployment(
deployment_id: DeploymentId,
tenant: &str,
customer: &str,
bundle: &str,
status: BundleDeploymentStatus,
) -> BundleDeployment {
BundleDeployment {
schema: SchemaVersion::new(SchemaVersion::BUNDLE_DEPLOYMENT_V1),
deployment_id,
env_id: env_id(),
bundle_id: BundleId::new(bundle),
customer_id: CustomerId::new(customer),
status,
current_revisions: Vec::new(),
route_binding: RouteBinding {
hosts: Vec::new(),
path_prefixes: Vec::new(),
tenant_selector: TenantSelector {
tenant: tenant.to_string(),
team: "default".to_string(),
},
},
revenue_share: vec![RevenueShareEntry {
party_id: PartyId::new("greentic"),
basis_points: 10_000,
}],
revenue_policy_ref: PathBuf::from("revenue.json"),
usage: None,
created_at: chrono::Utc::now(),
authorization_ref: PathBuf::from("auth.json"),
}
}
fn make_env(bundles: Vec<BundleDeployment>) -> Environment {
Environment {
schema: SchemaVersion::new(SchemaVersion::ENVIRONMENT_V1),
environment_id: env_id(),
name: ENV_ID.to_string(),
host_config: EnvironmentHostConfig {
env_id: env_id(),
region: None,
tenant_org_id: None,
listen_addr: None,
},
packs: Vec::new(),
credentials_ref: None,
bundles,
revisions: Vec::new(),
traffic_splits: Vec::new(),
revocation: Default::default(),
retention: Default::default(),
health: Default::default(),
}
}
fn write_lock(env_dir: &Path, rel: &str, lock: &PackListLock) -> PathBuf {
let abs = env_dir.join(rel);
std::fs::create_dir_all(abs.parent().unwrap()).unwrap();
std::fs::write(&abs, serde_json::to_vec(lock).unwrap()).unwrap();
abs
}
#[test]
fn deployment_index_maps_id_to_tenant_and_customer() {
let dep = make_deployment(
DeploymentId::new(),
"acme",
"cust-1",
"fast2flow",
BundleDeploymentStatus::Active,
);
let key = dep.deployment_id.to_string();
let env = make_env(vec![dep]);
let index = deployment_index(&env);
let meta = index.get(&key).expect("deployment present");
assert_eq!(meta.tenant, "acme");
assert_eq!(meta.customer_id, "cust-1");
assert_eq!(meta.bundle_id, "fast2flow");
assert_eq!(meta.status, BundleDeploymentStatus::Active);
}
#[test]
fn read_pack_refs_resolves_paths_and_binds_digests() {
let dir = tempdir().unwrap();
let env_dir = dir.path();
let rev = RevisionId::new();
let pack_rel = "revisions/r/bundle/packs/alpha/dist/alpha.gtpack";
let pack_abs = env_dir.join(pack_rel);
std::fs::create_dir_all(pack_abs.parent().unwrap()).unwrap();
std::fs::write(&pack_abs, b"PK\x03\x04").unwrap();
let lock = PackListLock {
schema: SchemaVersion::new(SchemaVersion::PACK_LIST_LOCK_V1),
revision_id: rev,
packs: vec![LockedPack {
pack_id: PackId::new("alpha"),
path: PathBuf::from(pack_rel),
digest: "sha256:deadbeef".to_string(),
}],
};
let lock_path = write_lock(env_dir, "revisions/r/pack-list.lock", &lock);
let refs = read_revision_pack_refs(env_dir, &rev, &[lock_path]).unwrap();
assert_eq!(refs.len(), 1);
assert_eq!(refs[0].path, pack_abs);
assert_eq!(refs[0].digest, "sha256:deadbeef");
}
#[test]
fn read_pack_refs_rejects_revision_id_mismatch() {
let dir = tempdir().unwrap();
let env_dir = dir.path();
let lock_rev = RevisionId::new();
let lock = PackListLock {
schema: SchemaVersion::new(SchemaVersion::PACK_LIST_LOCK_V1),
revision_id: lock_rev,
packs: vec![LockedPack {
pack_id: PackId::new("alpha"),
path: PathBuf::from("packs/alpha.gtpack"),
digest: "sha256:00".to_string(),
}],
};
let lock_path = write_lock(env_dir, "pack-list.lock", &lock);
let other = RevisionId::new();
let err = read_revision_pack_refs(env_dir, &other, &[lock_path]).unwrap_err();
assert!(
err.to_string().contains("pins revision"),
"unexpected error: {err}"
);
}
#[test]
fn read_pack_refs_rejects_pack_path_that_is_not_a_regular_file() {
let dir = tempdir().unwrap();
let env_dir = dir.path();
let rev = RevisionId::new();
std::fs::create_dir_all(env_dir.join("packs/ghost.gtpack")).unwrap();
let lock = PackListLock {
schema: SchemaVersion::new(SchemaVersion::PACK_LIST_LOCK_V1),
revision_id: rev,
packs: vec![LockedPack {
pack_id: PackId::new("ghost"),
path: PathBuf::from("packs/ghost.gtpack"),
digest: "sha256:00".to_string(),
}],
};
let lock_path = write_lock(env_dir, "pack-list.lock", &lock);
let err = read_revision_pack_refs(env_dir, &rev, &[lock_path]).unwrap_err();
assert!(err.to_string().contains("is not a file"), "got: {err}");
}
#[test]
fn read_pack_refs_rejects_path_escaping_env_dir() {
let dir = tempdir().unwrap();
let env_dir = dir.path().join("env");
std::fs::create_dir_all(&env_dir).unwrap();
std::fs::write(dir.path().join("outside.gtpack"), b"PK").unwrap();
let rev = RevisionId::new();
let lock = PackListLock {
schema: SchemaVersion::new(SchemaVersion::PACK_LIST_LOCK_V1),
revision_id: rev,
packs: vec![LockedPack {
pack_id: PackId::new("escape"),
path: PathBuf::from("../outside.gtpack"),
digest: "sha256:00".to_string(),
}],
};
let lock_path = write_lock(&env_dir, "pack-list.lock", &lock);
assert!(read_revision_pack_refs(&env_dir, &rev, &[lock_path]).is_err());
}
#[test]
fn signing_key_is_stable_across_loads() {
let dir = tempdir().unwrap();
let first = load_or_create_signing_key(dir.path()).unwrap();
let second = load_or_create_signing_key(dir.path()).unwrap();
assert_eq!(first, second, "key must be stable across boots");
assert!(first.iter().any(|&b| b != 0), "key must not be all zeroes");
}
#[test]
fn signing_key_rejects_wrong_length() {
let dir = tempdir().unwrap();
std::fs::write(dir.path().join(SIGNING_KEY_FILE), b"too short").unwrap();
let err = load_or_create_signing_key(dir.path()).unwrap_err();
assert!(err.to_string().contains("expected 32"), "got: {err}");
}
#[cfg(unix)]
#[test]
fn signing_key_file_is_owner_only() {
use std::os::unix::fs::PermissionsExt;
let dir = tempdir().unwrap();
load_or_create_signing_key(dir.path()).unwrap();
let mode = std::fs::metadata(dir.path().join(SIGNING_KEY_FILE))
.unwrap()
.permissions()
.mode();
assert_eq!(mode & 0o777, 0o600);
}
fn seed_env_dir(store_root: &Path) -> PathBuf {
let env_dir = store_root.join(ENV_ID);
std::fs::create_dir_all(&env_dir).unwrap();
env_dir
}
fn single_revision_rc(deployment_id: &DeploymentId) -> LoadedRuntimeConfig {
LoadedRuntimeConfig {
env_id: ENV_ID.to_string(),
revisions: vec![crate::runtime_config::ResolvedRevisionBlock {
deployment_id: deployment_id.to_string(),
revision_id: RevisionId::new().to_string(),
bundle_id: "fast2flow".to_string(),
pack_list_refs: Vec::new(),
pack_config_refs: Vec::new(),
weight_bps: 10_000,
}],
}
}
fn block_on<F: std::future::Future>(f: F) -> F::Output {
tokio::runtime::Runtime::new().unwrap().block_on(f)
}
fn dummy_secrets() -> DynSecretsManager {
std::sync::Arc::new(greentic_secrets_lib::env::EnvSecretsManager)
}
#[test]
fn activate_errors_when_env_id_does_not_match_runtime_config() {
let dir = tempdir().unwrap();
let rc = single_revision_rc(&DeploymentId::new());
let mut mismatched = make_env(Vec::new());
mismatched.environment_id = EnvId::try_from("not-local").unwrap();
mismatched.host_config.env_id = EnvId::try_from("not-local").unwrap();
let err = match block_on(activate_runtime_config(
dir.path(),
&rc,
dummy_secrets(),
&mismatched,
)) {
Ok(_) => panic!("expected activation to fail"),
Err(e) => e,
};
assert!(
err.to_string().contains("was loaded for activation"),
"got: {err:#}"
);
}
#[test]
fn activate_errors_when_deployment_not_in_environment() {
let dir = tempdir().unwrap();
seed_env_dir(dir.path());
let env = make_env(Vec::new());
let rc = single_revision_rc(&DeploymentId::new());
let err = match block_on(activate_runtime_config(
dir.path(),
&rc,
dummy_secrets(),
&env,
)) {
Ok(_) => panic!("expected activation to fail"),
Err(e) => e,
};
assert!(
err.to_string().contains("not present in environment"),
"got: {err:#}"
);
}
#[test]
fn activate_errors_when_deployment_is_not_active() {
let dir = tempdir().unwrap();
seed_env_dir(dir.path());
let dep_id = DeploymentId::new();
let env = make_env(vec![make_deployment(
dep_id,
"acme",
"cust",
"fast2flow",
BundleDeploymentStatus::Paused,
)]);
let rc = single_revision_rc(&dep_id);
let err = match block_on(activate_runtime_config(
dir.path(),
&rc,
dummy_secrets(),
&env,
)) {
Ok(_) => panic!("expected activation to fail"),
Err(e) => e,
};
assert!(err.to_string().contains("not Active"), "got: {err:#}");
}
#[test]
fn activate_errors_when_block_bundle_mismatches_deployment() {
let dir = tempdir().unwrap();
seed_env_dir(dir.path());
let dep_id = DeploymentId::new();
let env = make_env(vec![make_deployment(
dep_id,
"acme",
"cust",
"other-bundle",
BundleDeploymentStatus::Active,
)]);
let rc = single_revision_rc(&dep_id);
let err = match block_on(activate_runtime_config(
dir.path(),
&rc,
dummy_secrets(),
&env,
)) {
Ok(_) => panic!("expected activation to fail"),
Err(e) => e,
};
assert!(err.to_string().contains("bound to bundle"), "got: {err:#}");
}
#[test]
fn activate_builds_host_with_provided_secrets_then_loads_packs() {
let dir = tempdir().unwrap();
seed_env_dir(dir.path());
let dep_id = DeploymentId::new();
let env = make_env(vec![make_deployment(
dep_id,
"acme",
"cust",
"fast2flow",
BundleDeploymentStatus::Active,
)]);
let rc = single_revision_rc(&dep_id);
let err = match block_on(activate_runtime_config(
dir.path(),
&rc,
dummy_secrets(),
&env,
)) {
Ok(_) => panic!("expected activation to fail at pack reading"),
Err(e) => e,
};
let chain = format!("{err:#}");
assert!(
chain.contains("no pinned packs"),
"expected to reach pack reading (host built with provided secrets), got: {chain}"
);
}
#[test]
fn activate_empty_runtime_config_yields_zero_routing() {
let dir = tempdir().unwrap();
seed_env_dir(dir.path());
let env = make_env(Vec::new());
let rc = LoadedRuntimeConfig {
env_id: ENV_ID.to_string(),
revisions: Vec::new(),
};
let activation = block_on(activate_runtime_config(
dir.path(),
&rc,
dummy_secrets(),
&env,
))
.expect("empty rc activates");
assert_eq!(activation.routing.dispatcher.deployment_count(), 0);
assert_eq!(activation.routing.dispatcher.revision_count(), 0);
assert_eq!(activation.routing.deployment_routes.len(), 0);
assert!(activation.host.tenant_configs().contains_key(ENV_ID));
}
}