use ntex::service::{fn_factory_with_config, fn_service, ServiceFactory};
use ntex_mqtt::v5;
#[derive(Clone)]
struct MySession;
#[derive(Debug)]
struct ServerError;
impl From<()> for ServerError {
fn from(_: ()) -> Self {
ServerError
}
}
impl std::convert::TryFrom<ServerError> for v5::PublishAck {
type Error = ServerError;
fn try_from(err: ServerError) -> Result<Self, Self::Error> {
Err(err)
}
}
#[ntex::main]
async fn main() -> std::io::Result<()> {
std::env::set_var("RUST_LOG", "ntex=trace,ntex_mqtt=trace,routing_v5=trace");
env_logger::init();
ntex::server::Server::build()
.bind("mqtt", "127.0.0.1:1883", |_| {
v5::MqttServer::new(
fn_service(|handshake: v5::Handshake| async move {
log::info!("new mqtt v3 connection: {:?}", handshake);
Ok::<_, ServerError>(handshake.ack(MySession))
})
.map_init_err(|_| ServerError),
)
.publish(
v5::Router::new(
fn_service(|p: v5::Publish| async move {
log::info!("incoming publish: {:?} -> {:?}", p.id(), p.topic());
Ok::<_, ServerError>(p.ack())
})
.map_init_err(|_| ServerError),
)
.resource(
["topic1", "topic2", "topic3"],
fn_factory_with_config(|_: v5::Session<MySession>| async {
Ok::<_, ServerError>(fn_service(|p: v5::Publish| async move {
log::info!("incoming publish for {:?} -> {:?}", p.topic(), p.id());
Ok(p.ack())
}))
}),
)
.resource(["topic4/{id}/files"], |p: v5::Publish| async move {
let id = p.topic().get("id").unwrap();
log::info!(
"incoming publish for {:?} -> {:?} ({:?})",
p.topic(),
p.id(),
id
);
Ok(p.ack())
}),
)
.finish()
})?
.workers(1)
.run()
.await
}