astrid-events 0.1.1

Event bus for Astrid secure agent runtime
Documentation
//! Cross-boundary IPC message schemas and payloads.

use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use serde_json::Value;
use uuid::Uuid;

/// A cross-boundary message sent over the event bus between WASM guests and the host.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct IpcMessage {
    /// Topic pattern or exact match (e.g., `astrid.cli.input`).
    pub topic: String,
    /// Standardized payload structure.
    pub payload: IpcPayload,
    /// Optional cryptographic signature for stateless verification across a distributed swarm.
    pub signature: Option<Vec<u8>>,
    /// Identifier of the sender plugin or agent.
    pub source_id: Uuid,
    /// Timestamp when the message was dispatched.
    pub timestamp: DateTime<Utc>,
}

impl IpcMessage {
    /// Create a new IPC message.
    #[must_use]
    pub fn new(topic: impl Into<String>, payload: IpcPayload, source_id: Uuid) -> Self {
        Self {
            topic: topic.into(),
            payload,
            signature: None,
            source_id,
            timestamp: Utc::now(),
        }
    }

    /// Attach a signature for swarm verification.
    #[must_use]
    pub fn with_signature(mut self, signature: Vec<u8>) -> Self {
        self.signature = Some(signature);
        self
    }
}

/// Standardized cross-boundary payload schemas.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum IpcPayload {
    /// User input provided via a frontend (CLI, Telegram).
    UserInput {
        /// The raw text input.
        text: String,
        /// Optional extra context.
        #[serde(default, skip_serializing_if = "Option::is_none")]
        context: Option<Value>,
    },
    /// A response generated by an agent.
    AgentResponse {
        /// The text output.
        text: String,
        /// True if this is the final response in a chain.
        is_final: bool,
    },
    /// An interceptor request for capability approval.
    ApprovalRequired {
        /// The action being requested.
        action: String,
        /// The resource target.
        resource: String,
        /// Justification.
        reason: String,
    },
    /// Arbitrary JSON data for unstructured plugins.
    Custom {
        /// Raw data.
        data: Value,
    },
}

/// Errors that can occur when checking IPC quota.
#[derive(Debug, thiserror::Error, PartialEq, Eq)]
pub enum QuotaError {
    /// The plugin has exceeded its rate limit.
    #[error("Rate limit exceeded")]
    RateLimited,
    /// The payload exceeds the maximum allowed size.
    #[error("Payload too large")]
    PayloadTooLarge,
}

/// Simple token-bucket rate limiter for IPC publish events.
#[derive(Debug)]
pub struct IpcRateLimiter {
    state: dashmap::DashMap<Uuid, (std::time::Instant, usize)>,
    last_prune: std::sync::Mutex<std::time::Instant>,
}

impl IpcRateLimiter {
    /// Create a new IPC rate limiter.
    #[must_use]
    pub fn new() -> Self {
        Self {
            state: dashmap::DashMap::new(),
            last_prune: std::sync::Mutex::new(std::time::Instant::now()),
        }
    }

    /// Check if a plugin (`source_id`) is allowed to publish a payload of `size_bytes`.
    ///
    /// # Errors
    ///
    /// Returns a `QuotaError` if rate-limited or if the payload is too large.
    #[allow(clippy::collapsible_if)]
    pub fn check_quota(&self, source_id: Uuid, size_bytes: usize) -> Result<(), QuotaError> {
        // Hard limit on payload size to prevent OOM
        if size_bytes > 5 * 1024 * 1024 {
            return Err(QuotaError::PayloadTooLarge);
        }

        let now = std::time::Instant::now();

        // Lazy prune stale entries to prevent memory leaks when shared globally.
        // Debounce pruning to at most once per minute to avoid O(N) locking contention.
        if self.state.len() > 1000 {
            if let Ok(mut last) = self.last_prune.try_lock() {
                if now.saturating_duration_since(*last).as_secs() > 60 {
                    *last = now;
                    self.state
                        .retain(|_, v| now.saturating_duration_since(v.0).as_secs() < 1);
                }
            }
        }

        let mut entry = self.state.entry(source_id).or_insert((now, 0));

        // Reset window if more than 1 second has passed
        if now.saturating_duration_since(entry.0).as_secs() >= 1 {
            entry.0 = now;
            entry.1 = 0;
        }

        // Hard limit on total bytes per second (10MB)
        if entry.1.saturating_add(size_bytes) > 10 * 1024 * 1024 {
            return Err(QuotaError::RateLimited);
        }

        entry.1 = entry.1.saturating_add(size_bytes);

        Ok(())
    }
}

impl Default for IpcRateLimiter {
    fn default() -> Self {
        Self::new()
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_ipc_rate_limiter_size() {
        let limiter = IpcRateLimiter::new();
        let source_id = Uuid::new_v4();

        // 1 MB is fine
        assert_eq!(limiter.check_quota(source_id, 1024 * 1024), Ok(()));

        // 6 MB is rejected (-2 for payload too large)
        assert_eq!(
            limiter.check_quota(source_id, 6 * 1024 * 1024),
            Err(QuotaError::PayloadTooLarge)
        );
    }

    #[test]
    fn test_ipc_rate_limiter_frequency() {
        let limiter = IpcRateLimiter::new();
        let source_id = Uuid::new_v4();

        // First 4 MB is fine
        assert_eq!(limiter.check_quota(source_id, 4 * 1024 * 1024), Ok(()));

        // Second 4 MB is fine (8 MB total in < 1 sec)
        assert_eq!(limiter.check_quota(source_id, 4 * 1024 * 1024), Ok(()));

        // Third 4 MB is rejected (12 MB total > 10MB limit) -> -1 for rate-limited
        assert_eq!(
            limiter.check_quota(source_id, 4 * 1024 * 1024),
            Err(QuotaError::RateLimited)
        );
    }

    #[test]
    fn test_ipc_message_signature() {
        let msg = IpcMessage::new(
            "test.topic",
            IpcPayload::AgentResponse {
                text: "hello".into(),
                is_final: true,
            },
            Uuid::new_v4(),
        );
        assert!(msg.signature.is_none());

        let signed = msg.with_signature(vec![1, 2, 3]);
        assert_eq!(signed.signature, Some(vec![1, 2, 3]));
    }
}