zedmq/codec/protocol.rs
1use std::io::{self, Read, Write};
2
3use crate::stream::Transport;
4
5use super::FrameBuf;
6
7/// A builder struct used to handle `greeting` and `handshake` steps.
8pub(crate) struct ZMTP {
9 // security: Option<()>,
10 transport: Transport,
11}
12
13impl ZMTP {
14 /// The "connect" side of the connection.
15 pub fn connect<F>(produce_transport: F) -> io::Result<Self>
16 where
17 F: FnOnce() -> io::Result<Transport>,
18 {
19 let transport = produce_transport()?;
20
21 Ok(Self {
22 // security: None,
23 transport,
24 })
25 }
26
27 /// Perform the greeting step of the ZMTP spec.
28 pub fn greet(mut self, (_major, _minor, _): (u8, u8, u8), as_server: bool) -> io::Result<Self> {
29 let (partial, remaining) = {
30 let mut greeting = crate::codec::Greeting::build();
31 greeting.as_server(as_server);
32 // greeting.security(self.security)
33 greeting.into_parts()
34 };
35
36 // Send partial greeting
37 self.transport.write(&partial)?;
38
39 // Inspect remote partial greeting.
40 {
41 let mut buf = [0u8; 12];
42 let n = self.transport.read(&mut buf)?;
43 assert_eq!(n, 12, "{:?}", buf);
44
45 // let peer_major = buf[10];
46
47 // if peer_major != major {
48 // return Err(io::Error::new(io::ErrorKind::ConnectionAborted, "peer major is not the same as us."))
49 // }
50
51 // let peer_minor = buf[11];
52
53 // if peer_minor > minor {
54 // return Err(io::Error::new(io::ErrorKind::ConnectionAborted, "peer minor is higher than us."))
55 // }
56 }
57
58 // Send remaining greeting
59 self.transport.write(&remaining)?;
60
61 Ok(self)
62 }
63
64 pub fn ready<'b>(self, socket_type: &'b str) -> io::Result<Transport> {
65 let Self { mut transport, .. } = self;
66
67 {
68 // Read the remaining remote greeting.
69 let mut buf = [0u8; 52];
70 let n = transport.read(&mut buf[..])?;
71 assert_eq!(n, 52);
72 // TODO: parse, this contains the security mechanism (by default NULL) and some extra metadata.
73
74 // Inspect remote handshake.
75 let mut buf = [0u8; 64];
76 let _n = transport.read(&mut buf)?;
77
78 // dbg!((super::Frame { bytes: &buf[..n] }.try_into_command()).unwrap());
79
80 // TODO: validate handshake, this contains (for NULL security mechanisms) the following properties:
81 // - Socket-Type {type} i.e. PUSH, PULL, DEALER, ROUTER, PAIR
82 // - Identity; only if WE are ROUTER and they are using a ROUTER compatible socket type with a custom routing id.
83 }
84
85 // Send handshake
86
87 let handshake = {
88 let properties = vec![("Socket-Type", socket_type)];
89
90 FrameBuf::short_command("READY", Some(properties))
91 };
92
93 transport.write(handshake.as_ref())?;
94
95 Ok(transport)
96 }
97}