camel-function 0.9.0

Function runtime service for out-of-process function execution
Documentation
use super::*;
use crate::provider::{HealthReport, ProviderError};
use camel_api::function::*;
use std::time::Duration;

pub struct ProtocolClient {
    http: reqwest::Client,
}

impl Default for ProtocolClient {
    fn default() -> Self {
        Self::new()
    }
}

impl ProtocolClient {
    pub fn new() -> Self {
        Self {
            http: reqwest::Client::builder()
                .timeout(Duration::from_secs(30))
                .build()
                .expect("reqwest client"),
        }
    }

    pub async fn health(&self, endpoint: &str) -> Result<HealthReport, ProviderError> {
        let url = format!("{}/health", endpoint);
        let resp = self
            .http
            .get(&url)
            .send()
            .await
            .map_err(|e| ProviderError::HealthFailed(format!("health request failed: {e}")))?;
        if resp.status().is_success() {
            let health: HealthResponse = resp
                .json()
                .await
                .map_err(|e| ProviderError::HealthFailed(format!("decode health response: {e}")))?;
            if health.status == "ready" {
                Ok(HealthReport::Healthy)
            } else {
                Ok(HealthReport::Unhealthy(health.status))
            }
        } else {
            Ok(HealthReport::Unhealthy(format!("status {}", resp.status())))
        }
    }

    pub async fn register(
        &self,
        endpoint: &str,
        def: &FunctionDefinition,
    ) -> Result<(), ProviderError> {
        let url = format!("{}/register", endpoint);
        let body = RegisterRequest {
            function_id: def.id.0.clone(),
            runtime: def.runtime.clone(),
            source: def.source.clone(),
            timeout_ms: def.timeout_ms,
        };
        let resp =
            self.http.post(&url).json(&body).send().await.map_err(|e| {
                ProviderError::RegisterFailed(format!("register request failed: {e}"))
            })?;
        if resp.status().is_success() {
            Ok(())
        } else {
            let err_resp: ErrorResponse = resp.json().await.map_err(|e| {
                ProviderError::RegisterFailed(format!("decode register error: {e}"))
            })?;
            Err(ProviderError::RegisterFailed(format!(
                "{}: {}",
                err_resp.kind, err_resp.error
            )))
        }
    }

    pub async fn invoke(
        &self,
        endpoint: &str,
        id: &FunctionId,
        exchange: &camel_api::Exchange,
        timeout: Duration,
    ) -> Result<InvokeResponse, ProviderError> {
        let url = format!("{}/invoke", endpoint);
        let wire = ExchangeWire {
            function_id: id.0.clone(),
            correlation_id: exchange.correlation_id.clone(),
            body: BodyWire::from_body(&exchange.input.body),
            headers: exchange.input.headers.clone(),
            properties: exchange.properties.clone(),
            timeout_ms: timeout.as_millis() as u64,
        };
        let resp = self
            .http
            .post(&url)
            .json(&wire)
            .timeout(timeout)
            .send()
            .await
            .map_err(|e| ProviderError::InvokeFailed(format!("invoke request failed: {e}")))?;
        let invoke_resp: InvokeResponse = resp
            .json()
            .await
            .map_err(|e| ProviderError::InvokeFailed(format!("decode invoke response: {e}")))?;
        Ok(invoke_resp)
    }

    pub async fn unregister(&self, endpoint: &str, id: &FunctionId) -> Result<(), ProviderError> {
        let url = format!("{}/unregister", endpoint);
        let body = serde_json::json!({ "function_id": id.0.as_str() });
        let resp = self.http.post(&url).json(&body).send().await.map_err(|e| {
            ProviderError::UnregisterFailed(format!("unregister request failed: {e}"))
        })?;
        if resp.status().is_success() {
            Ok(())
        } else {
            Err(ProviderError::UnregisterFailed(format!(
                "status {}",
                resp.status()
            )))
        }
    }

    pub async fn shutdown(&self, endpoint: &str) -> Result<(), ProviderError> {
        let url = format!("{}/shutdown", endpoint);
        let resp =
            self.http.post(&url).send().await.map_err(|e| {
                ProviderError::ShutdownFailed(format!("shutdown request failed: {e}"))
            })?;
        if resp.status().is_success() {
            Ok(())
        } else {
            Err(ProviderError::ShutdownFailed(format!(
                "status {}",
                resp.status()
            )))
        }
    }
}