bevy_mqtt 0.3.3

A simple MQTT client for Bevy ECS
Documentation

bevy_mqtt

crates.io MIT/Apache 2.0 crates.io CI Documentation

A mqtt client Plugin for Bevy game engine.

Example

first run as mqtt broker like mosquitto

then run the example

use std::time::SystemTime;

use bevy::{prelude::*, time::common_conditions::on_timer};
use bevy_log::LogPlugin;
use bevy_mqtt::{
    rumqttc::{MqttOptions, QoS},
    MqttClient, MqttClientError, MqttClientState, MqttConnectError, MqttEvent, MqttPlugin,
    MqttPublishOutgoing, MqttSetting, SubscribeTopic, TopicMessage,
};
use bevy_state::prelude::OnEnter;
use bincode::ErrorKind;
use serde::{Deserialize, Serialize};

#[derive(Serialize, Deserialize, Debug)]
struct Message {
    i: usize,
    time: SystemTime,
}

impl From<&Message> for Vec<u8> {
    fn from(value: &Message) -> Self {
        bincode::serialize(value).unwrap()
    }
}

impl From<Message> for Vec<u8> {
    fn from(value: Message) -> Self {
        bincode::serialize(&value).unwrap()
    }
}

impl TryFrom<&[u8]> for Message {
    type Error = Box<ErrorKind>;

    fn try_from(value: &[u8]) -> Result<Self, Self::Error> {
        bincode::deserialize(value)
    }
}

fn main() {
    App::new()
        .insert_resource(MqttSetting {
            mqtt_options: MqttOptions::new("mqtt-serde", "127.0.0.1", 1883),
            cap: 10,
        })
        .add_plugins((MinimalPlugins, MqttPlugin, LogPlugin::default()))
        .add_systems(Update, (handle_message, handle_error))
        .add_systems(
            OnEnter(MqttClientState::Connected),
            (sub_topic_direct, sub_topic_by_component),
        )
        .add_systems(
            Update,
            publish_message.run_if(on_timer(std::time::Duration::from_secs(1))),
        )
        .run();
}

fn handle_message(mut mqtt_event: EventReader<MqttEvent>) {
    for event in mqtt_event.read() {
        match &event.0 {
            rumqttc::Event::Incoming(income) => match income {
                rumqttc::Incoming::Publish(publish) => {
                    let message: Message = bincode::deserialize(&publish.payload).unwrap();
                    println!("Received Publish: {:?}", message);
                }
                _ => {
                    println!("Incoming: {:?}", income);
                }
            },
            rumqttc::Event::Outgoing(_) => {}
        }
    }
}

fn handle_error(
    mut connect_errors: EventReader<MqttConnectError>,
    mut client_errors: EventReader<MqttClientError>,
) {
    for error in connect_errors.read() {
        println!("connect Error: {:?}", error);
    }

    for error in client_errors.read() {
        println!("client Error: {:?}", error);
    }
}

/// there are two ways to subscribe to a topic
/// 1. Directly subscribe to a topic
/// 2. Subscribe to a topic by component
fn sub_topic_direct(client: Res<MqttClient>) {
    client
        .try_subscribe("hello/mqtt", QoS::AtMostOnce)
        .expect("subscribe failed");
}

fn sub_topic_by_component(mut commands: Commands) {
    commands
        .spawn(SubscribeTopic::new("+/mqtt", QoS::AtMostOnce))
        .observe(|topic_message: Trigger<TopicMessage>| {
            println!(
                "topic: {} received : {:?}",
                topic_message.event().topic,
                topic_message.event().payload
            );
        });
}

fn publish_message(mut pub_events: EventWriter<MqttPublishOutgoing>) {
    let mut list = vec![];
    for i in 0..3 {
        let message = Message {
            i,
            time: SystemTime::now(),
        };

        list.push(MqttPublishOutgoing {
            topic: "hello/mqtt".to_string(),
            qos: QoS::AtLeastOnce,
            retain: false,
            payload: message.into(),
        });
        list.push(MqttPublishOutgoing {
            topic: "bevy/mqtt".to_string(),
            qos: QoS::AtLeastOnce,
            retain: false,
            payload: Message {
                i: 999,
                time: SystemTime::now(),
            }
                .into(),
        });
    }

    pub_events.send_batch(list);
}

Supported Versions

bevy bevy_mqtt
0.14 0.2, 0.3
0.13 0.1

License

Dual-licensed under either

  • MIT
  • Apache 2.0