openlatch-client 0.1.13

The open-source security layer for AI agents — client forwarder
//! Cloud event forwarding — types, configuration, and module declarations.
//!
//! This is a strict leaf module. It MUST NOT import from `daemon/`, `cli/`, or
//! any other `core/` sibling module. Only external crates and std are allowed.
//!
//! The cloud forwarding pipeline:
//! 1. Daemon handler calls `try_send(CloudEvent)` into an mpsc channel (non-blocking)
//! 2. `worker::run_cloud_worker` consumes events and POSTs to the cloud API
//! 3. Auth state is persisted to `~/.openlatch/cloud_state.json` for cross-process visibility

pub mod envelope;
pub mod tamper;
pub mod worker;

use std::sync::{
    atomic::{AtomicBool, AtomicU64, Ordering},
    Arc,
};

use secrecy::SecretString;

// ---------------------------------------------------------------------------
// Credential provider abstraction
// ---------------------------------------------------------------------------

/// Minimal credential provider trait for the cloud worker.
///
/// The worker accepts `Arc<dyn CredentialProvider>` to obtain the API key.
/// In production, a `KeyringCredentialStore` adapter implements this trait.
/// In tests, `TestCredentialProvider` provides a fixed key.
///
/// Note: This mirrors `core::auth::CredentialStore::retrieve()` but is defined
/// here to preserve the leaf-module constraint (cloud/ must not import auth/).
/// The daemon bridges between `auth::CredentialStore` and this trait.
pub trait CredentialProvider: Send + Sync {
    /// Retrieve the current API key. Returns `None` if no credential is available.
    fn retrieve(&self) -> Option<SecretString>;
}

// ---------------------------------------------------------------------------
// CloudConfig — cloud worker configuration
// ---------------------------------------------------------------------------

/// Runtime configuration for the cloud forwarding worker.
///
/// Constructed by the daemon from `config::CloudConfig` fields.
/// Defined here (not in config/) to satisfy the leaf-module constraint.
#[derive(Debug, Clone)]
pub struct CloudConfig {
    /// Cloud API base URL. Default: "https://app.openlatch.ai/api".
    pub api_url: String,
    /// TCP connect timeout in milliseconds. Default: 5000.
    pub timeout_connect_ms: u64,
    /// Total request timeout in milliseconds. Default: 30000.
    pub timeout_total_ms: u64,
    /// Number of retries on network error or 5xx (0 = no retry). Default: 1.
    pub retry_count: u32,
    /// Delay between retries in milliseconds. Default: 2000.
    pub retry_delay_ms: u64,
    /// Bounded channel size for async event forwarding. Default: 1000.
    pub channel_size: usize,
    /// Default backoff in seconds when Retry-After header is absent. Default: 30.
    pub rate_limit_default_secs: u64,
    /// How often the worker re-reads the credential from the provider. Default:
    /// 60_000 ms (60 s). Overridden to a small value in unit tests so reload
    /// behavior can be exercised without a real-time wait.
    pub credential_poll_interval_ms: u64,
}

impl Default for CloudConfig {
    fn default() -> Self {
        Self {
            api_url: "https://app.openlatch.ai/api".into(),
            timeout_connect_ms: 5000,
            timeout_total_ms: 30000,
            retry_count: 1,
            retry_delay_ms: 2000,
            channel_size: 1000,
            rate_limit_default_secs: 30,
            credential_poll_interval_ms: 60_000,
        }
    }
}

// ---------------------------------------------------------------------------
// CloudEvent — data passed through the mpsc channel
// ---------------------------------------------------------------------------

/// An event forwarded through the cloud mpsc channel.
///
/// Created by the daemon handler after each verdict and sent via `try_send()`.
/// The envelope is a CloudEvents v1.0.2 structured-mode object (JSON-serialized)
/// — the raw agent payload is already embedded under `envelope.data`, and
/// OpenLatch metadata lives on the envelope as CloudEvents extension attrs.
/// `agent_id` is attached by the worker as the `agentid` extension
/// attribute just before POSTing.
#[derive(Debug, Clone)]
pub struct CloudEvent {
    /// Serialized `EventEnvelope` as a JSON value (already privacy-filtered).
    pub envelope: serde_json::Value,
    /// Agent install identifier from config (`agt_<uuid>`). Empty if not yet
    /// initialized. Stamped by the worker as the CloudEvents `agentid`
    /// extension attribute.
    pub agent_id: String,
}

// ---------------------------------------------------------------------------
// CloudError — errors that can occur during cloud forwarding
// ---------------------------------------------------------------------------

/// Errors returned by the cloud forwarding worker.
#[derive(Debug)]
pub enum CloudError {
    /// Cloud returned 401 or 403 — API key is invalid or revoked.
    AuthError,
    /// Cloud returned 429 — rate limit exceeded; honor the Retry-After delay.
    RateLimit {
        /// Seconds to wait before retrying (parsed from Retry-After header or default).
        retry_after_secs: u64,
    },
    /// Cloud returned a 5xx server error.
    ServerError,
    /// Network error before receiving a response (DNS, connection refused, timeout).
    Network,
    /// Cloud returned an unexpected 4xx client error (not 401, 403, or 429).
    ClientError(u16),
}

impl std::fmt::Display for CloudError {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        match self {
            CloudError::AuthError => {
                write!(f, "cloud auth error (401/403) — API key invalid or revoked")
            }
            CloudError::RateLimit { retry_after_secs } => {
                write!(
                    f,
                    "cloud rate limit (429) — retry after {retry_after_secs}s"
                )
            }
            CloudError::ServerError => write!(f, "cloud server error (5xx)"),
            CloudError::Network => write!(f, "cloud network error — endpoint unreachable"),
            CloudError::ClientError(code) => write!(f, "cloud client error ({code})"),
        }
    }
}

// ---------------------------------------------------------------------------
// CloudState — shared in-process auth error flag
// ---------------------------------------------------------------------------

/// Shared cloud state for in-process visibility of auth error condition and forwarding metrics.
///
/// The `auth_error` atomic flag is read by `openlatch status` (via AppState)
/// to report whether the daemon's cloud forwarding is currently blocked by
/// an auth error. It is also persisted to `cloud_state.json` for cross-process
/// visibility (CLOUD-08).
///
/// `forwarded_count` and `last_sync_secs` are incremented by the worker on
/// each successful 2xx response and exposed via the `/metrics` endpoint.
/// `drop_count` is incremented when an event is dropped after all retries
/// are exhausted (network error, 5xx, or rate-limit retry failure).
#[derive(Debug, Clone)]
pub struct CloudState {
    /// True if the last cloud POST returned 401/403 and the worker is skipping POSTs.
    pub auth_error: Arc<AtomicBool>,
    /// Total number of events successfully forwarded to the cloud (2xx responses).
    pub forwarded_count: Arc<AtomicU64>,
    /// Unix epoch seconds of the last successful cloud forward (0 = never).
    pub last_sync_secs: Arc<AtomicU64>,
    /// Events silently dropped after all retries were exhausted (network/5xx/rate-limit).
    pub drop_count: Arc<AtomicU64>,
    /// Drops since the last successful forward or health check. Drives the live
    /// `cloud_status` classification in the metrics endpoint — unlike `drop_count`,
    /// this is reset to zero on any recovery signal.
    pub consecutive_drops: Arc<AtomicU64>,
    /// True when the cloud worker has no API key and is silently skipping POSTs.
    /// Distinct from `auth_error`: the key has never been loaded (e.g. user not
    /// authenticated), rather than the server rejecting one that was. Reported
    /// as `cloud_status: "no_credential"` so observers don't see a bare
    /// `"connected"` when nothing is actually being forwarded.
    pub no_credential: Arc<AtomicBool>,
}

impl CloudState {
    /// Create a new `CloudState` with all fields at their initial state.
    pub fn new() -> Self {
        Self {
            auth_error: Arc::new(AtomicBool::new(false)),
            forwarded_count: Arc::new(AtomicU64::new(0)),
            last_sync_secs: Arc::new(AtomicU64::new(0)),
            drop_count: Arc::new(AtomicU64::new(0)),
            consecutive_drops: Arc::new(AtomicU64::new(0)),
            // Start pessimistic: the worker flips this to `false` on its first
            // credential load so `/metrics` reports `no_credential` during the
            // startup window before the first poll has run.
            no_credential: Arc::new(AtomicBool::new(true)),
        }
    }

    /// Returns true if the worker currently has no API key.
    pub fn is_no_credential(&self) -> bool {
        self.no_credential.load(Ordering::Relaxed)
    }

    /// Set the `no_credential` flag — called by the worker whenever it polls
    /// the credential provider.
    pub fn set_no_credential(&self, missing: bool) {
        self.no_credential.store(missing, Ordering::Relaxed);
    }

    /// Returns true if there is an active auth error.
    pub fn is_auth_error(&self) -> bool {
        self.auth_error.load(Ordering::Relaxed)
    }

    /// Record a successful cloud forward: increments forwarded_count by 1 and
    /// stores the current Unix epoch seconds in last_sync_secs.
    pub fn record_successful_forward(&self) {
        use std::time::{SystemTime, UNIX_EPOCH};
        self.forwarded_count.fetch_add(1, Ordering::Relaxed);
        let epoch_secs = SystemTime::now()
            .duration_since(UNIX_EPOCH)
            .unwrap_or_default()
            .as_secs();
        self.last_sync_secs.store(epoch_secs, Ordering::Relaxed);
        self.consecutive_drops.store(0, Ordering::Relaxed);
    }

    /// Signal that a background health probe against the cloud succeeded.
    /// Clears the recent-drop streak so `cloud_status` can recover without
    /// waiting for the next hook event to arrive.
    pub fn record_health_ok(&self) {
        self.consecutive_drops.store(0, Ordering::Relaxed);
    }

    /// Returns the current number of successfully forwarded events.
    pub fn forwarded_count(&self) -> u64 {
        self.forwarded_count.load(Ordering::Relaxed)
    }

    /// Returns the Unix epoch seconds of the last successful forward (0 = never).
    pub fn last_sync_secs(&self) -> u64 {
        self.last_sync_secs.load(Ordering::Relaxed)
    }

    /// Increment the drop counter — called whenever an event is silently
    /// discarded after all retries are exhausted.
    pub fn record_drop(&self) {
        self.drop_count.fetch_add(1, Ordering::Relaxed);
        self.consecutive_drops.fetch_add(1, Ordering::Relaxed);
    }

    /// Returns the cumulative number of dropped events since daemon start.
    pub fn drop_count(&self) -> u64 {
        self.drop_count.load(Ordering::Relaxed)
    }

    /// Returns drops since the last successful forward or health probe.
    pub fn consecutive_drops(&self) -> u64 {
        self.consecutive_drops.load(Ordering::Relaxed)
    }
}

impl Default for CloudState {
    fn default() -> Self {
        Self::new()
    }
}

// ---------------------------------------------------------------------------
// Tests
// ---------------------------------------------------------------------------

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_cloud_config_default_produces_expected_values() {
        let cfg = CloudConfig::default();
        assert_eq!(cfg.api_url, "https://app.openlatch.ai/api");
        assert_eq!(cfg.timeout_connect_ms, 5000);
        assert_eq!(cfg.timeout_total_ms, 30000);
        assert_eq!(cfg.channel_size, 1000);
        assert_eq!(cfg.retry_delay_ms, 2000);
        assert_eq!(cfg.retry_count, 1);
        assert_eq!(cfg.rate_limit_default_secs, 30);
        assert_eq!(cfg.credential_poll_interval_ms, 60_000);
    }

    #[test]
    fn test_cloud_error_variants_have_display() {
        // AuthError
        let e = CloudError::AuthError;
        assert!(format!("{e}").contains("auth error"));

        // RateLimit
        let e = CloudError::RateLimit {
            retry_after_secs: 42,
        };
        let s = format!("{e}");
        assert!(s.contains("rate limit") && s.contains("42"));

        // ServerError
        let e = CloudError::ServerError;
        assert!(format!("{e}").contains("server error"));

        // Network
        let e = CloudError::Network;
        assert!(format!("{e}").contains("network"));

        // ClientError
        let e = CloudError::ClientError(404);
        let s = format!("{e}");
        assert!(s.contains("404"));
    }

    #[test]
    fn test_cloud_state_defaults_to_no_auth_error() {
        let state = CloudState::new();
        assert!(!state.is_auth_error());
    }

    #[test]
    fn test_cloud_state_reflects_atomic_transitions() {
        let state = CloudState::new();
        state.auth_error.store(true, Ordering::Relaxed);
        assert!(state.is_auth_error());
        state.auth_error.store(false, Ordering::Relaxed);
        assert!(!state.is_auth_error());
    }

    #[test]
    fn test_cloud_state_no_credential_defaults_true_and_toggles() {
        let state = CloudState::new();
        assert!(
            state.is_no_credential(),
            "new CloudState must start in no_credential state until the worker polls"
        );
        state.set_no_credential(false);
        assert!(!state.is_no_credential());
        state.set_no_credential(true);
        assert!(state.is_no_credential());
    }

    #[test]
    fn test_cloud_event_fields_accessible() {
        let evt = CloudEvent {
            envelope: serde_json::json!({"id": "evt_123"}),
            agent_id: "agt_abc".to_string(),
        };
        assert_eq!(evt.agent_id, "agt_abc");
        assert_eq!(evt.envelope["id"], "evt_123");
    }

    #[test]
    fn test_cloud_state_new_initializes_forwarded_count_to_zero() {
        let state = CloudState::new();
        assert_eq!(state.forwarded_count(), 0);
    }

    #[test]
    fn test_cloud_state_new_initializes_last_sync_secs_to_zero() {
        let state = CloudState::new();
        assert_eq!(state.last_sync_secs(), 0);
    }

    #[test]
    fn test_cloud_state_record_successful_forward_increments_count() {
        let state = CloudState::new();
        state.record_successful_forward();
        assert_eq!(state.forwarded_count(), 1);
        state.record_successful_forward();
        assert_eq!(state.forwarded_count(), 2);
    }

    #[test]
    fn test_cloud_state_record_successful_forward_sets_last_sync_secs() {
        use std::time::{SystemTime, UNIX_EPOCH};
        let before = SystemTime::now()
            .duration_since(UNIX_EPOCH)
            .unwrap_or_default()
            .as_secs();
        let state = CloudState::new();
        state.record_successful_forward();
        let after = SystemTime::now()
            .duration_since(UNIX_EPOCH)
            .unwrap_or_default()
            .as_secs();
        let recorded = state.last_sync_secs();
        assert!(
            recorded >= before,
            "last_sync_secs should be >= before: {recorded} < {before}"
        );
        assert!(
            recorded <= after,
            "last_sync_secs should be <= after: {recorded} > {after}"
        );
    }
}