zedmq/codec/
protocol.rs

1use std::io::{self, Read, Write};
2
3use crate::stream::Transport;
4
5use super::FrameBuf;
6
7/// A builder struct used to handle `greeting` and `handshake` steps.
8pub(crate) struct ZMTP {
9    // security: Option<()>,
10    transport: Transport,
11}
12
13impl ZMTP {
14    /// The "connect" side of the connection.
15    pub fn connect<F>(produce_transport: F) -> io::Result<Self>
16    where
17        F: FnOnce() -> io::Result<Transport>,
18    {
19        let transport = produce_transport()?;
20
21        Ok(Self {
22            // security: None,
23            transport,
24        })
25    }
26
27    /// Perform the greeting step of the ZMTP spec.
28    pub fn greet(mut self, (_major, _minor, _): (u8, u8, u8), as_server: bool) -> io::Result<Self> {
29        let (partial, remaining) = {
30            let mut greeting = crate::codec::Greeting::build();
31            greeting.as_server(as_server);
32            // greeting.security(self.security)
33            greeting.into_parts()
34        };
35
36        // Send partial greeting
37        self.transport.write(&partial)?;
38
39        // Inspect remote partial greeting.
40        {
41            let mut buf = [0u8; 12];
42            let n = self.transport.read(&mut buf)?;
43            assert_eq!(n, 12, "{:?}", buf);
44
45            // let peer_major = buf[10];
46
47            // if peer_major != major {
48            //     return Err(io::Error::new(io::ErrorKind::ConnectionAborted, "peer major is not the same as us."))
49            // }
50
51            // let peer_minor = buf[11];
52
53            // if peer_minor > minor {
54            //     return Err(io::Error::new(io::ErrorKind::ConnectionAborted, "peer minor is higher than us."))
55            // }
56        }
57
58        // Send remaining greeting
59        self.transport.write(&remaining)?;
60
61        Ok(self)
62    }
63
64    pub fn ready<'b>(self, socket_type: &'b str) -> io::Result<Transport> {
65        let Self { mut transport, .. } = self;
66
67        {
68            // Read the remaining remote greeting.
69            let mut buf = [0u8; 52];
70            let n = transport.read(&mut buf[..])?;
71            assert_eq!(n, 52);
72            // TODO: parse, this contains the security mechanism (by default NULL) and some extra metadata.
73
74            // Inspect remote handshake.
75            let mut buf = [0u8; 64];
76            let _n = transport.read(&mut buf)?;
77
78            // dbg!((super::Frame { bytes: &buf[..n] }.try_into_command()).unwrap());
79
80            // TODO: validate handshake, this contains (for NULL security mechanisms) the following properties:
81            //  - Socket-Type {type} i.e. PUSH, PULL, DEALER, ROUTER, PAIR
82            //  - Identity; only if WE are ROUTER and they are using a ROUTER compatible socket type with a custom routing id.
83        }
84
85        // Send handshake
86
87        let handshake = {
88            let properties = vec![("Socket-Type", socket_type)];
89
90            FrameBuf::short_command("READY", Some(properties))
91        };
92
93        transport.write(handshake.as_ref())?;
94
95        Ok(transport)
96    }
97}