frequenz_microgrid/client/microgrid_client_handle.rs
1// License: MIT
2// Copyright © 2025 Frequenz Energy-as-a-Service GmbH
3
4//! A clonable client handle for the microgrid API.
5//!
6//! Instructions received by this handle are sent to the microgrid client actor,
7//! which owns the connection to the microgrid API service.
8
9use chrono::TimeDelta;
10use tokio::sync::{broadcast, mpsc, oneshot};
11use tonic::transport::{Channel, Endpoint};
12
13use crate::{
14 Bounds, Error,
15 client::MicrogridApiClient,
16 client::proto::{
17 common::metrics::Bounds as PbBounds,
18 common::microgrid::electrical_components::{
19 ElectricalComponent, ElectricalComponentCategory, ElectricalComponentConnection,
20 ElectricalComponentTelemetry,
21 },
22 microgrid::microgrid_client::MicrogridClient,
23 },
24 metric::Metric,
25};
26
27use super::{instruction::Instruction, microgrid_client_actor::MicrogridClientActor};
28
29/// A handle to the microgrid client connection.
30///
31/// This handle can be cloned as many times as needed, and each clone will share
32/// the same underlying connection to the microgrid API.
33#[derive(Clone)]
34pub struct MicrogridClientHandle {
35 instructions_tx: mpsc::Sender<Instruction>,
36}
37
38impl MicrogridClientHandle {
39 /// Creates a new `MicrogridClientHandle` for the microgrid API at the
40 /// specified URL.
41 ///
42 /// The connection is established lazily on the first RPC, so this method
43 /// succeeds even when no server is reachable yet. Per-call errors will
44 /// surface from the individual RPC methods, and the actor's per-stream
45 /// retry loop will keep attempting to reconnect telemetry streams.
46 ///
47 /// Returns an error only if `url` is not a valid endpoint URL.
48 pub async fn try_new(url: impl Into<String>) -> Result<Self, Error> {
49 let url = url.into();
50 let channel = Endpoint::from_shared(url.clone())
51 .map_err(|e| {
52 Error::connection_failure(format!("Invalid microgrid API URL {url}: {e}"))
53 })?
54 .connect_lazy();
55 Ok(Self::new_from_client(MicrogridClient::<Channel>::new(
56 channel,
57 )))
58 }
59
60 pub fn new_from_client(client: impl MicrogridApiClient) -> Self {
61 let (instructions_tx, instructions_rx) = mpsc::channel(100);
62 tokio::spawn(MicrogridClientActor::new_from_client(client, instructions_rx).run());
63 Self { instructions_tx }
64 }
65
66 /// Returns a telemetry stream from an electrical component with a given ID.
67 ///
68 /// When a connection to the API service is lost, reconnecting is handled
69 /// automatically, and the receiver will resume receiving data from the
70 /// component once the connection is re-established.
71 pub async fn receive_electrical_component_telemetry_stream(
72 &self,
73 electrical_component_id: u64,
74 ) -> Result<broadcast::Receiver<ElectricalComponentTelemetry>, Error> {
75 let (response_tx, response_rx) = oneshot::channel();
76
77 self.instructions_tx
78 .send(Instruction::ReceiveElectricalComponentTelemetryStream {
79 electrical_component_id,
80 response_tx,
81 })
82 .await
83 .map_err(|_| Error::internal("failed to send instruction"))?;
84
85 response_rx
86 .await
87 .map_err(|e| Error::internal(format!("failed to receive response: {e}")))
88 }
89
90 /// Lists the electrical components in the local microgrid.
91 ///
92 /// If provided, the filters for component IDs and categories have an `AND`
93 /// relationship with one another, meaning that they are applied serially,
94 /// but the elements within a single filter list have an `OR` relationship with
95 /// each other.
96 ///
97 /// For example, if `ids` = [1, 2, 3], and `categories` = [
98 /// `ComponentCategory::COMPONENT_CATEGORY_INVERTER`,
99 /// `ComponentCategory::COMPONENT_CATEGORY_BATTERY`
100 /// ],
101 /// then the results will consist of elements that
102 /// have the IDs 1, OR 2, OR 3,
103 /// AND
104 /// are of the categories `ComponentCategory::COMPONENT_CATEGORY_INVERTER` OR
105 /// `ComponentCategory::COMPONENT_CATEGORY_BATTERY`.
106 ///
107 /// If a filter list is empty, then that filter is not applied.
108 pub async fn list_electrical_components(
109 &self,
110 electrical_component_ids: Vec<u64>,
111 electrical_component_categories: Vec<ElectricalComponentCategory>,
112 ) -> Result<Vec<ElectricalComponent>, Error> {
113 let (response_tx, response_rx) = oneshot::channel();
114
115 self.instructions_tx
116 .send(Instruction::ListElectricalComponents {
117 response_tx,
118 electrical_component_ids,
119 electrical_component_categories,
120 })
121 .await
122 .map_err(|_| Error::internal("failed to send instruction"))?;
123
124 response_rx
125 .await
126 .map_err(|e| Error::internal(format!("failed to receive response: {e}")))?
127 }
128
129 /// Lists the connections between the electrical components in a microgrid,
130 /// denoted by `(start, end)`.
131 ///
132 /// The direction of a connection is always away from the grid endpoint,
133 /// i.e. aligned with the direction of positive current according to the
134 /// [passive sign
135 /// convention](https://en.wikipedia.org/wiki/Passive_sign_convention).
136 ///
137 /// If provided, the `start` and `end` filters have an `AND` relationship
138 /// between each other, meaning that they are applied serially, but an `OR`
139 /// relationship with other elements in the same list. For example, if
140 /// `start` = `[1, 2, 3]`, and `end` = `[4, 5, 6]`, then the result should
141 /// have all the connections where
142 ///
143 /// * each `start` component ID is either `1`, `2`, OR `3`,
144 /// AND
145 /// * each `end` component ID is either `4`, `5`, OR `6`.
146 pub async fn list_electrical_component_connections(
147 &self,
148 source_electrical_component_ids: Vec<u64>,
149 destination_electrical_component_ids: Vec<u64>,
150 ) -> Result<Vec<ElectricalComponentConnection>, Error> {
151 let (response_tx, response_rx) = oneshot::channel();
152
153 self.instructions_tx
154 .send(Instruction::ListElectricalComponentConnections {
155 response_tx,
156 source_electrical_component_ids,
157 destination_electrical_component_ids,
158 })
159 .await
160 .map_err(|_| Error::internal("failed to send instruction"))?;
161
162 response_rx
163 .await
164 .map_err(|e| Error::internal(format!("failed to receive response: {e}")))?
165 }
166
167 /// Augments the overall bounds for a given metric of a given electrical
168 /// component with the provided bounds.
169 /// Returns the UTC time at which the provided bounds will expire and its
170 /// effects will no longer be visible in the overall bounds for the
171 /// given metric.
172 ///
173 /// The request parameters allows users to select a duration until
174 /// which the bounds will stay in effect. If no duration is provided, then the
175 /// bounds will be removed after a default duration of 5 seconds.
176 ///
177 /// Inclusion bounds give the range that the system will try to keep the
178 /// metric within. If the metric goes outside of these bounds, the system will
179 /// try to bring it back within the bounds.
180 /// If the bounds for a metric are [\`lower_1`, `upper_1`],
181 /// [`lower_2`, `upper_2`](<`lower_1`, `upper_1`],
182 /// [`lower_2`, `upper_2`>), then this metric's `value` needs to comply with
183 /// the constraints
184 /// `lower_1 <= value <= upper_1` OR `lower_2 <= value <= upper_2`.
185 ///
186 /// If multiple inclusion bounds have been provided for a metric, then the
187 /// overlapping bounds are merged into a single bound, and non-overlapping
188 /// bounds are kept separate.
189 /// E.g. if the bounds are [0, 10], [5, 15], [20, 30](<0, 10], [5, 15], [20, 30>), then the resulting
190 /// bounds will be [0, 15], [20, 30](<0, 15], [20, 30>).
191 ///
192 /// The following diagram illustrates how bounds are applied:
193 ///
194 /// ```text,
195 /// lower_1 upper_1
196 /// <----|========|--------|========|-------->
197 /// lower_2 upper_2
198 /// ```
199 ///
200 /// The bounds in this example are `[[lower_1, upper_1], [lower_2, upper_2]]`.
201 /// ---- values here are considered out of range.
202 /// ==== values here are considered within range.
203 ///
204 /// Note that for power metrics, regardless of the bounds, 0W is always
205 /// allowed.
206 pub async fn augment_electrical_component_bounds<M, I>(
207 &self,
208 electrical_component_id: u64,
209 #[allow(unused_variables)] target_metric: M,
210 bounds: Vec<I>,
211 request_lifetime: Option<TimeDelta>,
212 ) -> Result<Option<chrono::DateTime<chrono::Utc>>, Error>
213 where
214 M: Metric,
215 Bounds<M::QuantityType>: Into<PbBounds>,
216 I: Into<Bounds<M::QuantityType>>,
217 {
218 let (response_tx, response_rx) = oneshot::channel();
219
220 self.instructions_tx
221 .send(Instruction::AugmentElectricalComponentBounds {
222 response_tx,
223 electrical_component_id,
224 target_metric: M::METRIC,
225 bounds: bounds.into_iter().map(|x| x.into().into()).collect(),
226 request_lifetime,
227 })
228 .await
229 .map_err(|_| Error::internal("failed to send instruction"))?;
230
231 response_rx
232 .await
233 .map_err(|e| Error::internal(format!("failed to receive response: {e}")))?
234 }
235}
236
237#[cfg(test)]
238mod tests {
239
240 use tokio::time::Instant;
241
242 use crate::{
243 MicrogridClientHandle,
244 client::proto::common::{
245 metrics::{SimpleMetricValue, metric_value_variant},
246 microgrid::electrical_components::ElectricalComponentCategory,
247 },
248 client::test_utils::{MockComponent, MockMicrogridApiClient},
249 };
250
251 fn new_client_handle() -> MicrogridClientHandle {
252 let api_client = MockMicrogridApiClient::new(
253 // Grid connection point
254 MockComponent::grid(1).with_children(vec![
255 // Main meter
256 MockComponent::meter(2)
257 .with_power(vec![4.0, 5.0, 6.0, 7.0, 7.0, 7.0])
258 .with_children(vec![
259 // PV meter
260 MockComponent::meter(3).with_children(vec![
261 // PV inverter
262 MockComponent::pv_inverter(4),
263 ]),
264 // Battery meter
265 MockComponent::meter(5).with_children(vec![
266 // Battery inverter
267 MockComponent::battery_inverter(6).with_children(vec![
268 // Battery
269 MockComponent::battery(7),
270 ]),
271 ]),
272 ]),
273 ]),
274 );
275
276 MicrogridClientHandle::new_from_client(api_client)
277 }
278
279 #[tokio::test]
280 async fn test_list_electrical_components() {
281 let handle = new_client_handle();
282
283 let components = handle
284 .list_electrical_components(vec![], vec![])
285 .await
286 .unwrap();
287 let component_ids: Vec<u64> = components.iter().map(|c| c.id).collect();
288 assert_eq!(component_ids, vec![1, 2, 3, 4, 5, 6, 7]);
289 }
290
291 #[tokio::test]
292 async fn test_list_electrical_components_with_filters() {
293 let handle = new_client_handle();
294
295 let components = handle
296 .list_electrical_components(vec![1, 2], vec![])
297 .await
298 .unwrap();
299 let component_ids: Vec<u64> = components.iter().map(|c| c.id).collect();
300 assert_eq!(component_ids, vec![1, 2]);
301
302 let components = handle
303 .list_electrical_components(
304 vec![],
305 vec![
306 ElectricalComponentCategory::Meter,
307 ElectricalComponentCategory::Battery,
308 ],
309 )
310 .await
311 .unwrap();
312 let component_ids: Vec<u64> = components.iter().map(|c| c.id).collect();
313 assert_eq!(component_ids, vec![2, 3, 5, 7]);
314
315 let components = handle
316 .list_electrical_components(
317 vec![2, 3, 4],
318 vec![
319 ElectricalComponentCategory::Meter,
320 ElectricalComponentCategory::Battery,
321 ],
322 )
323 .await
324 .unwrap();
325 let component_ids: Vec<u64> = components.iter().map(|c| c.id).collect();
326 assert_eq!(component_ids, vec![2, 3]);
327 }
328
329 #[tokio::test]
330 async fn test_list_electrical_component_connections() {
331 let handle = new_client_handle();
332
333 let connections = handle
334 .list_electrical_component_connections(vec![], vec![])
335 .await
336 .unwrap();
337
338 let connection_tuples: Vec<(u64, u64)> = connections
339 .iter()
340 .map(|c| {
341 (
342 c.source_electrical_component_id,
343 c.destination_electrical_component_id,
344 )
345 })
346 .collect();
347
348 assert_eq!(
349 connection_tuples,
350 vec![(1, 2), (2, 3), (3, 4), (2, 5), (5, 6), (6, 7)]
351 );
352 }
353
354 #[tokio::test(start_paused = true)]
355 async fn test_receive_component_telemetry_stream() {
356 let handle = new_client_handle();
357
358 let start = Instant::now();
359 let mut telemetry_rx = handle
360 .receive_electrical_component_telemetry_stream(2)
361 .await
362 .unwrap();
363
364 let mut values = vec![];
365 let mut elapsed_millis = vec![];
366 for _ in 0..10 {
367 let telemetry = telemetry_rx.recv().await.unwrap();
368 values.push(
369 if let metric_value_variant::MetricValueVariant::SimpleMetric(SimpleMetricValue {
370 value,
371 }) = telemetry.metric_samples[0]
372 .value
373 .as_ref()
374 .unwrap()
375 .metric_value_variant
376 .as_ref()
377 .unwrap()
378 .clone()
379 {
380 value
381 } else {
382 panic!("Unexpected metric value variant for live data");
383 },
384 );
385 elapsed_millis.push(start.elapsed().as_millis());
386 }
387
388 // Check that received values are as expected
389 assert_eq!(
390 values,
391 vec![
392 4.0, 5.0, 6.0, 7.0, 7.0, 7.0,
393 // repeats because the client stream closes and the actor reconnects
394 4.0, 5.0, 6.0, 7.0
395 ]
396 );
397
398 // Check that reconnect delays are as expected
399 assert_eq!(
400 elapsed_millis,
401 vec![
402 0, 200, 400, 600, 800, 1000,
403 // reconnect delay of 3000 ms, before receiving more samples
404 4000, 4200, 4400, 4600,
405 ]
406 );
407 }
408}