Skip to main content

frequenz_microgrid/client/
microgrid_client_handle.rs

1// License: MIT
2// Copyright © 2025 Frequenz Energy-as-a-Service GmbH
3
4//! A clonable client handle for the microgrid API.
5//!
6//! Instructions received by this handle are sent to the microgrid client actor,
7//! which owns the connection to the microgrid API service.
8
9use chrono::TimeDelta;
10use tokio::sync::{broadcast, mpsc, oneshot};
11use tonic::transport::Channel;
12
13use crate::{
14    Bounds, Error,
15    client::MicrogridApiClient,
16    metric::Metric,
17    proto::{
18        common::metrics::Bounds as PbBounds,
19        common::microgrid::electrical_components::{
20            ElectricalComponent, ElectricalComponentCategory, ElectricalComponentConnection,
21            ElectricalComponentTelemetry,
22        },
23        microgrid::microgrid_client::MicrogridClient,
24    },
25};
26
27use super::{instruction::Instruction, microgrid_client_actor::MicrogridClientActor};
28
29/// A handle to the microgrid client connection.
30///
31/// This handle can be cloned as many times as needed, and each clone will share
32/// the same underlying connection to the microgrid API.
33#[derive(Clone)]
34pub struct MicrogridClientHandle {
35    instructions_tx: mpsc::Sender<Instruction>,
36}
37
38impl MicrogridClientHandle {
39    /// Creates a new `MicrogridClientHandle` that connects to the microgrid API
40    /// at the specified URL.
41    pub async fn try_new(url: impl Into<String>) -> Result<Self, Error> {
42        let client = match MicrogridClient::<Channel>::connect(url.into()).await {
43            Ok(t) => t,
44            Err(e) => {
45                tracing::error!("Could not connect to server: {e}");
46                return Err(Error::connection_failure(format!(
47                    "Could not connect to server: {e}"
48                )));
49            }
50        };
51
52        Ok(Self::new_from_client(client))
53    }
54
55    pub fn new_from_client(client: impl MicrogridApiClient) -> Self {
56        let (instructions_tx, instructions_rx) = mpsc::channel(100);
57        tokio::spawn(MicrogridClientActor::new_from_client(client, instructions_rx).run());
58        Self { instructions_tx }
59    }
60
61    /// Returns a telemetry stream from an electrical component with a given ID.
62    ///
63    /// When a connection to the API service is lost, reconnecting is handled
64    /// automatically, and the receiver will resume receiving data from the
65    /// component once the connection is re-established.
66    pub async fn receive_electrical_component_telemetry_stream(
67        &self,
68        electrical_component_id: u64,
69    ) -> Result<broadcast::Receiver<ElectricalComponentTelemetry>, Error> {
70        let (response_tx, response_rx) = oneshot::channel();
71
72        self.instructions_tx
73            .send(Instruction::ReceiveElectricalComponentTelemetryStream {
74                electrical_component_id,
75                response_tx,
76            })
77            .await
78            .map_err(|_| Error::internal("failed to send instruction"))?;
79
80        response_rx
81            .await
82            .map_err(|e| Error::internal(format!("failed to receive response: {e}")))
83    }
84
85    /// Lists the electrical components in the local microgrid.
86    ///
87    /// If provided, the filters for component IDs and categories have an `AND`
88    /// relationship with one another, meaning that they are applied serially,
89    /// but the elements within a single filter list have an `OR` relationship with
90    /// each other.
91    ///
92    /// For example, if `ids` = [1, 2, 3], and `categories` = [
93    ///    `ComponentCategory::COMPONENT_CATEGORY_INVERTER`,
94    ///    `ComponentCategory::COMPONENT_CATEGORY_BATTERY`
95    /// ],
96    /// then the results will consist of elements that
97    /// have the IDs 1, OR 2, OR 3,
98    /// AND
99    /// are of the categories `ComponentCategory::COMPONENT_CATEGORY_INVERTER` OR
100    /// `ComponentCategory::COMPONENT_CATEGORY_BATTERY`.
101    ///
102    /// If a filter list is empty, then that filter is not applied.
103    pub async fn list_electrical_components(
104        &self,
105        electrical_component_ids: Vec<u64>,
106        electrical_component_categories: Vec<ElectricalComponentCategory>,
107    ) -> Result<Vec<ElectricalComponent>, Error> {
108        let (response_tx, response_rx) = oneshot::channel();
109
110        self.instructions_tx
111            .send(Instruction::ListElectricalComponents {
112                response_tx,
113                electrical_component_ids,
114                electrical_component_categories,
115            })
116            .await
117            .map_err(|_| Error::internal("failed to send instruction"))?;
118
119        response_rx
120            .await
121            .map_err(|e| Error::internal(format!("failed to receive response: {e}")))?
122    }
123
124    /// Lists the connections between the electrical components in a microgrid,
125    /// denoted by `(start, end)`.
126    ///
127    /// The direction of a connection is always away from the grid endpoint,
128    /// i.e. aligned with the direction of positive current according to the
129    /// passive sign convention:
130    /// https://en.wikipedia.org/wiki/Passive_sign_convention
131    ///
132    /// If provided, the `start` and `end` filters have an `AND` relationship
133    /// between each other, meaning that they are applied serially, but an `OR`
134    /// relationship with other elements in the same list.  For example, if
135    /// `start` = `[1, 2, 3]`, and `end` = `[4, 5, 6]`, then the result should
136    /// have all the connections where
137    ///
138    /// * each `start` component ID is either `1`, `2`, OR `3`,
139    ///   AND
140    /// * each `end` component ID is either `4`, `5`, OR `6`.
141    pub async fn list_electrical_component_connections(
142        &self,
143        source_electrical_component_ids: Vec<u64>,
144        destination_electrical_component_ids: Vec<u64>,
145    ) -> Result<Vec<ElectricalComponentConnection>, Error> {
146        let (response_tx, response_rx) = oneshot::channel();
147
148        self.instructions_tx
149            .send(Instruction::ListElectricalComponentConnections {
150                response_tx,
151                source_electrical_component_ids,
152                destination_electrical_component_ids,
153            })
154            .await
155            .map_err(|_| Error::internal("failed to send instruction"))?;
156
157        response_rx
158            .await
159            .map_err(|e| Error::internal(format!("failed to receive response: {e}")))?
160    }
161
162    /// Augments the overall bounds for a given metric of a given electrical
163    /// component with the provided bounds.
164    /// Returns the UTC time at which the provided bounds will expire and its
165    /// effects will no longer be visible in the overall bounds for the
166    /// given metric.
167    ///
168    /// The request parameters allows users to select a duration until
169    /// which the bounds will stay in effect. If no duration is provided, then the
170    /// bounds will be removed after a default duration of 5 seconds.
171    ///
172    /// Inclusion bounds give the range that the system will try to keep the
173    /// metric within. If the metric goes outside of these bounds, the system will
174    /// try to bring it back within the bounds.
175    /// If the bounds for a metric are [\`lower_1`, `upper_1`],
176    /// [`lower_2`, `upper_2`](<`lower_1`, `upper_1`],
177    /// [`lower_2`, `upper_2`>), then this metric's `value` needs to comply with
178    /// the constraints
179    /// `lower_1 <= value <= upper_1` OR `lower_2 <= value <= upper_2`.
180    ///
181    /// If multiple inclusion bounds have been provided for a metric, then the
182    /// overlapping bounds are merged into a single bound, and non-overlapping
183    /// bounds are kept separate.
184    /// E.g. if the bounds are [0, 10], [5, 15], [20, 30](<0, 10], [5, 15], [20, 30>), then the resulting
185    /// bounds will be [0, 15], [20, 30](<0, 15], [20, 30>).
186    ///
187    /// The following diagram illustrates how bounds are applied:
188    ///
189    /// ```text,
190    ///  lower_1  upper_1
191    /// <----|========|--------|========|-------->
192    ///                    lower_2  upper_2
193    /// ```
194    ///
195    /// The bounds in this example are `[[lower_1, upper_1], [lower_2, upper_2]]`.
196    /// ---- values here are considered out of range.
197    /// ==== values here are considered within range.
198    ///
199    /// Note that for power metrics, regardless of the bounds, 0W is always
200    /// allowed.
201    pub async fn augment_electrical_component_bounds<M, I>(
202        &self,
203        electrical_component_id: u64,
204        #[allow(unused_variables)] target_metric: M,
205        bounds: Vec<I>,
206        request_lifetime: Option<TimeDelta>,
207    ) -> Result<Option<chrono::DateTime<chrono::Utc>>, Error>
208    where
209        M: Metric,
210        Bounds<M::QuantityType>: Into<PbBounds>,
211        I: Into<Bounds<M::QuantityType>>,
212    {
213        let (response_tx, response_rx) = oneshot::channel();
214
215        self.instructions_tx
216            .send(Instruction::AugmentElectricalComponentBounds {
217                response_tx,
218                electrical_component_id,
219                target_metric: M::METRIC,
220                bounds: bounds.into_iter().map(|x| x.into().into()).collect(),
221                request_lifetime,
222            })
223            .await
224            .map_err(|_| Error::internal("failed to send instruction"))?;
225
226        response_rx
227            .await
228            .map_err(|e| Error::internal(format!("failed to receive response: {e}")))?
229    }
230}
231
232#[cfg(test)]
233mod tests {
234
235    use tokio::time::Instant;
236
237    use crate::{
238        MicrogridClientHandle,
239        client::test_utils::{MockComponent, MockMicrogridApiClient},
240        proto::common::{
241            metrics::{SimpleMetricValue, metric_value_variant},
242            microgrid::electrical_components::ElectricalComponentCategory,
243        },
244    };
245
246    fn new_client_handle() -> MicrogridClientHandle {
247        let api_client = MockMicrogridApiClient::new(
248            // Grid connection point
249            MockComponent::grid(1).with_children(vec![
250                // Main meter
251                MockComponent::meter(2)
252                    .with_power(vec![4.0, 5.0, 6.0, 7.0, 7.0, 7.0])
253                    .with_children(vec![
254                        // PV meter
255                        MockComponent::meter(3).with_children(vec![
256                            // PV inverter
257                            MockComponent::pv_inverter(4),
258                        ]),
259                        // Battery meter
260                        MockComponent::meter(5).with_children(vec![
261                            // Battery inverter
262                            MockComponent::battery_inverter(6).with_children(vec![
263                                // Battery
264                                MockComponent::battery(7),
265                            ]),
266                        ]),
267                    ]),
268            ]),
269        );
270
271        MicrogridClientHandle::new_from_client(api_client)
272    }
273
274    #[tokio::test]
275    async fn test_list_electrical_components() {
276        let handle = new_client_handle();
277
278        let components = handle
279            .list_electrical_components(vec![], vec![])
280            .await
281            .unwrap();
282        let component_ids: Vec<u64> = components.iter().map(|c| c.id).collect();
283        assert_eq!(component_ids, vec![1, 2, 3, 4, 5, 6, 7]);
284    }
285
286    #[tokio::test]
287    async fn test_list_electrical_components_with_filters() {
288        let handle = new_client_handle();
289
290        let components = handle
291            .list_electrical_components(vec![1, 2], vec![])
292            .await
293            .unwrap();
294        let component_ids: Vec<u64> = components.iter().map(|c| c.id).collect();
295        assert_eq!(component_ids, vec![1, 2]);
296
297        let components = handle
298            .list_electrical_components(
299                vec![],
300                vec![
301                    ElectricalComponentCategory::Meter,
302                    ElectricalComponentCategory::Battery,
303                ],
304            )
305            .await
306            .unwrap();
307        let component_ids: Vec<u64> = components.iter().map(|c| c.id).collect();
308        assert_eq!(component_ids, vec![2, 3, 5, 7]);
309
310        let components = handle
311            .list_electrical_components(
312                vec![2, 3, 4],
313                vec![
314                    ElectricalComponentCategory::Meter,
315                    ElectricalComponentCategory::Battery,
316                ],
317            )
318            .await
319            .unwrap();
320        let component_ids: Vec<u64> = components.iter().map(|c| c.id).collect();
321        assert_eq!(component_ids, vec![2, 3]);
322    }
323
324    #[tokio::test]
325    async fn test_list_electrical_component_connections() {
326        let handle = new_client_handle();
327
328        let connections = handle
329            .list_electrical_component_connections(vec![], vec![])
330            .await
331            .unwrap();
332
333        let connection_tuples: Vec<(u64, u64)> = connections
334            .iter()
335            .map(|c| {
336                (
337                    c.source_electrical_component_id,
338                    c.destination_electrical_component_id,
339                )
340            })
341            .collect();
342
343        assert_eq!(
344            connection_tuples,
345            vec![(1, 2), (2, 3), (3, 4), (2, 5), (5, 6), (6, 7)]
346        );
347    }
348
349    #[tokio::test(start_paused = true)]
350    async fn test_receive_component_telemetry_stream() {
351        let handle = new_client_handle();
352
353        let start = Instant::now();
354        let mut telemetry_rx = handle
355            .receive_electrical_component_telemetry_stream(2)
356            .await
357            .unwrap();
358
359        let mut values = vec![];
360        let mut elapsed_millis = vec![];
361        for _ in 0..10 {
362            let telemetry = telemetry_rx.recv().await.unwrap();
363            values.push(
364                if let metric_value_variant::MetricValueVariant::SimpleMetric(SimpleMetricValue {
365                    value,
366                }) = telemetry.metric_samples[0]
367                    .value
368                    .as_ref()
369                    .unwrap()
370                    .metric_value_variant
371                    .as_ref()
372                    .unwrap()
373                    .clone()
374                {
375                    value
376                } else {
377                    panic!("Unexpected metric value variant for live data");
378                },
379            );
380            elapsed_millis.push(start.elapsed().as_millis());
381        }
382
383        // Check that received values are as expected
384        assert_eq!(
385            values,
386            vec![
387                4.0, 5.0, 6.0, 7.0, 7.0, 7.0,
388                // repeats because the client stream closes and the actor reconnects
389                4.0, 5.0, 6.0, 7.0
390            ]
391        );
392
393        // Check that reconnect delays are as expected
394        assert_eq!(
395            elapsed_millis,
396            vec![
397                0, 200, 400, 600, 800, 1000,
398                // reconnect delay of 3000 ms, before receiving more samples
399                4000, 4200, 4400, 4600,
400            ]
401        );
402    }
403}