1use rumqttc::{AsyncClient, Event, MqttOptions, Packet, QoS};
2use serde_json::Value;
3use std::collections::HashMap;
4use std::sync::Arc;
5use tokio::sync::{Notify, RwLock};
6use tokio::task;
7use tokio::time::{self, Duration};
8use tracing::{debug, error, info};
9
10use crate::ac::AcSpec;
11use crate::battery::{BatteryDC, BatterySummary};
12use crate::ess::Ess;
13use crate::pvinverter::{PvInverter, PvInverterSummary};
14use crate::traits::HandleFrame;
15
16pub mod ac;
17pub mod battery;
18pub mod ess;
19pub mod pvinverter;
20pub mod traits;
21
22#[derive(Debug, Clone, Default)]
23pub struct VictronData {
24 pub ac_input: AcSpec,
25 pub ac_output: AcSpec,
26 pub ess: Ess,
27 pub batteries_dc: HashMap<u16, BatteryDC>,
28 pub pv_inverters: HashMap<u16, PvInverter>,
29}
30
31pub struct VictronGx {
32 pub serial_number: String,
33 data: Arc<RwLock<VictronData>>,
34 client: AsyncClient,
35
36 shutdown: Arc<Notify>,
37
38 _eventloop_handle: task::JoinHandle<()>,
39 _keepalive_handle: task::JoinHandle<()>,
40}
41
42impl VictronGx {
43 pub async fn new(
44 client_id: &str,
45 host: &str,
46 port: u16,
47 serial_number: &str,
48 ) -> anyhow::Result<Self> {
49 let mut mqttoptions = MqttOptions::new(client_id, host, port);
50 mqttoptions.set_keep_alive(Duration::from_secs(30));
51
52 let (client, mut eventloop) = AsyncClient::new(mqttoptions, 10);
53 let data = Arc::new(RwLock::new(VictronData::default()));
54 let shutdown = Arc::new(Notify::new());
55
56 let keepalive_topic = format!("R/{}/keepalive", serial_number);
57 let client_clone = client.clone();
58 let shutdown_clone = shutdown.clone();
59
60 let _keepalive_handle = task::spawn(Self::keepalive_task(
61 client_clone,
62 keepalive_topic,
63 shutdown_clone,
64 ));
65
66 let topics = vec![format!("N/{}/#", serial_number)];
68
69 for topic in &topics {
70 client.subscribe(topic, QoS::AtMostOnce).await?;
71 }
72
73 let data_clone = data.clone();
74 let serial_number_cpy = serial_number.to_string();
75
76 let shutdown_clone = shutdown.clone();
77 let _eventloop_handle = task::spawn(async move {
78 loop {
79 tokio::select! {
80 _ = shutdown_clone.notified() => {
81 info!("Shutting down event loop");
82 break;
83 }
84 event = eventloop.poll() => match event {
85 Ok(Event::Incoming(Packet::Publish(publish))) => {
86 Self::handle_publish(&data_clone, &serial_number_cpy, &publish.topic, &publish.payload).await;
87 }
88 Ok(_) => {}
89 Err(e) => {
90 error!("MQTT event loop error: {:?}", e);
91 time::sleep(Duration::from_secs(5)).await;
92 }
93 }
94 }
95 }
96 });
97
98 Ok(Self {
108 serial_number: serial_number.to_string(),
109 data,
110 client,
111 shutdown,
112 _eventloop_handle,
113 _keepalive_handle,
114 })
115 }
116
117 async fn handle_publish(
118 data: &Arc<RwLock<VictronData>>,
119 serial_number: &str,
120 topic: &str,
121 payload: &[u8],
122 ) {
123 let payload_str = match std::str::from_utf8(payload) {
124 Ok(s) => s,
125 Err(e) => {
126 error!("Invalid UTF-8 payload: {:?}", e);
127 return;
128 }
129 };
130
131 let value = match serde_json::from_str::<Value>(payload_str) {
132 Ok(json) => json.get("value").and_then(|v| v.as_f64()),
133 Err(e) => {
134 error!("JSON parse error: {:?}", e);
135 None
136 }
137 };
138
139 let suffix = match topic.strip_prefix(&format!("N/{}/", serial_number)) {
140 Some(s) => s,
141 None => return,
142 };
143
144 let parts: Vec<&str> = suffix.split('/').collect();
145 let mut data = data.write().await;
146
147 match parts.as_slice() {
148 ["vebus", "275", "Ac", "ActiveIn", rest @ ..] => {
149 data.ac_input.handle_frame(rest, value)
150 }
151 ["vebus", "275", "Ac", "Out", rest @ ..] => data.ac_output.handle_frame(rest, value),
152 ["battery", id, rest @ ..] => {
153 let id: u16 = id.parse().unwrap_or(0);
154 let battery = data.batteries_dc.entry(id).or_default();
155 battery.handle_frame(rest, value);
156 }
157 ["pvinverter", id, rest @ ..] => {
158 let id: u16 = id.parse().unwrap_or(0);
159 let inverter = data.pv_inverters.entry(id).or_default();
160 inverter.handle_frame(rest, value);
161 }
162 ["vebus", "275", "Hub4", rest @ ..] => {
163 data.ess.handle_frame(rest, value);
164 }
165 _ => {
166 debug!(
167 "Unhandled topic: {}, parts: {:?}, value: {:?}",
168 topic, parts, value
169 );
170 }
171 }
172 }
173 async fn keepalive_task(client: AsyncClient, topic: String, shutdown: Arc<Notify>) {
176 let mut interval = time::interval(Duration::from_secs(10));
177 loop {
178 tokio::select! {
179 _ = shutdown.notified() => {
180 info!("Shutting down keepalive task");
181 break;
182 }
183 _ = interval.tick() => {
184 if let Err(e) = client.publish(&topic, QoS::AtLeastOnce, false, "").await {
185 error!("Keepalive publish failed: {:?}", e);
186 }
187 }
188 }
189 }
190 }
191
192 pub async fn shutdown(&self) {
193 self.shutdown.notify_waiters();
194 }
195
196 pub async fn get_ac_input(&self) -> AcSpec {
197 self.data.read().await.ac_input.clone()
198 }
199 pub async fn get_ac_output(&self) -> AcSpec {
200 self.data.read().await.ac_output.clone()
201 }
202
203 pub async fn get_batteries_dc(&self) -> Vec<(u16, BatteryDC)> {
204 self.data
205 .read()
206 .await
207 .batteries_dc
208 .iter()
209 .map(|(id, batt)| (*id, batt.clone()))
210 .collect()
211 }
212
213 pub async fn get_battery_summary(&self) -> BatterySummary {
215 let data = self.data.read().await;
216 BatterySummary::from_batteries(data.batteries_dc.values())
217 }
218
219 pub async fn get_pv_inverters(&self) -> Vec<(u16, PvInverter)> {
220 self.data
221 .read()
222 .await
223 .pv_inverters
224 .iter()
225 .map(|(id, inv)| (*id, inv.clone()))
226 .collect()
227 }
228
229 pub async fn get_pv_inverter_summary(&self) -> PvInverterSummary {
231 let data = self.data.read().await;
232 PvInverterSummary::from_pv_inverters(data.pv_inverters.values())
233 }
234
235 pub async fn set_ac_power_setpoint(&self) {
237 let topic = "W/028102353a50/settings/0/Settings/CGwacs/AcPowerSetPoint";
240 let content = r#"{"value": 1234}"#;
241 if let Err(e) = self
242 .client
243 .publish(topic, QoS::AtLeastOnce, false, content)
244 .await
245 {
246 error!("Failed to set AC power setpoint: {:?}", e);
247 }
248 }
249
250 pub async fn get_ess(&self) -> Ess {
251 self.data.read().await.ess.clone()
252 }
253
254 pub async fn ess_set_setpoint(&self, value: f64) -> anyhow::Result<()> {
255 let topic = format!(
256 "W/{}/settings/0/Settings/CGwacs/AcPowerSetPoint",
257 self.serial_number
258 );
259 let content = serde_json::json!({ "value": value }).to_string();
260
261 self.client
262 .publish(topic, QoS::AtLeastOnce, false, content)
263 .await?;
264
265 Ok(())
266 }
267
268 pub async fn send_mqtt(&self) -> anyhow::Result<()> {
269 let topic = "W/028102353a50/vebus/275/Mode";
270 let topic = "W/028102353a50/vebus/275/Hub4/L1/AcPowerSetpoint";
271 let topic = "W/028102353a50/settings/0/Settings/CGwacs/AcPowerSetPoint";
272 let content = r#"{"value": 0}"#;
273
274 self.client
275 .publish(topic, QoS::AtLeastOnce, false, content)
276 .await?;
277
278 Ok(())
279 }
280}