camel_function/protocol/
client.rs1use super::*;
2use crate::provider::{FunctionHealthStatus, ProviderError};
3use camel_api::function::*;
4use std::time::Duration;
5
6pub struct ProtocolClient {
7 http: reqwest::Client,
8}
9
10impl Default for ProtocolClient {
11 fn default() -> Self {
12 Self::new()
13 }
14}
15
16impl ProtocolClient {
17 pub fn new() -> Self {
18 Self {
19 http: reqwest::Client::builder()
20 .timeout(Duration::from_secs(30))
21 .build()
22 .expect("reqwest client"), }
24 }
25
26 pub async fn health(&self, endpoint: &str) -> Result<FunctionHealthStatus, ProviderError> {
27 let url = format!("{}/health", endpoint);
28 let resp = self
29 .http
30 .get(&url)
31 .send()
32 .await
33 .map_err(|e| ProviderError::HealthFailed(format!("health request failed: {e}")))?;
34 if resp.status().is_success() {
35 let health: HealthResponse = resp
36 .json()
37 .await
38 .map_err(|e| ProviderError::HealthFailed(format!("decode health response: {e}")))?;
39 if health.status == "ready" {
40 Ok(FunctionHealthStatus::Healthy)
41 } else {
42 Ok(FunctionHealthStatus::Unhealthy(health.status))
43 }
44 } else {
45 Ok(FunctionHealthStatus::Unhealthy(format!(
46 "status {}",
47 resp.status()
48 )))
49 }
50 }
51
52 pub async fn register(
53 &self,
54 endpoint: &str,
55 def: &FunctionDefinition,
56 ) -> Result<(), ProviderError> {
57 let url = format!("{}/register", endpoint);
58 let body = RegisterRequest {
59 function_id: def.id.0.clone(),
60 runtime: def.runtime.clone(),
61 source: def.source.clone(),
62 timeout_ms: def.timeout_ms,
63 };
64 let resp =
65 self.http.post(&url).json(&body).send().await.map_err(|e| {
66 ProviderError::RegisterFailed(format!("register request failed: {e}"))
67 })?;
68 if resp.status().is_success() {
69 Ok(())
70 } else {
71 let err_resp: ErrorResponse = resp.json().await.map_err(|e| {
72 ProviderError::RegisterFailed(format!("decode register error: {e}"))
73 })?;
74 Err(ProviderError::RegisterFailed(format!(
75 "{}: {}",
76 err_resp.kind, err_resp.error
77 )))
78 }
79 }
80
81 pub async fn invoke(
82 &self,
83 endpoint: &str,
84 id: &FunctionId,
85 exchange: &camel_api::Exchange,
86 timeout: Duration,
87 ) -> Result<InvokeResponse, ProviderError> {
88 let url = format!("{}/invoke", endpoint);
89 let wire = ExchangeWire {
90 function_id: id.0.clone(),
91 correlation_id: exchange.correlation_id.clone(),
92 body: BodyWire::from_body(&exchange.input.body),
93 headers: exchange.input.headers.clone(),
94 properties: exchange.properties.clone(),
95 timeout_ms: timeout.as_millis() as u64,
96 };
97 let resp = self
98 .http
99 .post(&url)
100 .json(&wire)
101 .timeout(timeout)
102 .send()
103 .await
104 .map_err(|e| ProviderError::InvokeFailed(format!("invoke request failed: {e}")))?;
105 let invoke_resp: InvokeResponse = resp
106 .json()
107 .await
108 .map_err(|e| ProviderError::InvokeFailed(format!("decode invoke response: {e}")))?;
109 Ok(invoke_resp)
110 }
111
112 pub async fn unregister(&self, endpoint: &str, id: &FunctionId) -> Result<(), ProviderError> {
113 let url = format!("{}/unregister", endpoint);
114 let body = serde_json::json!({ "function_id": id.0.as_str() });
115 let resp = self.http.post(&url).json(&body).send().await.map_err(|e| {
116 ProviderError::UnregisterFailed(format!("unregister request failed: {e}"))
117 })?;
118 if resp.status().is_success() {
119 Ok(())
120 } else {
121 Err(ProviderError::UnregisterFailed(format!(
122 "status {}",
123 resp.status()
124 )))
125 }
126 }
127
128 pub async fn shutdown(&self, endpoint: &str) -> Result<(), ProviderError> {
129 let url = format!("{}/shutdown", endpoint);
130 let resp =
131 self.http.post(&url).send().await.map_err(|e| {
132 ProviderError::ShutdownFailed(format!("shutdown request failed: {e}"))
133 })?;
134 if resp.status().is_success() {
135 Ok(())
136 } else {
137 Err(ProviderError::ShutdownFailed(format!(
138 "status {}",
139 resp.status()
140 )))
141 }
142 }
143}