Skip to main content

camel_function/protocol/
mod.rs

1use base64::Engine;
2use serde::{Deserialize, Serialize};
3use std::collections::HashMap;
4
5// ---------------------------------------------------------------------------
6// RegisterRequest
7// ---------------------------------------------------------------------------
8
9#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
10pub struct RegisterRequest {
11    pub function_id: String,
12    pub runtime: String,
13    pub source: String,
14    pub timeout_ms: u64,
15}
16
17// ---------------------------------------------------------------------------
18// BodyWire
19// ---------------------------------------------------------------------------
20
21/// Wire representation of a message body.
22///
23/// v1 supports `Empty`, `Text`, `Json`. `Bytes` (base64) and `Xml` are
24/// forward-looking extensions for future protocol versions.
25#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
26#[serde(rename_all = "lowercase", tag = "kind", content = "value")]
27pub enum BodyWire {
28    Empty,
29    Text(String),
30    Json(serde_json::Value),
31    Bytes(String),
32    Xml(String),
33}
34
35impl BodyWire {
36    pub fn from_body(body: &camel_api::Body) -> Self {
37        match body {
38            camel_api::Body::Empty => BodyWire::Empty,
39            camel_api::Body::Text(s) => BodyWire::Text(s.clone()),
40            camel_api::Body::Json(v) => BodyWire::Json(v.clone()),
41            camel_api::Body::Bytes(b) => {
42                BodyWire::Bytes(base64::engine::general_purpose::STANDARD.encode(b))
43            }
44            camel_api::Body::Xml(s) => BodyWire::Xml(s.clone()),
45            camel_api::Body::Stream(_) => {
46                tracing::debug!("stream body cannot cross process boundary, mapping to Empty");
47                BodyWire::Empty
48            }
49        }
50    }
51
52    pub fn to_body(&self) -> camel_api::Body {
53        match self {
54            BodyWire::Empty => camel_api::Body::Empty,
55            BodyWire::Text(s) => camel_api::Body::Text(s.clone()),
56            BodyWire::Json(v) => camel_api::Body::Json(v.clone()),
57            BodyWire::Bytes(b64) => match base64::engine::general_purpose::STANDARD.decode(b64) {
58                Ok(bytes) => camel_api::Body::Bytes(bytes::Bytes::from(bytes)),
59                Err(e) => {
60                    tracing::warn!(error = %e, "invalid base64 in wire body, falling back to Empty");
61                    camel_api::Body::Empty
62                }
63            },
64            BodyWire::Xml(s) => camel_api::Body::Xml(s.clone()),
65        }
66    }
67
68    pub fn to_patch_body(self) -> Result<camel_api::function::PatchBody, camel_api::CamelError> {
69        use camel_api::function::PatchBody;
70        match self {
71            BodyWire::Empty => Ok(PatchBody::Empty),
72            BodyWire::Text(s) => Ok(PatchBody::Text(s)),
73            BodyWire::Json(v) => Ok(PatchBody::Json(v)),
74            BodyWire::Bytes(_) => Err(camel_api::CamelError::ProcessorError(
75                "unsupported body type for function: Bytes".into(),
76            )),
77            BodyWire::Xml(_) => Err(camel_api::CamelError::ProcessorError(
78                "unsupported body type for function: Xml".into(),
79            )),
80        }
81    }
82}
83
84// ---------------------------------------------------------------------------
85// ExchangeWire
86// ---------------------------------------------------------------------------
87
88#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
89pub struct ExchangeWire {
90    pub function_id: String,
91    pub correlation_id: String,
92    pub body: BodyWire,
93    pub headers: HashMap<String, serde_json::Value>,
94    pub properties: HashMap<String, serde_json::Value>,
95    pub timeout_ms: u64,
96}
97
98// ---------------------------------------------------------------------------
99// InvokeResponse
100// ---------------------------------------------------------------------------
101
102#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
103pub struct InvokeResponse {
104    pub ok: bool,
105    #[serde(skip_serializing_if = "Option::is_none")]
106    pub patch: Option<PatchWire>,
107    #[serde(skip_serializing_if = "Option::is_none")]
108    pub error: Option<ErrorWire>,
109}
110
111// ---------------------------------------------------------------------------
112// PatchWire
113// ---------------------------------------------------------------------------
114
115#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
116pub struct PatchWire {
117    pub body: Option<BodyWire>,
118    pub headers_set: Vec<(String, serde_json::Value)>,
119    pub headers_removed: Vec<String>,
120    pub properties_set: Vec<(String, serde_json::Value)>,
121}
122
123impl PatchWire {
124    pub fn to_exchange_patch(
125        self,
126    ) -> Result<camel_api::function::ExchangePatch, camel_api::CamelError> {
127        let body = self.body.map(BodyWire::to_patch_body).transpose()?;
128        Ok(camel_api::function::ExchangePatch {
129            body,
130            headers_set: self.headers_set,
131            headers_removed: self.headers_removed,
132            properties_set: self.properties_set,
133        })
134    }
135}
136
137// ---------------------------------------------------------------------------
138// ErrorWire
139// ---------------------------------------------------------------------------
140
141#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
142pub struct ErrorWire {
143    pub kind: String,
144    pub message: String,
145    pub stack: Option<String>,
146}
147
148// ---------------------------------------------------------------------------
149// HealthResponse
150// ---------------------------------------------------------------------------
151
152#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
153pub struct HealthResponse {
154    pub status: String,
155    pub registered: Vec<String>,
156}
157
158// ---------------------------------------------------------------------------
159// ErrorResponse
160// ---------------------------------------------------------------------------
161
162#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
163pub struct ErrorResponse {
164    pub error: String,
165    pub kind: String,
166}
167
168pub mod client;
169
170pub use client::ProtocolClient;
171
172// ---------------------------------------------------------------------------
173// Tests
174// ---------------------------------------------------------------------------
175
176#[cfg(test)]
177mod tests {
178    use super::*;
179
180    #[test]
181    fn test_register_request_roundtrip() {
182        let req = RegisterRequest {
183            function_id: "fn-123".into(),
184            runtime: "deno".into(),
185            source: "export default function(ex) { return ex; }".into(),
186            timeout_ms: 5000,
187        };
188        let json = serde_json::to_string(&req).unwrap();
189        let decoded: RegisterRequest = serde_json::from_str(&json).unwrap();
190        assert_eq!(req, decoded);
191    }
192
193    fn make_exchange_wire(body: BodyWire) -> ExchangeWire {
194        let mut headers = HashMap::new();
195        headers.insert("content-type".into(), serde_json::json!("text/plain"));
196        let mut properties = HashMap::new();
197        properties.insert("retry-count".into(), serde_json::json!(3));
198        ExchangeWire {
199            function_id: "fn-abc".into(),
200            correlation_id: "corr-001".into(),
201            body,
202            headers,
203            properties,
204            timeout_ms: 3000,
205        }
206    }
207
208    #[test]
209    fn test_exchange_wire_roundtrip_text() {
210        let wire = make_exchange_wire(BodyWire::Text("hello world".into()));
211        let json = serde_json::to_string(&wire).unwrap();
212        let decoded: ExchangeWire = serde_json::from_str(&json).unwrap();
213        assert_eq!(wire, decoded);
214    }
215
216    #[test]
217    fn test_exchange_wire_roundtrip_json() {
218        let wire = make_exchange_wire(BodyWire::Json(serde_json::json!({"key": "value"})));
219        let json = serde_json::to_string(&wire).unwrap();
220        let decoded: ExchangeWire = serde_json::from_str(&json).unwrap();
221        assert_eq!(wire, decoded);
222    }
223
224    #[test]
225    fn test_exchange_wire_roundtrip_bytes() {
226        let original = b"binary data here";
227        let encoded = base64::engine::general_purpose::STANDARD.encode(original);
228        let wire = make_exchange_wire(BodyWire::Bytes(encoded));
229        let json = serde_json::to_string(&wire).unwrap();
230        let decoded: ExchangeWire = serde_json::from_str(&json).unwrap();
231        assert_eq!(wire, decoded);
232        // Verify base64 roundtrip
233        if let BodyWire::Bytes(b64) = &decoded.body {
234            let decoded_bytes = base64::engine::general_purpose::STANDARD
235                .decode(b64)
236                .unwrap();
237            assert_eq!(decoded_bytes, original);
238        } else {
239            panic!("expected Bytes variant");
240        }
241    }
242
243    #[test]
244    fn test_exchange_wire_roundtrip_xml() {
245        let wire = make_exchange_wire(BodyWire::Xml("<root><item>1</item></root>".into()));
246        let json = serde_json::to_string(&wire).unwrap();
247        let decoded: ExchangeWire = serde_json::from_str(&json).unwrap();
248        assert_eq!(wire, decoded);
249    }
250
251    #[test]
252    fn test_exchange_wire_roundtrip_empty() {
253        let wire = make_exchange_wire(BodyWire::Empty);
254        let json = serde_json::to_string(&wire).unwrap();
255        let decoded: ExchangeWire = serde_json::from_str(&json).unwrap();
256        assert_eq!(wire, decoded);
257    }
258
259    #[test]
260    fn test_invoke_response_ok() {
261        let resp = InvokeResponse {
262            ok: true,
263            patch: Some(PatchWire {
264                body: Some(BodyWire::Text("processed".into())),
265                headers_set: vec![("x-custom".into(), serde_json::json!("added"))],
266                headers_removed: vec!["x-old".into()],
267                properties_set: vec![("status".into(), serde_json::json!("done"))],
268            }),
269            error: None,
270        };
271        let json = serde_json::to_string(&resp).unwrap();
272        let decoded: InvokeResponse = serde_json::from_str(&json).unwrap();
273        assert_eq!(resp, decoded);
274        assert!(decoded.ok);
275        assert!(decoded.patch.as_ref().unwrap().body.is_some());
276    }
277
278    #[test]
279    fn test_invoke_response_error() {
280        let resp = InvokeResponse {
281            ok: false,
282            patch: None,
283            error: Some(ErrorWire {
284                kind: "user_error".into(),
285                message: "ReferenceError: x is not defined".into(),
286                stack: Some("at main (file:///fn.ts:3:1)".into()),
287            }),
288        };
289        let json = serde_json::to_string(&resp).unwrap();
290        let decoded: InvokeResponse = serde_json::from_str(&json).unwrap();
291        assert_eq!(resp, decoded);
292        assert!(!decoded.ok);
293        let err = decoded.error.unwrap();
294        assert_eq!(err.kind, "user_error");
295        assert!(err.stack.is_some());
296    }
297
298    #[test]
299    fn test_health_response() {
300        let resp = HealthResponse {
301            status: "ok".into(),
302            registered: vec!["fn-a".into(), "fn-b".into()],
303        };
304        let json = serde_json::to_string(&resp).unwrap();
305        let decoded: HealthResponse = serde_json::from_str(&json).unwrap();
306        assert_eq!(resp, decoded);
307        assert_eq!(decoded.registered.len(), 2);
308    }
309
310    #[test]
311    fn test_error_response() {
312        let resp = ErrorResponse {
313            error: "function not found".into(),
314            kind: "not_registered".into(),
315        };
316        let json = serde_json::to_string(&resp).unwrap();
317        let decoded: ErrorResponse = serde_json::from_str(&json).unwrap();
318        assert_eq!(resp, decoded);
319    }
320
321    #[test]
322    fn test_patch_wire() {
323        let patch = PatchWire {
324            body: Some(BodyWire::Json(serde_json::json!({"updated": true}))),
325            headers_set: vec![("x-new".into(), serde_json::json!("val"))],
326            headers_removed: vec!["x-old".into()],
327            properties_set: vec![("key".into(), serde_json::json!(42))],
328        };
329        let json = serde_json::to_string(&patch).unwrap();
330        let decoded: PatchWire = serde_json::from_str(&json).unwrap();
331        assert_eq!(patch, decoded);
332    }
333
334    #[test]
335    fn test_body_wire_serde_lowercase() {
336        let wire = BodyWire::Text("hello".into());
337        let json = serde_json::to_string(&wire).unwrap();
338        assert!(
339            json.contains("\"text\""),
340            "expected lowercase variant name, got: {json}"
341        );
342        assert!(
343            !json.contains("\"Text\""),
344            "should not have UpperCamelCase variant"
345        );
346        let decoded: BodyWire = serde_json::from_str(&json).unwrap();
347        assert_eq!(wire, decoded);
348    }
349
350    #[test]
351    fn test_body_wire_bytes_base64_roundtrip() {
352        let original_bytes = vec![0xDE, 0xAD, 0xBE, 0xEF, 0xCA, 0xFE];
353        let encoded = base64::engine::general_purpose::STANDARD.encode(&original_bytes);
354        let wire = BodyWire::Bytes(encoded.clone());
355
356        let json = serde_json::to_string(&wire).unwrap();
357        let decoded: BodyWire = serde_json::from_str(&json).unwrap();
358
359        if let BodyWire::Bytes(b64) = &decoded {
360            let roundtrip = base64::engine::general_purpose::STANDARD
361                .decode(b64)
362                .unwrap();
363            assert_eq!(roundtrip, original_bytes);
364        } else {
365            panic!("expected Bytes variant after roundtrip");
366        }
367
368        // Also verify to_body conversion
369        let body = wire.to_body();
370        if let camel_api::Body::Bytes(b) = body {
371            assert_eq!(b.to_vec(), original_bytes);
372        } else {
373            panic!("expected Body::Bytes from to_body()");
374        }
375    }
376
377    #[test]
378    fn test_body_wire_from_body_roundtrip() {
379        let bodies = vec![
380            ("Empty", camel_api::Body::Empty),
381            ("Text", camel_api::Body::Text("hello world".into())),
382            (
383                "Json",
384                camel_api::Body::Json(serde_json::json!({"key": "value"})),
385            ),
386            (
387                "Xml",
388                camel_api::Body::Xml("<root><item>1</item></root>".into()),
389            ),
390        ];
391
392        for (name, body) in bodies {
393            let wire = BodyWire::from_body(&body);
394            let roundtripped = wire.to_body();
395            assert_eq!(body, roundtripped, "roundtrip failed for {name}");
396        }
397
398        // Bytes need special handling since from_body base64-encodes
399        let original_bytes = vec![0xDE, 0xAD, 0xBE, 0xEF];
400        let body = camel_api::Body::Bytes(bytes::Bytes::from(original_bytes.clone()));
401        let wire = BodyWire::from_body(&body);
402        let roundtripped = wire.to_body();
403        if let camel_api::Body::Bytes(b) = roundtripped {
404            assert_eq!(b.to_vec(), original_bytes);
405        } else {
406            panic!("expected Body::Bytes after Bytes roundtrip");
407        }
408    }
409
410    #[test]
411    fn test_body_wire_from_body_stream_maps_to_empty() {
412        use camel_api::{StreamBody, StreamMetadata};
413        use futures::stream;
414
415        let chunks = vec![Ok(bytes::Bytes::from("stream data"))];
416        let stream_body = camel_api::Body::Stream(StreamBody {
417            stream: std::sync::Arc::new(tokio::sync::Mutex::new(Some(Box::pin(stream::iter(
418                chunks,
419            ))))),
420            metadata: StreamMetadata::default(),
421        });
422
423        let wire = BodyWire::from_body(&stream_body);
424        assert!(matches!(wire, BodyWire::Empty));
425    }
426
427    #[test]
428    fn test_body_wire_to_body_from_body_text() {
429        let wire = BodyWire::Text("hello world".into());
430        let body = wire.to_body();
431        let wire2 = BodyWire::from_body(&body);
432
433        assert!(matches!(wire2, BodyWire::Text(ref s) if s == "hello world"));
434    }
435}