Skip to main content

slim_bindings/
message_context.rs

1// Copyright AGNTCY Contributors (https://github.com/agntcy)
2// SPDX-License-Identifier: Apache-2.0
3
4use std::collections::HashMap;
5use std::sync::Arc;
6
7use slim_datapath::api::{ProtoMessage, ProtoPublishType};
8use slim_datapath::messages::Name as SlimName;
9
10use slim_session::SessionError;
11
12// Import the FFI Name type for use in MessageContext fields
13use crate::Name;
14
15/// Generic message context for language bindings (UniFFI-compatible)
16///
17/// Provides routing and descriptive metadata needed for replying,
18/// auditing, and instrumentation across different language bindings.
19/// This type is exported to foreign languages via UniFFI.
20#[derive(Debug, Clone, PartialEq, uniffi::Record)]
21pub struct MessageContext {
22    /// Fully-qualified sender identity
23    pub source_name: Arc<Name>,
24    /// Fully-qualified destination identity (may be empty for broadcast/group scenarios)
25    pub destination_name: Option<Arc<Name>>,
26    /// Logical/semantic type (defaults to "msg" if unspecified)
27    pub payload_type: String,
28    /// Arbitrary key/value pairs supplied by the sender (e.g. tracing IDs)
29    pub metadata: HashMap<String, String>,
30    /// Numeric identifier of the inbound connection carrying the message
31    pub input_connection: u64,
32    /// Identity contained in the message
33    pub identity: String,
34}
35
36impl MessageContext {
37    /// Create a new MessageContext
38    pub fn new(
39        source: Name,
40        destination: Option<Name>,
41        payload_type: String,
42        metadata: HashMap<String, String>,
43        input_connection: u64,
44        identity: String,
45    ) -> Self {
46        Self {
47            source_name: Arc::new(source),
48            destination_name: destination.map(Arc::new),
49            payload_type,
50            metadata,
51            input_connection,
52            identity,
53        }
54    }
55
56    /// Get the source name as a SlimName (for internal use)
57    pub fn source_as_slim_name(&self) -> SlimName {
58        self.source_name.as_ref().into()
59    }
60
61    /// Build a `MessageContext` plus the raw payload bytes from a low-level
62    /// `ProtoMessage`. Returns an error if the message type is unsupported
63    /// (i.e. not a publish payload).
64    ///
65    /// On success:
66    /// * The context captures source/destination identities
67    /// * `payload_type` defaults to "msg" if unset
68    /// * `metadata` is copied from the underlying protocol envelope
69    /// * The returned `Vec<u8>` is the raw application payload
70    pub fn from_proto_message(msg: ProtoMessage) -> Result<(Self, Vec<u8>), SessionError> {
71        let Some(ProtoPublishType(publish)) = msg.message_type.as_ref() else {
72            return Err(SessionError::MessageTypeUnexpected(Box::new(msg)));
73        };
74
75        // Convert SlimName to FFI Name
76        let source = Name::from(&msg.get_source());
77        let destination = Some(Name::from(&msg.get_dst()));
78        let input_connection = msg.get_incoming_conn();
79        let payload_bytes = publish
80            .msg
81            .as_ref()
82            .and_then(|c| c.as_application_payload().ok())
83            .map(|p| p.blob.clone())
84            .unwrap_or_default();
85        let payload_type = publish
86            .msg
87            .as_ref()
88            .and_then(|c| c.as_application_payload().ok())
89            .map(|p| {
90                if p.payload_type.is_empty() {
91                    "msg".to_string()
92                } else {
93                    p.payload_type.clone()
94                }
95            })
96            .unwrap_or_else(|| "msg".to_string());
97        let metadata = msg.get_metadata_map();
98        let identity = msg.get_identity();
99
100        let ctx = Self::new(
101            source,
102            destination,
103            payload_type,
104            metadata,
105            input_connection,
106            identity,
107        );
108        Ok((ctx, payload_bytes))
109    }
110}
111
112/// Received message containing context and payload
113#[derive(Debug, Clone, uniffi::Record)]
114pub struct ReceivedMessage {
115    pub context: MessageContext,
116    pub payload: Vec<u8>,
117}
118
119#[cfg(test)]
120mod tests {
121    use super::*;
122    use slim_datapath::api::{
123        ApplicationPayload, ProtoMessage, ProtoPublish, ProtoPublishType, SessionHeader, SlimHeader,
124    };
125    use std::collections::HashMap;
126
127    /// Helper to create FFI Name from string parts (matches what SlimName converts to)
128    fn ffi_name(parts: [&str; 3]) -> Arc<Name> {
129        Arc::new(Name::new(
130            parts[0].to_string(),
131            parts[1].to_string(),
132            parts[2].to_string(),
133        ))
134    }
135
136    /// Helper to create SlimName for ProtoMessage construction
137    fn slim_name(parts: [&str; 3]) -> SlimName {
138        SlimName::from_strings(parts)
139    }
140
141    // Helper function to create a test ProtoMessage with Publish type
142    fn create_test_proto_message(
143        source: SlimName,
144        dest: SlimName,
145        connection_id: u64,
146        payload: Vec<u8>,
147        content_type: String,
148        metadata: HashMap<String, String>,
149    ) -> ProtoMessage {
150        let content = ApplicationPayload::new(&content_type, payload).as_content();
151
152        let mut slim_header = SlimHeader::default();
153        slim_header.set_source(&source);
154        slim_header.set_destination(&dest);
155
156        let publish = ProtoPublish {
157            header: Some(slim_header),
158            session: Some(SessionHeader::default()),
159            msg: Some(content),
160        };
161
162        let mut proto_msg = ProtoMessage {
163            message_type: Some(ProtoPublishType(publish)),
164            metadata,
165        };
166
167        proto_msg.set_incoming_conn(Some(connection_id));
168
169        proto_msg
170    }
171
172    #[tokio::test]
173    async fn test_message_context_creation() {
174        let source = ffi_name(["org", "namespace", "sender"]);
175        let destination = Some(ffi_name(["org", "namespace", "receiver"]));
176        let mut metadata = HashMap::new();
177        metadata.insert("test".to_string(), "value".to_string());
178
179        let ctx = MessageContext::new(
180            source.as_ref().clone(),
181            destination.as_ref().map(|n| n.as_ref().clone()),
182            "application/json".to_string(),
183            metadata.clone(),
184            42,
185            "test-identity".to_string(),
186        );
187
188        assert_eq!(&ctx.source_name, &source);
189        assert_eq!(&ctx.destination_name, &destination);
190        assert_eq!(ctx.payload_type, "application/json");
191        assert_eq!(ctx.metadata, metadata);
192        assert_eq!(ctx.input_connection, 42);
193        assert_eq!(ctx.identity, "test-identity");
194    }
195
196    #[tokio::test]
197    async fn test_from_proto_message_success() {
198        // Create test data
199        let payload_data = b"test payload".to_vec();
200        let content_type = "application/json".to_string();
201        let source_slim = slim_name(["org", "sender", "service"]);
202        let dest_slim = slim_name(["org", "receiver", "service"]);
203        let connection_id = 12345u64;
204        let identity = "test-identity".to_string();
205
206        // Expected FFI names
207        let source_ffi = ffi_name(["org", "sender", "service"]);
208        let dest_ffi = ffi_name(["org", "receiver", "service"]);
209
210        // Create metadata
211        let mut metadata = HashMap::new();
212        metadata.insert("trace_id".to_string(), "abc123".to_string());
213        metadata.insert("user_id".to_string(), "user456".to_string());
214
215        let mut proto_msg = create_test_proto_message(
216            source_slim,
217            dest_slim,
218            connection_id,
219            payload_data.clone(),
220            content_type.clone(),
221            metadata.clone(),
222        );
223
224        // set identity
225        proto_msg
226            .get_slim_header_mut()
227            .set_identity(identity.clone());
228        // Test from_proto_message
229        let result = MessageContext::from_proto_message(proto_msg);
230        assert!(result.is_ok());
231
232        let (ctx, payload) = result.unwrap();
233
234        // Verify context fields (comparing FFI Names)
235        assert_eq!(&ctx.source_name, &source_ffi);
236        assert_eq!(&ctx.destination_name, &Some(dest_ffi));
237        assert_eq!(ctx.payload_type, content_type);
238        assert_eq!(ctx.metadata, metadata);
239        assert_eq!(ctx.input_connection, connection_id);
240        assert_eq!(ctx.identity, identity);
241
242        // Verify payload
243        assert_eq!(payload, payload_data);
244    }
245
246    #[tokio::test]
247    async fn test_from_proto_message_with_default_content_type() {
248        let source_slim = slim_name(["org", "sender", "service"]);
249        let dest_slim = slim_name(["org", "receiver", "service"]);
250        let payload_data = b"test payload".to_vec();
251
252        let proto_msg = create_test_proto_message(
253            source_slim,
254            dest_slim,
255            42,
256            payload_data,
257            String::new(), // Empty content type should default to "msg"
258            HashMap::new(),
259        );
260
261        let result = MessageContext::from_proto_message(proto_msg);
262        assert!(result.is_ok());
263
264        let (ctx, _) = result.unwrap();
265        assert_eq!(ctx.payload_type, "msg"); // Should default to "msg"
266    }
267
268    #[tokio::test]
269    async fn test_from_proto_message_with_no_content() {
270        let source_slim = slim_name(["org", "sender", "service"]);
271        let dest_slim = slim_name(["org", "receiver", "service"]);
272
273        // Create ProtoPublish without msg content
274        let mut slim_header = SlimHeader::default();
275        slim_header.set_source(&source_slim);
276        slim_header.set_destination(&dest_slim);
277
278        let publish = ProtoPublish {
279            header: Some(slim_header),
280            session: Some(SessionHeader::default()),
281            msg: None, // No content
282        };
283
284        let mut proto_msg = ProtoMessage {
285            message_type: Some(ProtoPublishType(publish)),
286            ..Default::default()
287        };
288        proto_msg.set_incoming_conn(Some(42));
289
290        let result = MessageContext::from_proto_message(proto_msg);
291        assert!(result.is_ok());
292
293        let (ctx, payload) = result.unwrap();
294        assert_eq!(ctx.payload_type, "msg"); // Should default to "msg"
295        assert_eq!(payload, Vec::<u8>::new()); // Should be empty payload
296    }
297
298    #[tokio::test]
299    async fn test_from_proto_message_unsupported_message_type() {
300        // Create ProtoMessage without ProtoPublishType
301        let proto_msg = ProtoMessage {
302            message_type: None, // Unsupported type
303            ..Default::default()
304        };
305
306        let result = MessageContext::from_proto_message(proto_msg);
307        assert!(result.is_err_and(|e| matches!(e, SessionError::MessageTypeUnexpected(_))));
308    }
309
310    #[tokio::test]
311    async fn test_from_proto_message_with_empty_metadata() {
312        let source_slim = slim_name(["test", "source", "v1"]);
313        let dest_slim = slim_name(["test", "dest", "v1"]);
314        let source_ffi = ffi_name(["test", "source", "v1"]);
315        let dest_ffi = ffi_name(["test", "dest", "v1"]);
316        let payload_data = b"test".to_vec();
317
318        let proto_msg = create_test_proto_message(
319            source_slim,
320            dest_slim,
321            99,
322            payload_data.clone(),
323            "text/plain".to_string(),
324            HashMap::new(), // Empty metadata
325        );
326
327        let result = MessageContext::from_proto_message(proto_msg);
328        assert!(result.is_ok());
329
330        let (ctx, payload) = result.unwrap();
331        assert_eq!(&ctx.source_name, &source_ffi);
332        assert_eq!(&ctx.destination_name, &Some(dest_ffi));
333        assert_eq!(ctx.payload_type, "text/plain");
334        assert_eq!(ctx.metadata, HashMap::new()); // Should be empty
335        assert_eq!(ctx.input_connection, 99);
336        assert_eq!(payload, payload_data);
337    }
338
339    /// Test ReceivedMessage creation
340    #[test]
341    fn test_received_message() {
342        let msg = ReceivedMessage {
343            context: MessageContext::new(
344                Name::new_with_id("org".to_string(), "ns".to_string(), "app".to_string(), 123),
345                Some(Name::new_with_id(
346                    "org".to_string(),
347                    "ns".to_string(),
348                    "dest".to_string(),
349                    456,
350                )),
351                "application/json".to_string(),
352                std::collections::HashMap::new(),
353                789,
354                "test-identity".to_string(),
355            ),
356            payload: b"hello world".to_vec(),
357        };
358
359        assert_eq!(msg.payload, b"hello world");
360        assert_eq!(msg.context.input_connection, 789);
361        assert_eq!(msg.context.payload_type, "application/json");
362        assert_eq!(msg.context.identity, "test-identity");
363    }
364}