Skip to main content

synapse_rpc/
message.rs

1//! Message construction helpers for Synapse protocol
2
3use crate::{ContentType, codec::encode_message};
4use bytes::Bytes;
5use once_cell::sync::Lazy;
6use synapse_primitives::{PackedVersion, Uuid};
7use synapse_proto::{
8    HealthPull, HealthResponse, HttpEndpointDeregister, HttpEndpointRegister, HttpRegistrationAck,
9    InterfaceDeregister, InterfaceRegister, MessageKind, RegistrationAck, RpcRequest, RpcResponse,
10    RpcStatus, SynapseMessage, synapse_message,
11};
12
13/// Current protocol version
14pub static PROTOCOL_VERSION: Lazy<PackedVersion> =
15    Lazy::new(|| PackedVersion::new(0, 0, 1).expect("valid version"));
16
17/// Maximum RPC message payload size (1 MB)
18pub const MAX_RPC_PAYLOAD_SIZE: usize = 1024 * 1024;
19
20/// Maximum batch message payload size (4 MB)
21pub const MAX_BATCH_PAYLOAD_SIZE: usize = 4 * 1024 * 1024;
22
23/// Create an RPC request envelope
24///
25/// Returns an error if the payload exceeds `MAX_RPC_PAYLOAD_SIZE` (1 MB).
26pub fn create_rpc_request(
27    request_id: Uuid,
28    rpc_request: RpcRequest,
29    content_type: ContentType,
30) -> anyhow::Result<Bytes> {
31    if rpc_request.payload.len() > MAX_RPC_PAYLOAD_SIZE {
32        anyhow::bail!(
33            "RPC request payload too large: {} bytes (max {})",
34            rpc_request.payload.len(),
35            MAX_RPC_PAYLOAD_SIZE
36        );
37    }
38
39    let msg = SynapseMessage {
40        protocol_version: PROTOCOL_VERSION.as_u32(),
41        kind: MessageKind::RpcRequest as i32,
42        request_id: Bytes::copy_from_slice(request_id.as_bytes()),
43        message: Some(synapse_message::Message::RpcRequest(rpc_request)),
44    };
45
46    encode_message(&msg, content_type)
47}
48
49/// Create an RPC response envelope
50pub fn create_rpc_response(
51    request_id: &[u8],
52    rpc_response: RpcResponse,
53    content_type: ContentType,
54) -> anyhow::Result<Bytes> {
55    let msg = SynapseMessage {
56        protocol_version: PROTOCOL_VERSION.as_u32(),
57        kind: MessageKind::RpcResponse as i32,
58        request_id: Bytes::copy_from_slice(request_id),
59        message: Some(synapse_message::Message::RpcResponse(rpc_response)),
60    };
61
62    encode_message(&msg, content_type)
63}
64
65/// Create a success RPC response
66pub fn ok_response(payload: Bytes) -> RpcResponse {
67    RpcResponse {
68        status: RpcStatus::Ok as i32,
69        payload,
70        error: None,
71        headers: vec![],
72        responded_at_unix_ms: chrono::Utc::now().timestamp_millis(),
73    }
74}
75
76/// Create an error RPC response
77pub fn error_response(status: RpcStatus, code: u32, message: impl Into<String>) -> RpcResponse {
78    RpcResponse {
79        status: status as i32,
80        payload: Bytes::new(),
81        error: Some(synapse_proto::RpcError {
82            code,
83            message: message.into(),
84            details: vec![],
85        }),
86        headers: vec![],
87        responded_at_unix_ms: chrono::Utc::now().timestamp_millis(),
88    }
89}
90
91/// Extract RpcRequest from a SynapseMessage
92pub fn extract_rpc_request(msg: SynapseMessage) -> Option<(Bytes, RpcRequest)> {
93    match msg.message {
94        Some(synapse_message::Message::RpcRequest(req)) => Some((msg.request_id, req)),
95        _ => None,
96    }
97}
98
99/// Extract RpcResponse from a SynapseMessage
100pub fn extract_rpc_response(msg: SynapseMessage) -> Option<(Bytes, RpcResponse)> {
101    match msg.message {
102        Some(synapse_message::Message::RpcResponse(resp)) => Some((msg.request_id, resp)),
103        _ => None,
104    }
105}
106
107// ============================================================================
108// Control Messages - Registration
109// ============================================================================
110
111/// Create an interface registration message
112pub fn create_interface_register(
113    request_id: Uuid,
114    register: InterfaceRegister,
115    content_type: ContentType,
116) -> anyhow::Result<Bytes> {
117    let msg = SynapseMessage {
118        protocol_version: PROTOCOL_VERSION.as_u32(),
119        kind: MessageKind::InterfaceRegister as i32,
120        request_id: Bytes::copy_from_slice(request_id.as_bytes()),
121        message: Some(synapse_message::Message::InterfaceRegister(register)),
122    };
123    encode_message(&msg, content_type)
124}
125
126/// Create an interface deregistration message
127pub fn create_interface_deregister(
128    request_id: Uuid,
129    deregister: InterfaceDeregister,
130    content_type: ContentType,
131) -> anyhow::Result<Bytes> {
132    let msg = SynapseMessage {
133        protocol_version: PROTOCOL_VERSION.as_u32(),
134        kind: MessageKind::InterfaceDeregister as i32,
135        request_id: Bytes::copy_from_slice(request_id.as_bytes()),
136        message: Some(synapse_message::Message::InterfaceDeregister(deregister)),
137    };
138    encode_message(&msg, content_type)
139}
140
141/// Create a registration acknowledgment message
142pub fn create_registration_ack(
143    request_id: &[u8],
144    success: bool,
145    error_message: Option<String>,
146    content_type: ContentType,
147) -> anyhow::Result<Bytes> {
148    let msg = SynapseMessage {
149        protocol_version: PROTOCOL_VERSION.as_u32(),
150        kind: MessageKind::RegistrationAck as i32,
151        request_id: Bytes::copy_from_slice(request_id),
152        message: Some(synapse_message::Message::RegistrationAck(RegistrationAck {
153            success,
154            error_message: error_message.unwrap_or_default(),
155        })),
156    };
157    encode_message(&msg, content_type)
158}
159
160/// Extract InterfaceRegister from a SynapseMessage
161pub fn extract_interface_register(msg: SynapseMessage) -> Option<(Bytes, InterfaceRegister)> {
162    match msg.message {
163        Some(synapse_message::Message::InterfaceRegister(reg)) => Some((msg.request_id, reg)),
164        _ => None,
165    }
166}
167
168/// Extract InterfaceDeregister from a SynapseMessage
169pub fn extract_interface_deregister(msg: SynapseMessage) -> Option<(Bytes, InterfaceDeregister)> {
170    match msg.message {
171        Some(synapse_message::Message::InterfaceDeregister(dereg)) => Some((msg.request_id, dereg)),
172        _ => None,
173    }
174}
175
176/// Extract RegistrationAck from a SynapseMessage
177pub fn extract_registration_ack(msg: SynapseMessage) -> Option<(Bytes, RegistrationAck)> {
178    match msg.message {
179        Some(synapse_message::Message::RegistrationAck(ack)) => Some((msg.request_id, ack)),
180        _ => None,
181    }
182}
183
184// ============================================================================
185// Control Messages - Health
186// ============================================================================
187
188/// Create a health pull request
189pub fn create_health_pull(request_id: Uuid, content_type: ContentType) -> anyhow::Result<Bytes> {
190    let msg = SynapseMessage {
191        protocol_version: PROTOCOL_VERSION.as_u32(),
192        kind: MessageKind::HealthPull as i32,
193        request_id: Bytes::copy_from_slice(request_id.as_bytes()),
194        message: Some(synapse_message::Message::HealthPull(HealthPull {
195            pull: true,
196        })),
197    };
198    encode_message(&msg, content_type)
199}
200
201/// Create a health response
202pub fn create_health_response(
203    request_id: &[u8],
204    response: HealthResponse,
205    content_type: ContentType,
206) -> anyhow::Result<Bytes> {
207    let msg = SynapseMessage {
208        protocol_version: PROTOCOL_VERSION.as_u32(),
209        kind: MessageKind::HealthResponse as i32,
210        request_id: Bytes::copy_from_slice(request_id),
211        message: Some(synapse_message::Message::HealthResponse(response)),
212    };
213    encode_message(&msg, content_type)
214}
215
216/// Extract HealthPull from a SynapseMessage
217pub fn extract_health_pull(msg: SynapseMessage) -> Option<(Bytes, HealthPull)> {
218    match msg.message {
219        Some(synapse_message::Message::HealthPull(pull)) => Some((msg.request_id, pull)),
220        _ => None,
221    }
222}
223
224/// Extract HealthResponse from a SynapseMessage
225pub fn extract_health_response(msg: SynapseMessage) -> Option<(Bytes, HealthResponse)> {
226    match msg.message {
227        Some(synapse_message::Message::HealthResponse(resp)) => Some((msg.request_id, resp)),
228        _ => None,
229    }
230}
231
232// ============================================================================
233// Control Messages - HTTP Endpoint Registration
234// ============================================================================
235
236/// Create an HTTP endpoint registration message
237pub fn create_http_endpoint_register(
238    request_id: Uuid,
239    register: HttpEndpointRegister,
240    content_type: ContentType,
241) -> anyhow::Result<Bytes> {
242    let msg = SynapseMessage {
243        protocol_version: PROTOCOL_VERSION.as_u32(),
244        kind: MessageKind::HttpEndpointRegister as i32,
245        request_id: Bytes::copy_from_slice(request_id.as_bytes()),
246        message: Some(synapse_message::Message::HttpEndpointRegister(register)),
247    };
248    encode_message(&msg, content_type)
249}
250
251/// Create an HTTP endpoint deregistration message
252pub fn create_http_endpoint_deregister(
253    request_id: Uuid,
254    deregister: HttpEndpointDeregister,
255    content_type: ContentType,
256) -> anyhow::Result<Bytes> {
257    let msg = SynapseMessage {
258        protocol_version: PROTOCOL_VERSION.as_u32(),
259        kind: MessageKind::HttpEndpointDeregister as i32,
260        request_id: Bytes::copy_from_slice(request_id.as_bytes()),
261        message: Some(synapse_message::Message::HttpEndpointDeregister(deregister)),
262    };
263    encode_message(&msg, content_type)
264}
265
266/// Create an HTTP registration acknowledgment message
267pub fn create_http_registration_ack(
268    request_id: &[u8],
269    success: bool,
270    error_message: Option<String>,
271    content_type: ContentType,
272) -> anyhow::Result<Bytes> {
273    let msg = SynapseMessage {
274        protocol_version: PROTOCOL_VERSION.as_u32(),
275        kind: MessageKind::HttpRegistrationAck as i32,
276        request_id: Bytes::copy_from_slice(request_id),
277        message: Some(synapse_message::Message::HttpRegistrationAck(
278            HttpRegistrationAck {
279                success,
280                error_message: error_message.unwrap_or_default(),
281            },
282        )),
283    };
284    encode_message(&msg, content_type)
285}
286
287/// Extract HttpRegistrationAck from a SynapseMessage
288pub fn extract_http_registration_ack(msg: SynapseMessage) -> Option<(Bytes, HttpRegistrationAck)> {
289    match msg.message {
290        Some(synapse_message::Message::HttpRegistrationAck(ack)) => Some((msg.request_id, ack)),
291        _ => None,
292    }
293}
294
295// ============================================================================
296// Message Routing
297// ============================================================================
298
299/// Parsed SynapseMessage for routing
300pub enum ParsedMessage {
301    RpcRequest {
302        request_id: Bytes,
303        request: RpcRequest,
304    },
305    RpcResponse {
306        request_id: Bytes,
307        response: RpcResponse,
308    },
309    InterfaceRegister {
310        request_id: Bytes,
311        register: InterfaceRegister,
312    },
313    InterfaceDeregister {
314        request_id: Bytes,
315        deregister: InterfaceDeregister,
316    },
317    RegistrationAck {
318        request_id: Bytes,
319        ack: RegistrationAck,
320    },
321    HealthPull {
322        request_id: Bytes,
323    },
324    HealthResponse {
325        request_id: Bytes,
326        response: HealthResponse,
327    },
328    HttpEndpointRegister {
329        request_id: Bytes,
330        register: HttpEndpointRegister,
331    },
332    HttpEndpointDeregister {
333        request_id: Bytes,
334        deregister: HttpEndpointDeregister,
335    },
336    HttpRegistrationAck {
337        request_id: Bytes,
338        ack: HttpRegistrationAck,
339    },
340    Unknown {
341        kind: i32,
342    },
343}
344
345/// Parse a SynapseMessage into a typed enum for routing
346pub fn parse_message(msg: SynapseMessage) -> ParsedMessage {
347    let request_id = msg.request_id.clone();
348
349    match msg.message {
350        Some(synapse_message::Message::RpcRequest(req)) => ParsedMessage::RpcRequest {
351            request_id,
352            request: req,
353        },
354        Some(synapse_message::Message::RpcResponse(resp)) => ParsedMessage::RpcResponse {
355            request_id,
356            response: resp,
357        },
358        Some(synapse_message::Message::InterfaceRegister(reg)) => {
359            ParsedMessage::InterfaceRegister {
360                request_id,
361                register: reg,
362            }
363        }
364        Some(synapse_message::Message::InterfaceDeregister(dereg)) => {
365            ParsedMessage::InterfaceDeregister {
366                request_id,
367                deregister: dereg,
368            }
369        }
370        Some(synapse_message::Message::RegistrationAck(ack)) => {
371            ParsedMessage::RegistrationAck { request_id, ack }
372        }
373        Some(synapse_message::Message::HealthPull(_)) => ParsedMessage::HealthPull { request_id },
374        Some(synapse_message::Message::HealthResponse(resp)) => ParsedMessage::HealthResponse {
375            request_id,
376            response: resp,
377        },
378        Some(synapse_message::Message::HttpEndpointRegister(reg)) => {
379            ParsedMessage::HttpEndpointRegister {
380                request_id,
381                register: reg,
382            }
383        }
384        Some(synapse_message::Message::HttpEndpointDeregister(dereg)) => {
385            ParsedMessage::HttpEndpointDeregister {
386                request_id,
387                deregister: dereg,
388            }
389        }
390        Some(synapse_message::Message::HttpRegistrationAck(ack)) => {
391            ParsedMessage::HttpRegistrationAck { request_id, ack }
392        }
393        _ => ParsedMessage::Unknown { kind: msg.kind },
394    }
395}
396
397#[cfg(test)]
398mod tests {
399    use super::*;
400    use synapse_primitives::Uuid;
401
402    fn test_request_id() -> Uuid {
403        Uuid::new_v4()
404    }
405
406    fn test_rpc_request() -> RpcRequest {
407        RpcRequest {
408            interface_id: 42,
409            method_id: 7,
410            headers: vec![],
411            payload: Bytes::from("test payload"),
412            sent_at_unix_ms: 1000,
413        }
414    }
415
416    // ========== ok_response / error_response ==========
417
418    #[test]
419    fn test_ok_response_status() {
420        let resp = ok_response(Bytes::from("data"));
421        assert_eq!(resp.status, RpcStatus::Ok as i32);
422    }
423
424    #[test]
425    fn test_ok_response_payload_preserved() {
426        let payload = Bytes::from("my payload");
427        let resp = ok_response(payload.clone());
428        assert_eq!(resp.payload, payload);
429    }
430
431    #[test]
432    fn test_ok_response_no_error() {
433        let resp = ok_response(Bytes::new());
434        assert!(resp.error.is_none());
435    }
436
437    #[test]
438    fn test_ok_response_timestamp_set() {
439        let resp = ok_response(Bytes::new());
440        assert!(resp.responded_at_unix_ms > 0);
441    }
442
443    #[test]
444    fn test_error_response_status() {
445        let resp = error_response(RpcStatus::InvalidRequest, 400, "bad input");
446        assert_eq!(resp.status, RpcStatus::InvalidRequest as i32);
447    }
448
449    #[test]
450    fn test_error_response_code_and_message() {
451        let resp = error_response(RpcStatus::Error, 3001, "custom error");
452        let err = resp.error.unwrap();
453        assert_eq!(err.code, 3001);
454        assert_eq!(err.message, "custom error");
455    }
456
457    #[test]
458    fn test_error_response_empty_payload() {
459        let resp = error_response(RpcStatus::Timeout, 504, "timed out");
460        assert!(resp.payload.is_empty());
461    }
462
463    // ========== create + decode roundtrips ==========
464
465    #[test]
466    fn test_create_rpc_request_roundtrip_json() {
467        let id = test_request_id();
468        let req = test_rpc_request();
469        let encoded = create_rpc_request(id, req.clone(), ContentType::Json).unwrap();
470        let decoded: SynapseMessage = serde_json::from_slice(&encoded).unwrap();
471
472        assert_eq!(decoded.protocol_version, PROTOCOL_VERSION.as_u32());
473        assert_eq!(decoded.kind, MessageKind::RpcRequest as i32);
474        if let Some(synapse_message::Message::RpcRequest(inner)) = decoded.message {
475            assert_eq!(inner.interface_id, 42);
476            assert_eq!(inner.method_id, 7);
477        } else {
478            panic!("Expected RpcRequest message");
479        }
480    }
481
482    #[test]
483    fn test_create_rpc_request_roundtrip_protobuf() {
484        use prost::Message;
485        let id = test_request_id();
486        let req = test_rpc_request();
487        let encoded = create_rpc_request(id, req, ContentType::Protobuf).unwrap();
488        let decoded = SynapseMessage::decode(encoded).unwrap();
489
490        assert_eq!(decoded.kind, MessageKind::RpcRequest as i32);
491        assert!(matches!(
492            decoded.message,
493            Some(synapse_message::Message::RpcRequest(_))
494        ));
495    }
496
497    #[test]
498    fn test_create_rpc_response_roundtrip() {
499        let id = Uuid::new_v4();
500        let resp = ok_response(Bytes::from("response data"));
501        let encoded = create_rpc_response(id.as_bytes(), resp, ContentType::Json).unwrap();
502        let decoded: SynapseMessage = serde_json::from_slice(&encoded).unwrap();
503
504        assert_eq!(decoded.kind, MessageKind::RpcResponse as i32);
505        if let Some(synapse_message::Message::RpcResponse(inner)) = decoded.message {
506            assert_eq!(inner.status, RpcStatus::Ok as i32);
507        } else {
508            panic!("Expected RpcResponse message");
509        }
510    }
511
512    #[test]
513    fn test_create_interface_register_roundtrip() {
514        let id = test_request_id();
515        let register = InterfaceRegister {
516            interface_id: 100,
517            interface_version: 1_000_000,
518            method_ids: vec![1, 2, 3],
519            instance_id: Bytes::from(vec![0u8; 16]),
520            capabilities: None,
521            service_name: "test-svc".to_string(),
522            interface_name: "test.Interface".to_string(),
523            http_endpoint: "http://localhost:9001/rpc".to_string(),
524            method_names: vec!["A".into(), "B".into(), "C".into()],
525        };
526        let encoded = create_interface_register(id, register, ContentType::Json).unwrap();
527        let decoded: SynapseMessage = serde_json::from_slice(&encoded).unwrap();
528
529        assert_eq!(decoded.kind, MessageKind::InterfaceRegister as i32);
530        assert!(matches!(
531            decoded.message,
532            Some(synapse_message::Message::InterfaceRegister(_))
533        ));
534    }
535
536    #[test]
537    fn test_create_registration_ack_success() {
538        let id = Uuid::new_v4();
539        let encoded =
540            create_registration_ack(id.as_bytes(), true, None, ContentType::Json).unwrap();
541        let decoded: SynapseMessage = serde_json::from_slice(&encoded).unwrap();
542
543        if let Some(synapse_message::Message::RegistrationAck(ack)) = decoded.message {
544            assert!(ack.success);
545            assert!(ack.error_message.is_empty());
546        } else {
547            panic!("Expected RegistrationAck");
548        }
549    }
550
551    #[test]
552    fn test_create_registration_ack_failure() {
553        let id = Uuid::new_v4();
554        let encoded = create_registration_ack(
555            id.as_bytes(),
556            false,
557            Some("duplicate interface".to_string()),
558            ContentType::Json,
559        )
560        .unwrap();
561        let decoded: SynapseMessage = serde_json::from_slice(&encoded).unwrap();
562
563        if let Some(synapse_message::Message::RegistrationAck(ack)) = decoded.message {
564            assert!(!ack.success);
565            assert_eq!(ack.error_message, "duplicate interface");
566        } else {
567            panic!("Expected RegistrationAck");
568        }
569    }
570
571    #[test]
572    fn test_create_health_pull_roundtrip() {
573        let id = test_request_id();
574        let encoded = create_health_pull(id, ContentType::Json).unwrap();
575        let decoded: SynapseMessage = serde_json::from_slice(&encoded).unwrap();
576
577        assert_eq!(decoded.kind, MessageKind::HealthPull as i32);
578        assert!(matches!(
579            decoded.message,
580            Some(synapse_message::Message::HealthPull(_))
581        ));
582    }
583
584    #[test]
585    fn test_create_health_response_roundtrip() {
586        let id = Uuid::new_v4();
587        let health = HealthResponse {
588            instance_id: Bytes::from(vec![0u8; 16]),
589            status: 1,
590            version: "0.1.0".to_string(),
591            uptime_ms: 5000,
592            message: "all good".to_string(),
593        };
594        let encoded = create_health_response(id.as_bytes(), health, ContentType::Json).unwrap();
595        let decoded: SynapseMessage = serde_json::from_slice(&encoded).unwrap();
596
597        assert_eq!(decoded.kind, MessageKind::HealthResponse as i32);
598        if let Some(synapse_message::Message::HealthResponse(resp)) = decoded.message {
599            assert_eq!(resp.version, "0.1.0");
600            assert_eq!(resp.uptime_ms, 5000);
601        } else {
602            panic!("Expected HealthResponse");
603        }
604    }
605
606    // ========== extract_* ==========
607
608    fn make_synapse_msg(kind: MessageKind, message: synapse_message::Message) -> SynapseMessage {
609        SynapseMessage {
610            protocol_version: PROTOCOL_VERSION.as_u32(),
611            kind: kind as i32,
612            request_id: Bytes::from(vec![0u8; 16]),
613            message: Some(message),
614        }
615    }
616
617    #[test]
618    fn test_extract_rpc_request_correct() {
619        let msg = make_synapse_msg(
620            MessageKind::RpcRequest,
621            synapse_message::Message::RpcRequest(test_rpc_request()),
622        );
623        assert!(extract_rpc_request(msg).is_some());
624    }
625
626    #[test]
627    fn test_extract_rpc_request_wrong_type() {
628        let msg = make_synapse_msg(
629            MessageKind::RpcResponse,
630            synapse_message::Message::RpcResponse(ok_response(Bytes::new())),
631        );
632        assert!(extract_rpc_request(msg).is_none());
633    }
634
635    #[test]
636    fn test_extract_rpc_response_correct() {
637        let msg = make_synapse_msg(
638            MessageKind::RpcResponse,
639            synapse_message::Message::RpcResponse(ok_response(Bytes::new())),
640        );
641        assert!(extract_rpc_response(msg).is_some());
642    }
643
644    #[test]
645    fn test_extract_rpc_response_wrong_type() {
646        let msg = make_synapse_msg(
647            MessageKind::RpcRequest,
648            synapse_message::Message::RpcRequest(test_rpc_request()),
649        );
650        assert!(extract_rpc_response(msg).is_none());
651    }
652
653    #[test]
654    fn test_extract_registration_ack_correct() {
655        let msg = make_synapse_msg(
656            MessageKind::RegistrationAck,
657            synapse_message::Message::RegistrationAck(RegistrationAck {
658                success: true,
659                error_message: String::new(),
660            }),
661        );
662        let (_, ack) = extract_registration_ack(msg).unwrap();
663        assert!(ack.success);
664    }
665
666    #[test]
667    fn test_extract_health_pull_correct() {
668        let msg = make_synapse_msg(
669            MessageKind::HealthPull,
670            synapse_message::Message::HealthPull(HealthPull { pull: true }),
671        );
672        assert!(extract_health_pull(msg).is_some());
673    }
674
675    #[test]
676    fn test_extract_health_response_correct() {
677        let msg = make_synapse_msg(
678            MessageKind::HealthResponse,
679            synapse_message::Message::HealthResponse(HealthResponse {
680                instance_id: Bytes::new(),
681                status: 0,
682                version: String::new(),
683                uptime_ms: 0,
684                message: String::new(),
685            }),
686        );
687        assert!(extract_health_response(msg).is_some());
688    }
689
690    // ========== parse_message ==========
691
692    #[test]
693    fn test_parse_message_rpc_request() {
694        let msg = make_synapse_msg(
695            MessageKind::RpcRequest,
696            synapse_message::Message::RpcRequest(test_rpc_request()),
697        );
698        assert!(matches!(
699            parse_message(msg),
700            ParsedMessage::RpcRequest { .. }
701        ));
702    }
703
704    #[test]
705    fn test_parse_message_rpc_response() {
706        let msg = make_synapse_msg(
707            MessageKind::RpcResponse,
708            synapse_message::Message::RpcResponse(ok_response(Bytes::new())),
709        );
710        assert!(matches!(
711            parse_message(msg),
712            ParsedMessage::RpcResponse { .. }
713        ));
714    }
715
716    #[test]
717    fn test_parse_message_interface_register() {
718        let msg = make_synapse_msg(
719            MessageKind::InterfaceRegister,
720            synapse_message::Message::InterfaceRegister(InterfaceRegister::default()),
721        );
722        assert!(matches!(
723            parse_message(msg),
724            ParsedMessage::InterfaceRegister { .. }
725        ));
726    }
727
728    #[test]
729    fn test_parse_message_unknown() {
730        let msg = SynapseMessage {
731            protocol_version: 1,
732            kind: 9999,
733            request_id: Bytes::new(),
734            message: None,
735        };
736        assert!(matches!(
737            parse_message(msg),
738            ParsedMessage::Unknown { kind: 9999 }
739        ));
740    }
741
742    // ========== HTTP endpoint registration ==========
743
744    #[test]
745    fn test_create_http_registration_ack_success() {
746        let id = Uuid::new_v4();
747        let encoded =
748            create_http_registration_ack(id.as_bytes(), true, None, ContentType::Json).unwrap();
749        let decoded: SynapseMessage = serde_json::from_slice(&encoded).unwrap();
750
751        assert_eq!(decoded.kind, MessageKind::HttpRegistrationAck as i32);
752        if let Some(synapse_message::Message::HttpRegistrationAck(ack)) = decoded.message {
753            assert!(ack.success);
754            assert!(ack.error_message.is_empty());
755        } else {
756            panic!("Expected HttpRegistrationAck");
757        }
758    }
759
760    #[test]
761    fn test_create_http_registration_ack_failure() {
762        let id = Uuid::new_v4();
763        let encoded = create_http_registration_ack(
764            id.as_bytes(),
765            false,
766            Some("permission denied".to_string()),
767            ContentType::Json,
768        )
769        .unwrap();
770        let decoded: SynapseMessage = serde_json::from_slice(&encoded).unwrap();
771
772        if let Some(synapse_message::Message::HttpRegistrationAck(ack)) = decoded.message {
773            assert!(!ack.success);
774            assert_eq!(ack.error_message, "permission denied");
775        } else {
776            panic!("Expected HttpRegistrationAck");
777        }
778    }
779
780    #[test]
781    fn test_extract_http_registration_ack_correct() {
782        let msg = make_synapse_msg(
783            MessageKind::HttpRegistrationAck,
784            synapse_message::Message::HttpRegistrationAck(synapse_proto::HttpRegistrationAck {
785                success: true,
786                error_message: String::new(),
787            }),
788        );
789        let (_, ack) = extract_http_registration_ack(msg).unwrap();
790        assert!(ack.success);
791    }
792
793    #[test]
794    fn test_extract_http_registration_ack_wrong_type() {
795        let msg = make_synapse_msg(
796            MessageKind::RpcRequest,
797            synapse_message::Message::RpcRequest(test_rpc_request()),
798        );
799        assert!(extract_http_registration_ack(msg).is_none());
800    }
801
802    #[test]
803    fn test_parse_message_http_endpoint_register() {
804        let msg = make_synapse_msg(
805            MessageKind::HttpEndpointRegister,
806            synapse_message::Message::HttpEndpointRegister(
807                synapse_proto::HttpEndpointRegister::default(),
808            ),
809        );
810        assert!(matches!(
811            parse_message(msg),
812            ParsedMessage::HttpEndpointRegister { .. }
813        ));
814    }
815
816    #[test]
817    fn test_parse_message_http_endpoint_deregister() {
818        let msg = make_synapse_msg(
819            MessageKind::HttpEndpointDeregister,
820            synapse_message::Message::HttpEndpointDeregister(
821                synapse_proto::HttpEndpointDeregister::default(),
822            ),
823        );
824        assert!(matches!(
825            parse_message(msg),
826            ParsedMessage::HttpEndpointDeregister { .. }
827        ));
828    }
829
830    #[test]
831    fn test_parse_message_http_registration_ack() {
832        let msg = make_synapse_msg(
833            MessageKind::HttpRegistrationAck,
834            synapse_message::Message::HttpRegistrationAck(synapse_proto::HttpRegistrationAck {
835                success: true,
836                error_message: String::new(),
837            }),
838        );
839        assert!(matches!(
840            parse_message(msg),
841            ParsedMessage::HttpRegistrationAck { .. }
842        ));
843    }
844}