1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
use bytes::Bytes;
use protocol::{Command, CommandError};
use serde_json as json;

/// The CONNECT message is the client version of the INFO message. Once the client has established a TCP/IP
/// socket connection with the NATS server, and an INFO message has been received from the server, the client
/// may send a CONNECT message to the NATS server to provide more information about the current connection as
/// well as security information.
#[derive(Debug, Default, Clone, PartialEq, Serialize, Deserialize, Builder)]
#[builder(default)]
pub struct ConnectCommand {
    /// Turns on +OK protocol acknowledgements.
    pub verbose: bool,
    /// Turns on additional strict format checking, e.g. for properly formed subjects
    pub pedantic: bool,
    /// Indicates whether the client requires an SSL connection.
    pub tls_required: bool,
    /// Client authorization token (if auth_required is set)
    #[serde(skip_serializing_if = "Option::is_none")]
    auth_token: Option<String>,
    /// Connection username (if auth_required is set)
    #[serde(skip_serializing_if = "Option::is_none")]
    user: Option<String>,
    /// Connection password (if auth_required is set)
    #[serde(skip_serializing_if = "Option::is_none")]
    pass: Option<String>,
    /// Optional client name
    #[serde(skip_serializing_if = "Option::is_none")]
    #[builder(default = "self.default_name()?")]
    pub name: Option<String>,
    /// The implementation language of the client.
    #[builder(default = "self.default_lang()?", setter(into))]
    pub lang: String,
    /// The version of the client.
    #[builder(default = "self.default_ver()?", setter(into))]
    pub version: String,
    /// optional int. Sending 0 (or absent) indicates client supports original protocol. Sending 1 indicates that the
    /// client supports dynamic reconfiguration of cluster topology changes by asynchronously receiving INFO messages
    /// with known servers it can reconnect to.
    #[serde(skip_serializing_if = "Option::is_none")]
    protocol: Option<u8>,
    /// Optional boolean. If set to true, the server (version 1.2.0+) will not send originating messages from this
    /// connection to its own subscriptions. Clients should set this to true only for server supporting this feature,
    /// which is when proto in the INFO protocol is set to at least 1.
    #[serde(skip_serializing_if = "Option::is_none")]
    echo: Option<bool>,
}

impl ConnectCommand {
    pub fn builder() -> ConnectCommandBuilder {
        ConnectCommandBuilder::default()
    }
}

impl ConnectCommandBuilder {
    fn default_name(&self) -> Result<Option<String>, String> {
        Ok(Some("nitox".into()))
    }

    fn default_ver(&self) -> Result<String, String> {
        match ::std::env::var("CARGO_PKG_VERSION") {
            Ok(v) => Ok(v),
            Err(_) => Ok("0.1.x".into()),
        }
    }

    fn default_lang(&self) -> Result<String, String> {
        Ok("rust".into())
    }
}

impl Command for ConnectCommand {
    const CMD_NAME: &'static [u8] = b"CONNECT";

    fn into_vec(self) -> Result<Bytes, CommandError> {
        Ok(format!("CONNECT\t{}\r\n", json::to_string(&self)?).as_bytes().into())
    }

    fn try_parse(buf: &[u8]) -> Result<ConnectCommand, CommandError> {
        let len = buf.len();

        if buf[len - 2..] != [b'\r', b'\n'] {
            return Err(CommandError::IncompleteCommandError);
        }
        // Check if we're still on the right command
        if buf[..7] != *Self::CMD_NAME {
            return Err(CommandError::CommandMalformed);
        }

        Ok(json::from_slice(&buf[7..len - 2])?)
    }
}

#[cfg(test)]
mod tests {
    use super::{ConnectCommand, ConnectCommandBuilder};
    use protocol::Command;

    static DEFAULT_CONNECT: &'static str = "CONNECT\t{\"verbose\":false,\"pedantic\":false,\"tls_required\":false,\"name\":\"nitox\",\"lang\":\"rust\",\"version\":\"1.0.0\"}\r\n";

    #[test]
    fn it_parses() {
        let parse_res = ConnectCommand::try_parse(DEFAULT_CONNECT.as_bytes());
        assert!(parse_res.is_ok());
        let cmd = parse_res.unwrap();
        assert_eq!(cmd.verbose, false);
        assert_eq!(cmd.pedantic, false);
        assert_eq!(cmd.tls_required, false);
        assert!(cmd.name.is_some());
        assert_eq!(&cmd.name.unwrap(), "nitox");
        assert_eq!(&cmd.lang, "rust");
        assert_eq!(&cmd.version, "1.0.0");
    }

    #[test]
    fn it_stringifies() {
        let cmd = ConnectCommandBuilder::default()
            .lang("rust")
            .version("1.0.0")
            .name(Some("nitox".into()))
            .build()
            .unwrap();

        let cmd_bytes_res = cmd.into_vec();
        assert!(cmd_bytes_res.is_ok());
        let cmd_bytes = cmd_bytes_res.unwrap();

        assert_eq!(DEFAULT_CONNECT, cmd_bytes);
    }
}