1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
// #[cfg(feature = "tokio")]
// #[cfg(test)]
// mod tokio_e2e {
// use futures_concurrency::future::Join;
// use tracing::Level;
// use tracing_subscriber::FmtSubscriber;
// use crate::tests::stages::Nop;
// use crate::{
// connect_options::ConnectOptions, create_tokio_tcp, error::ClientError,
// event_handler::EventHandler, packets::QoS,
// };
// use crate::tests::stages::qos_2::TestPubQoS2;
// #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
// async fn test_pub_qos_2() {
// let filter = tracing_subscriber::filter::EnvFilter::new("none,mqrstt=trace");
// let subscriber = FmtSubscriber::builder()
// .with_env_filter(filter)
// .with_max_level(Level::TRACE)
// .with_line_number(true)
// .finish();
// tracing::subscriber::set_global_default(subscriber)
// .expect("setting default subscriber failed");
// let opt = ConnectOptions::new_with_tls_config(
// "broker.emqx.io".to_string(),
// 1883,
// "test123123".to_string(),
// None,
// );
// // let opt = ConnectOptions::new("127.0.0.1".to_string(), 1883, "test123123".to_string(), None);
// let (mut mqtt_network, handler, client) = create_tokio_tcp(opt);
// let network = tokio::task::spawn(async move { dbg!(mqtt_network.run().await) });
// let event_handler = tokio::task::spawn(async move {
// let mut custom_handler = Nop{};
// dbg!(handler.handle(&mut custom_handler).await)
// });
// let sender = tokio::task::spawn(async move {
// client.subscribe("mqrstt").await.unwrap();
// client
// .publish(QoS::ExactlyOnce, false, "test".to_string(), "123456789")
// .await?;
// let lol = smol::future::pending::<Result<(), ClientError>>();
// lol.await
// });
// dbg!((network, event_handler, sender).join().await);
// }
// }
// #[cfg(all(feature = "smol", feature = "smol-rustls"))]
// #[cfg(test)]
// mod smol_rustls_e2e {
// use tracing::Level;
// use tracing_subscriber::FmtSubscriber;
// use crate::{
// connect_options::ConnectOptions, connections::transport::RustlsConfig, create_smol_rustls,
// tests::resources::EMQX_CERT,
// };
// #[test]
// fn test_pub_tcp_qos_2() {
// let filter = tracing_subscriber::filter::EnvFilter::new("none,mqrstt=trace");
// let subscriber = FmtSubscriber::builder()
// .with_env_filter(filter)
// .with_max_level(Level::TRACE)
// .with_line_number(true)
// .finish();
// tracing::subscriber::set_global_default(subscriber)
// .expect("setting default subscriber failed");
// let config = RustlsConfig::Simple {
// ca: EMQX_CERT.to_vec(),
// alpn: None,
// client_auth: None,
// };
// let opt = ConnectOptions::new("broker.emqx.io".to_string(), 8883, "test123123".to_string());
// let (mut mqtt_network, _handler, _client) = create_smol_rustls(opt, config);
// smol::block_on(mqtt_network.run()).unwrap()
// }
// }