pub mod auth;
pub mod dedup;
pub mod handlers;
pub mod reconciler;
pub mod watcher;
use std::sync::atomic::AtomicU64;
use std::sync::{Arc, Mutex};
use axum::{extract::DefaultBodyLimit, middleware, routing::get, routing::post, Router};
use secrecy::SecretString;
use tokio::net::TcpListener;
use crate::cloud::{CloudState, CredentialProvider};
use crate::config::Config;
use crate::core::logging::tamper_log::{TamperLogger, TamperLoggerHandle};
use crate::logging::EventLogger;
use crate::privacy::PrivacyFilter;
use crate::update;
struct CredentialStoreAdapter {
store: Arc<dyn crate::auth::CredentialStore>,
}
impl CredentialProvider for CredentialStoreAdapter {
fn retrieve(&self) -> Option<SecretString> {
match self.store.retrieve() {
Ok(key) => Some(key),
Err(e) => {
tracing::debug!(
code = e.code,
"credential provider: retrieve failed — returning None to worker"
);
None
}
}
}
}
pub struct AppState {
pub config: Arc<Config>,
pub token: String,
pub dedup: dedup::DedupStore,
pub event_logger: EventLogger,
pub privacy_filter: PrivacyFilter,
pub event_counter: AtomicU64,
pub shutdown_tx: tokio::sync::Mutex<Option<tokio::sync::oneshot::Sender<()>>>,
pub started_at: std::time::Instant,
pub available_update: Mutex<Option<String>>,
pub cloud_tx: Option<tokio::sync::mpsc::Sender<crate::cloud::CloudEvent>>,
pub cloud_state: Option<CloudState>,
pub local_ipv4: Option<std::net::Ipv4Addr>,
pub local_ipv6: Option<std::net::Ipv6Addr>,
pub public_ipv4: Option<std::net::Ipv4Addr>,
pub public_ipv6: Option<std::net::Ipv6Addr>,
pub tamper_logger: Option<TamperLogger>,
}
impl AppState {
pub fn set_available_update(&self, version: String) {
if let Ok(mut guard) = self.available_update.lock() {
*guard = Some(version);
}
}
pub fn get_available_update(&self) -> Option<String> {
self.available_update.lock().ok().and_then(|g| g.clone())
}
}
pub async fn start_server(
config: Config,
token: String,
credential_store: Option<Arc<dyn crate::auth::CredentialStore>>,
) -> anyhow::Result<(u64, u64)> {
let bind_host = match std::env::var("OPENLATCH_BIND_ALL").as_deref() {
Ok("true") | Ok("1") => "0.0.0.0",
_ => "127.0.0.1",
};
let bind_addr = format!("{}:{}", bind_host, config.port);
let listener = TcpListener::bind(&bind_addr).await?;
tracing::info!(
port = config.port,
addr = %bind_addr,
"daemon listening"
);
if let Err(e) = crate::config::write_port_file(config.port) {
tracing::warn!(error = %e, "failed to write daemon.port file");
}
serve_with_listener(listener, config, token, credential_store).await
}
pub async fn start_server_with_listener(
listener: TcpListener,
config: Config,
token: String,
credential_store: Option<Arc<dyn crate::auth::CredentialStore>>,
) -> anyhow::Result<(u64, u64)> {
serve_with_listener(listener, config, token, credential_store).await
}
async fn serve_with_listener(
listener: TcpListener,
config: Config,
token: String,
credential_store: Option<Arc<dyn crate::auth::CredentialStore>>,
) -> anyhow::Result<(u64, u64)> {
let log_dir = config.log_dir.clone();
let (event_logger, logger_handle) = EventLogger::new(log_dir.clone());
let privacy_filter = PrivacyFilter::new(&config.extra_patterns);
let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel::<()>();
let (cloud_tx, cloud_state_opt) = if config.cloud.enabled {
if let Some(store) = credential_store {
let (tx, rx) = tokio::sync::mpsc::channel(config.cloud.channel_size);
let cloud_state = CloudState::new();
let cloud_config = crate::cloud::CloudConfig {
api_url: config.cloud.api_url.clone(),
timeout_connect_ms: config.cloud.timeout_connect_ms,
timeout_total_ms: config.cloud.timeout_total_ms,
retry_count: config.cloud.retry_count,
retry_delay_ms: config.cloud.retry_delay_ms,
channel_size: config.cloud.channel_size,
rate_limit_default_secs: 30,
credential_poll_interval_ms: config.cloud.credential_poll_interval_ms,
};
let openlatch_dir = crate::config::openlatch_dir();
let provider: Arc<dyn CredentialProvider> = Arc::new(CredentialStoreAdapter { store });
let worker_state = cloud_state.clone();
tokio::spawn(crate::cloud::worker::run_cloud_worker(
rx,
provider,
cloud_config,
worker_state,
openlatch_dir,
));
tracing::info!(
api_url = %config.cloud.api_url,
channel_size = config.cloud.channel_size,
"cloud forwarding worker started"
);
(Some(tx), Some(cloud_state))
} else {
tracing::info!("cloud forwarding enabled in config but no credential store provided — cloud forwarding disabled");
(None, None)
}
} else {
(None, None)
};
let startup_started = std::time::Instant::now();
let host_ips = crate::net::HostIps::detect().await;
tracing::info!(
local_ipv4 = host_ips
.local_ipv4
.map(|a| a.to_string())
.as_deref()
.unwrap_or("none"),
local_ipv6 = host_ips
.local_ipv6
.map(|a| a.to_string())
.as_deref()
.unwrap_or("none"),
public_ipv4 = host_ips
.public_ipv4
.map(|a| a.to_string())
.as_deref()
.unwrap_or("none"),
public_ipv6 = host_ips
.public_ipv6
.map(|a| a.to_string())
.as_deref()
.unwrap_or("none"),
"host ips detected"
);
let openlatch_dir_for_tamper = crate::config::openlatch_dir();
let (tamper_logger, _tamper_logger_handle): (TamperLogger, TamperLoggerHandle) =
TamperLogger::new(openlatch_dir_for_tamper);
let state = Arc::new(AppState {
config: Arc::new(config.clone()),
token,
dedup: dedup::DedupStore::new(),
event_logger,
privacy_filter,
event_counter: AtomicU64::new(0),
shutdown_tx: tokio::sync::Mutex::new(Some(shutdown_tx)),
started_at: std::time::Instant::now(),
available_update: Mutex::new(None),
cloud_tx,
cloud_state: cloud_state_opt,
local_ipv4: host_ips.local_ipv4,
local_ipv6: host_ips.local_ipv6,
public_ipv4: host_ips.public_ipv4,
public_ipv6: host_ips.public_ipv6,
tamper_logger: Some(tamper_logger),
});
crate::telemetry::capture_global(crate::telemetry::Event::daemon_started(
state.config.port,
startup_started
.elapsed()
.as_millis()
.min(u128::from(u64::MAX)) as u64,
state.config.cloud.enabled,
));
if config.update.check {
let current = env!("CARGO_PKG_VERSION").to_string();
let state_for_update = state.clone();
tokio::spawn(async move {
if let Some(latest) = update::check_for_update(¤t).await {
tracing::warn!(code = crate::error::ERR_VERSION_OUTDATED, latest_version = %latest, "Update available: run `npx openlatch@latest`");
state_for_update.set_available_update(latest);
}
});
}
let state_for_evict = state.clone();
tokio::spawn(async move {
let mut interval = tokio::time::interval(std::time::Duration::from_secs(30));
loop {
interval.tick().await;
state_for_evict.dedup.evict_expired();
}
});
let _watcher_guard;
let _poll_handle;
if let Ok(agent) = crate::hooks::detect_agent() {
let settings_path = match &agent {
crate::hooks::DetectedAgent::ClaudeCode { settings_path, .. } => settings_path.clone(),
};
let openlatch_dir = crate::config::openlatch_dir();
let token_file = openlatch_dir.join("daemon.token");
reconciler::run_startup_reconcile(&settings_path, &openlatch_dir, config.port);
let (reconcile_tx, reconcile_rx) = tokio::sync::mpsc::channel(100);
let sinks = reconciler::TamperSinks {
logger: state.tamper_logger.clone(),
cloud_tx: state.cloud_tx.clone(),
agent_id: state.config.agent_id.clone().unwrap_or_default(),
client_version: env!("CARGO_PKG_VERSION").to_string(),
};
let r = reconciler::Reconciler::new_with_sinks(
reconcile_rx,
settings_path.clone(),
openlatch_dir,
config.port,
token_file,
sinks,
);
tokio::spawn(r.run());
_watcher_guard = match watcher::spawn_watcher(&settings_path, reconcile_tx.clone()) {
Ok(w) => {
tracing::info!("filesystem watcher active");
Some(w)
}
Err(e) => {
tracing::warn!(error = %e, "filesystem watcher failed — falling back to poll-only");
None
}
};
_poll_handle = Some(watcher::spawn_poll_fallback(reconcile_tx));
tracing::info!("reconciler started (reactive watcher + 30s poll)");
} else {
_watcher_guard = None;
_poll_handle = None;
}
let hook_routes = Router::new()
.route("/hooks", post(handlers::ingest_cloudevent))
.route_layer(middleware::from_fn_with_state(
state.clone(),
auth::bearer_auth,
));
let shutdown_route = Router::new()
.route("/shutdown", post(handlers::shutdown_handler))
.route_layer(middleware::from_fn_with_state(
state.clone(),
auth::bearer_auth,
));
let public_routes = Router::new()
.route("/health", get(handlers::health))
.route("/metrics", get(handlers::metrics));
let app = Router::new()
.merge(hook_routes)
.merge(shutdown_route)
.merge(public_routes)
.layer(DefaultBodyLimit::max(1_048_576))
.with_state(state.clone());
axum::serve(listener, app)
.with_graceful_shutdown(async move {
tokio::select! {
_ = signal_handler() => {
tracing::info!("received OS shutdown signal");
}
_ = shutdown_rx => {
tracing::info!("received shutdown via /shutdown endpoint");
}
}
})
.await?;
let uptime_secs = state.started_at.elapsed().as_secs();
let events = state
.event_counter
.load(std::sync::atomic::Ordering::Relaxed);
crate::logging::daemon_log::log_shutdown(uptime_secs, events);
crate::telemetry::capture_global(crate::telemetry::Event::daemon_stopped(uptime_secs, events));
match Arc::try_unwrap(state) {
Ok(_state) => { }
Err(arc) => {
tracing::warn!(
strong_refs = Arc::strong_count(&arc),
"AppState still has references at shutdown — log drain may be incomplete"
);
drop(arc);
}
}
logger_handle.shutdown().await;
Ok((uptime_secs, events))
}
pub fn format_uptime(secs: u64) -> String {
let hours = secs / 3600;
let minutes = (secs % 3600) / 60;
let seconds = secs % 60;
if hours > 0 {
format!("{}h{}m", hours, minutes)
} else if minutes > 0 {
format!("{}m{}s", minutes, seconds)
} else {
format!("{}s", seconds)
}
}
async fn signal_handler() {
#[cfg(unix)]
{
use tokio::signal::unix::{signal, SignalKind};
let mut sigterm =
signal(SignalKind::terminate()).expect("failed to register SIGTERM handler");
tokio::select! {
_ = tokio::signal::ctrl_c() => {}
_ = sigterm.recv() => {}
}
}
#[cfg(not(unix))]
{
tokio::signal::ctrl_c()
.await
.expect("failed to register ctrl_c handler");
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_openlatch_marker_detected_in_settings() {
let with_hooks = r#"{"hooks": {"_openlatch": true, "preToolUse": []}}"#;
assert!(with_hooks.contains("\"_openlatch\""));
let without_hooks = r#"{"hooks": {"preToolUse": []}}"#;
assert!(!without_hooks.contains("\"_openlatch\""));
}
#[test]
fn test_format_uptime_seconds_only() {
assert_eq!(format_uptime(0), "0s");
assert_eq!(format_uptime(45), "45s");
assert_eq!(format_uptime(59), "59s");
}
#[test]
fn test_format_uptime_minutes_and_seconds() {
assert_eq!(format_uptime(60), "1m0s");
assert_eq!(format_uptime(192), "3m12s");
assert_eq!(format_uptime(3599), "59m59s");
}
#[test]
fn test_format_uptime_hours_and_minutes() {
assert_eq!(format_uptime(3600), "1h0m");
assert_eq!(format_uptime(8094), "2h14m");
assert_eq!(format_uptime(7200), "2h0m");
}
}