use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::sync::mpsc;
use std::thread::{self, JoinHandle};
use std::time::Duration;
use anyhow::{Context, Result};
use notify_debouncer_full::notify::RecursiveMode;
use notify_debouncer_full::{
DebounceEventResult, Debouncer, RecommendedCache, new_debouncer, notify::RecommendedWatcher,
};
use greentic_deploy_spec::Environment;
use greentic_deployer::environment::{EnvironmentStore, LocalFsStore};
use greentic_runner_host::runtime_refs::RuntimeRefResolver;
use greentic_types::EnvId;
const ENVIRONMENT_FILE: &str = "environment.json";
use crate::operator_log;
use crate::revision_boot::{self, RuntimeConfigActivation};
use crate::revision_serve::{Activation, RevisionServer};
use crate::runtime_config::{self, LoadedRuntimeConfig};
use crate::secrets_gate::DynSecretsManager;
pub(crate) const DEFAULT_DEBOUNCE: Duration = Duration::from_millis(250);
pub(crate) struct WatcherHandle {
debouncer: Option<Debouncer<RecommendedWatcher, RecommendedCache>>,
worker: Option<JoinHandle<()>>,
}
impl Drop for WatcherHandle {
fn drop(&mut self) {
drop(self.debouncer.take());
if let Some(h) = self.worker.take() {
if let Err(err) = h.join() {
operator_log::warn(
module_path!(),
format!("runtime-config watcher worker panicked: {err:?}"),
);
}
}
}
}
pub(crate) fn spawn_runtime_config_watcher<R, P, S>(
env_dir: PathBuf,
debounce: Duration,
drain_window: Duration,
server: Arc<RevisionServer>,
rebuild: R,
post_reload: P,
snapshot_reload: S,
) -> Result<WatcherHandle>
where
R: FnMut() -> Result<Option<Activation>> + Send + 'static,
P: FnMut(&Activation) + Send + 'static,
S: FnMut() + Send + 'static,
{
let activation_targets = vec![
env_dir.join(runtime_config::RUNTIME_CONFIG_FILE),
env_dir.join(ENVIRONMENT_FILE),
];
let snapshot_targets = vec![env_dir.join(crate::runtime_refs_store::RUNTIME_FILE)];
let (tx, rx) = mpsc::channel::<DebounceEventResult>();
let mut debouncer = new_debouncer(debounce, None, move |res| {
let _ = tx.send(res);
})
.context("creating runtime-config debouncer")?;
debouncer
.watch(&env_dir, RecursiveMode::NonRecursive)
.with_context(|| format!("watching env directory {}", env_dir.display()))?;
let worker = thread::Builder::new()
.name("revision-reload".to_string())
.spawn(move || {
reload_worker(
rx,
activation_targets,
snapshot_targets,
drain_window,
server,
rebuild,
post_reload,
snapshot_reload,
);
})
.context("spawning runtime-config reload worker")?;
Ok(WatcherHandle {
debouncer: Some(debouncer),
worker: Some(worker),
})
}
#[allow(clippy::too_many_arguments)]
fn reload_worker<R, P, S>(
rx: mpsc::Receiver<DebounceEventResult>,
activation_targets: Vec<PathBuf>,
snapshot_targets: Vec<PathBuf>,
drain_window: Duration,
server: Arc<RevisionServer>,
mut rebuild: R,
mut post_reload: P,
mut snapshot_reload: S,
) where
R: FnMut() -> Result<Option<Activation>> + Send + 'static,
P: FnMut(&Activation) + Send + 'static,
S: FnMut() + Send + 'static,
{
for result in rx {
let events = match result {
Ok(events) => events,
Err(errs) => {
for err in errs {
operator_log::warn(
module_path!(),
format!("runtime-config watcher notify error: {err}"),
);
}
continue;
}
};
let touches = |targets: &[PathBuf]| {
events.iter().any(|ev| {
ev.event
.paths
.iter()
.any(|p| targets.iter().any(|t| p == t))
})
};
let touches_snapshot = touches(&snapshot_targets);
let touches_activation = touches(&activation_targets);
if !touches_snapshot && !touches_activation {
continue;
}
if touches_snapshot {
snapshot_reload();
}
if !touches_activation {
continue;
}
match rebuild() {
Ok(Some(activation)) => {
let live = activation.clone();
let report = server.reload(activation, drain_window);
operator_log::info(
module_path!(),
format!(
"runtime-config reloaded: {} → {} deployment(s), {} → {} revision(s)",
report.prev_deployments,
report.new_deployments,
report.prev_revisions,
report.new_revisions,
),
);
post_reload(&live);
}
Ok(None) => {
}
Err(err) => {
operator_log::error(
module_path!(),
format!("runtime-config reload failed: {err:#}"),
);
}
}
}
}
#[derive(Debug, PartialEq, Eq)]
struct LastReloadInputs {
rc: LoadedRuntimeConfig,
env: Environment,
}
pub(crate) fn default_rebuild(
store_root: PathBuf,
env_id: String,
secrets: DynSecretsManager,
runtime_ref_resolver: Arc<dyn RuntimeRefResolver>,
activation_rt: tokio::runtime::Handle,
) -> impl FnMut() -> Result<Option<Activation>> + Send + 'static {
let mut last: Option<LastReloadInputs> = None;
move || {
rebuild_once(
&store_root,
&env_id,
&secrets,
&runtime_ref_resolver,
&activation_rt,
&mut last,
)
}
}
fn rebuild_once(
store_root: &Path,
env_id: &str,
secrets: &DynSecretsManager,
runtime_ref_resolver: &Arc<dyn RuntimeRefResolver>,
activation_rt: &tokio::runtime::Handle,
last: &mut Option<LastReloadInputs>,
) -> Result<Option<Activation>> {
let rc = runtime_config::load_in(store_root, env_id)?.unwrap_or_else(|| LoadedRuntimeConfig {
env_id: env_id.to_string(),
revisions: Vec::new(),
});
let store = LocalFsStore::new(store_root.to_path_buf());
let env_typed =
EnvId::new(env_id).with_context(|| format!("invalid environment id `{env_id}`"))?;
let environment = EnvironmentStore::load(&store, &env_typed)
.with_context(|| format!("loading environment `{env_id}` for reload"))?;
if let Some(prev) = last.as_ref()
&& prev.rc == rc
&& prev.env == environment
{
return Ok(None);
}
let RuntimeConfigActivation { host, routing } =
activation_rt.block_on(revision_boot::activate_runtime_config(
store_root,
&rc,
Arc::clone(secrets),
&environment,
Arc::clone(runtime_ref_resolver),
))?;
*last = Some(LastReloadInputs {
rc,
env: environment,
});
Ok(Some(Activation {
host: Arc::new(host),
routing: Arc::new(routing),
}))
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::mpsc as std_mpsc;
fn counting_rebuild(
counter: Arc<AtomicUsize>,
) -> impl FnMut() -> Result<Option<Activation>> + Send + 'static {
move || {
counter.fetch_add(1, Ordering::SeqCst);
Ok(None)
}
}
fn channel_rebuild(
tx: std_mpsc::Sender<()>,
) -> impl FnMut() -> Result<Option<Activation>> + Send + 'static {
move || {
let _ = tx.send(());
Ok(None)
}
}
fn fresh_env_dir() -> tempfile::TempDir {
tempfile::tempdir().expect("create temp env dir")
}
fn write_runtime_config(env_dir: &Path, body: &str) {
std::fs::write(env_dir.join(runtime_config::RUNTIME_CONFIG_FILE), body)
.expect("write runtime-config");
}
fn delete_runtime_config(env_dir: &Path) {
std::fs::remove_file(env_dir.join(runtime_config::RUNTIME_CONFIG_FILE))
.expect("delete runtime-config");
}
fn write_environment_json(env_dir: &Path, body: &str) {
std::fs::write(env_dir.join(ENVIRONMENT_FILE), body).expect("write environment.json");
}
fn placeholder_server() -> Arc<RevisionServer> {
use crate::deployment_routes::{DeploymentRouteTable, RevisionIngressRouting};
use crate::endpoint_admit::EndpointAdmit;
use crate::http_routes::HttpRouteTable;
use crate::revision_dispatcher::{RevisionDispatcher, RevisionDispatcherConfig};
use crate::revision_serve::{RevisionServeConfig, RevisionServer};
use greentic_runner_host::{HostBuilder, HostConfig, TenantBindings};
use std::net::SocketAddr;
let host = Arc::new(
HostBuilder::new()
.with_config(HostConfig::from_gtbind(TenantBindings {
tenant: "watcher-test".to_string(),
packs: Vec::new(),
env_passthrough: Vec::new(),
}))
.build()
.expect("build placeholder host"),
);
let dispatcher = Arc::new(RevisionDispatcher::new(RevisionDispatcherConfig::new(
"watcher-test",
[0u8; 32],
)));
let routing = Arc::new(RevisionIngressRouting {
dispatcher,
http_routes: HttpRouteTable::from_descriptors(Vec::new()),
deployment_routes: DeploymentRouteTable::default(),
endpoint_admit: Arc::new(EndpointAdmit::default()),
deployment_config_overrides: Arc::default(),
});
let activation = Arc::new(Activation { host, routing });
let bind: SocketAddr = "127.0.0.1:0".parse().unwrap();
Arc::new(
RevisionServer::start(RevisionServeConfig {
bind_addr: bind,
activation,
})
.expect("placeholder server"),
)
}
#[test]
fn watcher_dispatches_runtime_json_to_snapshot_only() {
let env = fresh_env_dir();
let rebuild_counter = Arc::new(AtomicUsize::new(0));
let snapshot_counter = Arc::new(AtomicUsize::new(0));
let snapshot_counter_for_closure = Arc::clone(&snapshot_counter);
let _handle = spawn_runtime_config_watcher(
env.path().to_path_buf(),
Duration::from_millis(50),
Duration::ZERO,
placeholder_server(),
counting_rebuild(Arc::clone(&rebuild_counter)),
|_: &Activation| {},
move || {
snapshot_counter_for_closure.fetch_add(1, Ordering::SeqCst);
},
)
.expect("spawn watcher");
std::fs::write(
env.path().join(crate::runtime_refs_store::RUNTIME_FILE),
br#"{"schema":"x"}"#,
)
.expect("write runtime.json");
std::thread::sleep(Duration::from_millis(400));
assert!(
snapshot_counter.load(Ordering::SeqCst) >= 1,
"runtime.json write must invoke snapshot_reload"
);
assert_eq!(
rebuild_counter.load(Ordering::SeqCst),
0,
"runtime.json write must NOT invoke rebuild"
);
}
#[test]
fn watcher_fires_on_runtime_config_create() {
let env = fresh_env_dir();
let (tx, rx) = std_mpsc::channel();
let _handle = spawn_runtime_config_watcher(
env.path().to_path_buf(),
Duration::from_millis(50),
Duration::ZERO,
placeholder_server(),
channel_rebuild(tx),
|_: &Activation| {},
|| {},
)
.expect("spawn watcher");
write_runtime_config(
env.path(),
r#"{"schema":"x","env_id":"local","revisions":[]}"#,
);
rx.recv_timeout(Duration::from_secs(3))
.expect("watcher must fire after the runtime-config is created");
}
#[test]
fn watcher_fires_on_runtime_config_delete() {
let env = fresh_env_dir();
write_runtime_config(env.path(), r#"{"a":1}"#);
let (tx, rx) = std_mpsc::channel();
let _handle = spawn_runtime_config_watcher(
env.path().to_path_buf(),
Duration::from_millis(50),
Duration::ZERO,
placeholder_server(),
channel_rebuild(tx),
|_: &Activation| {},
|| {},
)
.expect("spawn watcher");
let _ = rx.recv_timeout(Duration::from_millis(500));
delete_runtime_config(env.path());
rx.recv_timeout(Duration::from_secs(3))
.expect("watcher must fire after the runtime-config is deleted");
}
#[test]
fn watcher_coalesces_burst_writes_into_one_rebuild() {
let env = fresh_env_dir();
let counter = Arc::new(AtomicUsize::new(0));
let _handle = spawn_runtime_config_watcher(
env.path().to_path_buf(),
Duration::from_millis(200),
Duration::ZERO,
placeholder_server(),
counting_rebuild(Arc::clone(&counter)),
|_: &Activation| {},
|| {},
)
.expect("spawn watcher");
for i in 0..5 {
write_runtime_config(env.path(), &format!(r#"{{"i":{i}}}"#));
}
std::thread::sleep(Duration::from_millis(800));
let observed = counter.load(Ordering::SeqCst);
assert!(
(1..=2).contains(&observed),
"burst of 5 writes must coalesce to ~1 rebuild (saw {observed})"
);
}
#[test]
fn watcher_fires_on_environment_json_create() {
let env = fresh_env_dir();
let (tx, rx) = std_mpsc::channel();
let _handle = spawn_runtime_config_watcher(
env.path().to_path_buf(),
Duration::from_millis(50),
Duration::ZERO,
placeholder_server(),
channel_rebuild(tx),
|_: &Activation| {},
|| {},
)
.expect("spawn watcher");
write_environment_json(env.path(), r#"{"schema":"x"}"#);
rx.recv_timeout(Duration::from_secs(3))
.expect("watcher must fire after environment.json is created");
}
#[test]
fn watcher_ignores_environment_json_backup_files() {
let env = fresh_env_dir();
let counter = Arc::new(AtomicUsize::new(0));
let _handle = spawn_runtime_config_watcher(
env.path().to_path_buf(),
Duration::from_millis(50),
Duration::ZERO,
placeholder_server(),
counting_rebuild(Arc::clone(&counter)),
|_: &Activation| {},
|| {},
)
.expect("spawn watcher");
std::fs::write(
env.path().join("environment.json.1234567890.bak"),
b"old-content",
)
.expect("write backup decoy");
std::thread::sleep(Duration::from_millis(400));
assert_eq!(
counter.load(Ordering::SeqCst),
0,
"watcher must ignore environment.json.<ts>.bak backups"
);
}
#[test]
fn watcher_ignores_other_files_in_env_dir() {
let env = fresh_env_dir();
let counter = Arc::new(AtomicUsize::new(0));
let _handle = spawn_runtime_config_watcher(
env.path().to_path_buf(),
Duration::from_millis(50),
Duration::ZERO,
placeholder_server(),
counting_rebuild(Arc::clone(&counter)),
|_: &Activation| {},
|| {},
)
.expect("spawn watcher");
std::fs::write(env.path().join("revision-signing.key"), b"not-the-target")
.expect("write decoy");
std::thread::sleep(Duration::from_millis(400));
assert_eq!(
counter.load(Ordering::SeqCst),
0,
"watcher must ignore writes to files other than runtime-config.json"
);
}
fn empty_loaded_rc() -> LoadedRuntimeConfig {
LoadedRuntimeConfig {
env_id: "local".to_string(),
revisions: Vec::new(),
}
}
fn env_with_endpoints(endpoints: Vec<greentic_deploy_spec::MessagingEndpoint>) -> Environment {
use greentic_deploy_spec::{EnvironmentHostConfig, SchemaVersion};
use greentic_types::EnvId;
let env_id = EnvId::try_from("local").unwrap();
Environment {
schema: SchemaVersion::new(SchemaVersion::ENVIRONMENT_V1),
environment_id: env_id.clone(),
name: "local".to_string(),
host_config: EnvironmentHostConfig {
env_id,
region: None,
tenant_org_id: None,
listen_addr: None,
public_base_url: None,
},
packs: Vec::new(),
messaging_endpoints: endpoints,
extensions: Vec::new(),
credentials_ref: None,
bundles: Vec::new(),
revisions: Vec::new(),
traffic_splits: Vec::new(),
revocation: Default::default(),
retention: Default::default(),
health: Default::default(),
}
}
fn make_endpoint(linked_bundles: &[&str]) -> greentic_deploy_spec::MessagingEndpoint {
use greentic_deploy_spec::{
BundleId, MessagingEndpoint, MessagingEndpointId, SchemaVersion,
};
use greentic_types::EnvId;
let now = chrono::Utc::now();
MessagingEndpoint {
schema: SchemaVersion::new(SchemaVersion::MESSAGING_ENDPOINT_V1),
env_id: EnvId::try_from("local").unwrap(),
endpoint_id: MessagingEndpointId::new(),
provider_id: "teams-legal".to_string(),
provider_type: "teams".to_string(),
display_name: "Legal".to_string(),
secret_refs: Vec::new(),
webhook_secret_ref: None,
linked_bundles: linked_bundles.iter().map(|b| BundleId::new(*b)).collect(),
welcome_flow: None,
generation: 1,
created_at: now,
updated_at: now,
updated_by: "test".to_string(),
}
}
#[test]
fn last_reload_inputs_inequality_on_messaging_endpoint_change() {
let rc = empty_loaded_rc();
let env_with = env_with_endpoints(vec![make_endpoint(&["legal-bundle"])]);
let env_without = env_with_endpoints(vec![make_endpoint(&[])]);
let a = LastReloadInputs {
rc: rc.clone(),
env: env_with,
};
let b = LastReloadInputs {
rc,
env: env_without,
};
assert_ne!(
a, b,
"messaging endpoint linked_bundles change must defeat dedup"
);
}
#[test]
fn watcher_keeps_running_when_rebuild_returns_error() {
let env = fresh_env_dir();
let counter = Arc::new(AtomicUsize::new(0));
let counter_for_closure = Arc::clone(&counter);
let _handle = spawn_runtime_config_watcher(
env.path().to_path_buf(),
Duration::from_millis(50),
Duration::ZERO,
placeholder_server(),
move || {
let n = counter_for_closure.fetch_add(1, Ordering::SeqCst) + 1;
if n == 1 {
Err(anyhow::anyhow!("synthetic rebuild error"))
} else {
Ok(None)
}
},
|_: &Activation| {},
|| {},
)
.expect("spawn watcher");
write_runtime_config(env.path(), r#"{"first":true}"#);
std::thread::sleep(Duration::from_millis(250));
write_runtime_config(env.path(), r#"{"second":true}"#);
std::thread::sleep(Duration::from_millis(500));
let observed = counter.load(Ordering::SeqCst);
assert!(
observed >= 2,
"watcher must keep running after a rebuild error (rebuild fired {observed} times)"
);
}
}