victron_gx/
lib.rs

1use rumqttc::{AsyncClient, Event, MqttOptions, Packet, QoS};
2use serde_json::Value;
3use std::collections::HashMap;
4use std::sync::Arc;
5use tokio::sync::{Notify, RwLock};
6use tokio::task;
7use tokio::time::{self, Duration};
8use tracing::{debug, error, info};
9
10use crate::ac::AcSpec;
11use crate::battery::{BatteryDC, BatterySummary};
12use crate::ess::Ess;
13use crate::pvinverter::{PvInverter, PvInverterSummary};
14use crate::traits::HandleFrame;
15
16pub mod ac;
17pub mod battery;
18pub mod ess;
19pub mod pvinverter;
20pub mod traits;
21
22#[derive(Debug, Clone, Default)]
23pub struct VictronData {
24    pub ac_input: AcSpec,
25    pub ac_output: AcSpec,
26    pub ess: Ess,
27    pub batteries_dc: HashMap<u16, BatteryDC>,
28    pub pv_inverters: HashMap<u16, PvInverter>,
29}
30
31pub struct VictronGx {
32    pub serial_number: String,
33    data: Arc<RwLock<VictronData>>,
34    client: AsyncClient,
35
36    shutdown: Arc<Notify>,
37
38    _eventloop_handle: task::JoinHandle<()>,
39    _keepalive_handle: task::JoinHandle<()>,
40}
41
42impl VictronGx {
43    pub async fn new(
44        client_id: &str,
45        host: &str,
46        port: u16,
47        serial_number: &str,
48    ) -> anyhow::Result<Self> {
49        let mut mqttoptions = MqttOptions::new(client_id, host, port);
50        mqttoptions.set_keep_alive(Duration::from_secs(30));
51
52        let (client, mut eventloop) = AsyncClient::new(mqttoptions, 10);
53        let data = Arc::new(RwLock::new(VictronData::default()));
54        let shutdown = Arc::new(Notify::new());
55
56        let keepalive_topic = format!("R/{}/keepalive", serial_number);
57        let client_clone = client.clone();
58        let shutdown_clone = shutdown.clone();
59
60        let _keepalive_handle = task::spawn(Self::keepalive_task(
61            client_clone,
62            keepalive_topic,
63            shutdown_clone,
64        ));
65
66        // Subscribe to topics with the serial number
67        let topics = vec![format!("N/{}/#", serial_number)];
68
69        for topic in &topics {
70            client.subscribe(topic, QoS::AtMostOnce).await?;
71        }
72
73        let data_clone = data.clone();
74        let serial_number_cpy = serial_number.to_string();
75
76        let shutdown_clone = shutdown.clone();
77        let _eventloop_handle = task::spawn(async move {
78            loop {
79                tokio::select! {
80                    _ = shutdown_clone.notified() => {
81                        info!("Shutting down event loop");
82                        break;
83                    }
84                    event = eventloop.poll() => match event {
85                        Ok(Event::Incoming(Packet::Publish(publish))) => {
86                            Self::handle_publish(&data_clone, &serial_number_cpy, &publish.topic, &publish.payload).await;
87                        }
88                        Ok(_) => {}
89                        Err(e) => {
90                            error!("MQTT event loop error: {:?}", e);
91                            time::sleep(Duration::from_secs(5)).await;
92                        }
93                    }
94                }
95            }
96        });
97
98        // Schedules
99        /*
100        N/028102353a50/settings/0/Settings/CGwacs/BatteryLife/Schedule/Charge/0/AllowDischarge -> b'{"value": 0}'
101        N/028102353a50/settings/0/Settings/CGwacs/BatteryLife/Schedule/Charge/0/Day -> b'{"value": 7}'
102        N/028102353a50/settings/0/Settings/CGwacs/BatteryLife/Schedule/Charge/0/Duration -> b'{"value": 13800}'
103        N/028102353a50/settings/0/Settings/CGwacs/BatteryLife/Schedule/Charge/0/Soc -> b'{"value": 55}'
104        N/028102353a50/settings/0/Settings/CGwacs/BatteryLife/Schedule/Charge/0/Start -> b'{"value": 7200}'
105        */
106
107        Ok(Self {
108            serial_number: serial_number.to_string(),
109            data,
110            client,
111            shutdown,
112            _eventloop_handle,
113            _keepalive_handle,
114        })
115    }
116
117    async fn handle_publish(
118        data: &Arc<RwLock<VictronData>>,
119        serial_number: &str,
120        topic: &str,
121        payload: &[u8],
122    ) {
123        let payload_str = match std::str::from_utf8(payload) {
124            Ok(s) => s,
125            Err(e) => {
126                error!("Invalid UTF-8 payload: {:?}", e);
127                return;
128            }
129        };
130
131        let value = match serde_json::from_str::<Value>(payload_str) {
132            Ok(json) => json.get("value").and_then(|v| v.as_f64()),
133            Err(e) => {
134                error!("JSON parse error: {:?}", e);
135                None
136            }
137        };
138
139        let suffix = match topic.strip_prefix(&format!("N/{}/", serial_number)) {
140            Some(s) => s,
141            None => return,
142        };
143
144        let parts: Vec<&str> = suffix.split('/').collect();
145        let mut data = data.write().await;
146
147        match parts.as_slice() {
148            ["vebus", "275", "Ac", "ActiveIn", rest @ ..] => {
149                data.ac_input.handle_frame(rest, value)
150            }
151            ["vebus", "275", "Ac", "Out", rest @ ..] => data.ac_output.handle_frame(rest, value),
152            ["battery", id, rest @ ..] => {
153                let id: u16 = id.parse().unwrap_or(0);
154                let battery = data.batteries_dc.entry(id).or_default();
155                battery.handle_frame(rest, value);
156            }
157            ["pvinverter", id, rest @ ..] => {
158                let id: u16 = id.parse().unwrap_or(0);
159                let inverter = data.pv_inverters.entry(id).or_default();
160                inverter.handle_frame(rest, value);
161            }
162            ["vebus", "275", "Hub4", rest @ ..] => {
163                data.ess.handle_frame(rest, value);
164            }
165            _ => {
166                debug!(
167                    "Unhandled topic: {}, parts: {:?}, value: {:?}",
168                    topic, parts, value
169                );
170            }
171        }
172    }
173    /// The MQTT server "shuts-down" (basically stops publishing anything) if
174    /// no messages are received by it for a minute or so
175    async fn keepalive_task(client: AsyncClient, topic: String, shutdown: Arc<Notify>) {
176        let mut interval = time::interval(Duration::from_secs(10));
177        loop {
178            tokio::select! {
179                _ = shutdown.notified() => {
180                    info!("Shutting down keepalive task");
181                    break;
182                }
183                _ = interval.tick() => {
184                    if let Err(e) = client.publish(&topic, QoS::AtLeastOnce, false, "").await {
185                        error!("Keepalive publish failed: {:?}", e);
186                    }
187                }
188            }
189        }
190    }
191
192    pub async fn shutdown(&self) {
193        self.shutdown.notify_waiters();
194    }
195
196    pub async fn get_ac_input(&self) -> AcSpec {
197        self.data.read().await.ac_input.clone()
198    }
199    pub async fn get_ac_output(&self) -> AcSpec {
200        self.data.read().await.ac_output.clone()
201    }
202
203    pub async fn get_batteries_dc(&self) -> Vec<(u16, BatteryDC)> {
204        self.data
205            .read()
206            .await
207            .batteries_dc
208            .iter()
209            .map(|(id, batt)| (*id, batt.clone()))
210            .collect()
211    }
212
213    /// Returns a summary of all batteries (average voltage, average temperature, total power)
214    pub async fn get_battery_summary(&self) -> BatterySummary {
215        let data = self.data.read().await;
216        BatterySummary::from_batteries(data.batteries_dc.values())
217    }
218
219    pub async fn get_pv_inverters(&self) -> Vec<(u16, PvInverter)> {
220        self.data
221            .read()
222            .await
223            .pv_inverters
224            .iter()
225            .map(|(id, inv)| (*id, inv.clone()))
226            .collect()
227    }
228
229    /// Returns a summary of all pv inverters (total power)
230    pub async fn get_pv_inverter_summary(&self) -> PvInverterSummary {
231        let data = self.data.read().await;
232        PvInverterSummary::from_pv_inverters(data.pv_inverters.values())
233    }
234
235    // TODO FIXME
236    pub async fn set_ac_power_setpoint(&self) {
237        // XXX this just sets the setpoint. But the power going into the batteries
238        // depends on the consumption of the house
239        let topic = "W/028102353a50/settings/0/Settings/CGwacs/AcPowerSetPoint";
240        let content = r#"{"value": 1234}"#;
241        if let Err(e) = self
242            .client
243            .publish(topic, QoS::AtLeastOnce, false, content)
244            .await
245        {
246            error!("Failed to set AC power setpoint: {:?}", e);
247        }
248    }
249
250    pub async fn get_ess(&self) -> Ess {
251        self.data.read().await.ess.clone()
252    }
253
254    pub async fn ess_set_setpoint(&self, value: f64) -> anyhow::Result<()> {
255        let topic = format!(
256            "W/{}/settings/0/Settings/CGwacs/AcPowerSetPoint",
257            self.serial_number
258        );
259        let content = serde_json::json!({ "value": value }).to_string();
260
261        self.client
262            .publish(topic, QoS::AtLeastOnce, false, content)
263            .await?;
264
265        Ok(())
266    }
267
268    pub async fn send_mqtt(&self) -> anyhow::Result<()> {
269        let topic = "W/028102353a50/vebus/275/Mode";
270        let topic = "W/028102353a50/vebus/275/Hub4/L1/AcPowerSetpoint";
271        let topic = "W/028102353a50/settings/0/Settings/CGwacs/AcPowerSetPoint";
272        let content = r#"{"value": 0}"#;
273
274        self.client
275            .publish(topic, QoS::AtLeastOnce, false, content)
276            .await?;
277
278        Ok(())
279    }
280}