1use super::{Conf, Mqttest, ConnInfo};
5use mqttrs::{encode, Connect, Protocol};
6use std::{future::Future,
7 net::{IpAddr, SocketAddr}};
8use tokio::{io::AsyncWriteExt, net::TcpStream, runtime::Builder};
9
10async fn client(port: u16) -> Result<(), Box<dyn std::error::Error>> {
12 let sock = SocketAddr::from((IpAddr::from([127, 0, 0, 1]), port));
14 let mut stream = TcpStream::connect(sock).await?;
15 let mut buf = Vec::new();
17 let pkt = Connect { protocol: Protocol::MQTT311,
18 keep_alive: 60,
19 client_id: String::from("test"),
20 clean_session: true,
21 last_will: None,
22 username: None,
23 password: None };
24 encode(&pkt.into(), &mut buf)?;
25 stream.write_all(&buf).await?;
26 Ok(())
29}
30
31fn block_on<T>(f: impl Future<Output = T>) -> T {
33 let _ = env_logger::builder().is_test(true).parse_filters("debug").try_init();
34 Builder::new().basic_scheduler().enable_all().build().unwrap().block_on(f)
35}
36
37#[test]
38fn connect() {
39 let conns: Vec<ConnInfo> = block_on(async {
40 let conf = Conf::new().max_connect(Some(1));
42 let srv = Mqttest::start(conf).await.expect("Failed listen");
44 client(srv.port).await.expect("client failure");
46 srv.fut.await.unwrap()
48 });
49 assert_eq!(1, conns.len());
51}