camel_function/protocol/
client.rs1use super::*;
2use crate::provider::{HealthReport, ProviderError};
3use camel_api::function::*;
4use std::time::Duration;
5
6pub struct ProtocolClient {
7 http: reqwest::Client,
8}
9
10impl Default for ProtocolClient {
11 fn default() -> Self {
12 Self::new()
13 }
14}
15
16impl ProtocolClient {
17 pub fn new() -> Self {
18 Self {
19 http: reqwest::Client::builder()
20 .timeout(Duration::from_secs(30))
21 .build()
22 .expect("reqwest client"),
23 }
24 }
25
26 pub async fn health(&self, endpoint: &str) -> Result<HealthReport, ProviderError> {
27 let url = format!("{}/health", endpoint);
28 let resp = self
29 .http
30 .get(&url)
31 .send()
32 .await
33 .map_err(|e| ProviderError::HealthFailed(format!("health request failed: {e}")))?;
34 if resp.status().is_success() {
35 let health: HealthResponse = resp
36 .json()
37 .await
38 .map_err(|e| ProviderError::HealthFailed(format!("decode health response: {e}")))?;
39 if health.status == "ready" {
40 Ok(HealthReport::Healthy)
41 } else {
42 Ok(HealthReport::Unhealthy(health.status))
43 }
44 } else {
45 Ok(HealthReport::Unhealthy(format!("status {}", resp.status())))
46 }
47 }
48
49 pub async fn register(
50 &self,
51 endpoint: &str,
52 def: &FunctionDefinition,
53 ) -> Result<(), ProviderError> {
54 let url = format!("{}/register", endpoint);
55 let body = RegisterRequest {
56 function_id: def.id.0.clone(),
57 runtime: def.runtime.clone(),
58 source: def.source.clone(),
59 timeout_ms: def.timeout_ms,
60 };
61 let resp =
62 self.http.post(&url).json(&body).send().await.map_err(|e| {
63 ProviderError::RegisterFailed(format!("register request failed: {e}"))
64 })?;
65 if resp.status().is_success() {
66 Ok(())
67 } else {
68 let err_resp: ErrorResponse = resp.json().await.map_err(|e| {
69 ProviderError::RegisterFailed(format!("decode register error: {e}"))
70 })?;
71 Err(ProviderError::RegisterFailed(format!(
72 "{}: {}",
73 err_resp.kind, err_resp.error
74 )))
75 }
76 }
77
78 pub async fn invoke(
79 &self,
80 endpoint: &str,
81 id: &FunctionId,
82 exchange: &camel_api::Exchange,
83 timeout: Duration,
84 ) -> Result<InvokeResponse, ProviderError> {
85 let url = format!("{}/invoke", endpoint);
86 let wire = ExchangeWire {
87 function_id: id.0.clone(),
88 correlation_id: exchange.correlation_id.clone(),
89 body: BodyWire::from_body(&exchange.input.body),
90 headers: exchange.input.headers.clone(),
91 properties: exchange.properties.clone(),
92 timeout_ms: timeout.as_millis() as u64,
93 };
94 let resp = self
95 .http
96 .post(&url)
97 .json(&wire)
98 .timeout(timeout)
99 .send()
100 .await
101 .map_err(|e| ProviderError::InvokeFailed(format!("invoke request failed: {e}")))?;
102 let invoke_resp: InvokeResponse = resp
103 .json()
104 .await
105 .map_err(|e| ProviderError::InvokeFailed(format!("decode invoke response: {e}")))?;
106 Ok(invoke_resp)
107 }
108
109 pub async fn unregister(&self, endpoint: &str, id: &FunctionId) -> Result<(), ProviderError> {
110 let url = format!("{}/unregister", endpoint);
111 let body = serde_json::json!({ "function_id": id.0.as_str() });
112 let resp = self.http.post(&url).json(&body).send().await.map_err(|e| {
113 ProviderError::UnregisterFailed(format!("unregister request failed: {e}"))
114 })?;
115 if resp.status().is_success() {
116 Ok(())
117 } else {
118 Err(ProviderError::UnregisterFailed(format!(
119 "status {}",
120 resp.status()
121 )))
122 }
123 }
124
125 pub async fn shutdown(&self, endpoint: &str) -> Result<(), ProviderError> {
126 let url = format!("{}/shutdown", endpoint);
127 let resp =
128 self.http.post(&url).send().await.map_err(|e| {
129 ProviderError::ShutdownFailed(format!("shutdown request failed: {e}"))
130 })?;
131 if resp.status().is_success() {
132 Ok(())
133 } else {
134 Err(ProviderError::ShutdownFailed(format!(
135 "status {}",
136 resp.status()
137 )))
138 }
139 }
140}