Skip to main content

camel_function/protocol/
mod.rs

1use base64::Engine;
2use serde::{Deserialize, Serialize};
3use std::collections::HashMap;
4
5// ---------------------------------------------------------------------------
6// RegisterRequest
7// ---------------------------------------------------------------------------
8
9#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
10pub struct RegisterRequest {
11    pub function_id: String,
12    pub runtime: String,
13    pub source: String,
14    pub timeout_ms: u64,
15}
16
17// ---------------------------------------------------------------------------
18// BodyWire
19// ---------------------------------------------------------------------------
20
21/// Wire representation of a message body.
22///
23/// v1 supports `Empty`, `Text`, `Json`. `Bytes` (base64) and `Xml` are
24/// forward-looking extensions for future protocol versions.
25#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
26#[serde(rename_all = "lowercase", tag = "kind", content = "value")]
27pub enum BodyWire {
28    Empty,
29    Text(String),
30    Json(serde_json::Value),
31    Bytes(String),
32    Xml(String),
33}
34
35impl BodyWire {
36    pub fn from_body(body: &camel_api::Body) -> Self {
37        match body {
38            camel_api::Body::Empty => BodyWire::Empty,
39            camel_api::Body::Text(s) => BodyWire::Text(s.clone()),
40            camel_api::Body::Json(v) => BodyWire::Json(v.clone()),
41            camel_api::Body::Bytes(b) => {
42                BodyWire::Bytes(base64::engine::general_purpose::STANDARD.encode(b))
43            }
44            camel_api::Body::Xml(s) => BodyWire::Xml(s.clone()),
45            camel_api::Body::Stream(_) => {
46                tracing::debug!("stream body cannot cross process boundary, mapping to Empty");
47                BodyWire::Empty
48            }
49        }
50    }
51
52    pub fn to_body(&self) -> camel_api::Body {
53        match self {
54            BodyWire::Empty => camel_api::Body::Empty,
55            BodyWire::Text(s) => camel_api::Body::Text(s.clone()),
56            BodyWire::Json(v) => camel_api::Body::Json(v.clone()),
57            BodyWire::Bytes(b64) => match base64::engine::general_purpose::STANDARD.decode(b64) {
58                Ok(bytes) => camel_api::Body::Bytes(bytes::Bytes::from(bytes)),
59                Err(e) => {
60                    tracing::warn!(error = %e, "invalid base64 in wire body, falling back to Empty");
61                    camel_api::Body::Empty
62                }
63            },
64            BodyWire::Xml(s) => camel_api::Body::Xml(s.clone()),
65        }
66    }
67
68    pub fn to_patch_body(self) -> camel_api::function::PatchBody {
69        use camel_api::function::PatchBody;
70        match self {
71            BodyWire::Empty => PatchBody::Empty,
72            BodyWire::Text(s) => PatchBody::Text(s),
73            BodyWire::Json(v) => PatchBody::Json(v),
74            BodyWire::Bytes(_) | BodyWire::Xml(_) => PatchBody::Empty,
75        }
76    }
77}
78
79// ---------------------------------------------------------------------------
80// ExchangeWire
81// ---------------------------------------------------------------------------
82
83#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
84pub struct ExchangeWire {
85    pub function_id: String,
86    pub correlation_id: String,
87    pub body: BodyWire,
88    pub headers: HashMap<String, serde_json::Value>,
89    pub properties: HashMap<String, serde_json::Value>,
90    pub timeout_ms: u64,
91}
92
93// ---------------------------------------------------------------------------
94// InvokeResponse
95// ---------------------------------------------------------------------------
96
97#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
98pub struct InvokeResponse {
99    pub ok: bool,
100    #[serde(skip_serializing_if = "Option::is_none")]
101    pub patch: Option<PatchWire>,
102    #[serde(skip_serializing_if = "Option::is_none")]
103    pub error: Option<ErrorWire>,
104}
105
106// ---------------------------------------------------------------------------
107// PatchWire
108// ---------------------------------------------------------------------------
109
110#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
111pub struct PatchWire {
112    pub body: Option<BodyWire>,
113    pub headers_set: Vec<(String, serde_json::Value)>,
114    pub headers_removed: Vec<String>,
115    pub properties_set: Vec<(String, serde_json::Value)>,
116}
117
118impl PatchWire {
119    pub fn to_exchange_patch(self) -> camel_api::function::ExchangePatch {
120        camel_api::function::ExchangePatch {
121            body: self.body.map(BodyWire::to_patch_body),
122            headers_set: self.headers_set,
123            headers_removed: self.headers_removed,
124            properties_set: self.properties_set,
125        }
126    }
127}
128
129// ---------------------------------------------------------------------------
130// ErrorWire
131// ---------------------------------------------------------------------------
132
133#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
134pub struct ErrorWire {
135    pub kind: String,
136    pub message: String,
137    pub stack: Option<String>,
138}
139
140// ---------------------------------------------------------------------------
141// HealthResponse
142// ---------------------------------------------------------------------------
143
144#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
145pub struct HealthResponse {
146    pub status: String,
147    pub registered: Vec<String>,
148}
149
150// ---------------------------------------------------------------------------
151// ErrorResponse
152// ---------------------------------------------------------------------------
153
154#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
155pub struct ErrorResponse {
156    pub error: String,
157    pub kind: String,
158}
159
160pub mod client;
161
162pub use client::ProtocolClient;
163
164// ---------------------------------------------------------------------------
165// Tests
166// ---------------------------------------------------------------------------
167
168#[cfg(test)]
169mod tests {
170    use super::*;
171
172    #[test]
173    fn test_register_request_roundtrip() {
174        let req = RegisterRequest {
175            function_id: "fn-123".into(),
176            runtime: "deno".into(),
177            source: "export default function(ex) { return ex; }".into(),
178            timeout_ms: 5000,
179        };
180        let json = serde_json::to_string(&req).unwrap();
181        let decoded: RegisterRequest = serde_json::from_str(&json).unwrap();
182        assert_eq!(req, decoded);
183    }
184
185    fn make_exchange_wire(body: BodyWire) -> ExchangeWire {
186        let mut headers = HashMap::new();
187        headers.insert("content-type".into(), serde_json::json!("text/plain"));
188        let mut properties = HashMap::new();
189        properties.insert("retry-count".into(), serde_json::json!(3));
190        ExchangeWire {
191            function_id: "fn-abc".into(),
192            correlation_id: "corr-001".into(),
193            body,
194            headers,
195            properties,
196            timeout_ms: 3000,
197        }
198    }
199
200    #[test]
201    fn test_exchange_wire_roundtrip_text() {
202        let wire = make_exchange_wire(BodyWire::Text("hello world".into()));
203        let json = serde_json::to_string(&wire).unwrap();
204        let decoded: ExchangeWire = serde_json::from_str(&json).unwrap();
205        assert_eq!(wire, decoded);
206    }
207
208    #[test]
209    fn test_exchange_wire_roundtrip_json() {
210        let wire = make_exchange_wire(BodyWire::Json(serde_json::json!({"key": "value"})));
211        let json = serde_json::to_string(&wire).unwrap();
212        let decoded: ExchangeWire = serde_json::from_str(&json).unwrap();
213        assert_eq!(wire, decoded);
214    }
215
216    #[test]
217    fn test_exchange_wire_roundtrip_bytes() {
218        let original = b"binary data here";
219        let encoded = base64::engine::general_purpose::STANDARD.encode(original);
220        let wire = make_exchange_wire(BodyWire::Bytes(encoded));
221        let json = serde_json::to_string(&wire).unwrap();
222        let decoded: ExchangeWire = serde_json::from_str(&json).unwrap();
223        assert_eq!(wire, decoded);
224        // Verify base64 roundtrip
225        if let BodyWire::Bytes(b64) = &decoded.body {
226            let decoded_bytes = base64::engine::general_purpose::STANDARD
227                .decode(b64)
228                .unwrap();
229            assert_eq!(decoded_bytes, original);
230        } else {
231            panic!("expected Bytes variant");
232        }
233    }
234
235    #[test]
236    fn test_exchange_wire_roundtrip_xml() {
237        let wire = make_exchange_wire(BodyWire::Xml("<root><item>1</item></root>".into()));
238        let json = serde_json::to_string(&wire).unwrap();
239        let decoded: ExchangeWire = serde_json::from_str(&json).unwrap();
240        assert_eq!(wire, decoded);
241    }
242
243    #[test]
244    fn test_exchange_wire_roundtrip_empty() {
245        let wire = make_exchange_wire(BodyWire::Empty);
246        let json = serde_json::to_string(&wire).unwrap();
247        let decoded: ExchangeWire = serde_json::from_str(&json).unwrap();
248        assert_eq!(wire, decoded);
249    }
250
251    #[test]
252    fn test_invoke_response_ok() {
253        let resp = InvokeResponse {
254            ok: true,
255            patch: Some(PatchWire {
256                body: Some(BodyWire::Text("processed".into())),
257                headers_set: vec![("x-custom".into(), serde_json::json!("added"))],
258                headers_removed: vec!["x-old".into()],
259                properties_set: vec![("status".into(), serde_json::json!("done"))],
260            }),
261            error: None,
262        };
263        let json = serde_json::to_string(&resp).unwrap();
264        let decoded: InvokeResponse = serde_json::from_str(&json).unwrap();
265        assert_eq!(resp, decoded);
266        assert!(decoded.ok);
267        assert!(decoded.patch.as_ref().unwrap().body.is_some());
268    }
269
270    #[test]
271    fn test_invoke_response_error() {
272        let resp = InvokeResponse {
273            ok: false,
274            patch: None,
275            error: Some(ErrorWire {
276                kind: "user_error".into(),
277                message: "ReferenceError: x is not defined".into(),
278                stack: Some("at main (file:///fn.ts:3:1)".into()),
279            }),
280        };
281        let json = serde_json::to_string(&resp).unwrap();
282        let decoded: InvokeResponse = serde_json::from_str(&json).unwrap();
283        assert_eq!(resp, decoded);
284        assert!(!decoded.ok);
285        let err = decoded.error.unwrap();
286        assert_eq!(err.kind, "user_error");
287        assert!(err.stack.is_some());
288    }
289
290    #[test]
291    fn test_health_response() {
292        let resp = HealthResponse {
293            status: "ok".into(),
294            registered: vec!["fn-a".into(), "fn-b".into()],
295        };
296        let json = serde_json::to_string(&resp).unwrap();
297        let decoded: HealthResponse = serde_json::from_str(&json).unwrap();
298        assert_eq!(resp, decoded);
299        assert_eq!(decoded.registered.len(), 2);
300    }
301
302    #[test]
303    fn test_error_response() {
304        let resp = ErrorResponse {
305            error: "function not found".into(),
306            kind: "not_registered".into(),
307        };
308        let json = serde_json::to_string(&resp).unwrap();
309        let decoded: ErrorResponse = serde_json::from_str(&json).unwrap();
310        assert_eq!(resp, decoded);
311    }
312
313    #[test]
314    fn test_patch_wire() {
315        let patch = PatchWire {
316            body: Some(BodyWire::Json(serde_json::json!({"updated": true}))),
317            headers_set: vec![("x-new".into(), serde_json::json!("val"))],
318            headers_removed: vec!["x-old".into()],
319            properties_set: vec![("key".into(), serde_json::json!(42))],
320        };
321        let json = serde_json::to_string(&patch).unwrap();
322        let decoded: PatchWire = serde_json::from_str(&json).unwrap();
323        assert_eq!(patch, decoded);
324    }
325
326    #[test]
327    fn test_body_wire_serde_lowercase() {
328        let wire = BodyWire::Text("hello".into());
329        let json = serde_json::to_string(&wire).unwrap();
330        assert!(
331            json.contains("\"text\""),
332            "expected lowercase variant name, got: {json}"
333        );
334        assert!(
335            !json.contains("\"Text\""),
336            "should not have UpperCamelCase variant"
337        );
338        let decoded: BodyWire = serde_json::from_str(&json).unwrap();
339        assert_eq!(wire, decoded);
340    }
341
342    #[test]
343    fn test_body_wire_bytes_base64_roundtrip() {
344        let original_bytes = vec![0xDE, 0xAD, 0xBE, 0xEF, 0xCA, 0xFE];
345        let encoded = base64::engine::general_purpose::STANDARD.encode(&original_bytes);
346        let wire = BodyWire::Bytes(encoded.clone());
347
348        let json = serde_json::to_string(&wire).unwrap();
349        let decoded: BodyWire = serde_json::from_str(&json).unwrap();
350
351        if let BodyWire::Bytes(b64) = &decoded {
352            let roundtrip = base64::engine::general_purpose::STANDARD
353                .decode(b64)
354                .unwrap();
355            assert_eq!(roundtrip, original_bytes);
356        } else {
357            panic!("expected Bytes variant after roundtrip");
358        }
359
360        // Also verify to_body conversion
361        let body = wire.to_body();
362        if let camel_api::Body::Bytes(b) = body {
363            assert_eq!(b.to_vec(), original_bytes);
364        } else {
365            panic!("expected Body::Bytes from to_body()");
366        }
367    }
368
369    #[test]
370    fn test_body_wire_from_body_roundtrip() {
371        let bodies = vec![
372            ("Empty", camel_api::Body::Empty),
373            ("Text", camel_api::Body::Text("hello world".into())),
374            (
375                "Json",
376                camel_api::Body::Json(serde_json::json!({"key": "value"})),
377            ),
378            (
379                "Xml",
380                camel_api::Body::Xml("<root><item>1</item></root>".into()),
381            ),
382        ];
383
384        for (name, body) in bodies {
385            let wire = BodyWire::from_body(&body);
386            let roundtripped = wire.to_body();
387            assert_eq!(body, roundtripped, "roundtrip failed for {name}");
388        }
389
390        // Bytes need special handling since from_body base64-encodes
391        let original_bytes = vec![0xDE, 0xAD, 0xBE, 0xEF];
392        let body = camel_api::Body::Bytes(bytes::Bytes::from(original_bytes.clone()));
393        let wire = BodyWire::from_body(&body);
394        let roundtripped = wire.to_body();
395        if let camel_api::Body::Bytes(b) = roundtripped {
396            assert_eq!(b.to_vec(), original_bytes);
397        } else {
398            panic!("expected Body::Bytes after Bytes roundtrip");
399        }
400    }
401
402    #[test]
403    fn test_body_wire_from_body_stream_maps_to_empty() {
404        use camel_api::{StreamBody, StreamMetadata};
405        use futures::stream;
406
407        let chunks = vec![Ok(bytes::Bytes::from("stream data"))];
408        let stream_body = camel_api::Body::Stream(StreamBody {
409            stream: std::sync::Arc::new(tokio::sync::Mutex::new(Some(Box::pin(stream::iter(
410                chunks,
411            ))))),
412            metadata: StreamMetadata::default(),
413        });
414
415        let wire = BodyWire::from_body(&stream_body);
416        assert!(matches!(wire, BodyWire::Empty));
417    }
418
419    #[test]
420    fn test_body_wire_to_body_from_body_text() {
421        let wire = BodyWire::Text("hello world".into());
422        let body = wire.to_body();
423        let wire2 = BodyWire::from_body(&body);
424
425        assert!(matches!(wire2, BodyWire::Text(ref s) if s == "hello world"));
426    }
427}