Skip to main content

camel_function/protocol/
client.rs

1use super::*;
2use crate::provider::{FunctionHealthStatus, ProviderError};
3use camel_api::function::*;
4use std::time::Duration;
5
6pub struct ProtocolClient {
7    http: reqwest::Client,
8}
9
10impl Default for ProtocolClient {
11    fn default() -> Self {
12        Self::new()
13    }
14}
15
16impl ProtocolClient {
17    pub fn new() -> Self {
18        Self {
19            http: reqwest::Client::builder()
20                .timeout(Duration::from_secs(30))
21                .build()
22                .expect("reqwest client"), // allow-unwrap
23        }
24    }
25
26    pub async fn health(&self, endpoint: &str) -> Result<FunctionHealthStatus, ProviderError> {
27        let url = format!("{}/health", endpoint);
28        let resp = self
29            .http
30            .get(&url)
31            .send()
32            .await
33            .map_err(|e| ProviderError::HealthFailed(format!("health request failed: {e}")))?;
34        if resp.status().is_success() {
35            let health: HealthResponse = resp
36                .json()
37                .await
38                .map_err(|e| ProviderError::HealthFailed(format!("decode health response: {e}")))?;
39            if health.status == "ready" {
40                Ok(FunctionHealthStatus::Healthy)
41            } else {
42                Ok(FunctionHealthStatus::Unhealthy(health.status))
43            }
44        } else {
45            Ok(FunctionHealthStatus::Unhealthy(format!(
46                "status {}",
47                resp.status()
48            )))
49        }
50    }
51
52    pub async fn register(
53        &self,
54        endpoint: &str,
55        def: &FunctionDefinition,
56    ) -> Result<(), ProviderError> {
57        let url = format!("{}/register", endpoint);
58        let body = RegisterRequest {
59            function_id: def.id.0.clone(),
60            runtime: def.runtime.clone(),
61            source: def.source.clone(),
62            timeout_ms: def.timeout_ms,
63        };
64        let resp =
65            self.http.post(&url).json(&body).send().await.map_err(|e| {
66                ProviderError::RegisterFailed(format!("register request failed: {e}"))
67            })?;
68        if resp.status().is_success() {
69            Ok(())
70        } else {
71            let err_resp: ErrorResponse = resp.json().await.map_err(|e| {
72                ProviderError::RegisterFailed(format!("decode register error: {e}"))
73            })?;
74            Err(ProviderError::RegisterFailed(format!(
75                "{}: {}",
76                err_resp.kind, err_resp.error
77            )))
78        }
79    }
80
81    pub async fn invoke(
82        &self,
83        endpoint: &str,
84        id: &FunctionId,
85        exchange: &camel_api::Exchange,
86        timeout: Duration,
87    ) -> Result<InvokeResponse, ProviderError> {
88        let url = format!("{}/invoke", endpoint);
89        let wire = ExchangeWire {
90            function_id: id.0.clone(),
91            correlation_id: exchange.correlation_id.clone(),
92            body: BodyWire::from_body(&exchange.input.body),
93            headers: exchange.input.headers.clone(),
94            properties: exchange.properties.clone(),
95            timeout_ms: timeout.as_millis() as u64,
96        };
97        let resp = self
98            .http
99            .post(&url)
100            .json(&wire)
101            .timeout(timeout)
102            .send()
103            .await
104            .map_err(|e| ProviderError::InvokeFailed(format!("invoke request failed: {e}")))?;
105        let invoke_resp: InvokeResponse = resp
106            .json()
107            .await
108            .map_err(|e| ProviderError::InvokeFailed(format!("decode invoke response: {e}")))?;
109        Ok(invoke_resp)
110    }
111
112    pub async fn unregister(&self, endpoint: &str, id: &FunctionId) -> Result<(), ProviderError> {
113        let url = format!("{}/unregister", endpoint);
114        let body = serde_json::json!({ "function_id": id.0.as_str() });
115        let resp = self.http.post(&url).json(&body).send().await.map_err(|e| {
116            ProviderError::UnregisterFailed(format!("unregister request failed: {e}"))
117        })?;
118        if resp.status().is_success() {
119            Ok(())
120        } else {
121            Err(ProviderError::UnregisterFailed(format!(
122                "status {}",
123                resp.status()
124            )))
125        }
126    }
127
128    pub async fn shutdown(&self, endpoint: &str) -> Result<(), ProviderError> {
129        let url = format!("{}/shutdown", endpoint);
130        let resp =
131            self.http.post(&url).send().await.map_err(|e| {
132                ProviderError::ShutdownFailed(format!("shutdown request failed: {e}"))
133            })?;
134        if resp.status().is_success() {
135            Ok(())
136        } else {
137            Err(ProviderError::ShutdownFailed(format!(
138                "status {}",
139                resp.status()
140            )))
141        }
142    }
143}