1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
use async_std::io;
use async_std::net::TcpStream;
use async_std::prelude::*;
use async_std::sync::Arc;
use bytes::BufMut;
use serde::{Deserialize, Serialize};
use std::cmp::{Eq, PartialEq};
use std::hash::{Hash, Hasher};

/// nsq client config from server response
#[derive(Debug, Deserialize, Serialize)]
pub struct NsqdClientConfig {
    pub max_rdy_count: u64,
    pub version: String,
    pub max_msg_timeout: u64,
    pub msg_timeout: u64,
    pub tls_v1: bool,
    pub deflate: bool,
    pub deflate_level: u8,
    pub max_deflate_level: u8,
    pub snappy: bool,
    pub sample_rate: u64,
    pub auth_required: bool,
    pub output_buffer_size: u64,
    pub output_buffer_timeout: u64,
}

/// identify config for client
#[derive(Debug, Deserialize, Serialize)]
pub struct IdentifyConfig {
    pub client_id: String,
    pub heartbeat_interval: u64,
    pub output_buffer_timeout: u64,
    pub msg_timeout: u64,
    pub user_agent: String,
}

/// client nsqd address config
#[derive(Debug, Deserialize, Serialize)]
pub struct ClientNsqdAddressConfig {
    pub hostname: String,
    pub tcp_port: u32,
}

/// client httpd address config
#[derive(Debug, Deserialize, Serialize)]
pub struct ClientHttpdAddressConfig {
    pub address: String,
    pub topic: String,
    pub nsq_channel: String,
}

impl IdentifyConfig {
    pub fn new(
        client_id: String,
        heartbeat_interval: u64,
        output_buffer_timeout: u64,
        msg_timeout: u64,
        user_agent: String,
    ) -> IdentifyConfig {
        IdentifyConfig {
            client_id,
            heartbeat_interval,
            output_buffer_timeout,
            msg_timeout,
            user_agent,
        }
    }
}

/// toml  config for client to initial connect to lookupd server
#[derive(Debug, Deserialize, Serialize)]
pub struct NsqHttpdInitConfig {
    pub identify: IdentifyConfig,
    pub httpd_adress: ClientHttpdAddressConfig,
}

/// toml config for client to initial connect nsqd server
#[derive(Debug, Deserialize, Serialize)]
pub struct NsqdInitConfig {
    pub identify: IdentifyConfig,
    pub nsqd_adress: ClientNsqdAddressConfig,
}

/// the msg struct from server
#[derive(Debug)]
pub struct Msg {
    pub ts: i64,
    pub attempts: u16,
    pub id: Vec<u8>,
    pub body: Vec<u8>,
    pub stream: Arc<TcpStream>,
}
impl Msg {
    /// finish the msg
    pub async fn finish(&self) -> io::Result<()> {
        let mut msg = Vec::new();
        msg.put(&b"FIN "[..]);
        msg.put(&self.id);
        msg.put(&b"\n"[..]);
        let writer = &mut &*self.stream;
        writer.write_all(&msg).await?;
        writer.write_all("RDY 2\n".as_bytes()).await
    }
    /// requeue the msg
    pub async fn req(&self, timeout: i32) -> io::Result<()> {
        let mut msg = Vec::new();
        let timeout = timeout.to_string();
        msg.put(&b"REQ "[..]);
        msg.put(&self.id);
        msg.put(&b" "[..]);
        msg.put(timeout.as_bytes());
        msg.put(&b"\n"[..]);
        let writer = &mut &*self.stream;
        writer.write_all(&msg).await?;
        writer.write_all("RDY 2\n".as_bytes()).await
    }
    /// touch the msg
    pub async fn touch(&self) -> io::Result<()> {
        let mut msg = Vec::new();
        msg.put(&b"TOUCH "[..]);
        msg.put(&self.id);
        msg.put(&b"\n"[..]);
        let writer = &mut &*self.stream;
        writer.write_all(&msg).await?;
        writer.write_all("RDY 2\n".as_bytes()).await
    }
}

/// the address enum
pub enum Address {
    ReaderdAddresses(Vec<String>),
    ReaderdAddr(String),
    HttpdAddress(String),
}

/// the nsqd config
#[derive(Serialize, Deserialize, Debug)]
pub struct NsqdConfig {
    pub broadcast_address: String,
    pub hostname: String,
    pub remote_address: String,
    pub tcp_port: u32,
    pub http_port: u32,
    pub version: String,
}

/// impl PartialEq for nsqdconfig to decide the duplicate
impl PartialEq for NsqdConfig {
    fn eq(&self, other: &Self) -> bool {
        if (self.broadcast_address == other.broadcast_address)
            && (self.tcp_port == other.tcp_port)
            && (self.http_port == other.http_port)
            && (self.version == other.version)
        {
            return true;
        }
        false
    }
}

impl Eq for NsqdConfig {}

impl Hash for NsqdConfig {
    fn hash<H: Hasher>(&self, state: &mut H) {
        self.broadcast_address.hash(state);
        self.tcp_port.hash(state);
        self.http_port.hash(state);
        self.version.hash(state);
    }
}

/// nsq lookupd config
#[derive(Serialize, Deserialize)]
pub struct NsqLookupdConfig {
    pub channels: Vec<String>,
    pub producers: Vec<NsqdConfig>,
}