pub mod envelope;
pub mod tamper;
pub mod worker;
use std::sync::{
atomic::{AtomicBool, AtomicU64, Ordering},
Arc,
};
use secrecy::SecretString;
pub trait CredentialProvider: Send + Sync {
fn retrieve(&self) -> Option<SecretString>;
}
#[derive(Debug, Clone)]
pub struct CloudConfig {
pub api_url: String,
pub timeout_connect_ms: u64,
pub timeout_total_ms: u64,
pub retry_count: u32,
pub retry_delay_ms: u64,
pub channel_size: usize,
pub rate_limit_default_secs: u64,
pub credential_poll_interval_ms: u64,
}
impl Default for CloudConfig {
fn default() -> Self {
Self {
api_url: "https://app.openlatch.ai/api".into(),
timeout_connect_ms: 5000,
timeout_total_ms: 30000,
retry_count: 1,
retry_delay_ms: 2000,
channel_size: 1000,
rate_limit_default_secs: 30,
credential_poll_interval_ms: 60_000,
}
}
}
#[derive(Debug, Clone)]
pub struct CloudEvent {
pub envelope: serde_json::Value,
pub agent_id: String,
}
#[derive(Debug)]
pub enum CloudError {
AuthError,
RateLimit {
retry_after_secs: u64,
},
ServerError,
Network,
ClientError(u16),
}
impl std::fmt::Display for CloudError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
CloudError::AuthError => {
write!(f, "cloud auth error (401/403) — API key invalid or revoked")
}
CloudError::RateLimit { retry_after_secs } => {
write!(
f,
"cloud rate limit (429) — retry after {retry_after_secs}s"
)
}
CloudError::ServerError => write!(f, "cloud server error (5xx)"),
CloudError::Network => write!(f, "cloud network error — endpoint unreachable"),
CloudError::ClientError(code) => write!(f, "cloud client error ({code})"),
}
}
}
#[derive(Debug, Clone)]
pub struct CloudState {
pub auth_error: Arc<AtomicBool>,
pub forwarded_count: Arc<AtomicU64>,
pub last_sync_secs: Arc<AtomicU64>,
pub drop_count: Arc<AtomicU64>,
pub consecutive_drops: Arc<AtomicU64>,
pub no_credential: Arc<AtomicBool>,
}
impl CloudState {
pub fn new() -> Self {
Self {
auth_error: Arc::new(AtomicBool::new(false)),
forwarded_count: Arc::new(AtomicU64::new(0)),
last_sync_secs: Arc::new(AtomicU64::new(0)),
drop_count: Arc::new(AtomicU64::new(0)),
consecutive_drops: Arc::new(AtomicU64::new(0)),
no_credential: Arc::new(AtomicBool::new(true)),
}
}
pub fn is_no_credential(&self) -> bool {
self.no_credential.load(Ordering::Relaxed)
}
pub fn set_no_credential(&self, missing: bool) {
self.no_credential.store(missing, Ordering::Relaxed);
}
pub fn is_auth_error(&self) -> bool {
self.auth_error.load(Ordering::Relaxed)
}
pub fn record_successful_forward(&self) {
use std::time::{SystemTime, UNIX_EPOCH};
self.forwarded_count.fetch_add(1, Ordering::Relaxed);
let epoch_secs = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
self.last_sync_secs.store(epoch_secs, Ordering::Relaxed);
self.consecutive_drops.store(0, Ordering::Relaxed);
}
pub fn record_health_ok(&self) {
self.consecutive_drops.store(0, Ordering::Relaxed);
}
pub fn forwarded_count(&self) -> u64 {
self.forwarded_count.load(Ordering::Relaxed)
}
pub fn last_sync_secs(&self) -> u64 {
self.last_sync_secs.load(Ordering::Relaxed)
}
pub fn record_drop(&self) {
self.drop_count.fetch_add(1, Ordering::Relaxed);
self.consecutive_drops.fetch_add(1, Ordering::Relaxed);
}
pub fn drop_count(&self) -> u64 {
self.drop_count.load(Ordering::Relaxed)
}
pub fn consecutive_drops(&self) -> u64 {
self.consecutive_drops.load(Ordering::Relaxed)
}
}
impl Default for CloudState {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_cloud_config_default_produces_expected_values() {
let cfg = CloudConfig::default();
assert_eq!(cfg.api_url, "https://app.openlatch.ai/api");
assert_eq!(cfg.timeout_connect_ms, 5000);
assert_eq!(cfg.timeout_total_ms, 30000);
assert_eq!(cfg.channel_size, 1000);
assert_eq!(cfg.retry_delay_ms, 2000);
assert_eq!(cfg.retry_count, 1);
assert_eq!(cfg.rate_limit_default_secs, 30);
assert_eq!(cfg.credential_poll_interval_ms, 60_000);
}
#[test]
fn test_cloud_error_variants_have_display() {
let e = CloudError::AuthError;
assert!(format!("{e}").contains("auth error"));
let e = CloudError::RateLimit {
retry_after_secs: 42,
};
let s = format!("{e}");
assert!(s.contains("rate limit") && s.contains("42"));
let e = CloudError::ServerError;
assert!(format!("{e}").contains("server error"));
let e = CloudError::Network;
assert!(format!("{e}").contains("network"));
let e = CloudError::ClientError(404);
let s = format!("{e}");
assert!(s.contains("404"));
}
#[test]
fn test_cloud_state_defaults_to_no_auth_error() {
let state = CloudState::new();
assert!(!state.is_auth_error());
}
#[test]
fn test_cloud_state_reflects_atomic_transitions() {
let state = CloudState::new();
state.auth_error.store(true, Ordering::Relaxed);
assert!(state.is_auth_error());
state.auth_error.store(false, Ordering::Relaxed);
assert!(!state.is_auth_error());
}
#[test]
fn test_cloud_state_no_credential_defaults_true_and_toggles() {
let state = CloudState::new();
assert!(
state.is_no_credential(),
"new CloudState must start in no_credential state until the worker polls"
);
state.set_no_credential(false);
assert!(!state.is_no_credential());
state.set_no_credential(true);
assert!(state.is_no_credential());
}
#[test]
fn test_cloud_event_fields_accessible() {
let evt = CloudEvent {
envelope: serde_json::json!({"id": "evt_123"}),
agent_id: "agt_abc".to_string(),
};
assert_eq!(evt.agent_id, "agt_abc");
assert_eq!(evt.envelope["id"], "evt_123");
}
#[test]
fn test_cloud_state_new_initializes_forwarded_count_to_zero() {
let state = CloudState::new();
assert_eq!(state.forwarded_count(), 0);
}
#[test]
fn test_cloud_state_new_initializes_last_sync_secs_to_zero() {
let state = CloudState::new();
assert_eq!(state.last_sync_secs(), 0);
}
#[test]
fn test_cloud_state_record_successful_forward_increments_count() {
let state = CloudState::new();
state.record_successful_forward();
assert_eq!(state.forwarded_count(), 1);
state.record_successful_forward();
assert_eq!(state.forwarded_count(), 2);
}
#[test]
fn test_cloud_state_record_successful_forward_sets_last_sync_secs() {
use std::time::{SystemTime, UNIX_EPOCH};
let before = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
let state = CloudState::new();
state.record_successful_forward();
let after = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
let recorded = state.last_sync_secs();
assert!(
recorded >= before,
"last_sync_secs should be >= before: {recorded} < {before}"
);
assert!(
recorded <= after,
"last_sync_secs should be <= after: {recorded} > {after}"
);
}
}