//! Examples show how to handle different mqtt topics
use ntex_mqtt::v3;
#[derive(Clone)]
struct Session;
#[derive(Debug)]
struct ServerError;
impl From<()> for ServerError {
fn from(_: ()) -> Self {
ServerError
}
}
#[ntex::main]
async fn main() -> std::io::Result<()> {
std::env::set_var("RUST_LOG", "ntex=trace,ntex_mqtt=trace,routing=trace");
env_logger::init();
ntex::server::Server::build()
.bind("mqtt", "127.0.0.1:1883", |_| {
v3::MqttServer::new(|handshake: v3::Handshake| async move {
log::info!("new mqtt v3 connection: {:?}", handshake);
Ok::<_, ServerError>(handshake.ack(Session, false))
})
.publish(
// create router with default publish handler, default service handles
// all topics that are not recognized by router
v3::Router::new(|p: v3::Publish| async move {
log::info!("incoming publish: {:?} -> {:?}", p.id(), p.topic());
Ok(())
})
// this handler can handle topic1, topic2 and topic3 topics
.resource(["topic1", "topic2", "topic3"], |p: v3::Publish| async move {
log::info!("incoming publish for {:?} -> {:?}", p.topic(), p.id());
Ok(())
})
// this handler can handle topic with dynamic section
// ie `topic4/id1/files`, `topic4/id100/files`, etc
.resource(
["topic4/{id}/files"],
|p: v3::Publish| async move {
// get dynamic section from topic
let id = p.topic().get("id").unwrap();
log::info!(
"incoming publish for {:?} -> {:?} ({:?})",
p.topic(),
p.id(),
id
);
Ok(())
},
),
)
.finish()
})?
.workers(1)
.run()
.await
}