Skip to main content

frequenz_microgrid/client/
microgrid_client_handle.rs

1// License: MIT
2// Copyright © 2025 Frequenz Energy-as-a-Service GmbH
3
4//! A clonable client handle for the microgrid API.
5//!
6//! Instructions received by this handle are sent to the microgrid client actor,
7//! which owns the connection to the microgrid API service.
8
9use chrono::TimeDelta;
10use tokio::sync::{broadcast, mpsc, oneshot};
11use tonic::transport::{Channel, Endpoint};
12
13use crate::{
14    Bounds, Error,
15    client::MicrogridApiClient,
16    client::proto::{
17        common::metrics::Bounds as PbBounds,
18        common::microgrid::electrical_components::{
19            ElectricalComponent, ElectricalComponentCategory, ElectricalComponentConnection,
20            ElectricalComponentTelemetry,
21        },
22        microgrid::microgrid_client::MicrogridClient,
23    },
24    metric::Metric,
25};
26
27use super::{instruction::Instruction, microgrid_client_actor::MicrogridClientActor};
28
29/// A handle to the microgrid client connection.
30///
31/// This handle can be cloned as many times as needed, and each clone will share
32/// the same underlying connection to the microgrid API.
33#[derive(Clone)]
34pub struct MicrogridClientHandle {
35    instructions_tx: mpsc::Sender<Instruction>,
36}
37
38impl MicrogridClientHandle {
39    /// Creates a new `MicrogridClientHandle` for the microgrid API at the
40    /// specified URL.
41    ///
42    /// The connection is established lazily on the first RPC, so this method
43    /// succeeds even when no server is reachable yet.  Per-call errors will
44    /// surface from the individual RPC methods, and the actor's per-stream
45    /// retry loop will keep attempting to reconnect telemetry streams.
46    ///
47    /// Returns an error only if `url` is not a valid endpoint URL.
48    pub async fn try_new(url: impl Into<String>) -> Result<Self, Error> {
49        let url = url.into();
50        let channel = Endpoint::from_shared(url.clone())
51            .map_err(|e| {
52                Error::connection_failure(format!("Invalid microgrid API URL {url}: {e}"))
53            })?
54            .connect_lazy();
55        Ok(Self::new_from_client(MicrogridClient::<Channel>::new(
56            channel,
57        )))
58    }
59
60    pub fn new_from_client(client: impl MicrogridApiClient) -> Self {
61        let (instructions_tx, instructions_rx) = mpsc::channel(100);
62        tokio::spawn(MicrogridClientActor::new_from_client(client, instructions_rx).run());
63        Self { instructions_tx }
64    }
65
66    /// Returns a telemetry stream from an electrical component with a given ID.
67    ///
68    /// When a connection to the API service is lost, reconnecting is handled
69    /// automatically, and the receiver will resume receiving data from the
70    /// component once the connection is re-established.
71    pub async fn receive_electrical_component_telemetry_stream(
72        &self,
73        electrical_component_id: u64,
74    ) -> Result<broadcast::Receiver<ElectricalComponentTelemetry>, Error> {
75        let (response_tx, response_rx) = oneshot::channel();
76
77        self.instructions_tx
78            .send(Instruction::ReceiveElectricalComponentTelemetryStream {
79                electrical_component_id,
80                response_tx,
81            })
82            .await
83            .map_err(|_| Error::internal("failed to send instruction"))?;
84
85        response_rx
86            .await
87            .map_err(|e| Error::internal(format!("failed to receive response: {e}")))
88    }
89
90    /// Lists the electrical components in the local microgrid.
91    ///
92    /// If provided, the filters for component IDs and categories have an `AND`
93    /// relationship with one another, meaning that they are applied serially,
94    /// but the elements within a single filter list have an `OR` relationship with
95    /// each other.
96    ///
97    /// For example, if `ids` = [1, 2, 3], and `categories` = [
98    ///    `ComponentCategory::COMPONENT_CATEGORY_INVERTER`,
99    ///    `ComponentCategory::COMPONENT_CATEGORY_BATTERY`
100    /// ],
101    /// then the results will consist of elements that
102    /// have the IDs 1, OR 2, OR 3,
103    /// AND
104    /// are of the categories `ComponentCategory::COMPONENT_CATEGORY_INVERTER` OR
105    /// `ComponentCategory::COMPONENT_CATEGORY_BATTERY`.
106    ///
107    /// If a filter list is empty, then that filter is not applied.
108    pub async fn list_electrical_components(
109        &self,
110        electrical_component_ids: Vec<u64>,
111        electrical_component_categories: Vec<ElectricalComponentCategory>,
112    ) -> Result<Vec<ElectricalComponent>, Error> {
113        let (response_tx, response_rx) = oneshot::channel();
114
115        self.instructions_tx
116            .send(Instruction::ListElectricalComponents {
117                response_tx,
118                electrical_component_ids,
119                electrical_component_categories,
120            })
121            .await
122            .map_err(|_| Error::internal("failed to send instruction"))?;
123
124        response_rx
125            .await
126            .map_err(|e| Error::internal(format!("failed to receive response: {e}")))?
127    }
128
129    /// Lists the connections between the electrical components in a microgrid,
130    /// denoted by `(start, end)`.
131    ///
132    /// The direction of a connection is always away from the grid endpoint,
133    /// i.e. aligned with the direction of positive current according to the
134    /// [passive sign
135    /// convention](https://en.wikipedia.org/wiki/Passive_sign_convention).
136    ///
137    /// If provided, the `start` and `end` filters have an `AND` relationship
138    /// between each other, meaning that they are applied serially, but an `OR`
139    /// relationship with other elements in the same list.  For example, if
140    /// `start` = `[1, 2, 3]`, and `end` = `[4, 5, 6]`, then the result should
141    /// have all the connections where
142    ///
143    /// * each `start` component ID is either `1`, `2`, OR `3`,
144    ///   AND
145    /// * each `end` component ID is either `4`, `5`, OR `6`.
146    pub async fn list_electrical_component_connections(
147        &self,
148        source_electrical_component_ids: Vec<u64>,
149        destination_electrical_component_ids: Vec<u64>,
150    ) -> Result<Vec<ElectricalComponentConnection>, Error> {
151        let (response_tx, response_rx) = oneshot::channel();
152
153        self.instructions_tx
154            .send(Instruction::ListElectricalComponentConnections {
155                response_tx,
156                source_electrical_component_ids,
157                destination_electrical_component_ids,
158            })
159            .await
160            .map_err(|_| Error::internal("failed to send instruction"))?;
161
162        response_rx
163            .await
164            .map_err(|e| Error::internal(format!("failed to receive response: {e}")))?
165    }
166
167    /// Augments the overall bounds for a given metric of a given electrical
168    /// component with the provided bounds.
169    /// Returns the UTC time at which the provided bounds will expire and its
170    /// effects will no longer be visible in the overall bounds for the
171    /// given metric.
172    ///
173    /// The request parameters allows users to select a duration until
174    /// which the bounds will stay in effect. If no duration is provided, then the
175    /// bounds will be removed after a default duration of 5 seconds.
176    ///
177    /// Inclusion bounds give the range that the system will try to keep the
178    /// metric within. If the metric goes outside of these bounds, the system will
179    /// try to bring it back within the bounds.
180    /// If the bounds for a metric are [\`lower_1`, `upper_1`],
181    /// [`lower_2`, `upper_2`](<`lower_1`, `upper_1`],
182    /// [`lower_2`, `upper_2`>), then this metric's `value` needs to comply with
183    /// the constraints
184    /// `lower_1 <= value <= upper_1` OR `lower_2 <= value <= upper_2`.
185    ///
186    /// If multiple inclusion bounds have been provided for a metric, then the
187    /// overlapping bounds are merged into a single bound, and non-overlapping
188    /// bounds are kept separate.
189    /// E.g. if the bounds are [0, 10], [5, 15], [20, 30](<0, 10], [5, 15], [20, 30>), then the resulting
190    /// bounds will be [0, 15], [20, 30](<0, 15], [20, 30>).
191    ///
192    /// The following diagram illustrates how bounds are applied:
193    ///
194    /// ```text,
195    ///  lower_1  upper_1
196    /// <----|========|--------|========|-------->
197    ///                    lower_2  upper_2
198    /// ```
199    ///
200    /// The bounds in this example are `[[lower_1, upper_1], [lower_2, upper_2]]`.
201    /// ---- values here are considered out of range.
202    /// ==== values here are considered within range.
203    ///
204    /// Note that for power metrics, regardless of the bounds, 0W is always
205    /// allowed.
206    pub async fn augment_electrical_component_bounds<M, I>(
207        &self,
208        electrical_component_id: u64,
209        #[allow(unused_variables)] target_metric: M,
210        bounds: Vec<I>,
211        request_lifetime: Option<TimeDelta>,
212    ) -> Result<Option<chrono::DateTime<chrono::Utc>>, Error>
213    where
214        M: Metric,
215        Bounds<M::QuantityType>: Into<PbBounds>,
216        I: Into<Bounds<M::QuantityType>>,
217    {
218        let (response_tx, response_rx) = oneshot::channel();
219
220        self.instructions_tx
221            .send(Instruction::AugmentElectricalComponentBounds {
222                response_tx,
223                electrical_component_id,
224                target_metric: M::METRIC,
225                bounds: bounds.into_iter().map(|x| x.into().into()).collect(),
226                request_lifetime,
227            })
228            .await
229            .map_err(|_| Error::internal("failed to send instruction"))?;
230
231        response_rx
232            .await
233            .map_err(|e| Error::internal(format!("failed to receive response: {e}")))?
234    }
235}
236
237#[cfg(test)]
238mod tests {
239
240    use tokio::time::Instant;
241
242    use crate::{
243        MicrogridClientHandle,
244        client::proto::common::{
245            metrics::{SimpleMetricValue, metric_value_variant},
246            microgrid::electrical_components::ElectricalComponentCategory,
247        },
248        client::test_utils::{MockComponent, MockMicrogridApiClient},
249    };
250
251    fn new_client_handle() -> MicrogridClientHandle {
252        let api_client = MockMicrogridApiClient::new(
253            // Grid connection point
254            MockComponent::grid(1).with_children(vec![
255                // Main meter
256                MockComponent::meter(2)
257                    .with_power(vec![4.0, 5.0, 6.0, 7.0, 7.0, 7.0])
258                    .with_children(vec![
259                        // PV meter
260                        MockComponent::meter(3).with_children(vec![
261                            // PV inverter
262                            MockComponent::pv_inverter(4),
263                        ]),
264                        // Battery meter
265                        MockComponent::meter(5).with_children(vec![
266                            // Battery inverter
267                            MockComponent::battery_inverter(6).with_children(vec![
268                                // Battery
269                                MockComponent::battery(7),
270                            ]),
271                        ]),
272                    ]),
273            ]),
274        );
275
276        MicrogridClientHandle::new_from_client(api_client)
277    }
278
279    #[tokio::test]
280    async fn test_list_electrical_components() {
281        let handle = new_client_handle();
282
283        let components = handle
284            .list_electrical_components(vec![], vec![])
285            .await
286            .unwrap();
287        let component_ids: Vec<u64> = components.iter().map(|c| c.id).collect();
288        assert_eq!(component_ids, vec![1, 2, 3, 4, 5, 6, 7]);
289    }
290
291    #[tokio::test]
292    async fn test_list_electrical_components_with_filters() {
293        let handle = new_client_handle();
294
295        let components = handle
296            .list_electrical_components(vec![1, 2], vec![])
297            .await
298            .unwrap();
299        let component_ids: Vec<u64> = components.iter().map(|c| c.id).collect();
300        assert_eq!(component_ids, vec![1, 2]);
301
302        let components = handle
303            .list_electrical_components(
304                vec![],
305                vec![
306                    ElectricalComponentCategory::Meter,
307                    ElectricalComponentCategory::Battery,
308                ],
309            )
310            .await
311            .unwrap();
312        let component_ids: Vec<u64> = components.iter().map(|c| c.id).collect();
313        assert_eq!(component_ids, vec![2, 3, 5, 7]);
314
315        let components = handle
316            .list_electrical_components(
317                vec![2, 3, 4],
318                vec![
319                    ElectricalComponentCategory::Meter,
320                    ElectricalComponentCategory::Battery,
321                ],
322            )
323            .await
324            .unwrap();
325        let component_ids: Vec<u64> = components.iter().map(|c| c.id).collect();
326        assert_eq!(component_ids, vec![2, 3]);
327    }
328
329    #[tokio::test]
330    async fn test_list_electrical_component_connections() {
331        let handle = new_client_handle();
332
333        let connections = handle
334            .list_electrical_component_connections(vec![], vec![])
335            .await
336            .unwrap();
337
338        let connection_tuples: Vec<(u64, u64)> = connections
339            .iter()
340            .map(|c| {
341                (
342                    c.source_electrical_component_id,
343                    c.destination_electrical_component_id,
344                )
345            })
346            .collect();
347
348        assert_eq!(
349            connection_tuples,
350            vec![(1, 2), (2, 3), (3, 4), (2, 5), (5, 6), (6, 7)]
351        );
352    }
353
354    #[tokio::test(start_paused = true)]
355    async fn test_receive_component_telemetry_stream() {
356        let handle = new_client_handle();
357
358        let start = Instant::now();
359        let mut telemetry_rx = handle
360            .receive_electrical_component_telemetry_stream(2)
361            .await
362            .unwrap();
363
364        let mut values = vec![];
365        let mut elapsed_millis = vec![];
366        for _ in 0..10 {
367            let telemetry = telemetry_rx.recv().await.unwrap();
368            values.push(
369                if let metric_value_variant::MetricValueVariant::SimpleMetric(SimpleMetricValue {
370                    value,
371                }) = telemetry.metric_samples[0]
372                    .value
373                    .as_ref()
374                    .unwrap()
375                    .metric_value_variant
376                    .as_ref()
377                    .unwrap()
378                    .clone()
379                {
380                    value
381                } else {
382                    panic!("Unexpected metric value variant for live data");
383                },
384            );
385            elapsed_millis.push(start.elapsed().as_millis());
386        }
387
388        // Check that received values are as expected
389        assert_eq!(
390            values,
391            vec![
392                4.0, 5.0, 6.0, 7.0, 7.0, 7.0,
393                // repeats because the client stream closes and the actor reconnects
394                4.0, 5.0, 6.0, 7.0
395            ]
396        );
397
398        // Check that reconnect delays are as expected
399        assert_eq!(
400            elapsed_millis,
401            vec![
402                0, 200, 400, 600, 800, 1000,
403                // reconnect delay of 3000 ms, before receiving more samples
404                4000, 4200, 4400, 4600,
405            ]
406        );
407    }
408}