photonic_interface_mqtt/
lib.rs1use std::sync::Arc;
2use std::time::Duration;
3
4use anyhow::Result;
5use bytes::Bytes;
6use futures::StreamExt;
7use palette::rgb::Rgb;
8use photonic::attr::Range;
9use photonic::input::{AnyInputValue, InputSink, Trigger};
10use rumqttc::{AsyncClient, Event, Incoming, LastWill, MqttOptions, QoS};
11use tokio_stream::StreamMap;
12
13use photonic::interface::{Interface, Introspection};
14
15struct Realm<'a>(&'a str);
16
17impl<'a> Realm<'a> {
18 pub fn from(value: &'a str) -> Self {
19 return Self(if value.ends_with('/') { &value[0..value.len() - 1] } else { value });
20 }
21
22 pub fn topic(&self, suffix: impl AsRef<str>) -> String {
23 return format!("{}/{}", self.0, suffix.as_ref());
24 }
25}
26
27pub struct MQTT {
28 pub mqtt_options: MqttOptions,
29
30 pub realm: String,
31}
32
33impl MQTT {
34 pub fn with_url(url: impl Into<String>) -> Result<Self> {
35 let mut mqtt_options = MqttOptions::parse_url(url)?;
36 mqtt_options.set_keep_alive(Duration::from_secs(5));
37 mqtt_options.set_clean_session(true);
38
39 return Ok(Self {
40 mqtt_options,
41 realm: "photonic".into(), });
43 }
44
45 pub fn with_realm(mut self, realm: impl Into<String>) -> Self {
46 self.realm = realm.into();
47 return self;
48 }
49}
50
51impl Interface for MQTT {
52 async fn listen(mut self, introspection: Arc<Introspection>) -> Result<()> {
53 let realm = Realm::from(&self.realm);
54
55 self.mqtt_options.set_last_will(LastWill {
56 topic: realm.topic("status"),
57 message: Bytes::from("offline"),
58 qos: QoS::AtLeastOnce,
59 retain: true,
60 });
61
62 self.mqtt_options.set_keep_alive(Duration::from_secs(5));
63
64 let (client, mut event_loop) = AsyncClient::new(self.mqtt_options.clone(), 10);
65
66 let mut inputs = introspection
67 .inputs
68 .iter()
69 .map(|(name, input)| (realm.topic(format!("input/{name}")), input.subscribe()))
70 .collect::<StreamMap<_, _>>();
71
72 loop {
73 tokio::select! {
74 Some((topic, value)) = inputs.next() => {
75 let value = match value {
76 AnyInputValue::Trigger => String::new(),
77 AnyInputValue::Boolean(value) => value.to_string(),
78 AnyInputValue::Integer(value) => value.to_string(),
79 AnyInputValue::Decimal(value) => value.to_string(),
80 AnyInputValue::Color(value) => format!("#{:06x}", value.into_format::<u8>()),
81 AnyInputValue::IntegerRange(value) => value.to_string(),
82 AnyInputValue::DecimalRange(value) => value.to_string(),
83 AnyInputValue::ColorRange(value) => value.map(|value| format!("#{:06x}", value.into_format::<u8>())).to_string(),
84 };
85 client.publish(topic, QoS::AtLeastOnce, false, value).await?;
86 }
87
88 event = event_loop.poll() => match event {
89 Ok(Event::Incoming(Incoming::ConnAck(_))) => {
90 client.subscribe(realm.topic("input/+/set"), QoS::AtLeastOnce).await?;
92
93 client.publish_bytes(realm.topic("status"), QoS::AtLeastOnce, true, Bytes::from("online")).await?;
95 }
96
97 Ok(Event::Incoming(Incoming::Publish(publish))) => {
98 let input = introspection.inputs.iter()
99 .find_map(|(name, input)| (realm.topic(format!("input/{name}/set")) == publish.topic).then_some(input));
100
101 let input = match input {
102 Some(input) => input,
103 None => {
104 eprintln!("Got notification for unknown topic: {}", publish.topic);
105 continue;
106 }
107 };
108
109 let payload = match String::from_utf8(publish.payload.to_vec()) {
110 Ok(payload) => payload,
111 Err(err) => {
112 eprintln!("⇄ Invalid value on '{}' = {:?}: {}", publish.topic, publish.payload, err);
113 continue;
114 }
115 };
116
117 let res: Result<()> = (async { match input.sink() {
118 InputSink::Trigger(sink) => sink.send(Trigger::next()).await,
119 InputSink::Boolean(sink) => sink.send(payload.parse()?).await,
120 InputSink::Integer(sink) => sink.send(payload.parse()?).await,
121 InputSink::Decimal(sink) => sink.send(payload.parse()?).await,
122 InputSink::Color(sink) => sink.send(payload.parse::<Rgb<_, u8>>()?.into_format()).await,
123 InputSink::IntegerRange(sink) => sink.send(payload.parse()?).await,
124 InputSink::DecimalRange(sink) => sink.send(payload.parse()?).await,
125 InputSink::ColorRange(sink) => sink.send(payload.parse::<Range<Rgb<_, u8>>>()?.map(Rgb::into_format)).await,
126 }}).await;
127
128 match res {
129 Ok(()) => {}
130 Err(err) => {
131 eprintln!("⇄ Invalid value on '{}' = {:?}: {}", publish.topic, payload, err);
132 continue;
133 }
134 }
135 }
136
137 Ok(_) => {}
138
139 Err(err) => {
140 eprintln!("MQTT error: {err}");
141 tokio::time::sleep(Duration::from_secs(5)).await;
142 }
143 }
144 }
145 }
146 }
147}