1use std::collections::HashMap;
5use std::sync::Arc;
6
7use slim_datapath::api::{ProtoMessage, ProtoPublishType};
8use slim_datapath::messages::Name as SlimName;
9
10use slim_session::SessionError;
11
12use crate::Name;
14
15#[derive(Debug, Clone, PartialEq, uniffi::Record)]
21pub struct MessageContext {
22 pub source_name: Arc<Name>,
24 pub destination_name: Option<Arc<Name>>,
26 pub payload_type: String,
28 pub metadata: HashMap<String, String>,
30 pub input_connection: u64,
32 pub identity: String,
34}
35
36impl MessageContext {
37 pub fn new(
39 source: Name,
40 destination: Option<Name>,
41 payload_type: String,
42 metadata: HashMap<String, String>,
43 input_connection: u64,
44 identity: String,
45 ) -> Self {
46 Self {
47 source_name: Arc::new(source),
48 destination_name: destination.map(Arc::new),
49 payload_type,
50 metadata,
51 input_connection,
52 identity,
53 }
54 }
55
56 pub fn source_as_slim_name(&self) -> SlimName {
58 self.source_name.as_ref().into()
59 }
60
61 pub fn from_proto_message(msg: ProtoMessage) -> Result<(Self, Vec<u8>), SessionError> {
71 let Some(ProtoPublishType(publish)) = msg.message_type.as_ref() else {
72 return Err(SessionError::MessageTypeUnexpected(Box::new(msg)));
73 };
74
75 let source = Name::from(&msg.get_source());
77 let destination = Some(Name::from(&msg.get_dst()));
78 let input_connection = msg.get_incoming_conn();
79 let payload_bytes = publish
80 .msg
81 .as_ref()
82 .and_then(|c| c.as_application_payload().ok())
83 .map(|p| p.blob.clone())
84 .unwrap_or_default();
85 let payload_type = publish
86 .msg
87 .as_ref()
88 .and_then(|c| c.as_application_payload().ok())
89 .map(|p| {
90 if p.payload_type.is_empty() {
91 "msg".to_string()
92 } else {
93 p.payload_type.clone()
94 }
95 })
96 .unwrap_or_else(|| "msg".to_string());
97 let metadata = msg.get_metadata_map();
98 let identity = msg.get_identity();
99
100 let ctx = Self::new(
101 source,
102 destination,
103 payload_type,
104 metadata,
105 input_connection,
106 identity,
107 );
108 Ok((ctx, payload_bytes))
109 }
110}
111
112#[derive(Debug, Clone, uniffi::Record)]
114pub struct ReceivedMessage {
115 pub context: MessageContext,
116 pub payload: Vec<u8>,
117}
118
119#[cfg(test)]
120mod tests {
121 use super::*;
122 use slim_datapath::api::{
123 ApplicationPayload, ProtoMessage, ProtoPublish, ProtoPublishType, SessionHeader, SlimHeader,
124 };
125 use std::collections::HashMap;
126
127 fn ffi_name(parts: [&str; 3]) -> Arc<Name> {
129 Arc::new(Name::new(
130 parts[0].to_string(),
131 parts[1].to_string(),
132 parts[2].to_string(),
133 ))
134 }
135
136 fn slim_name(parts: [&str; 3]) -> SlimName {
138 SlimName::from_strings(parts)
139 }
140
141 fn create_test_proto_message(
143 source: SlimName,
144 dest: SlimName,
145 connection_id: u64,
146 payload: Vec<u8>,
147 content_type: String,
148 metadata: HashMap<String, String>,
149 ) -> ProtoMessage {
150 let content = ApplicationPayload::new(&content_type, payload).as_content();
151
152 let mut slim_header = SlimHeader::default();
153 slim_header.set_source(&source);
154 slim_header.set_destination(&dest);
155
156 let publish = ProtoPublish {
157 header: Some(slim_header),
158 session: Some(SessionHeader::default()),
159 msg: Some(content),
160 };
161
162 let mut proto_msg = ProtoMessage {
163 message_type: Some(ProtoPublishType(publish)),
164 metadata,
165 };
166
167 proto_msg.set_incoming_conn(Some(connection_id));
168
169 proto_msg
170 }
171
172 #[tokio::test]
173 async fn test_message_context_creation() {
174 let source = ffi_name(["org", "namespace", "sender"]);
175 let destination = Some(ffi_name(["org", "namespace", "receiver"]));
176 let mut metadata = HashMap::new();
177 metadata.insert("test".to_string(), "value".to_string());
178
179 let ctx = MessageContext::new(
180 source.as_ref().clone(),
181 destination.as_ref().map(|n| n.as_ref().clone()),
182 "application/json".to_string(),
183 metadata.clone(),
184 42,
185 "test-identity".to_string(),
186 );
187
188 assert_eq!(&ctx.source_name, &source);
189 assert_eq!(&ctx.destination_name, &destination);
190 assert_eq!(ctx.payload_type, "application/json");
191 assert_eq!(ctx.metadata, metadata);
192 assert_eq!(ctx.input_connection, 42);
193 assert_eq!(ctx.identity, "test-identity");
194 }
195
196 #[tokio::test]
197 async fn test_from_proto_message_success() {
198 let payload_data = b"test payload".to_vec();
200 let content_type = "application/json".to_string();
201 let source_slim = slim_name(["org", "sender", "service"]);
202 let dest_slim = slim_name(["org", "receiver", "service"]);
203 let connection_id = 12345u64;
204 let identity = "test-identity".to_string();
205
206 let source_ffi = ffi_name(["org", "sender", "service"]);
208 let dest_ffi = ffi_name(["org", "receiver", "service"]);
209
210 let mut metadata = HashMap::new();
212 metadata.insert("trace_id".to_string(), "abc123".to_string());
213 metadata.insert("user_id".to_string(), "user456".to_string());
214
215 let mut proto_msg = create_test_proto_message(
216 source_slim,
217 dest_slim,
218 connection_id,
219 payload_data.clone(),
220 content_type.clone(),
221 metadata.clone(),
222 );
223
224 proto_msg
226 .get_slim_header_mut()
227 .set_identity(identity.clone());
228 let result = MessageContext::from_proto_message(proto_msg);
230 assert!(result.is_ok());
231
232 let (ctx, payload) = result.unwrap();
233
234 assert_eq!(&ctx.source_name, &source_ffi);
236 assert_eq!(&ctx.destination_name, &Some(dest_ffi));
237 assert_eq!(ctx.payload_type, content_type);
238 assert_eq!(ctx.metadata, metadata);
239 assert_eq!(ctx.input_connection, connection_id);
240 assert_eq!(ctx.identity, identity);
241
242 assert_eq!(payload, payload_data);
244 }
245
246 #[tokio::test]
247 async fn test_from_proto_message_with_default_content_type() {
248 let source_slim = slim_name(["org", "sender", "service"]);
249 let dest_slim = slim_name(["org", "receiver", "service"]);
250 let payload_data = b"test payload".to_vec();
251
252 let proto_msg = create_test_proto_message(
253 source_slim,
254 dest_slim,
255 42,
256 payload_data,
257 String::new(), HashMap::new(),
259 );
260
261 let result = MessageContext::from_proto_message(proto_msg);
262 assert!(result.is_ok());
263
264 let (ctx, _) = result.unwrap();
265 assert_eq!(ctx.payload_type, "msg"); }
267
268 #[tokio::test]
269 async fn test_from_proto_message_with_no_content() {
270 let source_slim = slim_name(["org", "sender", "service"]);
271 let dest_slim = slim_name(["org", "receiver", "service"]);
272
273 let mut slim_header = SlimHeader::default();
275 slim_header.set_source(&source_slim);
276 slim_header.set_destination(&dest_slim);
277
278 let publish = ProtoPublish {
279 header: Some(slim_header),
280 session: Some(SessionHeader::default()),
281 msg: None, };
283
284 let mut proto_msg = ProtoMessage {
285 message_type: Some(ProtoPublishType(publish)),
286 ..Default::default()
287 };
288 proto_msg.set_incoming_conn(Some(42));
289
290 let result = MessageContext::from_proto_message(proto_msg);
291 assert!(result.is_ok());
292
293 let (ctx, payload) = result.unwrap();
294 assert_eq!(ctx.payload_type, "msg"); assert_eq!(payload, Vec::<u8>::new()); }
297
298 #[tokio::test]
299 async fn test_from_proto_message_unsupported_message_type() {
300 let proto_msg = ProtoMessage {
302 message_type: None, ..Default::default()
304 };
305
306 let result = MessageContext::from_proto_message(proto_msg);
307 assert!(result.is_err_and(|e| matches!(e, SessionError::MessageTypeUnexpected(_))));
308 }
309
310 #[tokio::test]
311 async fn test_from_proto_message_with_empty_metadata() {
312 let source_slim = slim_name(["test", "source", "v1"]);
313 let dest_slim = slim_name(["test", "dest", "v1"]);
314 let source_ffi = ffi_name(["test", "source", "v1"]);
315 let dest_ffi = ffi_name(["test", "dest", "v1"]);
316 let payload_data = b"test".to_vec();
317
318 let proto_msg = create_test_proto_message(
319 source_slim,
320 dest_slim,
321 99,
322 payload_data.clone(),
323 "text/plain".to_string(),
324 HashMap::new(), );
326
327 let result = MessageContext::from_proto_message(proto_msg);
328 assert!(result.is_ok());
329
330 let (ctx, payload) = result.unwrap();
331 assert_eq!(&ctx.source_name, &source_ffi);
332 assert_eq!(&ctx.destination_name, &Some(dest_ffi));
333 assert_eq!(ctx.payload_type, "text/plain");
334 assert_eq!(ctx.metadata, HashMap::new()); assert_eq!(ctx.input_connection, 99);
336 assert_eq!(payload, payload_data);
337 }
338
339 #[test]
341 fn test_received_message() {
342 let msg = ReceivedMessage {
343 context: MessageContext::new(
344 Name::new_with_id("org".to_string(), "ns".to_string(), "app".to_string(), 123),
345 Some(Name::new_with_id(
346 "org".to_string(),
347 "ns".to_string(),
348 "dest".to_string(),
349 456,
350 )),
351 "application/json".to_string(),
352 std::collections::HashMap::new(),
353 789,
354 "test-identity".to_string(),
355 ),
356 payload: b"hello world".to_vec(),
357 };
358
359 assert_eq!(msg.payload, b"hello world");
360 assert_eq!(msg.context.input_connection, 789);
361 assert_eq!(msg.context.payload_type, "application/json");
362 assert_eq!(msg.context.identity, "test-identity");
363 }
364}