use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::Duration;
use secrecy::SecretString;
use tokio::sync::mpsc;
use super::{CloudConfig, CloudError, CloudEvent, CloudState, CredentialProvider};
use crate::core::cloud::envelope::build_cloud_headers;
#[cfg(not(test))]
const CLOUD_HEALTH_INTERVAL: Duration = Duration::from_secs(60);
#[cfg(test)]
const CLOUD_HEALTH_INTERVAL: Duration = Duration::from_millis(50);
pub fn build_cloud_client(config: &CloudConfig) -> reqwest::Client {
reqwest::Client::builder()
.connect_timeout(Duration::from_millis(config.timeout_connect_ms))
.timeout(Duration::from_millis(config.timeout_total_ms))
.pool_max_idle_per_host(4)
.use_rustls_tls()
.build()
.expect("failed to build cloud reqwest client")
}
pub async fn run_cloud_worker(
mut rx: mpsc::Receiver<CloudEvent>,
credential_provider: Arc<dyn CredentialProvider>,
config: CloudConfig,
cloud_state: CloudState,
openlatch_dir: PathBuf,
) {
let client = build_cloud_client(&config);
let credential_poll_interval = Duration::from_millis(config.credential_poll_interval_ms);
let provider = credential_provider.clone();
let mut current_key: Option<SecretString> =
tokio::task::spawn_blocking(move || provider.retrieve())
.await
.unwrap_or(None);
cloud_state.set_no_credential(current_key.is_none());
let mut auth_error = false;
let mut missing_key_warned = current_key.is_none();
if current_key.is_none() {
tracing::warn!(
code = "OL-1200",
"cloud worker: no API key available — cloud forwarding disabled until you run \
'openlatch auth login' or set [cloud] enabled = false in config.toml (fail-open, \
events are still logged locally)"
);
}
let mut last_credential_poll = tokio::time::Instant::now();
let mut last_health_tick = tokio::time::Instant::now();
loop {
if last_credential_poll.elapsed() >= credential_poll_interval {
let provider = credential_provider.clone();
let new_key = tokio::task::spawn_blocking(move || provider.retrieve())
.await
.unwrap_or(None);
let key_changed = match (¤t_key, &new_key) {
(None, None) => false,
(Some(_), None) | (None, Some(_)) => true,
(Some(old), Some(new)) => {
use secrecy::ExposeSecret;
old.expose_secret() != new.expose_secret()
}
};
if new_key.is_some() {
if key_changed {
tracing::info!(
"cloud worker: credential refreshed — resetting auth_error state"
);
}
if auth_error {
auth_error = false;
cloud_state
.auth_error
.store(false, std::sync::atomic::Ordering::Relaxed);
if let Err(e) = persist_cloud_state(&openlatch_dir, false) {
tracing::warn!(error = %e, "cloud worker: failed to persist cloud_state.json on credential refresh");
}
}
missing_key_warned = false;
current_key = new_key;
}
cloud_state.set_no_credential(current_key.is_none());
last_credential_poll = tokio::time::Instant::now();
}
let event = tokio::select! {
maybe = rx.recv() => {
match maybe {
Some(event) => event,
None => {
tracing::info!("cloud worker: channel closed, exiting");
return;
}
}
}
_ = tokio::time::sleep_until(
last_credential_poll + credential_poll_interval
) => {
continue;
}
_ = tokio::time::sleep_until(
last_health_tick + CLOUD_HEALTH_INTERVAL
) => {
last_health_tick = tokio::time::Instant::now();
match cloud_health_check(&client, &config).await {
Ok(()) => cloud_state.record_health_ok(),
Err(e) => tracing::debug!(error = %e, "cloud health check failed"),
}
continue;
}
};
if auth_error {
tracing::debug!(
code = "OL-1201",
"cloud worker: auth_error active — skipping POST for event"
);
continue;
}
let key = match ¤t_key {
Some(k) => k.clone(),
None => {
if !missing_key_warned {
tracing::warn!(
code = "OL-1200",
"cloud worker: no API key available — cloud forwarding disabled until \
you run 'openlatch auth login' or set [cloud] enabled = false in \
config.toml (fail-open, events are still logged locally)"
);
missing_key_warned = true;
} else {
tracing::debug!(
code = "OL-1200",
"cloud worker: skipping event — still no API key available"
);
}
continue;
}
};
match post_event(&client, &config, &key, &event, &openlatch_dir).await {
Ok(()) => {
tracing::debug!("cloud worker: event forwarded successfully");
cloud_state.record_successful_forward();
}
Err(CloudError::AuthError) => {
tracing::warn!(
code = "OL-1201",
"cloud worker: auth error (401/403) — pausing POSTs until credential refresh"
);
auth_error = true;
cloud_state
.auth_error
.store(true, std::sync::atomic::Ordering::Relaxed);
if let Err(e) = persist_cloud_state(&openlatch_dir, true) {
tracing::warn!(error = %e, "cloud worker: failed to persist cloud_state.json");
}
}
Err(CloudError::RateLimit { retry_after_secs }) => {
tracing::warn!(
code = "OL-1202",
retry_after_secs,
"cloud worker: rate limited (429) — sleeping then retrying once"
);
tokio::time::sleep(Duration::from_secs(retry_after_secs)).await;
if let Err(e) = post_event(&client, &config, &key, &event, &openlatch_dir).await {
tracing::warn!(
error = %e,
"cloud worker: event dropped after rate-limit retry"
);
cloud_state.record_drop();
}
}
Err(CloudError::ServerError) => {
tracing::warn!(
code = "OL-1200",
"cloud worker: server error (5xx) — retrying once after delay"
);
tokio::time::sleep(Duration::from_millis(config.retry_delay_ms)).await;
if let Err(e) = post_event(&client, &config, &key, &event, &openlatch_dir).await {
tracing::warn!(error = %e, "cloud worker: event dropped after 5xx retry");
cloud_state.record_drop();
}
}
Err(CloudError::Network) => {
tracing::warn!(
code = "OL-1200",
"cloud worker: network error — retrying once after delay"
);
tokio::time::sleep(Duration::from_millis(config.retry_delay_ms)).await;
if let Err(e) = post_event(&client, &config, &key, &event, &openlatch_dir).await {
tracing::warn!(error = %e, "cloud worker: event dropped after network retry");
cloud_state.record_drop();
}
}
Err(CloudError::ClientError(code)) => {
tracing::warn!(
http_status = code,
"cloud worker: unexpected 4xx — dropping event (no retry)"
);
}
}
}
}
async fn cloud_health_check(
client: &reqwest::Client,
config: &CloudConfig,
) -> Result<(), reqwest::Error> {
let base = config.api_url.trim_end_matches('/');
let url = format!("{base}/api/v1/health");
let resp = client.get(&url).send().await?;
resp.error_for_status().map(|_| ())
}
async fn post_event(
client: &reqwest::Client,
config: &CloudConfig,
key: &SecretString,
event: &CloudEvent,
_openlatch_dir: &Path,
) -> Result<(), CloudError> {
let request_id = uuid::Uuid::now_v7().to_string();
let mut envelope = event.envelope.clone();
if let Some(obj) = envelope.as_object_mut() {
obj.insert(
"agentid".to_string(),
serde_json::Value::String(event.agent_id.clone()),
);
}
let body = serde_json::Value::Array(vec![envelope]);
let headers = build_cloud_headers(key, &request_id);
let base = config.api_url.trim_end_matches('/');
let url = format!("{base}/api/v1/events/ingest");
let response = client
.post(&url)
.headers(headers)
.json(&body)
.send()
.await
.map_err(|_| CloudError::Network)?;
let status = response.status();
if status.is_success() {
return Ok(());
}
match status.as_u16() {
401 | 403 => Err(CloudError::AuthError),
429 => {
let retry_after_secs = response
.headers()
.get("Retry-After")
.and_then(|v| v.to_str().ok())
.and_then(|s| s.parse::<u64>().ok())
.unwrap_or(config.rate_limit_default_secs);
Err(CloudError::RateLimit { retry_after_secs })
}
500..=599 => Err(CloudError::ServerError),
code => Err(CloudError::ClientError(code)),
}
}
pub fn persist_cloud_state(openlatch_dir: &Path, auth_error: bool) -> std::io::Result<()> {
std::fs::create_dir_all(openlatch_dir)?;
let updated_at = {
use std::time::{SystemTime, UNIX_EPOCH};
let secs = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
let s = secs % 60;
let m = (secs / 60) % 60;
let h = (secs / 3600) % 24;
let days = secs / 86400;
let (year, month, day) = days_to_ymd(days);
format!("{year:04}-{month:02}-{day:02}T{h:02}:{m:02}:{s:02}Z")
};
let content = format!(
"{{\"auth_error\":{},\"updated_at\":\"{}\"}}\n",
auth_error, updated_at
);
let tmp_path = openlatch_dir.join("cloud_state.json.tmp");
let final_path = openlatch_dir.join("cloud_state.json");
std::fs::write(&tmp_path, &content)?;
std::fs::rename(&tmp_path, &final_path)?;
Ok(())
}
fn days_to_ymd(days: u64) -> (u64, u64, u64) {
let z = days + 719468;
let era = z / 146097;
let doe = z % 146097;
let yoe = (doe - doe / 1460 + doe / 36524 - doe / 146096) / 365;
let y = yoe + era * 400;
let doy = doe - (365 * yoe + yoe / 4 - yoe / 100);
let mp = (5 * doy + 2) / 153;
let d = doy - (153 * mp + 2) / 5 + 1;
let m = if mp < 10 { mp + 3 } else { mp - 9 };
let y = if m <= 2 { y + 1 } else { y };
(y, m, d)
}
#[cfg(test)]
mod tests {
use super::*;
use secrecy::SecretString;
use std::sync::Mutex;
use tokio::sync::mpsc;
struct TestCredentialProvider {
key: Mutex<Option<String>>,
}
impl TestCredentialProvider {
fn with_key(key: &str) -> Arc<Self> {
Arc::new(Self {
key: Mutex::new(Some(key.to_string())),
})
}
fn empty() -> Arc<Self> {
Arc::new(Self {
key: Mutex::new(None),
})
}
fn set_key(&self, key: &str) {
*self.key.lock().unwrap() = Some(key.to_string());
}
}
impl CredentialProvider for TestCredentialProvider {
fn retrieve(&self) -> Option<SecretString> {
self.key
.lock()
.ok()
.and_then(|g| g.as_ref().map(|k| SecretString::from(k.clone())))
}
}
#[test]
fn test_build_cloud_client_creates_client_with_pool_max_idle_per_host() {
let config = CloudConfig::default();
let _client = build_cloud_client(&config);
}
#[tokio::test]
async fn test_worker_exits_cleanly_when_channel_closed() {
let (tx, rx) = mpsc::channel::<CloudEvent>(10);
let provider = TestCredentialProvider::with_key("test-key");
let state = CloudState::new();
let dir = tempfile::tempdir().unwrap();
drop(tx);
let handle = tokio::spawn(run_cloud_worker(
rx,
provider,
CloudConfig::default(),
state,
dir.path().to_path_buf(),
));
let result = tokio::time::timeout(std::time::Duration::from_secs(1), handle).await;
assert!(
result.is_ok(),
"worker must exit when channel is closed (timed out waiting)"
);
}
#[tokio::test]
async fn test_worker_skips_posts_when_no_credential_available() {
let (tx, rx) = mpsc::channel::<CloudEvent>(10);
let provider = TestCredentialProvider::empty();
let state = CloudState::new();
let dir = tempfile::tempdir().unwrap();
let handle = tokio::spawn(run_cloud_worker(
rx,
provider,
CloudConfig::default(),
state,
dir.path().to_path_buf(),
));
let _ = tx
.send(CloudEvent {
envelope: serde_json::json!({}),
agent_id: "agt_test".to_string(),
})
.await;
drop(tx);
let result = tokio::time::timeout(std::time::Duration::from_secs(2), handle).await;
assert!(result.is_ok(), "worker must exit cleanly");
}
#[test]
fn test_persist_cloud_state_writes_valid_json_with_auth_error_true() {
let dir = tempfile::tempdir().unwrap();
persist_cloud_state(dir.path(), true).expect("persist must succeed");
let content = std::fs::read_to_string(dir.path().join("cloud_state.json"))
.expect("cloud_state.json must exist");
let parsed: serde_json::Value = serde_json::from_str(&content).expect("must be valid JSON");
assert_eq!(
parsed["auth_error"], true,
"auth_error must be true: {content}"
);
assert!(
parsed["updated_at"].as_str().is_some(),
"updated_at must be present: {content}"
);
}
#[test]
fn test_persist_cloud_state_writes_valid_json_with_auth_error_false() {
let dir = tempfile::tempdir().unwrap();
persist_cloud_state(dir.path(), false).expect("persist must succeed");
let content = std::fs::read_to_string(dir.path().join("cloud_state.json"))
.expect("cloud_state.json must exist");
let parsed: serde_json::Value = serde_json::from_str(&content).expect("must be valid JSON");
assert_eq!(parsed["auth_error"], false);
}
#[test]
fn test_persist_cloud_state_creates_parent_directory_if_missing() {
let base = tempfile::tempdir().unwrap();
let nested = base.path().join("a").join("b").join("c");
assert!(!nested.exists());
persist_cloud_state(&nested, false).expect("must create directories and write");
assert!(nested.join("cloud_state.json").exists());
}
#[tokio::test]
async fn test_worker_auth_error_set_when_credential_available_but_server_returns_401() {
use std::sync::atomic::Ordering;
let mut server = mockito::Server::new_async().await;
let mock = server
.mock("POST", "/api/v1/events/ingest")
.with_status(401)
.with_body("{}")
.create_async()
.await;
let (tx, rx) = mpsc::channel::<CloudEvent>(10);
let provider = TestCredentialProvider::with_key("test-api-key");
let state = CloudState::new();
let dir = tempfile::tempdir().unwrap();
let config = CloudConfig {
api_url: server.url(),
..Default::default()
};
let state_clone = state.clone();
let handle = tokio::spawn(run_cloud_worker(
rx,
provider,
config,
state_clone,
dir.path().to_path_buf(),
));
let _ = tx
.send(CloudEvent {
envelope: serde_json::json!({"id": "evt_test"}),
agent_id: "agt_test".to_string(),
})
.await;
tokio::time::sleep(std::time::Duration::from_millis(200)).await;
assert!(
state.auth_error.load(Ordering::Relaxed),
"auth_error must be true after 401 response"
);
let state_path = dir.path().join("cloud_state.json");
assert!(state_path.exists(), "cloud_state.json must be written");
let content = std::fs::read_to_string(&state_path).unwrap();
let parsed: serde_json::Value = serde_json::from_str(&content).unwrap();
assert_eq!(parsed["auth_error"], true);
drop(tx);
let _ = tokio::time::timeout(std::time::Duration::from_secs(1), handle).await;
mock.assert_async().await;
}
#[tokio::test]
async fn test_worker_retries_once_on_5xx_then_drops() {
let mut server = mockito::Server::new_async().await;
let mock = server
.mock("POST", "/api/v1/events/ingest")
.with_status(500)
.with_body("{}")
.expect(2) .create_async()
.await;
let (tx, rx) = mpsc::channel::<CloudEvent>(10);
let provider = TestCredentialProvider::with_key("test-api-key");
let state = CloudState::new();
let dir = tempfile::tempdir().unwrap();
let config = CloudConfig {
api_url: server.url(),
retry_delay_ms: 10, ..Default::default()
};
let handle = tokio::spawn(run_cloud_worker(
rx,
provider,
config,
state,
dir.path().to_path_buf(),
));
let _ = tx
.send(CloudEvent {
envelope: serde_json::json!({}),
agent_id: "agt_test".to_string(),
})
.await;
tokio::time::sleep(std::time::Duration::from_millis(300)).await;
drop(tx);
let _ = tokio::time::timeout(std::time::Duration::from_secs(1), handle).await;
mock.assert_async().await;
}
#[tokio::test]
async fn test_worker_honors_retry_after_header_on_429() {
let mut server = mockito::Server::new_async().await;
let mock_429 = server
.mock("POST", "/api/v1/events/ingest")
.with_status(429)
.with_header("Retry-After", "1")
.with_body("{}")
.expect(1)
.create_async()
.await;
let mock_200 = server
.mock("POST", "/api/v1/events/ingest")
.with_status(200)
.with_body("{\"status\":\"accepted\"}")
.expect(1)
.create_async()
.await;
let (tx, rx) = mpsc::channel::<CloudEvent>(10);
let provider = TestCredentialProvider::with_key("test-api-key");
let state = CloudState::new();
let dir = tempfile::tempdir().unwrap();
let config = CloudConfig {
api_url: server.url(),
..Default::default()
};
let handle = tokio::spawn(run_cloud_worker(
rx,
provider,
config,
state,
dir.path().to_path_buf(),
));
let _ = tx
.send(CloudEvent {
envelope: serde_json::json!({}),
agent_id: "agt_test".to_string(),
})
.await;
tokio::time::sleep(std::time::Duration::from_millis(1500)).await;
drop(tx);
let _ = tokio::time::timeout(std::time::Duration::from_secs(1), handle).await;
mock_429.assert_async().await;
mock_200.assert_async().await;
}
#[tokio::test]
async fn test_health_tick_clears_consecutive_drops_on_2xx() {
let mut server = mockito::Server::new_async().await;
let mock_health = server
.mock("GET", "/api/v1/health")
.with_status(200)
.with_body(r#"{"status":"ok"}"#)
.expect_at_least(1)
.create_async()
.await;
let (tx, rx) = mpsc::channel::<CloudEvent>(10);
let provider = TestCredentialProvider::with_key("test-api-key");
let state = CloudState::new();
state.record_drop();
state.record_drop();
state.record_drop();
assert_eq!(state.consecutive_drops(), 3);
assert_eq!(state.drop_count(), 3);
let dir = tempfile::tempdir().unwrap();
let config = CloudConfig {
api_url: server.url(),
..Default::default()
};
let state_clone = state.clone();
let handle = tokio::spawn(run_cloud_worker(
rx,
provider,
config,
state_clone,
dir.path().to_path_buf(),
));
tokio::time::sleep(std::time::Duration::from_millis(250)).await;
assert_eq!(
state.consecutive_drops(),
0,
"health tick must clear consecutive_drops on 2xx"
);
assert_eq!(
state.drop_count(),
3,
"lifetime drop_count must be preserved"
);
drop(tx);
let _ = tokio::time::timeout(std::time::Duration::from_secs(1), handle).await;
mock_health.assert_async().await;
}
#[tokio::test]
async fn test_health_tick_does_not_clear_on_5xx() {
let mut server = mockito::Server::new_async().await;
let mock_health = server
.mock("GET", "/api/v1/health")
.with_status(500)
.with_body("{}")
.expect_at_least(1)
.create_async()
.await;
let (tx, rx) = mpsc::channel::<CloudEvent>(10);
let provider = TestCredentialProvider::with_key("test-api-key");
let state = CloudState::new();
state.record_drop();
state.record_drop();
let dir = tempfile::tempdir().unwrap();
let config = CloudConfig {
api_url: server.url(),
..Default::default()
};
let state_clone = state.clone();
let handle = tokio::spawn(run_cloud_worker(
rx,
provider,
config,
state_clone,
dir.path().to_path_buf(),
));
tokio::time::sleep(std::time::Duration::from_millis(250)).await;
assert_eq!(
state.consecutive_drops(),
2,
"failed health probe must not reset consecutive_drops"
);
drop(tx);
let _ = tokio::time::timeout(std::time::Duration::from_secs(1), handle).await;
mock_health.assert_async().await;
}
#[tokio::test]
async fn test_successful_post_clears_consecutive_drops() {
let mut server = mockito::Server::new_async().await;
let _mock_health = server
.mock("GET", "/api/v1/health")
.with_status(500)
.with_body("{}")
.create_async()
.await;
let mock_ingest = server
.mock("POST", "/api/v1/events/ingest")
.with_status(200)
.with_body(r#"{"status":"accepted"}"#)
.expect(1)
.create_async()
.await;
let (tx, rx) = mpsc::channel::<CloudEvent>(10);
let provider = TestCredentialProvider::with_key("test-api-key");
let state = CloudState::new();
state.record_drop();
state.record_drop();
state.record_drop();
let dir = tempfile::tempdir().unwrap();
let config = CloudConfig {
api_url: server.url(),
..Default::default()
};
let state_clone = state.clone();
let handle = tokio::spawn(run_cloud_worker(
rx,
provider,
config,
state_clone,
dir.path().to_path_buf(),
));
let _ = tx
.send(CloudEvent {
envelope: serde_json::json!({"id": "evt_test"}),
agent_id: "agt_test".to_string(),
})
.await;
tokio::time::sleep(std::time::Duration::from_millis(200)).await;
assert_eq!(
state.consecutive_drops(),
0,
"successful forward must clear consecutive_drops"
);
assert_eq!(state.drop_count(), 3, "lifetime drop_count preserved");
assert_eq!(state.forwarded_count(), 1);
drop(tx);
let _ = tokio::time::timeout(std::time::Duration::from_secs(1), handle).await;
mock_ingest.assert_async().await;
}
#[tokio::test]
async fn test_worker_recovers_when_credential_appears_after_startup() {
let mut server = mockito::Server::new_async().await;
let mock_ingest = server
.mock("POST", "/api/v1/events/ingest")
.with_status(200)
.with_body(r#"{"status":"accepted"}"#)
.expect(1)
.create_async()
.await;
let (tx, rx) = mpsc::channel::<CloudEvent>(10);
let provider = TestCredentialProvider::empty();
let state = CloudState::new();
let dir = tempfile::tempdir().unwrap();
let config = CloudConfig {
api_url: server.url(),
credential_poll_interval_ms: 50,
..Default::default()
};
let provider_handle = provider.clone();
let handle = tokio::spawn(run_cloud_worker(
rx,
provider,
config,
state.clone(),
dir.path().to_path_buf(),
));
for i in 0..3 {
let _ = tx
.send(CloudEvent {
envelope: serde_json::json!({"id": format!("evt_{i}")}),
agent_id: "agt_test".to_string(),
})
.await;
}
tokio::time::sleep(std::time::Duration::from_millis(30)).await;
assert_eq!(
state.forwarded_count(),
0,
"events must not be forwarded before a credential is available"
);
provider_handle.set_key("late-arriving-key");
tokio::time::sleep(std::time::Duration::from_millis(120)).await;
let _ = tx
.send(CloudEvent {
envelope: serde_json::json!({"id": "evt_after_reload"}),
agent_id: "agt_test".to_string(),
})
.await;
tokio::time::sleep(std::time::Duration::from_millis(150)).await;
drop(tx);
let _ = tokio::time::timeout(std::time::Duration::from_secs(1), handle).await;
mock_ingest.assert_async().await;
assert_eq!(
state.forwarded_count(),
1,
"exactly one event should be forwarded after credential hot-reload"
);
}
}