photonic_interface_mqtt/
lib.rs

1use std::sync::Arc;
2use std::time::Duration;
3
4use anyhow::Result;
5use bytes::Bytes;
6use futures::StreamExt;
7use palette::rgb::Rgb;
8use photonic::attr::Range;
9use photonic::input::{AnyInputValue, InputSink, Trigger};
10use rumqttc::{AsyncClient, Event, Incoming, LastWill, MqttOptions, QoS};
11use tokio_stream::StreamMap;
12
13use photonic::interface::{Interface, Introspection};
14
15struct Realm<'a>(&'a str);
16
17impl<'a> Realm<'a> {
18    pub fn from(value: &'a str) -> Self {
19        return Self(if value.ends_with('/') { &value[0..value.len() - 1] } else { value });
20    }
21
22    pub fn topic(&self, suffix: impl AsRef<str>) -> String {
23        return format!("{}/{}", self.0, suffix.as_ref());
24    }
25}
26
27pub struct MQTT {
28    pub mqtt_options: MqttOptions,
29
30    pub realm: String,
31}
32
33impl MQTT {
34    pub fn with_url(url: impl Into<String>) -> Result<Self> {
35        let mut mqtt_options = MqttOptions::parse_url(url)?;
36        mqtt_options.set_keep_alive(Duration::from_secs(5));
37        mqtt_options.set_clean_session(true);
38
39        return Ok(Self {
40            mqtt_options,
41            realm: "photonic".into(), // TODO: Extract realm from URL
42        });
43    }
44
45    pub fn with_realm(mut self, realm: impl Into<String>) -> Self {
46        self.realm = realm.into();
47        return self;
48    }
49}
50
51impl Interface for MQTT {
52    async fn listen(mut self, introspection: Arc<Introspection>) -> Result<()> {
53        let realm = Realm::from(&self.realm);
54
55        self.mqtt_options.set_last_will(LastWill {
56            topic: realm.topic("status"),
57            message: Bytes::from("offline"),
58            qos: QoS::AtLeastOnce,
59            retain: true,
60        });
61
62        self.mqtt_options.set_keep_alive(Duration::from_secs(5));
63
64        let (client, mut event_loop) = AsyncClient::new(self.mqtt_options.clone(), 10);
65
66        let mut inputs = introspection
67            .inputs
68            .iter()
69            .map(|(name, input)| (realm.topic(format!("input/{name}")), input.subscribe()))
70            .collect::<StreamMap<_, _>>();
71
72        loop {
73            tokio::select! {
74                Some((topic, value)) = inputs.next() => {
75                    let value = match value {
76                        AnyInputValue::Trigger => String::new(),
77                        AnyInputValue::Boolean(value) => value.to_string(),
78                        AnyInputValue::Integer(value) => value.to_string(),
79                        AnyInputValue::Decimal(value) => value.to_string(),
80                        AnyInputValue::Color(value) => format!("#{:06x}", value.into_format::<u8>()),
81                        AnyInputValue::IntegerRange(value) => value.to_string(),
82                        AnyInputValue::DecimalRange(value) => value.to_string(),
83                        AnyInputValue::ColorRange(value) => value.map(|value| format!("#{:06x}", value.into_format::<u8>())).to_string(),
84                    };
85                    client.publish(topic, QoS::AtLeastOnce, false, value).await?;
86                }
87
88                event = event_loop.poll() => match event {
89                    Ok(Event::Incoming(Incoming::ConnAck(_))) => {
90                        // Subscribe to all input topics
91                        client.subscribe(realm.topic("input/+/set"), QoS::AtLeastOnce).await?;
92
93                        // Report online status
94                        client.publish_bytes(realm.topic("status"), QoS::AtLeastOnce, true, Bytes::from("online")).await?;
95                    }
96
97                    Ok(Event::Incoming(Incoming::Publish(publish))) => {
98                        let input = introspection.inputs.iter()
99                            .find_map(|(name, input)| (realm.topic(format!("input/{name}/set")) == publish.topic).then_some(input));
100
101                        let input = match input {
102                            Some(input) => input,
103                            None => {
104                                eprintln!("Got notification for unknown topic: {}", publish.topic);
105                                continue;
106                            }
107                        };
108
109                        let payload = match String::from_utf8(publish.payload.to_vec()) {
110                            Ok(payload) => payload,
111                            Err(err) => {
112                                eprintln!("⇄ Invalid value on '{}' = {:?}: {}", publish.topic, publish.payload, err);
113                                continue;
114                            }
115                        };
116
117                        let res: Result<()> = (async { match input.sink() {
118                            InputSink::Trigger(sink) => sink.send(Trigger::next()).await,
119                            InputSink::Boolean(sink) => sink.send(payload.parse()?).await,
120                            InputSink::Integer(sink) => sink.send(payload.parse()?).await,
121                            InputSink::Decimal(sink) => sink.send(payload.parse()?).await,
122                            InputSink::Color(sink) => sink.send(payload.parse::<Rgb<_, u8>>()?.into_format()).await,
123                            InputSink::IntegerRange(sink) => sink.send(payload.parse()?).await,
124                            InputSink::DecimalRange(sink) => sink.send(payload.parse()?).await,
125                            InputSink::ColorRange(sink) => sink.send(payload.parse::<Range<Rgb<_, u8>>>()?.map(Rgb::into_format)).await,
126                        }}).await;
127
128                        match res {
129                            Ok(()) => {}
130                            Err(err) => {
131                                eprintln!("⇄ Invalid value on '{}' = {:?}: {}", publish.topic, payload, err);
132                                continue;
133                            }
134                        }
135                    }
136
137                    Ok(_) => {}
138
139                    Err(err) => {
140                        eprintln!("MQTT error: {err}");
141                        tokio::time::sleep(Duration::from_secs(5)).await;
142                    }
143                }
144            }
145        }
146    }
147}