eventdbx 3.19.4

Immutable, event-sourced, nosql, write-side database system.
Documentation
use std::{future::Future, result::Result as StdResult, time::Duration};

use reqwest::Client;
use serde_json::{Map, Value as JsonValue};
use tokio::{
    runtime::{Handle, Runtime},
    task::block_in_place,
};

use crate::{
    config::HttpPluginConfig,
    error::{EventError, Result},
};

use super::{Plugin, PluginDelivery};

pub(super) struct HttpPlugin {
    config: HttpPluginConfig,
    client: StdResult<Client, String>,
}

impl HttpPlugin {
    pub(super) fn new(config: HttpPluginConfig) -> Self {
        let client: StdResult<_, String> = Client::builder()
            .timeout(Duration::from_secs(5))
            .build()
            .map_err(|err| err.to_string());
        Self { config, client }
    }

    fn client(&self) -> Result<&Client> {
        self.client
            .as_ref()
            .map_err(|err| EventError::Config(format!("failed to build http client: {err}")))
    }

    fn run_with_client<F, Fut, T>(&self, operation: F) -> Result<T>
    where
        F: FnOnce(Client) -> Fut,
        Fut: Future<Output = Result<T>>,
    {
        if let Ok(handle) = Handle::try_current() {
            let client = self.client()?.clone();
            block_in_place(|| handle.block_on(operation(client)))
        } else {
            let client = self.client()?.clone();
            let runtime = Runtime::new().map_err(|err| {
                EventError::Storage(format!("failed to build http runtime: {err}"))
            })?;
            runtime.block_on(operation(client))
        }
    }

    fn resolved_endpoint(&self) -> String {
        let endpoint = self.config.endpoint.trim();
        if endpoint.starts_with("http://") || endpoint.starts_with("https://") {
            endpoint.to_string()
        } else if self.config.https {
            format!("https://{}", endpoint)
        } else {
            format!("http://{}", endpoint)
        }
    }

    pub(super) fn ensure_ready(&self) -> Result<()> {
        let resolved = self.resolved_endpoint();
        let headers: Vec<(String, String)> = self
            .config
            .headers
            .iter()
            .map(|(key, value)| (key.clone(), value.clone()))
            .collect();

        let response = self.run_with_client(|client| {
            let resolved = resolved.clone();
            let headers = headers.clone();
            async move {
                let mut request = client.head(&resolved);
                for (key, value) in headers {
                    request = request.header(&key, &value);
                }
                request
                    .send()
                    .await
                    .map_err(|err| EventError::Storage(err.to_string()))
            }
        })?;

        if !response.status().is_success() {
            return Err(EventError::Storage(format!(
                "http plugin readiness check failed ({})",
                response.status()
            )));
        }

        Ok(())
    }
}

impl Plugin for HttpPlugin {
    fn name(&self) -> &'static str {
        "http"
    }

    fn notify_event(&self, delivery: PluginDelivery<'_>) -> Result<()> {
        let resolved = self.resolved_endpoint();
        let headers: Vec<(String, String)> = self
            .config
            .headers
            .iter()
            .map(|(key, value)| (key.clone(), value.clone()))
            .collect();

        let mut body = Map::new();
        if let Some(record) = delivery.record {
            body.insert(
                "event".to_string(),
                serde_json::to_value(record).unwrap_or(JsonValue::Null),
            );
        }
        if let Some(state) = delivery.state {
            body.insert(
                "state".to_string(),
                serde_json::to_value(state).unwrap_or(JsonValue::Null),
            );
        }
        if let Some(schema) = delivery.schema {
            body.insert(
                "schema".to_string(),
                serde_json::to_value(schema).unwrap_or(JsonValue::Null),
            );
        }

        if body.is_empty() {
            return Ok(());
        }

        let payload = JsonValue::Object(body);

        self.run_with_client(|client| {
            let resolved = resolved.clone();
            let headers = headers.clone();
            let payload = payload.clone();
            async move {
                let mut request = client.post(&resolved);
                for (key, value) in headers {
                    request = request.header(&key, &value);
                }
                let response = request
                    .json(&payload)
                    .send()
                    .await
                    .map_err(|err| EventError::Storage(err.to_string()))?;

                if response.status().is_success() {
                    return Ok(());
                }

                let status = response.status();
                let body = response
                    .text()
                    .await
                    .map_err(|err| EventError::Storage(err.to_string()))?;
                let body = body.trim();
                let body = if body.is_empty() {
                    String::new()
                } else {
                    let mut truncated = String::new();
                    let mut chars = body.chars();
                    for _ in 0..256 {
                        if let Some(ch) = chars.next() {
                            truncated.push(ch);
                        } else {
                            break;
                        }
                    }
                    if chars.next().is_some() {
                        truncated.push('');
                    }
                    truncated
                };

                let mut message = format!("http plugin request failed ({status})");
                if !body.is_empty() {
                    message.push_str(": ");
                    message.push_str(&body);
                }
                Err(EventError::Storage(message))
            }
        })?;
        Ok(())
    }
}