1use base64::Engine;
2use serde::{Deserialize, Serialize};
3use std::collections::HashMap;
4
5#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
10pub struct RegisterRequest {
11 pub function_id: String,
12 pub runtime: String,
13 pub source: String,
14 pub timeout_ms: u64,
15}
16
17#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
26#[serde(rename_all = "lowercase", tag = "kind", content = "value")]
27pub enum BodyWire {
28 Empty,
29 Text(String),
30 Json(serde_json::Value),
31 Bytes(String),
32 Xml(String),
33}
34
35impl BodyWire {
36 pub fn from_body(body: &camel_api::Body) -> Self {
37 match body {
38 camel_api::Body::Empty => BodyWire::Empty,
39 camel_api::Body::Text(s) => BodyWire::Text(s.clone()),
40 camel_api::Body::Json(v) => BodyWire::Json(v.clone()),
41 camel_api::Body::Bytes(b) => {
42 BodyWire::Bytes(base64::engine::general_purpose::STANDARD.encode(b))
43 }
44 camel_api::Body::Xml(s) => BodyWire::Xml(s.clone()),
45 camel_api::Body::Stream(_) => {
46 tracing::debug!("stream body cannot cross process boundary, mapping to Empty");
47 BodyWire::Empty
48 }
49 }
50 }
51
52 pub fn to_body(&self) -> camel_api::Body {
53 match self {
54 BodyWire::Empty => camel_api::Body::Empty,
55 BodyWire::Text(s) => camel_api::Body::Text(s.clone()),
56 BodyWire::Json(v) => camel_api::Body::Json(v.clone()),
57 BodyWire::Bytes(b64) => match base64::engine::general_purpose::STANDARD.decode(b64) {
58 Ok(bytes) => camel_api::Body::Bytes(bytes::Bytes::from(bytes)),
59 Err(e) => {
60 tracing::warn!(error = %e, "invalid base64 in wire body, falling back to Empty");
61 camel_api::Body::Empty
62 }
63 },
64 BodyWire::Xml(s) => camel_api::Body::Xml(s.clone()),
65 }
66 }
67
68 pub fn to_patch_body(self) -> camel_api::function::PatchBody {
69 use camel_api::function::PatchBody;
70 match self {
71 BodyWire::Empty => PatchBody::Empty,
72 BodyWire::Text(s) => PatchBody::Text(s),
73 BodyWire::Json(v) => PatchBody::Json(v),
74 BodyWire::Bytes(_) | BodyWire::Xml(_) => PatchBody::Empty,
75 }
76 }
77}
78
79#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
84pub struct ExchangeWire {
85 pub function_id: String,
86 pub correlation_id: String,
87 pub body: BodyWire,
88 pub headers: HashMap<String, serde_json::Value>,
89 pub properties: HashMap<String, serde_json::Value>,
90 pub timeout_ms: u64,
91}
92
93#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
98pub struct InvokeResponse {
99 pub ok: bool,
100 #[serde(skip_serializing_if = "Option::is_none")]
101 pub patch: Option<PatchWire>,
102 #[serde(skip_serializing_if = "Option::is_none")]
103 pub error: Option<ErrorWire>,
104}
105
106#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
111pub struct PatchWire {
112 pub body: Option<BodyWire>,
113 pub headers_set: Vec<(String, serde_json::Value)>,
114 pub headers_removed: Vec<String>,
115 pub properties_set: Vec<(String, serde_json::Value)>,
116}
117
118impl PatchWire {
119 pub fn to_exchange_patch(self) -> camel_api::function::ExchangePatch {
120 camel_api::function::ExchangePatch {
121 body: self.body.map(BodyWire::to_patch_body),
122 headers_set: self.headers_set,
123 headers_removed: self.headers_removed,
124 properties_set: self.properties_set,
125 }
126 }
127}
128
129#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
134pub struct ErrorWire {
135 pub kind: String,
136 pub message: String,
137 pub stack: Option<String>,
138}
139
140#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
145pub struct HealthResponse {
146 pub status: String,
147 pub registered: Vec<String>,
148}
149
150#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
155pub struct ErrorResponse {
156 pub error: String,
157 pub kind: String,
158}
159
160pub mod client;
161
162pub use client::ProtocolClient;
163
164#[cfg(test)]
169mod tests {
170 use super::*;
171
172 #[test]
173 fn test_register_request_roundtrip() {
174 let req = RegisterRequest {
175 function_id: "fn-123".into(),
176 runtime: "deno".into(),
177 source: "export default function(ex) { return ex; }".into(),
178 timeout_ms: 5000,
179 };
180 let json = serde_json::to_string(&req).unwrap();
181 let decoded: RegisterRequest = serde_json::from_str(&json).unwrap();
182 assert_eq!(req, decoded);
183 }
184
185 fn make_exchange_wire(body: BodyWire) -> ExchangeWire {
186 let mut headers = HashMap::new();
187 headers.insert("content-type".into(), serde_json::json!("text/plain"));
188 let mut properties = HashMap::new();
189 properties.insert("retry-count".into(), serde_json::json!(3));
190 ExchangeWire {
191 function_id: "fn-abc".into(),
192 correlation_id: "corr-001".into(),
193 body,
194 headers,
195 properties,
196 timeout_ms: 3000,
197 }
198 }
199
200 #[test]
201 fn test_exchange_wire_roundtrip_text() {
202 let wire = make_exchange_wire(BodyWire::Text("hello world".into()));
203 let json = serde_json::to_string(&wire).unwrap();
204 let decoded: ExchangeWire = serde_json::from_str(&json).unwrap();
205 assert_eq!(wire, decoded);
206 }
207
208 #[test]
209 fn test_exchange_wire_roundtrip_json() {
210 let wire = make_exchange_wire(BodyWire::Json(serde_json::json!({"key": "value"})));
211 let json = serde_json::to_string(&wire).unwrap();
212 let decoded: ExchangeWire = serde_json::from_str(&json).unwrap();
213 assert_eq!(wire, decoded);
214 }
215
216 #[test]
217 fn test_exchange_wire_roundtrip_bytes() {
218 let original = b"binary data here";
219 let encoded = base64::engine::general_purpose::STANDARD.encode(original);
220 let wire = make_exchange_wire(BodyWire::Bytes(encoded));
221 let json = serde_json::to_string(&wire).unwrap();
222 let decoded: ExchangeWire = serde_json::from_str(&json).unwrap();
223 assert_eq!(wire, decoded);
224 if let BodyWire::Bytes(b64) = &decoded.body {
226 let decoded_bytes = base64::engine::general_purpose::STANDARD
227 .decode(b64)
228 .unwrap();
229 assert_eq!(decoded_bytes, original);
230 } else {
231 panic!("expected Bytes variant");
232 }
233 }
234
235 #[test]
236 fn test_exchange_wire_roundtrip_xml() {
237 let wire = make_exchange_wire(BodyWire::Xml("<root><item>1</item></root>".into()));
238 let json = serde_json::to_string(&wire).unwrap();
239 let decoded: ExchangeWire = serde_json::from_str(&json).unwrap();
240 assert_eq!(wire, decoded);
241 }
242
243 #[test]
244 fn test_exchange_wire_roundtrip_empty() {
245 let wire = make_exchange_wire(BodyWire::Empty);
246 let json = serde_json::to_string(&wire).unwrap();
247 let decoded: ExchangeWire = serde_json::from_str(&json).unwrap();
248 assert_eq!(wire, decoded);
249 }
250
251 #[test]
252 fn test_invoke_response_ok() {
253 let resp = InvokeResponse {
254 ok: true,
255 patch: Some(PatchWire {
256 body: Some(BodyWire::Text("processed".into())),
257 headers_set: vec![("x-custom".into(), serde_json::json!("added"))],
258 headers_removed: vec!["x-old".into()],
259 properties_set: vec![("status".into(), serde_json::json!("done"))],
260 }),
261 error: None,
262 };
263 let json = serde_json::to_string(&resp).unwrap();
264 let decoded: InvokeResponse = serde_json::from_str(&json).unwrap();
265 assert_eq!(resp, decoded);
266 assert!(decoded.ok);
267 assert!(decoded.patch.as_ref().unwrap().body.is_some());
268 }
269
270 #[test]
271 fn test_invoke_response_error() {
272 let resp = InvokeResponse {
273 ok: false,
274 patch: None,
275 error: Some(ErrorWire {
276 kind: "user_error".into(),
277 message: "ReferenceError: x is not defined".into(),
278 stack: Some("at main (file:///fn.ts:3:1)".into()),
279 }),
280 };
281 let json = serde_json::to_string(&resp).unwrap();
282 let decoded: InvokeResponse = serde_json::from_str(&json).unwrap();
283 assert_eq!(resp, decoded);
284 assert!(!decoded.ok);
285 let err = decoded.error.unwrap();
286 assert_eq!(err.kind, "user_error");
287 assert!(err.stack.is_some());
288 }
289
290 #[test]
291 fn test_health_response() {
292 let resp = HealthResponse {
293 status: "ok".into(),
294 registered: vec!["fn-a".into(), "fn-b".into()],
295 };
296 let json = serde_json::to_string(&resp).unwrap();
297 let decoded: HealthResponse = serde_json::from_str(&json).unwrap();
298 assert_eq!(resp, decoded);
299 assert_eq!(decoded.registered.len(), 2);
300 }
301
302 #[test]
303 fn test_error_response() {
304 let resp = ErrorResponse {
305 error: "function not found".into(),
306 kind: "not_registered".into(),
307 };
308 let json = serde_json::to_string(&resp).unwrap();
309 let decoded: ErrorResponse = serde_json::from_str(&json).unwrap();
310 assert_eq!(resp, decoded);
311 }
312
313 #[test]
314 fn test_patch_wire() {
315 let patch = PatchWire {
316 body: Some(BodyWire::Json(serde_json::json!({"updated": true}))),
317 headers_set: vec![("x-new".into(), serde_json::json!("val"))],
318 headers_removed: vec!["x-old".into()],
319 properties_set: vec![("key".into(), serde_json::json!(42))],
320 };
321 let json = serde_json::to_string(&patch).unwrap();
322 let decoded: PatchWire = serde_json::from_str(&json).unwrap();
323 assert_eq!(patch, decoded);
324 }
325
326 #[test]
327 fn test_body_wire_serde_lowercase() {
328 let wire = BodyWire::Text("hello".into());
329 let json = serde_json::to_string(&wire).unwrap();
330 assert!(
331 json.contains("\"text\""),
332 "expected lowercase variant name, got: {json}"
333 );
334 assert!(
335 !json.contains("\"Text\""),
336 "should not have UpperCamelCase variant"
337 );
338 let decoded: BodyWire = serde_json::from_str(&json).unwrap();
339 assert_eq!(wire, decoded);
340 }
341
342 #[test]
343 fn test_body_wire_bytes_base64_roundtrip() {
344 let original_bytes = vec![0xDE, 0xAD, 0xBE, 0xEF, 0xCA, 0xFE];
345 let encoded = base64::engine::general_purpose::STANDARD.encode(&original_bytes);
346 let wire = BodyWire::Bytes(encoded.clone());
347
348 let json = serde_json::to_string(&wire).unwrap();
349 let decoded: BodyWire = serde_json::from_str(&json).unwrap();
350
351 if let BodyWire::Bytes(b64) = &decoded {
352 let roundtrip = base64::engine::general_purpose::STANDARD
353 .decode(b64)
354 .unwrap();
355 assert_eq!(roundtrip, original_bytes);
356 } else {
357 panic!("expected Bytes variant after roundtrip");
358 }
359
360 let body = wire.to_body();
362 if let camel_api::Body::Bytes(b) = body {
363 assert_eq!(b.to_vec(), original_bytes);
364 } else {
365 panic!("expected Body::Bytes from to_body()");
366 }
367 }
368
369 #[test]
370 fn test_body_wire_from_body_roundtrip() {
371 let bodies = vec![
372 ("Empty", camel_api::Body::Empty),
373 ("Text", camel_api::Body::Text("hello world".into())),
374 (
375 "Json",
376 camel_api::Body::Json(serde_json::json!({"key": "value"})),
377 ),
378 (
379 "Xml",
380 camel_api::Body::Xml("<root><item>1</item></root>".into()),
381 ),
382 ];
383
384 for (name, body) in bodies {
385 let wire = BodyWire::from_body(&body);
386 let roundtripped = wire.to_body();
387 assert_eq!(body, roundtripped, "roundtrip failed for {name}");
388 }
389
390 let original_bytes = vec![0xDE, 0xAD, 0xBE, 0xEF];
392 let body = camel_api::Body::Bytes(bytes::Bytes::from(original_bytes.clone()));
393 let wire = BodyWire::from_body(&body);
394 let roundtripped = wire.to_body();
395 if let camel_api::Body::Bytes(b) = roundtripped {
396 assert_eq!(b.to_vec(), original_bytes);
397 } else {
398 panic!("expected Body::Bytes after Bytes roundtrip");
399 }
400 }
401
402 #[test]
403 fn test_body_wire_from_body_stream_maps_to_empty() {
404 use camel_api::{StreamBody, StreamMetadata};
405 use futures::stream;
406
407 let chunks = vec![Ok(bytes::Bytes::from("stream data"))];
408 let stream_body = camel_api::Body::Stream(StreamBody {
409 stream: std::sync::Arc::new(tokio::sync::Mutex::new(Some(Box::pin(stream::iter(
410 chunks,
411 ))))),
412 metadata: StreamMetadata::default(),
413 });
414
415 let wire = BodyWire::from_body(&stream_body);
416 assert!(matches!(wire, BodyWire::Empty));
417 }
418
419 #[test]
420 fn test_body_wire_to_body_from_body_text() {
421 let wire = BodyWire::Text("hello world".into());
422 let body = wire.to_body();
423 let wire2 = BodyWire::from_body(&body);
424
425 assert!(matches!(wire2, BodyWire::Text(ref s) if s == "hello world"));
426 }
427}