1use crate::{ContentType, codec::encode_message};
4use bytes::Bytes;
5use once_cell::sync::Lazy;
6use synapse_primitives::{PackedVersion, Uuid};
7use synapse_proto::{
8 HealthPull, HealthResponse, HttpEndpointDeregister, HttpEndpointRegister, HttpRegistrationAck,
9 InterfaceDeregister, InterfaceRegister, MessageKind, RegistrationAck, RpcRequest, RpcResponse,
10 RpcStatus, SynapseMessage, synapse_message,
11};
12
13pub static PROTOCOL_VERSION: Lazy<PackedVersion> =
15 Lazy::new(|| PackedVersion::new(0, 0, 1).expect("valid version"));
16
17pub const MAX_RPC_PAYLOAD_SIZE: usize = 1024 * 1024;
19
20pub const MAX_BATCH_PAYLOAD_SIZE: usize = 4 * 1024 * 1024;
22
23pub fn create_rpc_request(
27 request_id: Uuid,
28 rpc_request: RpcRequest,
29 content_type: ContentType,
30) -> anyhow::Result<Bytes> {
31 if rpc_request.payload.len() > MAX_RPC_PAYLOAD_SIZE {
32 anyhow::bail!(
33 "RPC request payload too large: {} bytes (max {})",
34 rpc_request.payload.len(),
35 MAX_RPC_PAYLOAD_SIZE
36 );
37 }
38
39 let msg = SynapseMessage {
40 protocol_version: PROTOCOL_VERSION.as_u32(),
41 kind: MessageKind::RpcRequest as i32,
42 request_id: Bytes::copy_from_slice(request_id.as_bytes()),
43 message: Some(synapse_message::Message::RpcRequest(rpc_request)),
44 };
45
46 encode_message(&msg, content_type)
47}
48
49pub fn create_rpc_response(
51 request_id: &[u8],
52 rpc_response: RpcResponse,
53 content_type: ContentType,
54) -> anyhow::Result<Bytes> {
55 let msg = SynapseMessage {
56 protocol_version: PROTOCOL_VERSION.as_u32(),
57 kind: MessageKind::RpcResponse as i32,
58 request_id: Bytes::copy_from_slice(request_id),
59 message: Some(synapse_message::Message::RpcResponse(rpc_response)),
60 };
61
62 encode_message(&msg, content_type)
63}
64
65pub fn ok_response(payload: Bytes) -> RpcResponse {
67 RpcResponse {
68 status: RpcStatus::Ok as i32,
69 payload,
70 error: None,
71 headers: vec![],
72 responded_at_unix_ms: chrono::Utc::now().timestamp_millis(),
73 }
74}
75
76pub fn error_response(status: RpcStatus, code: u32, message: impl Into<String>) -> RpcResponse {
78 RpcResponse {
79 status: status as i32,
80 payload: Bytes::new(),
81 error: Some(synapse_proto::RpcError {
82 code,
83 message: message.into(),
84 details: vec![],
85 }),
86 headers: vec![],
87 responded_at_unix_ms: chrono::Utc::now().timestamp_millis(),
88 }
89}
90
91pub fn extract_rpc_request(msg: SynapseMessage) -> Option<(Bytes, RpcRequest)> {
93 match msg.message {
94 Some(synapse_message::Message::RpcRequest(req)) => Some((msg.request_id, req)),
95 _ => None,
96 }
97}
98
99pub fn extract_rpc_response(msg: SynapseMessage) -> Option<(Bytes, RpcResponse)> {
101 match msg.message {
102 Some(synapse_message::Message::RpcResponse(resp)) => Some((msg.request_id, resp)),
103 _ => None,
104 }
105}
106
107pub fn create_interface_register(
113 request_id: Uuid,
114 register: InterfaceRegister,
115 content_type: ContentType,
116) -> anyhow::Result<Bytes> {
117 let msg = SynapseMessage {
118 protocol_version: PROTOCOL_VERSION.as_u32(),
119 kind: MessageKind::InterfaceRegister as i32,
120 request_id: Bytes::copy_from_slice(request_id.as_bytes()),
121 message: Some(synapse_message::Message::InterfaceRegister(register)),
122 };
123 encode_message(&msg, content_type)
124}
125
126pub fn create_interface_deregister(
128 request_id: Uuid,
129 deregister: InterfaceDeregister,
130 content_type: ContentType,
131) -> anyhow::Result<Bytes> {
132 let msg = SynapseMessage {
133 protocol_version: PROTOCOL_VERSION.as_u32(),
134 kind: MessageKind::InterfaceDeregister as i32,
135 request_id: Bytes::copy_from_slice(request_id.as_bytes()),
136 message: Some(synapse_message::Message::InterfaceDeregister(deregister)),
137 };
138 encode_message(&msg, content_type)
139}
140
141pub fn create_registration_ack(
143 request_id: &[u8],
144 success: bool,
145 error_message: Option<String>,
146 content_type: ContentType,
147) -> anyhow::Result<Bytes> {
148 let msg = SynapseMessage {
149 protocol_version: PROTOCOL_VERSION.as_u32(),
150 kind: MessageKind::RegistrationAck as i32,
151 request_id: Bytes::copy_from_slice(request_id),
152 message: Some(synapse_message::Message::RegistrationAck(RegistrationAck {
153 success,
154 error_message: error_message.unwrap_or_default(),
155 })),
156 };
157 encode_message(&msg, content_type)
158}
159
160pub fn extract_interface_register(msg: SynapseMessage) -> Option<(Bytes, InterfaceRegister)> {
162 match msg.message {
163 Some(synapse_message::Message::InterfaceRegister(reg)) => Some((msg.request_id, reg)),
164 _ => None,
165 }
166}
167
168pub fn extract_interface_deregister(msg: SynapseMessage) -> Option<(Bytes, InterfaceDeregister)> {
170 match msg.message {
171 Some(synapse_message::Message::InterfaceDeregister(dereg)) => Some((msg.request_id, dereg)),
172 _ => None,
173 }
174}
175
176pub fn extract_registration_ack(msg: SynapseMessage) -> Option<(Bytes, RegistrationAck)> {
178 match msg.message {
179 Some(synapse_message::Message::RegistrationAck(ack)) => Some((msg.request_id, ack)),
180 _ => None,
181 }
182}
183
184pub fn create_health_pull(request_id: Uuid, content_type: ContentType) -> anyhow::Result<Bytes> {
190 let msg = SynapseMessage {
191 protocol_version: PROTOCOL_VERSION.as_u32(),
192 kind: MessageKind::HealthPull as i32,
193 request_id: Bytes::copy_from_slice(request_id.as_bytes()),
194 message: Some(synapse_message::Message::HealthPull(HealthPull {
195 pull: true,
196 })),
197 };
198 encode_message(&msg, content_type)
199}
200
201pub fn create_health_response(
203 request_id: &[u8],
204 response: HealthResponse,
205 content_type: ContentType,
206) -> anyhow::Result<Bytes> {
207 let msg = SynapseMessage {
208 protocol_version: PROTOCOL_VERSION.as_u32(),
209 kind: MessageKind::HealthResponse as i32,
210 request_id: Bytes::copy_from_slice(request_id),
211 message: Some(synapse_message::Message::HealthResponse(response)),
212 };
213 encode_message(&msg, content_type)
214}
215
216pub fn extract_health_pull(msg: SynapseMessage) -> Option<(Bytes, HealthPull)> {
218 match msg.message {
219 Some(synapse_message::Message::HealthPull(pull)) => Some((msg.request_id, pull)),
220 _ => None,
221 }
222}
223
224pub fn extract_health_response(msg: SynapseMessage) -> Option<(Bytes, HealthResponse)> {
226 match msg.message {
227 Some(synapse_message::Message::HealthResponse(resp)) => Some((msg.request_id, resp)),
228 _ => None,
229 }
230}
231
232pub fn create_http_endpoint_register(
238 request_id: Uuid,
239 register: HttpEndpointRegister,
240 content_type: ContentType,
241) -> anyhow::Result<Bytes> {
242 let msg = SynapseMessage {
243 protocol_version: PROTOCOL_VERSION.as_u32(),
244 kind: MessageKind::HttpEndpointRegister as i32,
245 request_id: Bytes::copy_from_slice(request_id.as_bytes()),
246 message: Some(synapse_message::Message::HttpEndpointRegister(register)),
247 };
248 encode_message(&msg, content_type)
249}
250
251pub fn create_http_endpoint_deregister(
253 request_id: Uuid,
254 deregister: HttpEndpointDeregister,
255 content_type: ContentType,
256) -> anyhow::Result<Bytes> {
257 let msg = SynapseMessage {
258 protocol_version: PROTOCOL_VERSION.as_u32(),
259 kind: MessageKind::HttpEndpointDeregister as i32,
260 request_id: Bytes::copy_from_slice(request_id.as_bytes()),
261 message: Some(synapse_message::Message::HttpEndpointDeregister(deregister)),
262 };
263 encode_message(&msg, content_type)
264}
265
266pub fn create_http_registration_ack(
268 request_id: &[u8],
269 success: bool,
270 error_message: Option<String>,
271 content_type: ContentType,
272) -> anyhow::Result<Bytes> {
273 let msg = SynapseMessage {
274 protocol_version: PROTOCOL_VERSION.as_u32(),
275 kind: MessageKind::HttpRegistrationAck as i32,
276 request_id: Bytes::copy_from_slice(request_id),
277 message: Some(synapse_message::Message::HttpRegistrationAck(
278 HttpRegistrationAck {
279 success,
280 error_message: error_message.unwrap_or_default(),
281 },
282 )),
283 };
284 encode_message(&msg, content_type)
285}
286
287pub fn extract_http_registration_ack(msg: SynapseMessage) -> Option<(Bytes, HttpRegistrationAck)> {
289 match msg.message {
290 Some(synapse_message::Message::HttpRegistrationAck(ack)) => Some((msg.request_id, ack)),
291 _ => None,
292 }
293}
294
295pub enum ParsedMessage {
301 RpcRequest {
302 request_id: Bytes,
303 request: RpcRequest,
304 },
305 RpcResponse {
306 request_id: Bytes,
307 response: RpcResponse,
308 },
309 InterfaceRegister {
310 request_id: Bytes,
311 register: InterfaceRegister,
312 },
313 InterfaceDeregister {
314 request_id: Bytes,
315 deregister: InterfaceDeregister,
316 },
317 RegistrationAck {
318 request_id: Bytes,
319 ack: RegistrationAck,
320 },
321 HealthPull {
322 request_id: Bytes,
323 },
324 HealthResponse {
325 request_id: Bytes,
326 response: HealthResponse,
327 },
328 HttpEndpointRegister {
329 request_id: Bytes,
330 register: HttpEndpointRegister,
331 },
332 HttpEndpointDeregister {
333 request_id: Bytes,
334 deregister: HttpEndpointDeregister,
335 },
336 HttpRegistrationAck {
337 request_id: Bytes,
338 ack: HttpRegistrationAck,
339 },
340 Unknown {
341 kind: i32,
342 },
343}
344
345pub fn parse_message(msg: SynapseMessage) -> ParsedMessage {
347 let request_id = msg.request_id.clone();
348
349 match msg.message {
350 Some(synapse_message::Message::RpcRequest(req)) => ParsedMessage::RpcRequest {
351 request_id,
352 request: req,
353 },
354 Some(synapse_message::Message::RpcResponse(resp)) => ParsedMessage::RpcResponse {
355 request_id,
356 response: resp,
357 },
358 Some(synapse_message::Message::InterfaceRegister(reg)) => {
359 ParsedMessage::InterfaceRegister {
360 request_id,
361 register: reg,
362 }
363 }
364 Some(synapse_message::Message::InterfaceDeregister(dereg)) => {
365 ParsedMessage::InterfaceDeregister {
366 request_id,
367 deregister: dereg,
368 }
369 }
370 Some(synapse_message::Message::RegistrationAck(ack)) => {
371 ParsedMessage::RegistrationAck { request_id, ack }
372 }
373 Some(synapse_message::Message::HealthPull(_)) => ParsedMessage::HealthPull { request_id },
374 Some(synapse_message::Message::HealthResponse(resp)) => ParsedMessage::HealthResponse {
375 request_id,
376 response: resp,
377 },
378 Some(synapse_message::Message::HttpEndpointRegister(reg)) => {
379 ParsedMessage::HttpEndpointRegister {
380 request_id,
381 register: reg,
382 }
383 }
384 Some(synapse_message::Message::HttpEndpointDeregister(dereg)) => {
385 ParsedMessage::HttpEndpointDeregister {
386 request_id,
387 deregister: dereg,
388 }
389 }
390 Some(synapse_message::Message::HttpRegistrationAck(ack)) => {
391 ParsedMessage::HttpRegistrationAck { request_id, ack }
392 }
393 _ => ParsedMessage::Unknown { kind: msg.kind },
394 }
395}
396
397#[cfg(test)]
398mod tests {
399 use super::*;
400 use synapse_primitives::Uuid;
401
402 fn test_request_id() -> Uuid {
403 Uuid::new_v4()
404 }
405
406 fn test_rpc_request() -> RpcRequest {
407 RpcRequest {
408 interface_id: 42,
409 method_id: 7,
410 headers: vec![],
411 payload: Bytes::from("test payload"),
412 sent_at_unix_ms: 1000,
413 }
414 }
415
416 #[test]
419 fn test_ok_response_status() {
420 let resp = ok_response(Bytes::from("data"));
421 assert_eq!(resp.status, RpcStatus::Ok as i32);
422 }
423
424 #[test]
425 fn test_ok_response_payload_preserved() {
426 let payload = Bytes::from("my payload");
427 let resp = ok_response(payload.clone());
428 assert_eq!(resp.payload, payload);
429 }
430
431 #[test]
432 fn test_ok_response_no_error() {
433 let resp = ok_response(Bytes::new());
434 assert!(resp.error.is_none());
435 }
436
437 #[test]
438 fn test_ok_response_timestamp_set() {
439 let resp = ok_response(Bytes::new());
440 assert!(resp.responded_at_unix_ms > 0);
441 }
442
443 #[test]
444 fn test_error_response_status() {
445 let resp = error_response(RpcStatus::InvalidRequest, 400, "bad input");
446 assert_eq!(resp.status, RpcStatus::InvalidRequest as i32);
447 }
448
449 #[test]
450 fn test_error_response_code_and_message() {
451 let resp = error_response(RpcStatus::Error, 3001, "custom error");
452 let err = resp.error.unwrap();
453 assert_eq!(err.code, 3001);
454 assert_eq!(err.message, "custom error");
455 }
456
457 #[test]
458 fn test_error_response_empty_payload() {
459 let resp = error_response(RpcStatus::Timeout, 504, "timed out");
460 assert!(resp.payload.is_empty());
461 }
462
463 #[test]
466 fn test_create_rpc_request_roundtrip_json() {
467 let id = test_request_id();
468 let req = test_rpc_request();
469 let encoded = create_rpc_request(id, req.clone(), ContentType::Json).unwrap();
470 let decoded: SynapseMessage = serde_json::from_slice(&encoded).unwrap();
471
472 assert_eq!(decoded.protocol_version, PROTOCOL_VERSION.as_u32());
473 assert_eq!(decoded.kind, MessageKind::RpcRequest as i32);
474 if let Some(synapse_message::Message::RpcRequest(inner)) = decoded.message {
475 assert_eq!(inner.interface_id, 42);
476 assert_eq!(inner.method_id, 7);
477 } else {
478 panic!("Expected RpcRequest message");
479 }
480 }
481
482 #[test]
483 fn test_create_rpc_request_roundtrip_protobuf() {
484 use prost::Message;
485 let id = test_request_id();
486 let req = test_rpc_request();
487 let encoded = create_rpc_request(id, req, ContentType::Protobuf).unwrap();
488 let decoded = SynapseMessage::decode(encoded).unwrap();
489
490 assert_eq!(decoded.kind, MessageKind::RpcRequest as i32);
491 assert!(matches!(
492 decoded.message,
493 Some(synapse_message::Message::RpcRequest(_))
494 ));
495 }
496
497 #[test]
498 fn test_create_rpc_response_roundtrip() {
499 let id = Uuid::new_v4();
500 let resp = ok_response(Bytes::from("response data"));
501 let encoded = create_rpc_response(id.as_bytes(), resp, ContentType::Json).unwrap();
502 let decoded: SynapseMessage = serde_json::from_slice(&encoded).unwrap();
503
504 assert_eq!(decoded.kind, MessageKind::RpcResponse as i32);
505 if let Some(synapse_message::Message::RpcResponse(inner)) = decoded.message {
506 assert_eq!(inner.status, RpcStatus::Ok as i32);
507 } else {
508 panic!("Expected RpcResponse message");
509 }
510 }
511
512 #[test]
513 fn test_create_interface_register_roundtrip() {
514 let id = test_request_id();
515 let register = InterfaceRegister {
516 interface_id: 100,
517 interface_version: 1_000_000,
518 method_ids: vec![1, 2, 3],
519 instance_id: Bytes::from(vec![0u8; 16]),
520 capabilities: None,
521 service_name: "test-svc".to_string(),
522 interface_name: "test.Interface".to_string(),
523 http_endpoint: "http://localhost:9001/rpc".to_string(),
524 method_names: vec!["A".into(), "B".into(), "C".into()],
525 };
526 let encoded = create_interface_register(id, register, ContentType::Json).unwrap();
527 let decoded: SynapseMessage = serde_json::from_slice(&encoded).unwrap();
528
529 assert_eq!(decoded.kind, MessageKind::InterfaceRegister as i32);
530 assert!(matches!(
531 decoded.message,
532 Some(synapse_message::Message::InterfaceRegister(_))
533 ));
534 }
535
536 #[test]
537 fn test_create_registration_ack_success() {
538 let id = Uuid::new_v4();
539 let encoded =
540 create_registration_ack(id.as_bytes(), true, None, ContentType::Json).unwrap();
541 let decoded: SynapseMessage = serde_json::from_slice(&encoded).unwrap();
542
543 if let Some(synapse_message::Message::RegistrationAck(ack)) = decoded.message {
544 assert!(ack.success);
545 assert!(ack.error_message.is_empty());
546 } else {
547 panic!("Expected RegistrationAck");
548 }
549 }
550
551 #[test]
552 fn test_create_registration_ack_failure() {
553 let id = Uuid::new_v4();
554 let encoded = create_registration_ack(
555 id.as_bytes(),
556 false,
557 Some("duplicate interface".to_string()),
558 ContentType::Json,
559 )
560 .unwrap();
561 let decoded: SynapseMessage = serde_json::from_slice(&encoded).unwrap();
562
563 if let Some(synapse_message::Message::RegistrationAck(ack)) = decoded.message {
564 assert!(!ack.success);
565 assert_eq!(ack.error_message, "duplicate interface");
566 } else {
567 panic!("Expected RegistrationAck");
568 }
569 }
570
571 #[test]
572 fn test_create_health_pull_roundtrip() {
573 let id = test_request_id();
574 let encoded = create_health_pull(id, ContentType::Json).unwrap();
575 let decoded: SynapseMessage = serde_json::from_slice(&encoded).unwrap();
576
577 assert_eq!(decoded.kind, MessageKind::HealthPull as i32);
578 assert!(matches!(
579 decoded.message,
580 Some(synapse_message::Message::HealthPull(_))
581 ));
582 }
583
584 #[test]
585 fn test_create_health_response_roundtrip() {
586 let id = Uuid::new_v4();
587 let health = HealthResponse {
588 instance_id: Bytes::from(vec![0u8; 16]),
589 status: 1,
590 version: "0.1.0".to_string(),
591 uptime_ms: 5000,
592 message: "all good".to_string(),
593 };
594 let encoded = create_health_response(id.as_bytes(), health, ContentType::Json).unwrap();
595 let decoded: SynapseMessage = serde_json::from_slice(&encoded).unwrap();
596
597 assert_eq!(decoded.kind, MessageKind::HealthResponse as i32);
598 if let Some(synapse_message::Message::HealthResponse(resp)) = decoded.message {
599 assert_eq!(resp.version, "0.1.0");
600 assert_eq!(resp.uptime_ms, 5000);
601 } else {
602 panic!("Expected HealthResponse");
603 }
604 }
605
606 fn make_synapse_msg(kind: MessageKind, message: synapse_message::Message) -> SynapseMessage {
609 SynapseMessage {
610 protocol_version: PROTOCOL_VERSION.as_u32(),
611 kind: kind as i32,
612 request_id: Bytes::from(vec![0u8; 16]),
613 message: Some(message),
614 }
615 }
616
617 #[test]
618 fn test_extract_rpc_request_correct() {
619 let msg = make_synapse_msg(
620 MessageKind::RpcRequest,
621 synapse_message::Message::RpcRequest(test_rpc_request()),
622 );
623 assert!(extract_rpc_request(msg).is_some());
624 }
625
626 #[test]
627 fn test_extract_rpc_request_wrong_type() {
628 let msg = make_synapse_msg(
629 MessageKind::RpcResponse,
630 synapse_message::Message::RpcResponse(ok_response(Bytes::new())),
631 );
632 assert!(extract_rpc_request(msg).is_none());
633 }
634
635 #[test]
636 fn test_extract_rpc_response_correct() {
637 let msg = make_synapse_msg(
638 MessageKind::RpcResponse,
639 synapse_message::Message::RpcResponse(ok_response(Bytes::new())),
640 );
641 assert!(extract_rpc_response(msg).is_some());
642 }
643
644 #[test]
645 fn test_extract_rpc_response_wrong_type() {
646 let msg = make_synapse_msg(
647 MessageKind::RpcRequest,
648 synapse_message::Message::RpcRequest(test_rpc_request()),
649 );
650 assert!(extract_rpc_response(msg).is_none());
651 }
652
653 #[test]
654 fn test_extract_registration_ack_correct() {
655 let msg = make_synapse_msg(
656 MessageKind::RegistrationAck,
657 synapse_message::Message::RegistrationAck(RegistrationAck {
658 success: true,
659 error_message: String::new(),
660 }),
661 );
662 let (_, ack) = extract_registration_ack(msg).unwrap();
663 assert!(ack.success);
664 }
665
666 #[test]
667 fn test_extract_health_pull_correct() {
668 let msg = make_synapse_msg(
669 MessageKind::HealthPull,
670 synapse_message::Message::HealthPull(HealthPull { pull: true }),
671 );
672 assert!(extract_health_pull(msg).is_some());
673 }
674
675 #[test]
676 fn test_extract_health_response_correct() {
677 let msg = make_synapse_msg(
678 MessageKind::HealthResponse,
679 synapse_message::Message::HealthResponse(HealthResponse {
680 instance_id: Bytes::new(),
681 status: 0,
682 version: String::new(),
683 uptime_ms: 0,
684 message: String::new(),
685 }),
686 );
687 assert!(extract_health_response(msg).is_some());
688 }
689
690 #[test]
693 fn test_parse_message_rpc_request() {
694 let msg = make_synapse_msg(
695 MessageKind::RpcRequest,
696 synapse_message::Message::RpcRequest(test_rpc_request()),
697 );
698 assert!(matches!(
699 parse_message(msg),
700 ParsedMessage::RpcRequest { .. }
701 ));
702 }
703
704 #[test]
705 fn test_parse_message_rpc_response() {
706 let msg = make_synapse_msg(
707 MessageKind::RpcResponse,
708 synapse_message::Message::RpcResponse(ok_response(Bytes::new())),
709 );
710 assert!(matches!(
711 parse_message(msg),
712 ParsedMessage::RpcResponse { .. }
713 ));
714 }
715
716 #[test]
717 fn test_parse_message_interface_register() {
718 let msg = make_synapse_msg(
719 MessageKind::InterfaceRegister,
720 synapse_message::Message::InterfaceRegister(InterfaceRegister::default()),
721 );
722 assert!(matches!(
723 parse_message(msg),
724 ParsedMessage::InterfaceRegister { .. }
725 ));
726 }
727
728 #[test]
729 fn test_parse_message_unknown() {
730 let msg = SynapseMessage {
731 protocol_version: 1,
732 kind: 9999,
733 request_id: Bytes::new(),
734 message: None,
735 };
736 assert!(matches!(
737 parse_message(msg),
738 ParsedMessage::Unknown { kind: 9999 }
739 ));
740 }
741
742 #[test]
745 fn test_create_http_registration_ack_success() {
746 let id = Uuid::new_v4();
747 let encoded =
748 create_http_registration_ack(id.as_bytes(), true, None, ContentType::Json).unwrap();
749 let decoded: SynapseMessage = serde_json::from_slice(&encoded).unwrap();
750
751 assert_eq!(decoded.kind, MessageKind::HttpRegistrationAck as i32);
752 if let Some(synapse_message::Message::HttpRegistrationAck(ack)) = decoded.message {
753 assert!(ack.success);
754 assert!(ack.error_message.is_empty());
755 } else {
756 panic!("Expected HttpRegistrationAck");
757 }
758 }
759
760 #[test]
761 fn test_create_http_registration_ack_failure() {
762 let id = Uuid::new_v4();
763 let encoded = create_http_registration_ack(
764 id.as_bytes(),
765 false,
766 Some("permission denied".to_string()),
767 ContentType::Json,
768 )
769 .unwrap();
770 let decoded: SynapseMessage = serde_json::from_slice(&encoded).unwrap();
771
772 if let Some(synapse_message::Message::HttpRegistrationAck(ack)) = decoded.message {
773 assert!(!ack.success);
774 assert_eq!(ack.error_message, "permission denied");
775 } else {
776 panic!("Expected HttpRegistrationAck");
777 }
778 }
779
780 #[test]
781 fn test_extract_http_registration_ack_correct() {
782 let msg = make_synapse_msg(
783 MessageKind::HttpRegistrationAck,
784 synapse_message::Message::HttpRegistrationAck(synapse_proto::HttpRegistrationAck {
785 success: true,
786 error_message: String::new(),
787 }),
788 );
789 let (_, ack) = extract_http_registration_ack(msg).unwrap();
790 assert!(ack.success);
791 }
792
793 #[test]
794 fn test_extract_http_registration_ack_wrong_type() {
795 let msg = make_synapse_msg(
796 MessageKind::RpcRequest,
797 synapse_message::Message::RpcRequest(test_rpc_request()),
798 );
799 assert!(extract_http_registration_ack(msg).is_none());
800 }
801
802 #[test]
803 fn test_parse_message_http_endpoint_register() {
804 let msg = make_synapse_msg(
805 MessageKind::HttpEndpointRegister,
806 synapse_message::Message::HttpEndpointRegister(
807 synapse_proto::HttpEndpointRegister::default(),
808 ),
809 );
810 assert!(matches!(
811 parse_message(msg),
812 ParsedMessage::HttpEndpointRegister { .. }
813 ));
814 }
815
816 #[test]
817 fn test_parse_message_http_endpoint_deregister() {
818 let msg = make_synapse_msg(
819 MessageKind::HttpEndpointDeregister,
820 synapse_message::Message::HttpEndpointDeregister(
821 synapse_proto::HttpEndpointDeregister::default(),
822 ),
823 );
824 assert!(matches!(
825 parse_message(msg),
826 ParsedMessage::HttpEndpointDeregister { .. }
827 ));
828 }
829
830 #[test]
831 fn test_parse_message_http_registration_ack() {
832 let msg = make_synapse_msg(
833 MessageKind::HttpRegistrationAck,
834 synapse_message::Message::HttpRegistrationAck(synapse_proto::HttpRegistrationAck {
835 success: true,
836 error_message: String::new(),
837 }),
838 );
839 assert!(matches!(
840 parse_message(msg),
841 ParsedMessage::HttpRegistrationAck { .. }
842 ));
843 }
844}