1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
use env_logger::builder;
use log::*;
use mqttest::{Conf, Mqttest};
use std::time::Duration;
use structopt::{clap::AppSettings::*, StructOpt};
#[derive(Debug)]
struct OptDuration(Option<Duration>);
impl std::str::FromStr for OptDuration {
type Err = std::num::ParseIntError;
fn from_str(s: &str) -> Result<Self, Self::Err> {
match &s[0..1] {
"-" => Ok(Self(None)),
_ => Ok(Self(Some(Duration::from_millis(s.parse()?)))),
}
}
}
#[derive(Debug)]
struct OptUsize(Option<usize>);
impl std::str::FromStr for OptUsize {
type Err = std::num::ParseIntError;
fn from_str(s: &str) -> Result<Self, Self::Err> {
match &s[0..1] {
"-" => Ok(Self(None)),
_ => Ok(Self(Some(s.parse()?))),
}
}
}
#[derive(StructOpt, Debug)]
#[structopt(name = "mqttest", help_message = "Prints help information. Use --help for more details.", global_settings = &[ColoredHelp, DeriveDisplayOrder, UnifiedHelpMessage])]
struct Opt {
/// Increase log level (info -> debug -> trace). Shorthand for "--log debug" or "--log trace".
#[structopt(short = "v", parse(from_occurrences))]
verbose: usize,
/// Detailed log level. See https://docs.rs/env_logger/0.7.1/env_logger/index.html.
#[structopt(long = "log", default_value = "", hide_default_value = true)]
log: String,
/// Ports to try to listen on, stopping at the first successful one.
#[structopt(short = "p",
value_name = "PORT",
use_delimiter = true,
value_delimiter = "-",
max_values = 2,
default_value = "1883-2000")]
ports: Vec<u16>,
/// Resend packet during connection if ack takes longer than this.
///
/// This only concerns resending during a live connection: resending at connection start (if
/// session was restored) always happens immediately. Use "-" to disable resending.
///
/// The second value is for MQTT5 clients. MQTT5 forbids resending during connection, only set
/// an MQTT5 value for testing purposes. MQTT3 doesn't specify a behaviour, but many
/// client/servers do resend non-acked packets during connection.
#[structopt(long = "ack-timeouts",
value_name = "ms",
default_value = "5000,-",
use_delimiter = true,
min_values = 1,
max_values = 2)]
ack_timeouts: Vec<OptDuration>,
/// Delay before sending publish and subscribe acks.
#[structopt(long = "ack-delay", value_name = "ms", default_value = "0")]
ack_delay: u64,
/// Dump packets to file.
///
/// The filename can contain a `{c}` placeholder that will be replaced by the connection
/// number. The dump format is json and corresponds to `pub struct mqttest::DumpMeta` in the
/// rust lib.
#[structopt(long = "dump", value_name = "DUMP")]
dumps: Vec<String>,
/// Decode command for publish payload.
///
/// The argument should be a command that reads raw payload from stdin, and writes the
/// corresponding utf8/json to stdout. If decoding fails, it should output diagnostics to stderr
/// and exit with a non-zero value.
#[structopt(long = "dump-decode", value_name = "CMD")]
dump_decode: Option<String>,
/// Be stricter about optional MQTT behaviours.
///
/// [MQTT-3.1.3-5]: Reject client_ids longer than 23 chars or not matching [0-9a-zA-Z].
/// [MQTT-3.1.3-6]: Reject empty client_ids.
#[structopt(long = "strict")]
strict: bool,
/// Reject clients whose client_id does not start with this prefix
#[structopt(long = "idprefix", default_value = "")]
idprefix: String,
/// Reject clients who didn't suppliy this username:password
///
/// Note that MQTT allows passwords to be binary but we only accept UTF-8.
#[structopt(long = "userpass")]
userpass: Option<String>,
/// Only accept up to N connections, and stop the server afterwards.
#[structopt(long = "max-connect", short = "c", value_name = "count", default_value = "-")]
max_connect: OptUsize,
/// Disconnect the client after receiving that many packets.
///
/// Use "-" for no disconnect. Multiple values apply to subsequent connections.
/// This just closes the TCP stream, without sending an mqtt disconnect packet.
#[structopt(long = "max-pkt", value_name = "count", default_value = "-", use_delimiter = true)]
max_pkt: Vec<OptUsize>,
/// Delay before max-pkt disconnection.
///
/// Useful if you want to receive the server response before disconnection. Use "-" for no
/// delay.
#[structopt(long = "max-pkt-delay", value_name = "ms", default_value = "-")]
max_pkt_delay: OptDuration,
/// Disconnect the client after a certain time.
///
/// Use "-" for no disconnect. Multiple values apply to subsequent connections.
/// This just closes the TCP stream, without sending an mqtt disconnect packet.
#[structopt(long = "max-time", value_name = "ms", default_value = "-", use_delimiter = true)]
max_time: Vec<OptDuration>,
/// How long to retain the session after disconnection.
///
/// Use "-" to use the client-specified value: CONNECT.clean_session (MQTT3) or
/// CONNECT.session_expiry (MQTT5). Multiple values apply to subsequent connections.
#[structopt(long = "session-expire",
value_name = "ms",
default_value = "-",
use_delimiter = true)]
sess_expire: Vec<OptDuration>,
// /// Warn/Error if client reuses an MQTT id from the previous N packets.
// #[structopt(long = "oldid",
// value_name = "W/E",
// use_delimiter = true,
// value_delimiter = "/",
// max_values = 2,
// default_value = "10/1")]
// oldid: Vec<usize>,
// /// Send more acks than expected
// #[structopt(long = "ackdup", value_name = "N", use_delimiter = true, default_value = "0")]
// ackduplicate: Vec<usize>,
}
// FIXME: Define exit codes
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let opt = Opt::from_args();
builder().filter_level(match opt.verbose {
0 => LevelFilter::Info,
1 => LevelFilter::Debug,
_ => LevelFilter::Trace,
})
.format_timestamp_micros()
.parse_filters(&opt.log)
.init();
trace!("Cli {:?}", opt);
let conf = Conf::new().ports(opt.ports[0]..=opt.ports[opt.ports.len() - 1])
.dump_files(opt.dumps)
.dump_decode(opt.dump_decode)
.ack_timeouts(opt.ack_timeouts[0].0,
opt.ack_timeouts.get(1).unwrap_or(&OptDuration(None)).0)
.ack_delay(Duration::from_millis(opt.ack_delay))
.strict(opt.strict)
.idprefix(opt.idprefix)
.userpass(opt.userpass)
.max_connect(opt.max_connect.0)
.max_pkt(opt.max_pkt.into_iter().map(|d| d.0).collect())
.max_pkt_delay(opt.max_pkt_delay.0)
.max_time(opt.max_time.into_iter().map(|d| d.0).collect())
.sess_expire(opt.sess_expire.into_iter().map(|d| d.0).collect());
let server = Mqttest::start(conf).await?;
server.fut.await?;
info!("Done");
Ok(())
}