stormchaser-model 0.1.0

A robust, distributed workflow engine for event-driven and human-triggered workflows.
Documentation
//! Authentication and authorization models and OPA client.

use anyhow::{Context, Result};
use async_trait::async_trait;
use reqwest_middleware::{ClientBuilder, ClientWithMiddleware};
use reqwest_retry::{policies::ExponentialBackoff, RetryTransientMiddleware};
use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::sync::Arc;
use tracing::debug;
use uuid::Uuid;

/// Extracted claims from a JWT token.
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct Claims {
    /// Subject (User ID) of the token.
    pub sub: String, // User ID
    /// Optional email address of the user.
    pub email: Option<String>,
    /// Expiration time as a Unix timestamp.
    pub exp: usize, // Expiration time
}

/// Trait for executing Open Policy Agent (OPA) policies compiled to WebAssembly.
#[async_trait]
pub trait OpaWasmExecutor: Send + Sync {
    /// Evaluates a WASM-compiled OPA policy against the given input.
    async fn evaluate(&self, entrypoint: &str, input: &Value) -> Result<bool>;
}

/// Client for interacting with an Open Policy Agent (OPA) server or WASM module.
#[derive(Clone)]
pub struct OpaClient {
    url: Option<String>,
    http_client: ClientWithMiddleware,
    wasm_executor: Option<Arc<dyn OpaWasmExecutor>>,
    entrypoint: String,
}

impl std::fmt::Debug for OpaClient {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        f.debug_struct("OpaClient")
            .field("url", &self.url)
            .field("wasm_configured", &self.wasm_executor.is_some())
            .field("entrypoint", &self.entrypoint)
            .finish()
    }
}

#[derive(Debug, Serialize)]
struct OpaInput<T> {
    input: T,
}

#[derive(Debug, Deserialize)]
struct OpaResponse {
    result: bool,
}

impl OpaClient {
    /// Creates a new OpaClient with the given URL and TLS configuration.
    pub fn new(url: Option<String>, tls_config: Option<Arc<rustls::ClientConfig>>) -> Self {
        let retry_policy = ExponentialBackoff::builder().build_with_max_retries(3);
        let mut builder = reqwest::Client::builder();

        if let Some(config) = tls_config {
            builder = builder.use_preconfigured_tls(config);
        }

        let http_client =
            ClientBuilder::new(builder.build().unwrap_or_else(|_| reqwest::Client::new()))
                .with(RetryTransientMiddleware::new_with_policy(retry_policy))
                .build();

        Self {
            url,
            http_client,
            wasm_executor: None,
            entrypoint: "stormchaser/allow".to_string(),
        }
    }

    /// Configures the client to use a WASM executor.
    pub fn with_wasm_executor(mut self, executor: Arc<dyn OpaWasmExecutor>) -> Self {
        self.wasm_executor = Some(executor);
        self
    }

    /// Sets the entrypoint for the OPA policy.
    pub fn with_entrypoint(mut self, entrypoint: String) -> Self {
        self.entrypoint = entrypoint;
        self
    }

    /// Returns true if the client is configured with either a URL or a WASM executor.
    pub fn is_configured(&self) -> bool {
        self.url.is_some() || self.wasm_executor.is_some()
    }

    /// Flexible check that sends any serializable context to OPA
    pub async fn check_context<T: Serialize>(&self, context: T) -> Result<bool> {
        // 1. Try WASM executor first if configured
        if let Some(executor) = &self.wasm_executor {
            let context_val = serde_json::to_value(&context)?;
            return executor.evaluate(&self.entrypoint, &context_val).await;
        }

        // 2. Fallback to HTTP OPA if configured
        let url = match &self.url {
            Some(url) => url,
            None => return Ok(true), // No-op if not configured
        };

        debug!("Checking OPA policy at {} with custom context", url);

        let input = OpaInput { input: context };

        let response = self
            .http_client
            .post(url)
            .json(&input)
            .send()
            .await
            .context("Failed to reach OPA server")?;

        if !response.status().is_success() {
            return Err(anyhow::anyhow!(
                "OPA server returned error: {}",
                response.status()
            ));
        }

        let opa_res: OpaResponse = response
            .json()
            .await
            .context("Failed to parse OPA response")?;

        Ok(opa_res.result)
    }
}

/// Trait for authorizing requests using an Open Policy Agent (OPA).
#[async_trait]
pub trait OpaAuthorizer: Send + Sync {
    /// Checks the given context against the OPA policy.
    async fn check(&self, context: ApiOpaContext<'_>) -> Result<bool>;
    /// Returns true if the authorizer is properly configured.
    fn is_configured(&self) -> bool;
}

#[async_trait]
impl OpaAuthorizer for OpaClient {
    async fn check(&self, context: ApiOpaContext<'_>) -> Result<bool> {
        self.check_context(context).await
    }
    fn is_configured(&self) -> bool {
        self.is_configured()
    }
}

/// Context for OPA checks in the API
#[derive(Debug, Serialize)]
pub struct ApiOpaContext<'a> {
    /// The requested path.
    pub path: &'a str,
    /// The HTTP method.
    pub method: &'a str,
    /// The optional authentication token.
    pub token: Option<&'a str>,
}

/// Context for OPA checks in the Engine after DSL parsing
#[derive(Debug, Serialize)]
pub struct EngineOpaContext {
    /// Associated workflow run ID.
    pub run_id: Uuid,
    /// Identifier of the user who initiated the run.
    pub initiating_user: String,
    /// Full parsed abstract syntax tree of the workflow.
    pub workflow_ast: Value,
    /// JSON inputs for the workflow run.
    pub inputs: Value,
}