#![deny(missing_docs, unsafe_op_in_unsafe_fn, rust_2018_idioms)]
#![forbid(clippy::dbg_macro, clippy::print_stdout)]
#![allow(unsafe_code)]
use std::fs::OpenOptions;
use std::io::{self, Write};
use std::path::{Path, PathBuf};
use std::process::ExitCode;
use std::sync::atomic::{AtomicI32, AtomicU64, AtomicU8, Ordering};
use std::time::{Duration, Instant};
#[cfg(feature = "prometheus-exporter")]
use varta_watch::exporter::IterStage;
use varta_watch::log_ratelimit::LogKind;
#[cfg(feature = "prometheus-exporter")]
use varta_watch::PromExporter;
use varta_watch::{
varta_error, varta_error_err, varta_error_pid, varta_error_rl, varta_info_pid_child,
varta_warn, varta_warn_child, varta_warn_rl, Config, ConfigError, Event, Exporter,
FileExporter, Observer, Recovery, RecoveryOutcome,
};
static SHUTDOWN: AtomicI32 = AtomicI32::new(0);
#[cfg(not(target_has_atomic = "32"))]
compile_error!(
"varta-watch requires lock-free 32-bit atomics (target_has_atomic = \"32\") \
for the async-signal-safe SHUTDOWN latch"
);
static LAST_TICK_NS: AtomicU64 = AtomicU64::new(0);
#[cfg(feature = "prometheus-exporter")]
static LAST_STAGE_ENTRY_NS: [AtomicU64; 6] = [
AtomicU64::new(0),
AtomicU64::new(0),
AtomicU64::new(0),
AtomicU64::new(0),
AtomicU64::new(0),
AtomicU64::new(0),
];
#[cfg(feature = "prometheus-exporter")]
static CURRENT_STAGE: AtomicU8 = AtomicU8::new(u8::MAX);
#[cfg(feature = "prometheus-exporter")]
const STAGE_ABORT_NS: [u64; 6] = [
2_000 * 1_000_000, 2_000 * 1_000_000, 500 * 1_000_000, 1_000 * 1_000_000, 2_000 * 1_000_000, 1_000 * 1_000_000, ];
static CLOCK_SOURCE: AtomicU8 = AtomicU8::new(0);
extern "C" fn handle_shutdown(_sig: i32) {
SHUTDOWN.store(1, Ordering::Release);
}
fn observer_now_ns() -> u64 {
let src = varta_watch::clock::ClockSource::from_u8(CLOCK_SOURCE.load(Ordering::Acquire));
let clk_id = match src.clk_id() {
Some(id) => id,
None => return 0,
};
varta_watch::clock::clock_gettime_raw(clk_id).unwrap_or(0)
}
fn watchdog_expired(now_ns: u64, last_ns: u64, deadline_ns: u64) -> bool {
last_ns != 0 && now_ns.saturating_sub(last_ns) > deadline_ns
}
fn write_heartbeat_atomic(path: &Path, contents: &[u8]) -> io::Result<()> {
let pid = std::process::id();
let mut tmp_os = path.as_os_str().to_owned();
tmp_os.push(format!(".{pid}.tmp"));
let tmp_path = PathBuf::from(tmp_os);
let result = (|| -> io::Result<()> {
let mut f = OpenOptions::new()
.write(true)
.create(true)
.truncate(true)
.open(&tmp_path)?;
f.write_all(contents)?;
drop(f);
std::fs::rename(&tmp_path, path)
})();
if result.is_err() {
let _ = std::fs::remove_file(&tmp_path);
}
result
}
fn main() -> ExitCode {
#[cfg(feature = "json-log")]
varta_watch::log::init_session_id();
#[cfg(not(feature = "compile-time-config"))]
let cfg_result: Result<Config, ConfigError> = {
let args: Vec<String> = std::env::args().skip(1).collect();
Config::from_args(args)
};
#[cfg(feature = "compile-time-config")]
let cfg_result: Result<Config, ConfigError> = {
if std::env::args().nth(1).is_some() {
Err(ConfigError::CompileTimeArgvForbidden)
} else {
Config::compile_time()
}
};
match cfg_result {
Ok(cfg) => match run(cfg) {
Ok(()) => ExitCode::SUCCESS,
Err(e) => {
varta_error!("{e}");
ExitCode::from(1)
}
},
Err(ConfigError::HelpRequested) => {
let _ = std::io::stdout().lock().write_all(Config::HELP.as_bytes());
ExitCode::SUCCESS
}
Err(e) => {
varta_error!("{e}");
let _ = std::io::stderr().lock().write_all(Config::HELP.as_bytes());
ExitCode::from(2)
}
}
}
fn run(cfg: Config) -> std::io::Result<()> {
let pre_thread = varta_watch::listener::PreThreadAttestation::new()?;
unsafe {
varta_watch::signal_install::install(cfg.signal_handler_mode, handle_shutdown)?;
}
#[cfg(not(feature = "prometheus-exporter"))]
varta_watch::varta_info!("signal_handler_mode={}", cfg.signal_handler_mode.as_str());
CLOCK_SOURCE.store(cfg.clock_source.as_u8(), Ordering::Release);
let mut observer = Observer::bind(
&cfg.socket,
cfg.threshold,
cfg.socket_mode,
cfg.read_timeout,
cfg.uds_rcvbuf_bytes,
cfg.tracker_capacity,
cfg.tracker_eviction_policy,
cfg.eviction_scan_window,
cfg.max_beat_rate,
cfg.global_beat_rate,
cfg.global_beat_burst,
cfg.clock_source,
&pre_thread,
)?
.with_allow_cross_namespace(cfg.allow_cross_namespace_agents);
#[cfg(not(any(
target_os = "linux",
target_os = "macos",
target_os = "freebsd",
target_os = "dragonfly",
target_os = "netbsd",
target_os = "illumos",
target_os = "solaris",
)))]
varta_warn!(
"running on {} — per-datagram PID verification is unavailable. \
Beats are tagged socket-mode-only; recovery commands will be refused. \
The only trust boundary is --socket-mode (default 0600): any process \
under the same UID can forge frame.pid.",
std::env::consts::OS,
);
#[cfg(feature = "secure-udp")]
let secure_udp_keys = cfg.load_secure_keys()?;
#[cfg(feature = "secure-udp")]
let master_key = cfg.load_master_key()?;
#[cfg(feature = "udp-core")]
if let Some(port) = cfg.udp_port {
#[cfg(feature = "secure-udp")]
let secure_keys_configured = cfg.secure_key_file.is_some()
|| cfg.accepted_key_file.is_some()
|| cfg.master_key_file.is_some();
#[cfg(not(feature = "secure-udp"))]
let secure_keys_configured = false;
let bind_addr = cfg.udp_bind_addr.unwrap_or(if secure_keys_configured {
std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST)
} else {
std::net::IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED)
});
let addr = std::net::SocketAddr::new(bind_addr, port);
if secure_keys_configured && !bind_addr.is_loopback() {
#[cfg(not(feature = "compile-time-config"))]
varta_warn!(
"secure-UDP is bound to non-loopback {addr} \
(--i-accept-secure-udp-non-loopback). The 1-deep replay shadow \
after capacity-forced eviction is inadequate for any reachable \
network; restrict reach via firewall / private VLAN. See \
book/src/architecture/vlp-transports.md for the threat-boundary \
derivation."
);
#[cfg(feature = "compile-time-config")]
varta_warn!(
"secure-UDP is bound to non-loopback {addr}. The 1-deep replay \
shadow after capacity-forced eviction is inadequate for any \
reachable network; restrict reach via firewall / private VLAN."
);
}
#[allow(unused_mut, unused_assignments)]
let mut secure_bound = false;
#[cfg(feature = "secure-udp")]
{
let has_shared_keys = secure_udp_keys.is_some();
let has_master = master_key.is_some();
if has_shared_keys || has_master {
let mut all_keys: Vec<varta_vlp::crypto::Key> = Vec::new();
if let Some((primary, accepted)) = secure_udp_keys {
all_keys.push(primary);
all_keys.extend(accepted);
}
let secure = if let Some(mk) = master_key {
varta_watch::SecureUdpListener::bind_with_master(addr, all_keys, mk).map_err(
|e| {
std::io::Error::new(
e.kind(),
format!("secure UDP bind (master key) {}: {e}", addr),
)
},
)?
} else {
varta_watch::SecureUdpListener::bind(addr, all_keys).map_err(|e| {
std::io::Error::new(e.kind(), format!("secure UDP bind {}: {e}", addr))
})?
};
let trust = if cfg.i_accept_recovery_on_secure_udp {
varta_watch::TransportTrust::Operator
} else {
varta_watch::TransportTrust::Untrusted
};
let secure = secure.with_recovery_trust(trust);
observer.add_listener(Box::new(secure));
secure_bound = true;
}
}
if !secure_bound {
if !cfg.i_accept_plaintext_udp {
#[cfg(not(feature = "compile-time-config"))]
varta_error!(
"--udp-port {addr} cannot bind: no AEAD keys are configured \
and --i-accept-plaintext-udp was not passed. Provide \
--key-file (or --master-key-file) for authenticated transport, \
or pass --i-accept-plaintext-udp to explicitly accept the \
security risk of an unauthenticated UDP listener (test/dev only)."
);
#[cfg(feature = "compile-time-config")]
varta_error!(
"UDP listener at {addr} cannot bind: no AEAD keys are configured \
and plaintext-UDP acknowledgement is not set."
);
return Err(std::io::Error::new(
std::io::ErrorKind::InvalidInput,
"plaintext UDP requires the plaintext-UDP acknowledgement (and no keys are configured)",
));
}
#[cfg(feature = "unsafe-plaintext-udp")]
{
let trust = if cfg.i_accept_recovery_on_plaintext_udp {
varta_watch::TransportTrust::Operator
} else {
varta_watch::TransportTrust::Untrusted
};
let udp = varta_watch::UdpListener::bind(addr)
.map_err(|e| std::io::Error::new(e.kind(), format!("UDP bind {}: {e}", addr)))?
.with_recovery_trust(trust);
observer.add_listener(Box::new(udp));
#[cfg(not(feature = "compile-time-config"))]
varta_warn!(
"UDP on {addr} is running WITHOUT authentication \
(--i-accept-plaintext-udp). Any device with network reach to \
this port can inject heartbeats, suppress stall detection, or \
trigger false recovery commands. NOT for production / \
safety-critical use."
);
#[cfg(feature = "compile-time-config")]
varta_warn!(
"UDP on {addr} is running WITHOUT authentication. \
Any device with network reach to this port can inject \
heartbeats. NOT for production / safety-critical use."
);
}
#[cfg(not(feature = "unsafe-plaintext-udp"))]
{
#[cfg(not(feature = "compile-time-config"))]
varta_error!(
"--udp-port {addr} cannot bind: this build does not include \
--features unsafe-plaintext-udp, and no AEAD keys are \
configured. Rebuild with --features secure-udp and provide \
--key-file / --master-key-file."
);
#[cfg(feature = "compile-time-config")]
varta_error!(
"UDP listener at {addr} cannot bind: plaintext UDP is not \
compiled in and no AEAD keys are configured."
);
return Err(std::io::Error::new(
std::io::ErrorKind::InvalidInput,
"plaintext UDP not compiled in; no keys configured",
));
}
}
}
#[cfg(not(feature = "udp-core"))]
if cfg.udp_port.is_some() {
#[cfg(not(feature = "compile-time-config"))]
varta_error!(
"--udp-port requires UDP support (rebuild with --features secure-udp \
for authenticated transport, or --features unsafe-plaintext-udp for \
a development/testing plaintext listener)"
);
#[cfg(feature = "compile-time-config")]
varta_error!("UDP port configured but UDP support is not compiled in");
return Err(std::io::Error::new(
std::io::ErrorKind::InvalidInput,
"UDP support not compiled in",
));
}
#[cfg(not(feature = "secure-udp"))]
if cfg.secure_key_file.is_some()
|| cfg.accepted_key_file.is_some()
|| cfg.master_key_file.is_some()
{
#[cfg(not(feature = "compile-time-config"))]
varta_error!(
"--key-file / --accepted-key-file / --master-key-file require secure \
UDP support (rebuild with --features secure-udp)"
);
#[cfg(feature = "compile-time-config")]
varta_error!(
"secure-UDP key files are configured but the secure-UDP transport \
is not compiled into this build"
);
return Err(std::io::Error::new(
std::io::ErrorKind::InvalidInput,
"secure UDP support not compiled in",
));
}
let recovery_mode = cfg.resolve_recovery_mode()?;
if cfg.recovery_inherit_env && recovery_mode.is_some() {
#[cfg(not(feature = "compile-time-config"))]
varta_warn!(
"--recovery-inherit-env is set: recovery child processes will inherit \
the observer's full environment. Audit the observer env for secrets \
(AWS_*, *_TOKEN, OAuth bearers, database URLs) before production. \
Prefer --recovery-env KEY=VALUE for explicit allowlisting."
);
#[cfg(feature = "compile-time-config")]
varta_warn!(
"recovery child env inheritance is enabled (compile-time config); \
recovery subprocesses inherit the observer's environment. Audit \
observer env for secrets before deployment."
);
}
let recovery_audit_sink = match cfg.recovery_audit_file.as_ref() {
Some(path) => {
let audit_cfg = varta_watch::audit::AuditConfig {
max_bytes: cfg.recovery_audit_max_bytes,
sync_every: cfg.recovery_audit_sync_every,
daemon_pid: std::process::id(),
fsync_budget: std::time::Duration::from_millis(cfg.audit_fsync_budget_ms as u64),
sync_interval: if cfg.audit_sync_interval_ms == 0 {
None
} else {
Some(std::time::Duration::from_millis(
cfg.audit_sync_interval_ms as u64,
))
},
rotation_budget: std::time::Duration::from_millis(
cfg.audit_rotation_budget_ms as u64,
),
};
let (sink, warnings) = varta_watch::audit::RecoveryAuditLog::create(path, audit_cfg)?;
if warnings.chain_disabled {
#[cfg(not(feature = "compile-time-config"))]
varta_warn!(
"recovery audit chain is DISABLED (build is missing the `audit-chain` \
feature). v2 records will carry a literal `-` in the chain column and \
this build is NOT IEC 62304 Class C-conforming. Rebuild with \
--features audit-chain for tamper-evident audit records."
);
#[cfg(feature = "compile-time-config")]
varta_warn!(
"recovery audit chain is DISABLED; records will carry `-` in the \
chain column and this build is NOT IEC 62304 Class C-conforming."
);
}
if warnings.sync_relaxed {
#[cfg(not(feature = "compile-time-config"))]
varta_warn!(
"recovery audit fdatasync cadence is relaxed (--recovery-audit-sync-every \
> 1). A power cut can lose up to N-1 records. The Class C-conforming \
value is 1 (every record)."
);
#[cfg(feature = "compile-time-config")]
varta_warn!(
"recovery audit fdatasync cadence is relaxed (> 1). A power cut can \
lose up to N-1 records. The Class C-conforming value is 1."
);
}
if warnings.legacy_v1 {
varta_warn!(
"recovery audit file contains a legacy v1 prefix; v2 section begins now \
with a `legacy_v1` boot record."
);
}
if warnings.corrupt_tail {
varta_warn!(
"recovery audit file had a torn tail from a prior unclean shutdown; \
truncated to the last newline before resuming."
);
}
if warnings.schema_drift {
varta_warn!(
"recovery audit file header does not match v1 or v2; appending a fresh \
v2 section with a `schema_drift` boot record."
);
}
Some(sink)
}
None => None,
};
let recovery_source = if let Some(p) = cfg.recovery_exec_file.as_ref() {
p.display().to_string()
} else {
"inline".to_string()
};
if recovery_mode.is_some() {
if cfg.i_accept_recovery_on_secure_udp {
#[cfg(not(feature = "compile-time-config"))]
varta_warn!(
"recovery on secure-UDP listener is enabled \
(--secure-udp-i-accept-recovery-on-unauthenticated-transport). \
NOT for safety-critical use."
);
#[cfg(feature = "compile-time-config")]
varta_warn!(
"recovery on secure-UDP listener is enabled. \
NOT for safety-critical use."
);
}
if cfg.i_accept_recovery_on_plaintext_udp {
#[cfg(not(feature = "compile-time-config"))]
varta_warn!(
"recovery on plaintext-UDP listener is enabled \
(--plaintext-udp-i-accept-recovery-on-unauthenticated-transport). \
NOT for safety-critical use."
);
#[cfg(feature = "compile-time-config")]
varta_warn!(
"recovery on plaintext-UDP listener is enabled. \
NOT for safety-critical use."
);
}
}
let mut recovery = recovery_mode.map(|mode| {
let capture_cap = if cfg.recovery_capture_stdio {
cfg.recovery_capture_bytes
} else {
0
};
Recovery::with_timeout(mode, cfg.recovery_debounce, cfg.recovery_timeout)
.with_recovery_env(cfg.recovery_env.clone())
.with_recovery_inherit_env(cfg.recovery_inherit_env)
.with_shutdown_grace(cfg.shutdown_grace)
.with_capture(capture_cap)
.with_source(recovery_source.clone())
.with_audit_sink(recovery_audit_sink)
.with_allow_cross_namespace(cfg.allow_cross_namespace_agents)
.with_reap_scratch_capacity(cfg.tracker_capacity)
.with_outstanding_capacity(cfg.tracker_capacity)
});
let mut file_export: Option<FileExporter> = match cfg.file_export.as_ref() {
Some(path) => Some(FileExporter::create(
path,
cfg.export_file_max_bytes,
cfg.export_file_sync_every,
)?),
None => None,
};
#[cfg(feature = "prometheus-exporter")]
let mut prom_export: Option<PromExporter> = match cfg.prom_addr {
Some(addr) => {
if !addr.ip().is_loopback() {
varta_warn!(
"/metrics is bound to a non-loopback address ({addr}); any host \
that can reach this port can attempt a scrape. The bearer token \
in --prom-token-file is enforced on every connection, but \
binding to 127.0.0.1 / ::1 behind a reverse proxy or \
firewall-restricted interface remains the recommended \
defense-in-depth posture."
);
}
let token = cfg.load_prom_token()?.ok_or_else(|| {
std::io::Error::new(
std::io::ErrorKind::InvalidInput,
"internal: --prom-addr without --prom-token-file slipped past Config validation",
)
})?;
let mut pe = PromExporter::bind_with_rate_limit(
addr,
token,
cfg.prom_rate_limit_per_sec,
cfg.prom_rate_limit_burst,
)?
.with_iteration_budget(cfg.iteration_budget)
.with_scrape_budget(cfg.scrape_budget);
pe.set_tracker_config(cfg.tracker_capacity, cfg.eviction_scan_window);
pe.set_signal_handler_mode(cfg.signal_handler_mode.as_str());
pe.set_uds_rcvbuf_bytes(observer.uds_rcvbuf_bytes());
pe.set_pid_max_current(observer.pid_max());
if let Ok(bound_addr) = pe.local_addr() {
let line = format!("{bound_addr}\n");
let _ = std::io::stdout().lock().write_all(line.as_bytes());
}
Some(pe)
}
None => None,
};
let mut sd_notify = varta_watch::notify::SdNotify::from_env();
sd_notify.ready();
const AUTO_DEADLINE_SECS: u64 = 4;
let wdt_notifier = sd_notify.take_watchdog_notifier();
let wdt_deadline: Option<Duration> = match (cfg.self_watchdog, wdt_notifier.is_some()) {
(Some(d), _) => Some(d),
(None, true) => Some(Duration::from_secs(AUTO_DEADLINE_SECS)),
(None, false) => None,
};
let mut wdt_handle: Option<std::thread::JoinHandle<()>> = None;
if let Some(deadline) = wdt_deadline {
let deadline_ns = deadline.as_nanos() as u64;
let secs = deadline.as_secs();
let tick_sleep = match wdt_notifier.as_ref() {
Some(n) => (n.half_interval() / 2)
.min(Duration::from_millis(500))
.max(Duration::from_millis(25)),
None => Duration::from_millis(500),
};
let mut wdt_notifier = wdt_notifier;
let handle = std::thread::Builder::new()
.name("varta-watchdog".into())
.spawn(move || loop {
std::thread::sleep(tick_sleep);
if SHUTDOWN.load(Ordering::Acquire) != 0 {
return;
}
let now = observer_now_ns();
let last = LAST_TICK_NS.load(Ordering::Acquire);
if watchdog_expired(now, last, deadline_ns) {
eprintln!("varta-watch poll loop wedged for >{secs}s; aborting");
std::process::abort();
}
#[cfg(feature = "prometheus-exporter")]
{
let stage_idx = CURRENT_STAGE.load(Ordering::Acquire);
if stage_idx != u8::MAX {
let stage_idx = stage_idx as usize;
if let (Some(abort_ns), Some(entry_atom)) = (
STAGE_ABORT_NS.get(stage_idx),
LAST_STAGE_ENTRY_NS.get(stage_idx),
) {
let entry_ns = entry_atom.load(Ordering::Relaxed);
if entry_ns != 0 && now.saturating_sub(entry_ns) > *abort_ns {
let stage_label = varta_watch::exporter::STAGE_LABELS
.get(stage_idx)
.copied()
.unwrap_or("unknown");
eprintln!(
"varta-watch stage '{stage_label}' wedged for >{abort_ns}ns; aborting"
);
std::process::abort();
}
}
}
}
if let Some(n) = wdt_notifier.as_mut() {
n.tick();
}
})?;
wdt_handle = Some(handle);
} else if sd_notify.watchdog_half_interval().is_some() {
varta_warn!(
"$WATCHDOG_USEC is set but no self-watchdog could be started \
(notify socket open failed). systemd watchdog integration is disabled."
);
}
let mut hw_wdt = if let Some(ref path) = cfg.hw_watchdog {
match varta_watch::hw_watchdog::HwWatchdog::open(path) {
Ok(w) => Some(w),
Err(e) => {
#[cfg(not(feature = "compile-time-config"))]
let msg = format!("--hw-watchdog {}: {e}", path.display());
#[cfg(feature = "compile-time-config")]
let msg = format!("hw_watchdog {}: {e}", path.display());
return Err(io::Error::new(e.kind(), msg));
}
}
} else {
None
};
let started = Instant::now();
let mut loop_count: u64 = 0;
#[cfg(feature = "test-hooks")]
let mut wedge_once = cfg.inject_wedge_ms;
loop {
if SHUTDOWN.load(Ordering::Acquire) != 0 {
break;
}
if let Some(deadline) = cfg.shutdown_after {
if started.elapsed() >= deadline {
break;
}
}
#[cfg(feature = "prometheus-exporter")]
let iter_start = Instant::now();
#[cfg(feature = "prometheus-exporter")]
let mut stage_start = iter_start;
#[cfg(feature = "prometheus-exporter")]
{
if let Some(a) = LAST_STAGE_ENTRY_NS.get(IterStage::DrainPending as usize) {
a.store(observer_now_ns(), Ordering::Relaxed);
}
CURRENT_STAGE.store(IterStage::DrainPending as u8, Ordering::Release);
}
while let Some(ev) = observer.poll_pending() {
if let Some(fe) = file_export.as_mut() {
if let Err(e) = fe.record(&ev) {
varta_error_rl!(LogKind::FileExportIo, "file export error: {e}");
}
}
#[cfg(feature = "prometheus-exporter")]
if let Some(pe) = prom_export.as_mut() {
let _ = pe.record(&ev);
}
if let Event::Stall {
pid,
origin,
pid_ns_inode,
..
} = &ev
{
if let Some(rec) = recovery.as_mut() {
let observer_ns_inode = observer.observer_pid_namespace_inode();
let cross_namespace_agent = matches!(
(observer_ns_inode, *pid_ns_inode),
(Some(a), Some(b)) if a != b
);
let outcome = rec.on_stall(*pid, *origin, cross_namespace_agent);
#[cfg(feature = "prometheus-exporter")]
if let Some(pe) = prom_export.as_mut() {
pe.record_recovery_outcome(&outcome, None);
}
match outcome {
RecoveryOutcome::Spawned { child_pid } => {
varta_info_pid_child!(
*pid,
child_pid,
"recovery for pid {pid} spawned (child {child_pid})"
);
}
RecoveryOutcome::Debounced => {}
RecoveryOutcome::SpawnFailed(e) => {
varta_error_pid!(
*pid,
e,
"recovery for pid {pid} failed to spawn: {e}"
);
}
RecoveryOutcome::RefusedUnauthenticatedSource { pid } => {
#[cfg(not(feature = "compile-time-config"))]
varta_warn!(
"recovery for pid {pid} REFUSED: stalled beat lifetime \
includes a non-kernel-attested transport (UDP). Pass \
--i-accept-recovery-on-unauthenticated-transport AND \
enable Recovery's allow_unauthenticated_source to \
override at your own risk."
);
#[cfg(feature = "compile-time-config")]
varta_warn!(
"recovery for pid {pid} REFUSED: stalled beat lifetime \
includes a non-kernel-attested transport (UDP)."
);
}
RecoveryOutcome::RefusedCrossNamespace { pid } => {
#[cfg(not(feature = "compile-time-config"))]
varta_warn!(
"recovery for pid {pid} REFUSED: agent's PID namespace \
differs from observer's. kill(2) against this pid \
in the observer's namespace would target the wrong \
process. Pass --allow-cross-namespace-agents only when \
agents are run with --pid=host or an out-of-band PID \
translator is in place."
);
#[cfg(feature = "compile-time-config")]
varta_warn!(
"recovery for pid {pid} REFUSED: agent's PID namespace \
differs from observer's."
);
}
RecoveryOutcome::RefusedDebounceCapacity { pid } => {
#[cfg(not(feature = "compile-time-config"))]
varta_warn!(
"recovery for pid {pid} REFUSED: debounce ledger at \
capacity and no slot's debounce window has elapsed. \
This is the M8 fail-closed guard against stall-burst \
attacks. Alert on \
rate(varta_recovery_refused_total{{reason=\"debounce_capacity\"}}[5m]) > 0; \
see book/src/architecture/observer-liveness.md."
);
#[cfg(feature = "compile-time-config")]
varta_warn!(
"recovery for pid {pid} REFUSED: debounce ledger \
at capacity (M8 fail-closed guard)."
);
}
RecoveryOutcome::RefusedOutstandingCapacity { pid } => {
#[cfg(not(feature = "compile-time-config"))]
varta_warn!(
"recovery for pid {pid} REFUSED: outstanding-child \
table at capacity (tracker_capacity worth of \
recoveries already in flight). Alert on \
rate(varta_recovery_refused_total{{reason=\"outstanding_capacity\"}}[5m]) > 0."
);
#[cfg(feature = "compile-time-config")]
varta_warn!(
"recovery for pid {pid} REFUSED: outstanding-child \
table at capacity."
);
}
RecoveryOutcome::RefusedSocketModeOnly { pid } => {
varta_warn!(
"recovery for pid {pid} REFUSED: observer is running \
on a platform without per-datagram kernel credential \
passing (socket-mode-only). frame.pid cannot be \
verified — spawning a recovery command against it is \
unsafe."
);
}
RecoveryOutcome::Reaped { .. }
| RecoveryOutcome::Killed { .. }
| RecoveryOutcome::ReapFailed(_) => {
unreachable!("on_stall returned a reap-only recovery outcome")
}
}
}
}
}
#[cfg(feature = "prometheus-exporter")]
if let Some(pe) = prom_export.as_mut() {
pe.record_stage_duration(IterStage::DrainPending, stage_start.elapsed());
stage_start = Instant::now();
if let Some(a) = LAST_STAGE_ENTRY_NS.get(IterStage::Poll as usize) {
a.store(observer_now_ns(), Ordering::Relaxed);
}
CURRENT_STAGE.store(IterStage::Poll as u8, Ordering::Release);
}
let had_io = if let Some(ev) = observer.poll() {
if let Some(fe) = file_export.as_mut() {
if let Err(e) = fe.record(&ev) {
varta_error_rl!(LogKind::FileExportIo, "file export error: {e}");
}
}
#[cfg(feature = "prometheus-exporter")]
if let Some(pe) = prom_export.as_mut() {
let _ = pe.record(&ev);
}
if cfg.strict_namespace_check && !cfg.allow_cross_namespace_agents {
if let Event::NamespaceConflict { claimed_pid, .. } = &ev {
#[cfg(not(feature = "compile-time-config"))]
varta_error!(
"FATAL --strict-namespace-check: cross-namespace agent \
detected for claimed pid {claimed_pid}; refusing to \
continue. Re-run with --allow-cross-namespace-agents \
only if PID translation is correctly configured."
);
#[cfg(feature = "compile-time-config")]
varta_error!(
"FATAL strict namespace check: cross-namespace agent \
detected for claimed pid {claimed_pid}; refusing to \
continue."
);
return Err(io::Error::new(
io::ErrorKind::Other,
"cross-namespace agent detected under strict namespace check",
));
}
}
true
} else {
false
};
#[cfg(feature = "prometheus-exporter")]
if let Some(pe) = prom_export.as_mut() {
pe.record_stage_duration(IterStage::Poll, stage_start.elapsed());
stage_start = Instant::now();
if let Some(a) = LAST_STAGE_ENTRY_NS.get(IterStage::Maintenance as usize) {
a.store(observer_now_ns(), Ordering::Relaxed);
}
CURRENT_STAGE.store(IterStage::Maintenance as u8, Ordering::Release);
}
let evicted = observer.drain_evictions();
if evicted > 0 {
#[cfg(feature = "prometheus-exporter")]
if let Some(pe) = prom_export.as_mut() {
pe.record_eviction(evicted);
}
}
if let Some(evicted_pid) = observer.drain_evicted_pid() {
if let Some(fe) = file_export.as_mut() {
fe.record_eviction_pid(evicted_pid, observer.now_ns());
}
#[cfg(feature = "prometheus-exporter")]
if let Some(pe) = prom_export.as_mut() {
pe.record_evicted_pid(evicted_pid);
}
}
let capacity_exceeded = observer.drain_capacity_exceeded();
if capacity_exceeded > 0 {
#[cfg(feature = "prometheus-exporter")]
if let Some(pe) = prom_export.as_mut() {
pe.record_capacity_exceeded(capacity_exceeded);
}
}
let bind_dir_fsync_failed = Observer::drain_bind_dir_fsync_failures();
if bind_dir_fsync_failed > 0 {
#[cfg(feature = "prometheus-exporter")]
if let Some(pe) = prom_export.as_mut() {
pe.record_bind_dir_fsync_failed(bind_dir_fsync_failed);
}
}
let decrypt_failures = observer.drain_decrypt_failures();
if decrypt_failures > 0 {
#[cfg(feature = "prometheus-exporter")]
if let Some(pe) = prom_export.as_mut() {
pe.record_decrypt_failures(decrypt_failures);
}
}
let truncated = observer.drain_truncated();
if truncated > 0 {
#[cfg(feature = "prometheus-exporter")]
if let Some(pe) = prom_export.as_mut() {
pe.record_truncated(truncated);
}
}
let sender_state_full = observer.drain_sender_state_full();
if sender_state_full > 0 {
#[cfg(feature = "prometheus-exporter")]
if let Some(pe) = prom_export.as_mut() {
pe.record_sender_state_full(sender_state_full);
}
}
let aead_attempts = observer.drain_aead_attempts();
if aead_attempts > 0 {
#[cfg(feature = "prometheus-exporter")]
if let Some(pe) = prom_export.as_mut() {
pe.record_secure_aead_attempts(aead_attempts);
}
}
let per_pid_rate_limited = observer.drain_per_pid_rate_limited();
if per_pid_rate_limited > 0 {
#[cfg(feature = "prometheus-exporter")]
if let Some(pe) = prom_export.as_mut() {
pe.record_per_pid_rate_limited(per_pid_rate_limited);
}
}
let global_rate_limited = observer.drain_global_rate_limited();
if global_rate_limited > 0 {
#[cfg(feature = "prometheus-exporter")]
if let Some(pe) = prom_export.as_mut() {
pe.record_global_rate_limited(global_rate_limited);
}
}
let clock_regressions = observer.drain_clock_regressions();
if clock_regressions > 0 {
#[cfg(feature = "prometheus-exporter")]
if let Some(pe) = prom_export.as_mut() {
pe.record_clock_regressions(clock_regressions);
}
}
let clock_jumps_forward = observer.drain_clock_jumps_forward();
if clock_jumps_forward > 0 {
#[cfg(feature = "prometheus-exporter")]
if let Some(pe) = prom_export.as_mut() {
pe.record_clock_jumps_forward(clock_jumps_forward);
}
}
let nonce_wraps = observer.drain_nonce_wraps();
if nonce_wraps > 0 {
#[cfg(feature = "prometheus-exporter")]
if let Some(pe) = prom_export.as_mut() {
pe.record_nonce_wraps(nonce_wraps);
}
}
let eviction_scan_truncated = observer.drain_eviction_scan_truncated();
if eviction_scan_truncated > 0 {
#[cfg(feature = "prometheus-exporter")]
if let Some(pe) = prom_export.as_mut() {
pe.record_eviction_scan_truncated(eviction_scan_truncated);
}
}
let origin_conflicts = observer.drain_origin_conflicts();
if origin_conflicts > 0 {
#[cfg(feature = "prometheus-exporter")]
if let Some(pe) = prom_export.as_mut() {
pe.record_origin_conflicts(origin_conflicts);
}
}
let frame_ns_mismatches = observer.drain_cross_namespace_drops();
if frame_ns_mismatches > 0 {
#[cfg(feature = "prometheus-exporter")]
if let Some(pe) = prom_export.as_mut() {
pe.record_frame_namespace_mismatches(frame_ns_mismatches);
}
}
if observer.maybe_refresh_pid_max() {
#[cfg(feature = "prometheus-exporter")]
if let Some(pe) = prom_export.as_mut() {
pe.set_pid_max_current(observer.pid_max());
}
}
let pid_above_max = observer.drain_pid_above_max_drops();
if pid_above_max > 0 {
#[cfg(feature = "prometheus-exporter")]
if let Some(pe) = prom_export.as_mut() {
pe.record_pid_above_max_drops(pid_above_max);
}
#[cfg(not(feature = "prometheus-exporter"))]
let _ = pid_above_max;
}
let tracker_ns_conflicts = observer.drain_namespace_conflicts();
if tracker_ns_conflicts > 0 {
#[cfg(feature = "prometheus-exporter")]
if let Some(pe) = prom_export.as_mut() {
pe.record_tracker_namespace_conflicts(tracker_ns_conflicts);
}
}
let tracker_invariants = observer.drain_invariant_violations();
if tracker_invariants > 0 {
#[cfg(feature = "prometheus-exporter")]
if let Some(pe) = prom_export.as_mut() {
pe.record_tracker_invariant_violations(tracker_invariants);
}
}
let probe_exhausted = observer.drain_pid_index_probe_exhausted();
if probe_exhausted > 0 {
#[cfg(feature = "prometheus-exporter")]
if let Some(pe) = prom_export.as_mut() {
pe.record_tracker_pid_index_probe_exhausted(probe_exhausted);
}
}
if let Some(rec) = recovery.as_mut() {
let evictions = rec.take_last_fired_evictions();
let invariants = rec.take_last_fired_invariant_violations();
let outstanding_probe_exhausted = rec.take_outstanding_probe_exhausted();
#[cfg(feature = "prometheus-exporter")]
if let Some(pe) = prom_export.as_mut() {
if evictions > 0 {
pe.record_recovery_last_fired_evictions(evictions);
}
if invariants > 0 {
pe.record_recovery_invariant_violations(invariants);
}
if outstanding_probe_exhausted > 0 {
pe.record_recovery_outstanding_probe_exhausted(outstanding_probe_exhausted);
}
}
#[cfg(not(feature = "prometheus-exporter"))]
{
let _ = evictions;
let _ = invariants;
let _ = outstanding_probe_exhausted;
}
}
if let Some(rec) = recovery.as_mut() {
rec.flush_audit_pending(std::time::Duration::from_millis(10));
if rec.audit_rotation_pending() || rec.audit_rotation_due() {
let _ = rec.drive_audit_rotation(std::time::Duration::from_millis(
cfg.audit_rotation_budget_ms as u64,
));
}
#[cfg(feature = "prometheus-exporter")]
if let Some(pe) = prom_export.as_mut() {
let dropped = rec.take_audit_dropped();
if dropped > 0 {
pe.record_audit_dropped(dropped);
}
let budget_exceeded = rec.take_audit_flush_budget_exceeded();
if budget_exceeded > 0 {
pe.record_audit_flush_budget_exceeded(budget_exceeded);
}
for d in rec.take_audit_fsync_durations() {
pe.record_audit_fsync_duration(d);
}
let fsync_overrun = rec.take_audit_fsync_budget_exceeded();
if fsync_overrun > 0 {
pe.record_audit_fsync_budget_exceeded(fsync_overrun);
}
let rot_overrun = rec.take_audit_rotation_budget_exceeded();
if rot_overrun > 0 {
pe.record_audit_rotation_budget_exceeded(rot_overrun);
}
let warn_cross = rec.take_audit_ring_watermark_warn();
if warn_cross > 0 {
pe.record_audit_ring_watermark("warn", warn_cross);
}
let crit_cross = rec.take_audit_ring_watermark_critical();
if crit_cross > 0 {
pe.record_audit_ring_watermark("critical", crit_cross);
}
}
#[cfg(not(feature = "prometheus-exporter"))]
{
let _ = rec.take_audit_fsync_durations();
let _ = rec.take_audit_fsync_budget_exceeded();
let _ = rec.take_audit_rotation_budget_exceeded();
let _ = rec.take_audit_ring_watermark_warn();
let _ = rec.take_audit_ring_watermark_critical();
}
}
if let Some(rec) = recovery.as_mut() {
if let Some(err) = rec.drain_audit_err() {
varta_warn_rl!(LogKind::AuditIo, "recovery audit IO error: {err}");
}
}
#[cfg(feature = "prometheus-exporter")]
if let Some(pe) = prom_export.as_mut() {
pe.record_stage_duration(IterStage::Maintenance, stage_start.elapsed());
stage_start = Instant::now();
if let Some(a) = LAST_STAGE_ENTRY_NS.get(IterStage::RecoveryReap as usize) {
a.store(observer_now_ns(), Ordering::Relaxed);
}
CURRENT_STAGE.store(IterStage::RecoveryReap as u8, Ordering::Release);
}
if let Some(rec) = recovery.as_mut() {
for outcome in rec.try_reap() {
#[cfg(feature = "prometheus-exporter")]
if let Some(pe) = prom_export.as_mut() {
pe.record_recovery_outcome(&outcome, None);
}
match outcome {
RecoveryOutcome::Reaped { child_pid, status } if !status.success() => {
varta_warn_child!(
child_pid,
"recovery child {child_pid} exited non-zero: {status}"
);
}
RecoveryOutcome::Killed { child_pid } => {
varta_warn_child!(
child_pid,
"recovery child {child_pid} killed after timeout"
);
}
RecoveryOutcome::ReapFailed(e) => {
varta_error_err!(e, "recovery reap failed: {e}");
}
_ => {}
}
}
#[cfg(feature = "prometheus-exporter")]
if let Some(pe) = prom_export.as_mut() {
let truncated = rec.take_reap_truncated();
if truncated > 0 {
pe.record_recovery_reap_truncated(truncated);
}
}
#[cfg(not(feature = "prometheus-exporter"))]
{
let _ = rec.take_reap_truncated();
}
}
#[cfg(feature = "prometheus-exporter")]
if let Some(pe) = prom_export.as_mut() {
pe.record_stage_duration(IterStage::RecoveryReap, stage_start.elapsed());
if let Some(a) = LAST_STAGE_ENTRY_NS.get(IterStage::ServePending as usize) {
a.store(observer_now_ns(), Ordering::Relaxed);
}
CURRENT_STAGE.store(IterStage::ServePending as u8, Ordering::Release);
let serve_start = Instant::now();
if let Err(e) = pe.serve_pending() {
varta_error_rl!(LogKind::PromServe, "/metrics serve error: {e}");
}
pe.record_loop_tick();
let serve_elapsed = serve_start.elapsed();
pe.record_serve_pending_duration(serve_elapsed);
pe.record_stage_duration(IterStage::ServePending, serve_elapsed);
stage_start = Instant::now(); if let Some(a) = LAST_STAGE_ENTRY_NS.get(IterStage::Housekeeping as usize) {
a.store(observer_now_ns(), Ordering::Relaxed);
}
CURRENT_STAGE.store(IterStage::Housekeeping as u8, Ordering::Release);
}
loop_count = loop_count.wrapping_add(1);
if let Some(ref hb_path) = cfg.heartbeat_file {
let ts = observer.now_ns();
let line = format!("{loop_count} {ts}\n");
if let Err(e) = write_heartbeat_atomic(hb_path, line.as_bytes()) {
varta_error_rl!(LogKind::HeartbeatIo, "heartbeat file write error: {e}");
}
}
LAST_TICK_NS.store(observer_now_ns(), Ordering::Release);
if let Some(ref mut hw) = hw_wdt {
hw.kick();
}
#[cfg(feature = "prometheus-exporter")]
if let Some(pe) = prom_export.as_mut() {
pe.record_stage_duration(IterStage::Housekeeping, stage_start.elapsed());
pe.record_iteration_duration(iter_start.elapsed());
}
#[cfg(feature = "prometheus-exporter")]
CURRENT_STAGE.store(u8::MAX, Ordering::Release);
if !had_io && !observer.has_pending_stalls() {
std::thread::sleep(Duration::from_millis(10));
}
#[cfg(feature = "test-hooks")]
if let Some(ms) = wedge_once.take() {
std::thread::sleep(Duration::from_millis(ms));
}
}
if let Some(ref hw) = hw_wdt {
hw.arm_disarm_on_drop();
}
SHUTDOWN.store(1, Ordering::Release);
if let Some(h) = wdt_handle.take() {
let _ = h.join();
}
sd_notify.stopping();
if let Some(fe) = file_export.as_mut() {
let _ = fe.flush();
}
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use std::fs;
use std::io::Read;
use std::sync::{Arc, Mutex};
use std::time::Duration;
use varta_watch::signal_install::SignalHandlerMode;
#[cfg(unix)]
static SIGNAL_TEST_LOCK: Mutex<()> = Mutex::new(());
#[test]
fn watchdog_expired_returns_false_before_first_tick() {
assert!(!watchdog_expired(u64::MAX, 0, 1));
}
#[test]
fn watchdog_expired_returns_false_within_deadline() {
let now = 1_000_000_000u64; let last = 999_000_000u64; let deadline = 5_000_000_000u64; assert!(!watchdog_expired(now, last, deadline));
}
#[test]
fn watchdog_expired_returns_true_past_deadline() {
let now = 10_000_000_000u64; let last = 1_000_000u64; let deadline = 5_000_000_000u64; assert!(watchdog_expired(now, last, deadline));
}
fn mk_tmpdir(tag: &str) -> PathBuf {
let dir = std::env::temp_dir().join(format!("varta_hb_{}_{}", tag, std::process::id()));
fs::create_dir_all(&dir).unwrap();
#[cfg(unix)]
{
use std::os::unix::fs::PermissionsExt;
fs::set_permissions(&dir, fs::Permissions::from_mode(0o755)).unwrap();
}
dir
}
#[test]
fn heartbeat_write_overwrites_existing() {
let dir = mk_tmpdir("overwrite");
let path = dir.join("hb.txt");
write_heartbeat_atomic(&path, b"1 100\n").unwrap();
write_heartbeat_atomic(&path, b"2 200\n").unwrap();
let contents = fs::read_to_string(&path).unwrap();
assert_eq!(contents, "2 200\n");
}
#[test]
fn heartbeat_write_is_atomic_under_reader_contention() {
let dir = mk_tmpdir("atomic");
let path = dir.join("hb.txt");
write_heartbeat_atomic(&path, b"0 0\n").unwrap();
let bad_reads: Arc<Mutex<Vec<String>>> = Arc::new(Mutex::new(Vec::new()));
let bad_reads_r = bad_reads.clone();
let path_r = path.clone();
let reader = std::thread::spawn(move || {
let deadline = std::time::Instant::now() + Duration::from_millis(300);
while std::time::Instant::now() < deadline {
let mut buf = String::new();
if let Ok(mut f) = fs::File::open(&path_r) {
let _ = f.read_to_string(&mut buf);
if !buf.is_empty() {
let parts: Vec<&str> = buf.split_whitespace().collect();
if parts.len() != 2
|| parts[0].parse::<u64>().is_err()
|| parts[1].parse::<u64>().is_err()
{
bad_reads_r.lock().unwrap().push(buf.clone());
}
}
}
std::hint::spin_loop();
}
});
let deadline = std::time::Instant::now() + Duration::from_millis(300);
let mut n: u64 = 1;
while std::time::Instant::now() < deadline {
let line = format!("{n} {}\n", n * 1000);
write_heartbeat_atomic(&path, line.as_bytes()).unwrap();
n += 1;
}
reader.join().unwrap();
let bad = bad_reads.lock().unwrap();
assert!(
bad.is_empty(),
"saw {} truncated/malformed heartbeat read(s): {:?}",
bad.len(),
&*bad
);
}
#[cfg(unix)]
#[test]
fn signal_handler_returns_ok_under_normal_conditions() {
let _guard = SIGNAL_TEST_LOCK.lock().unwrap_or_else(|e| e.into_inner());
let result = unsafe {
varta_watch::signal_install::install(SignalHandlerMode::Direct, handle_shutdown)
};
assert!(result.is_ok(), "signal install failed: {:?}", result);
}
#[cfg(unix)]
#[test]
fn signal_handler_real_sigint_flips_shutdown() {
let _guard = SIGNAL_TEST_LOCK.lock().unwrap_or_else(|e| e.into_inner());
SHUTDOWN.store(0, Ordering::Release);
unsafe { varta_watch::signal_install::install(SignalHandlerMode::Direct, handle_shutdown) }
.expect("install signal handlers");
assert!(
SHUTDOWN.load(Ordering::Acquire) == 0,
"SHUTDOWN was already non-zero before signal delivery"
);
extern "C" {
fn kill(pid: i32, sig: i32) -> i32;
fn getpid() -> i32;
}
const SIGINT: i32 = 2;
let rc = unsafe { kill(getpid(), SIGINT) };
assert_eq!(
rc,
0,
"kill(getpid(), SIGINT) failed: {:?}",
io::Error::last_os_error()
);
let deadline = std::time::Instant::now() + Duration::from_millis(50);
while std::time::Instant::now() < deadline && SHUTDOWN.load(Ordering::Acquire) == 0 {
std::thread::yield_now();
}
let fired = SHUTDOWN.load(Ordering::Acquire) != 0;
SHUTDOWN.store(0, Ordering::Release);
assert!(
fired,
"SHUTDOWN was not set within 50ms of SIGINT delivery — handler did not fire"
);
}
#[test]
fn heartbeat_tempfile_cleaned_on_rename_failure() {
let dir = mk_tmpdir("cleanup");
let target = dir.join("nonexistent_subdir").join("hb.txt");
let result = write_heartbeat_atomic(&target, b"1 100\n");
assert!(result.is_err());
let pid = std::process::id();
let tmp = PathBuf::from(format!("{}.{pid}.tmp", target.display()));
assert!(
!tmp.exists(),
"stale tempfile left behind: {}",
tmp.display()
);
}
}