Skip to main content

camel_function/protocol/
client.rs

1use super::*;
2use crate::provider::{HealthReport, ProviderError};
3use camel_api::function::*;
4use std::time::Duration;
5
6pub struct ProtocolClient {
7    http: reqwest::Client,
8}
9
10impl Default for ProtocolClient {
11    fn default() -> Self {
12        Self::new()
13    }
14}
15
16impl ProtocolClient {
17    pub fn new() -> Self {
18        Self {
19            http: reqwest::Client::builder()
20                .timeout(Duration::from_secs(30))
21                .build()
22                .expect("reqwest client"),
23        }
24    }
25
26    pub async fn health(&self, endpoint: &str) -> Result<HealthReport, ProviderError> {
27        let url = format!("{}/health", endpoint);
28        let resp = self
29            .http
30            .get(&url)
31            .send()
32            .await
33            .map_err(|e| ProviderError::HealthFailed(format!("health request failed: {e}")))?;
34        if resp.status().is_success() {
35            let health: HealthResponse = resp
36                .json()
37                .await
38                .map_err(|e| ProviderError::HealthFailed(format!("decode health response: {e}")))?;
39            if health.status == "ready" {
40                Ok(HealthReport::Healthy)
41            } else {
42                Ok(HealthReport::Unhealthy(health.status))
43            }
44        } else {
45            Ok(HealthReport::Unhealthy(format!("status {}", resp.status())))
46        }
47    }
48
49    pub async fn register(
50        &self,
51        endpoint: &str,
52        def: &FunctionDefinition,
53    ) -> Result<(), ProviderError> {
54        let url = format!("{}/register", endpoint);
55        let body = RegisterRequest {
56            function_id: def.id.0.clone(),
57            runtime: def.runtime.clone(),
58            source: def.source.clone(),
59            timeout_ms: def.timeout_ms,
60        };
61        let resp =
62            self.http.post(&url).json(&body).send().await.map_err(|e| {
63                ProviderError::RegisterFailed(format!("register request failed: {e}"))
64            })?;
65        if resp.status().is_success() {
66            Ok(())
67        } else {
68            let err_resp: ErrorResponse = resp.json().await.map_err(|e| {
69                ProviderError::RegisterFailed(format!("decode register error: {e}"))
70            })?;
71            Err(ProviderError::RegisterFailed(format!(
72                "{}: {}",
73                err_resp.kind, err_resp.error
74            )))
75        }
76    }
77
78    pub async fn invoke(
79        &self,
80        endpoint: &str,
81        id: &FunctionId,
82        exchange: &camel_api::Exchange,
83        timeout: Duration,
84    ) -> Result<InvokeResponse, ProviderError> {
85        let url = format!("{}/invoke", endpoint);
86        let wire = ExchangeWire {
87            function_id: id.0.clone(),
88            correlation_id: exchange.correlation_id.clone(),
89            body: BodyWire::from_body(&exchange.input.body),
90            headers: exchange.input.headers.clone(),
91            properties: exchange.properties.clone(),
92            timeout_ms: timeout.as_millis() as u64,
93        };
94        let resp = self
95            .http
96            .post(&url)
97            .json(&wire)
98            .timeout(timeout)
99            .send()
100            .await
101            .map_err(|e| ProviderError::InvokeFailed(format!("invoke request failed: {e}")))?;
102        let invoke_resp: InvokeResponse = resp
103            .json()
104            .await
105            .map_err(|e| ProviderError::InvokeFailed(format!("decode invoke response: {e}")))?;
106        Ok(invoke_resp)
107    }
108
109    pub async fn unregister(&self, endpoint: &str, id: &FunctionId) -> Result<(), ProviderError> {
110        let url = format!("{}/unregister", endpoint);
111        let body = serde_json::json!({ "function_id": id.0.as_str() });
112        let resp = self.http.post(&url).json(&body).send().await.map_err(|e| {
113            ProviderError::UnregisterFailed(format!("unregister request failed: {e}"))
114        })?;
115        if resp.status().is_success() {
116            Ok(())
117        } else {
118            Err(ProviderError::UnregisterFailed(format!(
119                "status {}",
120                resp.status()
121            )))
122        }
123    }
124
125    pub async fn shutdown(&self, endpoint: &str) -> Result<(), ProviderError> {
126        let url = format!("{}/shutdown", endpoint);
127        let resp =
128            self.http.post(&url).send().await.map_err(|e| {
129                ProviderError::ShutdownFailed(format!("shutdown request failed: {e}"))
130            })?;
131        if resp.status().is_success() {
132            Ok(())
133        } else {
134            Err(ProviderError::ShutdownFailed(format!(
135                "status {}",
136                resp.status()
137            )))
138        }
139    }
140}