1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
use bytes::Bytes;
use protocol::{commands::SubCommand, Command, CommandError};

/// UNSUB unsubcribes the connection from the specified subject, or auto-unsubscribes after the
/// specified number of messages has been received.
#[derive(Debug, Clone, PartialEq, Builder)]
pub struct UnsubCommand {
    /// The unique alphanumeric subscription ID of the subject to unsubscribe from
    #[builder(setter(into))]
    pub sid: String,
    /// An optional number of messages to wait for before automatically unsubscribing
    #[builder(default)]
    pub max_msgs: Option<u32>,
}

impl UnsubCommand {
    pub fn builder() -> UnsubCommandBuilder {
        UnsubCommandBuilder::default()
    }
}

impl From<SubCommand> for UnsubCommand {
    fn from(cmd: SubCommand) -> Self {
        UnsubCommand {
            sid: cmd.sid,
            max_msgs: None,
        }
    }
}

impl Command for UnsubCommand {
    const CMD_NAME: &'static [u8] = b"UNSUB";

    fn into_vec(self) -> Result<Bytes, CommandError> {
        let mm = if let Some(max_msgs) = self.max_msgs {
            format!("\t{}", max_msgs)
        } else {
            "".into()
        };

        Ok(format!("UNSUB\t{}{}\r\n", self.sid, mm).as_bytes().into())
    }

    fn try_parse(buf: &[u8]) -> Result<Self, CommandError> {
        let len = buf.len();

        if buf[len - 2..] != [b'\r', b'\n'] {
            return Err(CommandError::IncompleteCommandError);
        }

        let whole_command = ::std::str::from_utf8(&buf[..len - 2])?;
        let mut split = whole_command.split_whitespace();
        let cmd = split.next().ok_or_else(|| CommandError::CommandMalformed)?;
        // Check if we're still on the right command
        if cmd.as_bytes() != Self::CMD_NAME {
            return Err(CommandError::CommandMalformed);
        }

        let sid: String = split.next().ok_or_else(|| CommandError::CommandMalformed)?.into();

        let max_msgs: Option<u32> = if let Some(mm) = split.next() {
            Some(mm.parse()?)
        } else {
            None
        };

        Ok(UnsubCommand { sid, max_msgs })
    }
}

#[cfg(test)]
mod tests {
    use super::{UnsubCommand, UnsubCommandBuilder};
    use protocol::Command;

    static DEFAULT_UNSUB: &'static str = "UNSUB\tpouet\r\n";

    #[test]
    fn it_parses() {
        let parse_res = UnsubCommand::try_parse(DEFAULT_UNSUB.as_bytes());
        assert!(parse_res.is_ok());
        let cmd = parse_res.unwrap();
        assert_eq!(&cmd.sid, "pouet");
        assert!(cmd.max_msgs.is_none());
    }

    #[test]
    fn it_stringifies() {
        let cmd = UnsubCommandBuilder::default().sid("pouet").build().unwrap();

        let cmd_bytes_res = cmd.into_vec();
        assert!(cmd_bytes_res.is_ok());
        let cmd_bytes = cmd_bytes_res.unwrap();

        assert_eq!(DEFAULT_UNSUB, cmd_bytes);
    }
}