1use base64::Engine;
2use serde::{Deserialize, Serialize};
3use std::collections::HashMap;
4
5#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
10pub struct RegisterRequest {
11 pub function_id: String,
12 pub runtime: String,
13 pub source: String,
14 pub timeout_ms: u64,
15}
16
17#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
26#[serde(rename_all = "lowercase", tag = "kind", content = "value")]
27pub enum BodyWire {
28 Empty,
29 Text(String),
30 Json(serde_json::Value),
31 Bytes(String),
32 Xml(String),
33}
34
35impl BodyWire {
36 pub fn from_body(body: &camel_api::Body) -> Self {
37 match body {
38 camel_api::Body::Empty => BodyWire::Empty,
39 camel_api::Body::Text(s) => BodyWire::Text(s.clone()),
40 camel_api::Body::Json(v) => BodyWire::Json(v.clone()),
41 camel_api::Body::Bytes(b) => {
42 BodyWire::Bytes(base64::engine::general_purpose::STANDARD.encode(b))
43 }
44 camel_api::Body::Xml(s) => BodyWire::Xml(s.clone()),
45 camel_api::Body::Stream(_) => {
46 tracing::debug!("stream body cannot cross process boundary, mapping to Empty");
47 BodyWire::Empty
48 }
49 }
50 }
51
52 pub fn to_body(&self) -> camel_api::Body {
53 match self {
54 BodyWire::Empty => camel_api::Body::Empty,
55 BodyWire::Text(s) => camel_api::Body::Text(s.clone()),
56 BodyWire::Json(v) => camel_api::Body::Json(v.clone()),
57 BodyWire::Bytes(b64) => match base64::engine::general_purpose::STANDARD.decode(b64) {
58 Ok(bytes) => camel_api::Body::Bytes(bytes::Bytes::from(bytes)),
59 Err(e) => {
60 tracing::warn!(error = %e, "invalid base64 in wire body, falling back to Empty");
61 camel_api::Body::Empty
62 }
63 },
64 BodyWire::Xml(s) => camel_api::Body::Xml(s.clone()),
65 }
66 }
67
68 pub fn to_patch_body(self) -> Result<camel_api::function::PatchBody, camel_api::CamelError> {
69 use camel_api::function::PatchBody;
70 match self {
71 BodyWire::Empty => Ok(PatchBody::Empty),
72 BodyWire::Text(s) => Ok(PatchBody::Text(s)),
73 BodyWire::Json(v) => Ok(PatchBody::Json(v)),
74 BodyWire::Bytes(_) => Err(camel_api::CamelError::ProcessorError(
75 "unsupported body type for function: Bytes".into(),
76 )),
77 BodyWire::Xml(_) => Err(camel_api::CamelError::ProcessorError(
78 "unsupported body type for function: Xml".into(),
79 )),
80 }
81 }
82}
83
84#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
89pub struct ExchangeWire {
90 pub function_id: String,
91 pub correlation_id: String,
92 pub body: BodyWire,
93 pub headers: HashMap<String, serde_json::Value>,
94 pub properties: HashMap<String, serde_json::Value>,
95 pub timeout_ms: u64,
96}
97
98#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
103pub struct InvokeResponse {
104 pub ok: bool,
105 #[serde(skip_serializing_if = "Option::is_none")]
106 pub patch: Option<PatchWire>,
107 #[serde(skip_serializing_if = "Option::is_none")]
108 pub error: Option<ErrorWire>,
109}
110
111#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
116pub struct PatchWire {
117 pub body: Option<BodyWire>,
118 pub headers_set: Vec<(String, serde_json::Value)>,
119 pub headers_removed: Vec<String>,
120 pub properties_set: Vec<(String, serde_json::Value)>,
121}
122
123impl PatchWire {
124 pub fn to_exchange_patch(
125 self,
126 ) -> Result<camel_api::function::ExchangePatch, camel_api::CamelError> {
127 let body = self.body.map(BodyWire::to_patch_body).transpose()?;
128 Ok(camel_api::function::ExchangePatch {
129 body,
130 headers_set: self.headers_set,
131 headers_removed: self.headers_removed,
132 properties_set: self.properties_set,
133 })
134 }
135}
136
137#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
142pub struct ErrorWire {
143 pub kind: String,
144 pub message: String,
145 pub stack: Option<String>,
146}
147
148#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
153pub struct HealthResponse {
154 pub status: String,
155 pub registered: Vec<String>,
156}
157
158#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
163pub struct ErrorResponse {
164 pub error: String,
165 pub kind: String,
166}
167
168pub mod client;
169
170pub use client::ProtocolClient;
171
172#[cfg(test)]
177mod tests {
178 use super::*;
179
180 #[test]
181 fn test_register_request_roundtrip() {
182 let req = RegisterRequest {
183 function_id: "fn-123".into(),
184 runtime: "deno".into(),
185 source: "export default function(ex) { return ex; }".into(),
186 timeout_ms: 5000,
187 };
188 let json = serde_json::to_string(&req).unwrap();
189 let decoded: RegisterRequest = serde_json::from_str(&json).unwrap();
190 assert_eq!(req, decoded);
191 }
192
193 fn make_exchange_wire(body: BodyWire) -> ExchangeWire {
194 let mut headers = HashMap::new();
195 headers.insert("content-type".into(), serde_json::json!("text/plain"));
196 let mut properties = HashMap::new();
197 properties.insert("retry-count".into(), serde_json::json!(3));
198 ExchangeWire {
199 function_id: "fn-abc".into(),
200 correlation_id: "corr-001".into(),
201 body,
202 headers,
203 properties,
204 timeout_ms: 3000,
205 }
206 }
207
208 #[test]
209 fn test_exchange_wire_roundtrip_text() {
210 let wire = make_exchange_wire(BodyWire::Text("hello world".into()));
211 let json = serde_json::to_string(&wire).unwrap();
212 let decoded: ExchangeWire = serde_json::from_str(&json).unwrap();
213 assert_eq!(wire, decoded);
214 }
215
216 #[test]
217 fn test_exchange_wire_roundtrip_json() {
218 let wire = make_exchange_wire(BodyWire::Json(serde_json::json!({"key": "value"})));
219 let json = serde_json::to_string(&wire).unwrap();
220 let decoded: ExchangeWire = serde_json::from_str(&json).unwrap();
221 assert_eq!(wire, decoded);
222 }
223
224 #[test]
225 fn test_exchange_wire_roundtrip_bytes() {
226 let original = b"binary data here";
227 let encoded = base64::engine::general_purpose::STANDARD.encode(original);
228 let wire = make_exchange_wire(BodyWire::Bytes(encoded));
229 let json = serde_json::to_string(&wire).unwrap();
230 let decoded: ExchangeWire = serde_json::from_str(&json).unwrap();
231 assert_eq!(wire, decoded);
232 if let BodyWire::Bytes(b64) = &decoded.body {
234 let decoded_bytes = base64::engine::general_purpose::STANDARD
235 .decode(b64)
236 .unwrap();
237 assert_eq!(decoded_bytes, original);
238 } else {
239 panic!("expected Bytes variant");
240 }
241 }
242
243 #[test]
244 fn test_exchange_wire_roundtrip_xml() {
245 let wire = make_exchange_wire(BodyWire::Xml("<root><item>1</item></root>".into()));
246 let json = serde_json::to_string(&wire).unwrap();
247 let decoded: ExchangeWire = serde_json::from_str(&json).unwrap();
248 assert_eq!(wire, decoded);
249 }
250
251 #[test]
252 fn test_exchange_wire_roundtrip_empty() {
253 let wire = make_exchange_wire(BodyWire::Empty);
254 let json = serde_json::to_string(&wire).unwrap();
255 let decoded: ExchangeWire = serde_json::from_str(&json).unwrap();
256 assert_eq!(wire, decoded);
257 }
258
259 #[test]
260 fn test_invoke_response_ok() {
261 let resp = InvokeResponse {
262 ok: true,
263 patch: Some(PatchWire {
264 body: Some(BodyWire::Text("processed".into())),
265 headers_set: vec![("x-custom".into(), serde_json::json!("added"))],
266 headers_removed: vec!["x-old".into()],
267 properties_set: vec![("status".into(), serde_json::json!("done"))],
268 }),
269 error: None,
270 };
271 let json = serde_json::to_string(&resp).unwrap();
272 let decoded: InvokeResponse = serde_json::from_str(&json).unwrap();
273 assert_eq!(resp, decoded);
274 assert!(decoded.ok);
275 assert!(decoded.patch.as_ref().unwrap().body.is_some());
276 }
277
278 #[test]
279 fn test_invoke_response_error() {
280 let resp = InvokeResponse {
281 ok: false,
282 patch: None,
283 error: Some(ErrorWire {
284 kind: "user_error".into(),
285 message: "ReferenceError: x is not defined".into(),
286 stack: Some("at main (file:///fn.ts:3:1)".into()),
287 }),
288 };
289 let json = serde_json::to_string(&resp).unwrap();
290 let decoded: InvokeResponse = serde_json::from_str(&json).unwrap();
291 assert_eq!(resp, decoded);
292 assert!(!decoded.ok);
293 let err = decoded.error.unwrap();
294 assert_eq!(err.kind, "user_error");
295 assert!(err.stack.is_some());
296 }
297
298 #[test]
299 fn test_health_response() {
300 let resp = HealthResponse {
301 status: "ok".into(),
302 registered: vec!["fn-a".into(), "fn-b".into()],
303 };
304 let json = serde_json::to_string(&resp).unwrap();
305 let decoded: HealthResponse = serde_json::from_str(&json).unwrap();
306 assert_eq!(resp, decoded);
307 assert_eq!(decoded.registered.len(), 2);
308 }
309
310 #[test]
311 fn test_error_response() {
312 let resp = ErrorResponse {
313 error: "function not found".into(),
314 kind: "not_registered".into(),
315 };
316 let json = serde_json::to_string(&resp).unwrap();
317 let decoded: ErrorResponse = serde_json::from_str(&json).unwrap();
318 assert_eq!(resp, decoded);
319 }
320
321 #[test]
322 fn test_patch_wire() {
323 let patch = PatchWire {
324 body: Some(BodyWire::Json(serde_json::json!({"updated": true}))),
325 headers_set: vec![("x-new".into(), serde_json::json!("val"))],
326 headers_removed: vec!["x-old".into()],
327 properties_set: vec![("key".into(), serde_json::json!(42))],
328 };
329 let json = serde_json::to_string(&patch).unwrap();
330 let decoded: PatchWire = serde_json::from_str(&json).unwrap();
331 assert_eq!(patch, decoded);
332 }
333
334 #[test]
335 fn test_body_wire_serde_lowercase() {
336 let wire = BodyWire::Text("hello".into());
337 let json = serde_json::to_string(&wire).unwrap();
338 assert!(
339 json.contains("\"text\""),
340 "expected lowercase variant name, got: {json}"
341 );
342 assert!(
343 !json.contains("\"Text\""),
344 "should not have UpperCamelCase variant"
345 );
346 let decoded: BodyWire = serde_json::from_str(&json).unwrap();
347 assert_eq!(wire, decoded);
348 }
349
350 #[test]
351 fn test_body_wire_bytes_base64_roundtrip() {
352 let original_bytes = vec![0xDE, 0xAD, 0xBE, 0xEF, 0xCA, 0xFE];
353 let encoded = base64::engine::general_purpose::STANDARD.encode(&original_bytes);
354 let wire = BodyWire::Bytes(encoded.clone());
355
356 let json = serde_json::to_string(&wire).unwrap();
357 let decoded: BodyWire = serde_json::from_str(&json).unwrap();
358
359 if let BodyWire::Bytes(b64) = &decoded {
360 let roundtrip = base64::engine::general_purpose::STANDARD
361 .decode(b64)
362 .unwrap();
363 assert_eq!(roundtrip, original_bytes);
364 } else {
365 panic!("expected Bytes variant after roundtrip");
366 }
367
368 let body = wire.to_body();
370 if let camel_api::Body::Bytes(b) = body {
371 assert_eq!(b.to_vec(), original_bytes);
372 } else {
373 panic!("expected Body::Bytes from to_body()");
374 }
375 }
376
377 #[test]
378 fn test_body_wire_from_body_roundtrip() {
379 let bodies = vec![
380 ("Empty", camel_api::Body::Empty),
381 ("Text", camel_api::Body::Text("hello world".into())),
382 (
383 "Json",
384 camel_api::Body::Json(serde_json::json!({"key": "value"})),
385 ),
386 (
387 "Xml",
388 camel_api::Body::Xml("<root><item>1</item></root>".into()),
389 ),
390 ];
391
392 for (name, body) in bodies {
393 let wire = BodyWire::from_body(&body);
394 let roundtripped = wire.to_body();
395 assert_eq!(body, roundtripped, "roundtrip failed for {name}");
396 }
397
398 let original_bytes = vec![0xDE, 0xAD, 0xBE, 0xEF];
400 let body = camel_api::Body::Bytes(bytes::Bytes::from(original_bytes.clone()));
401 let wire = BodyWire::from_body(&body);
402 let roundtripped = wire.to_body();
403 if let camel_api::Body::Bytes(b) = roundtripped {
404 assert_eq!(b.to_vec(), original_bytes);
405 } else {
406 panic!("expected Body::Bytes after Bytes roundtrip");
407 }
408 }
409
410 #[test]
411 fn test_body_wire_from_body_stream_maps_to_empty() {
412 use camel_api::{StreamBody, StreamMetadata};
413 use futures::stream;
414
415 let chunks = vec![Ok(bytes::Bytes::from("stream data"))];
416 let stream_body = camel_api::Body::Stream(StreamBody {
417 stream: std::sync::Arc::new(tokio::sync::Mutex::new(Some(Box::pin(stream::iter(
418 chunks,
419 ))))),
420 metadata: StreamMetadata::default(),
421 });
422
423 let wire = BodyWire::from_body(&stream_body);
424 assert!(matches!(wire, BodyWire::Empty));
425 }
426
427 #[test]
428 fn test_body_wire_to_body_from_body_text() {
429 let wire = BodyWire::Text("hello world".into());
430 let body = wire.to_body();
431 let wire2 = BodyWire::from_body(&body);
432
433 assert!(matches!(wire2, BodyWire::Text(ref s) if s == "hello world"));
434 }
435}