ntex-mqtt 0.2.0

MQTT Client/Server framework for v5 and v3.1.1 protocols
Documentation
use bytes::Bytes;
use bytestring::ByteString;
use futures::future::ok;
use ntex::server;
use ntex::service::Service;

use ntex_mqtt::v3::{client, Connect, ConnectAck, MqttServer, Publish, Session};

struct St;

async fn connect<Io>(packet: Connect<Io>) -> Result<ConnectAck<Io, St>, ()> {
    println!("CONNECT: {:?}", packet);
    Ok(packet.ack(St, false))
}

#[ntex::test]
async fn test_simple() -> std::io::Result<()> {
    std::env::set_var("RUST_LOG", "ntex_mqtt=trace,ntex_codec=info,ntex=trace");
    env_logger::init();

    let srv = server::test_server(|| MqttServer::new(connect).publish(|_t| ok(())).finish());

    struct Client;

    let client = client::Client::new(ByteString::from_static("user"))
        .state(|ack: client::ConnectAck<_>| async move {
            ack.sink()
                .publish(ByteString::from_static("#"), Bytes::new())
                .at_most_once();
            ack.sink().close();
            Ok(ack.state(Client))
        })
        .finish(ntex::fn_factory_with_config(|session: Session<Client>| {
            let session = session.clone();

            ok::<_, ()>(ntex::into_service(move |_t: Publish| {
                session.sink().close();
                async { Ok(()) }
            }))
        }));

    let conn = ntex::connect::Connector::default()
        .call(ntex::connect::Connect::with(String::new(), srv.addr()))
        .await
        .unwrap();

    client.call(conn).await.unwrap();

    Ok(())
}