use mqtt_protocol_core::mqtt;
use mqtt_protocol_core::mqtt::prelude::*;
use std::env;
use std::io::{Read, Write};
use std::net::TcpStream;
fn main() -> Result<(), Box<dyn std::error::Error>> {
let args: Vec<String> = env::args().collect();
if args.len() != 6 {
let program = &args[0];
eprintln!("Usage: {program} <host> <port> <topic> <qos> <payload>");
eprintln!("Example: {program} localhost 1883 test/topic 0 \"Hello World\"");
std::process::exit(1);
}
let host = &args[1];
let port: u16 = args[2].parse().map_err(|e| format!("Invalid port: {e}"))?;
let topic = &args[3];
let qos: u8 = args[4].parse().map_err(|e| format!("Invalid QoS: {e}"))?;
let payload = &args[5];
let qos_level = mqtt::packet::Qos::try_from(qos)
.expect("Error: Invalid QoS level '{qos}'. Must be 0, 1, or 2");
let mut stream = TcpStream::connect(format!("{host}:{port}"))?;
println!("Connected to {host}:{port}");
let mut connection = mqtt::Connection::<mqtt::role::Client>::new(mqtt::Version::V5_0);
connection.set_auto_pub_response(true);
let connect_packet = mqtt::packet::v5_0::Connect::builder()
.client_id("mqtt_publish_example")
.unwrap()
.build()
.map_err(|e| format!("Failed to build CONNECT packet: {e:?}"))?;
let events = connection.checked_send(connect_packet);
handle_events(&mut stream, &mut connection, events)?;
let mut buffer = [0u8; 1024];
let n = stream.read(&mut buffer)?;
if n > 0 {
let mut cursor = mqtt::common::Cursor::new(&buffer[..n]);
let events = connection.recv(&mut cursor);
handle_events(&mut stream, &mut connection, events)?;
}
let mut publish_builder = mqtt::packet::v5_0::Publish::builder()
.topic_name(topic)
.unwrap()
.qos(qos_level)
.payload(payload.as_bytes());
if qos_level != mqtt::packet::Qos::AtMostOnce {
let packet_id = connection
.acquire_packet_id()
.map_err(|e| format!("Failed to acquire packet ID: {e:?}"))?;
publish_builder = publish_builder.packet_id(packet_id);
}
let publish_packet = publish_builder
.build()
.map_err(|e| format!("Failed to build PUBLISH packet: {e:?}"))?;
let events = connection.checked_send(publish_packet);
handle_events(&mut stream, &mut connection, events)?;
if qos_level != mqtt::packet::Qos::AtMostOnce {
let mut buffer = [0u8; 1024];
let n = stream.read(&mut buffer)?;
if n > 0 {
let mut cursor = mqtt::common::Cursor::new(&buffer[..n]);
let events = connection.recv(&mut cursor);
handle_events(&mut stream, &mut connection, events)?;
}
}
println!("Published message to topic '{topic}' with QoS {qos}: {payload}");
Ok(())
}
fn handle_events(
stream: &mut TcpStream,
_connection: &mut mqtt::Connection<mqtt::role::Client>,
events: Vec<mqtt::connection::Event>,
) -> Result<(), Box<dyn std::error::Error>> {
for event in events {
match event {
mqtt::connection::Event::RequestSendPacket { packet, .. } => {
let buffer = packet.to_continuous_buffer();
stream.write_all(&buffer)?;
let packet_type = packet.packet_type();
println!("Sent packet: {packet_type}");
}
mqtt::connection::Event::NotifyPacketReceived(packet) => match packet {
mqtt::packet::Packet::V5_0Connack(connack) => {
let reason_code = connack.reason_code();
println!("CONNACK received: {reason_code:?}");
}
mqtt::packet::Packet::V5_0Puback(puback) => {
let packet_id = puback.packet_id();
println!("PUBACK received for packet ID: {packet_id}");
}
mqtt::packet::Packet::V5_0Pubrec(pubrec) => {
let packet_id = pubrec.packet_id();
println!("PUBREC received for packet ID: {packet_id}");
}
mqtt::packet::Packet::V5_0Pubcomp(pubcomp) => {
let packet_id = pubcomp.packet_id();
println!("PUBCOMP received for packet ID: {packet_id}");
}
_ => {
let packet_type = packet.packet_type();
println!("Received packet: {packet_type}");
}
},
mqtt::connection::Event::NotifyPacketIdReleased(packet_id) => {
println!("Packet ID {packet_id} released");
}
mqtt::connection::Event::NotifyError(error) => {
eprintln!("MQTT Error: {error:?}");
}
mqtt::connection::Event::RequestClose => {
println!("Connection close requested");
return Ok(());
}
mqtt::connection::Event::RequestTimerReset { kind, duration_ms } => {
println!("Timer reset requested: {kind:?} for {duration_ms} ms");
}
mqtt::connection::Event::RequestTimerCancel(kind) => {
println!("Timer cancel requested: {kind:?}");
}
}
}
Ok(())
}