frequenz_microgrid/client/
microgrid_client_handle.rs

1// License: MIT
2// Copyright © 2025 Frequenz Energy-as-a-Service GmbH
3
4//! A clonable client handle for the microgrid API.
5//!
6//! Instructions received by this handle are sent to the microgrid client actor,
7//! which owns the connection to the microgrid API service.
8
9use tokio::sync::{broadcast, mpsc, oneshot};
10use tonic::transport::Channel;
11
12use crate::{
13    Error,
14    client::MicrogridApiClient,
15    proto::{
16        common::microgrid::electrical_components::{
17            ElectricalComponent, ElectricalComponentConnection, ElectricalComponentTelemetry,
18        },
19        microgrid::microgrid_client::MicrogridClient,
20    },
21};
22
23use super::{instruction::Instruction, microgrid_client_actor::MicrogridClientActor};
24
25/// A handle to the microgrid client connection.
26///
27/// This handle can be cloned as many times as needed, and each clone will share
28/// the same underlying connection to the microgrid API.
29#[derive(Clone)]
30pub struct MicrogridClientHandle {
31    instructions_tx: mpsc::Sender<Instruction>,
32}
33
34impl MicrogridClientHandle {
35    /// Creates a new `MicrogridClientHandle` that connects to the microgrid API
36    /// at the specified URL.
37    pub async fn try_new(url: impl Into<String>) -> Result<Self, Error> {
38        let client = match MicrogridClient::<Channel>::connect(url.into()).await {
39            Ok(t) => t,
40            Err(e) => {
41                tracing::error!("Could not connect to server: {e}");
42                return Err(Error::connection_failure(format!(
43                    "Could not connect to server: {e}"
44                )));
45            }
46        };
47
48        Ok(Self::new_from_client(client))
49    }
50
51    pub fn new_from_client(client: impl MicrogridApiClient) -> Self {
52        let (instructions_tx, instructions_rx) = mpsc::channel(100);
53        tokio::spawn(MicrogridClientActor::new_from_client(client, instructions_rx).run());
54        Self { instructions_tx }
55    }
56
57    /// Returns a telemetry stream from an electrical component with a given ID.
58    ///
59    /// When a connection to the API service is lost, reconnecting is handled
60    /// automatically, and the receiver will resume receiving data from the
61    /// component once the connection is re-established.
62    pub async fn receive_electrical_component_telemetry_stream(
63        &self,
64        electrical_component_id: u64,
65    ) -> Result<broadcast::Receiver<ElectricalComponentTelemetry>, Error> {
66        let (response_tx, response_rx) = oneshot::channel();
67
68        self.instructions_tx
69            .send(Instruction::ReceiveElectricalComponentTelemetryStream {
70                electrical_component_id,
71                response_tx,
72            })
73            .await
74            .map_err(|_| Error::internal("failed to send instruction"))?;
75
76        response_rx
77            .await
78            .map_err(|e| Error::internal(format!("failed to receive response: {e}")))
79    }
80
81    /// Lists the electrical components in the local microgrid.
82    ///
83    /// If provided, the filters for component IDs and categories have an `AND`
84    /// relationship with one another, meaning that they are applied serially,
85    /// but the elements within a single filter list have an `OR` relationship with
86    /// each other.
87    ///
88    /// For example, if `ids` = [1, 2, 3], and `categories` = [
89    ///    `ComponentCategory::COMPONENT_CATEGORY_INVERTER`,
90    ///    `ComponentCategory::COMPONENT_CATEGORY_BATTERY`
91    /// ],
92    /// then the results will consist of elements that
93    /// have the IDs 1, OR 2, OR 3,
94    /// AND
95    /// are of the categories `ComponentCategory::COMPONENT_CATEGORY_INVERTER` OR
96    /// `ComponentCategory::COMPONENT_CATEGORY_BATTERY`.
97    ///
98    /// If a filter list is empty, then that filter is not applied.
99    pub async fn list_electrical_components(
100        &self,
101        electrical_component_ids: Vec<u64>,
102        electrical_component_categories: Vec<i32>,
103    ) -> Result<Vec<ElectricalComponent>, Error> {
104        let (response_tx, response_rx) = oneshot::channel();
105
106        self.instructions_tx
107            .send(Instruction::ListElectricalComponents {
108                response_tx,
109                electrical_component_ids,
110                electrical_component_categories,
111            })
112            .await
113            .map_err(|_| Error::internal("failed to send instruction"))?;
114
115        response_rx
116            .await
117            .map_err(|e| Error::internal(format!("failed to receive response: {e}")))?
118    }
119
120    /// Lists the connections between the electrical components in a microgrid,
121    /// denoted by `(start, end)`.
122    ///
123    /// The direction of a connection is always away from the grid endpoint,
124    /// i.e. aligned with the direction of positive current according to the
125    /// passive sign convention:
126    /// https://en.wikipedia.org/wiki/Passive_sign_convention
127    ///
128    /// If provided, the `start` and `end` filters have an `AND` relationship
129    /// between each other, meaning that they are applied serially, but an `OR`
130    /// relationship with other elements in the same list.  For example, if
131    /// `start` = `[1, 2, 3]`, and `end` = `[4, 5, 6]`, then the result should
132    /// have all the connections where
133    ///
134    /// * each `start` component ID is either `1`, `2`, OR `3`,
135    ///   AND
136    /// * each `end` component ID is either `4`, `5`, OR `6`.
137    pub async fn list_electrical_component_connections(
138        &self,
139        source_electrical_component_ids: Vec<u64>,
140        destination_electrical_component_ids: Vec<u64>,
141    ) -> Result<Vec<ElectricalComponentConnection>, Error> {
142        let (response_tx, response_rx) = oneshot::channel();
143
144        self.instructions_tx
145            .send(Instruction::ListElectricalComponentConnections {
146                response_tx,
147                source_electrical_component_ids,
148                destination_electrical_component_ids,
149            })
150            .await
151            .map_err(|_| Error::internal("failed to send instruction"))?;
152
153        response_rx
154            .await
155            .map_err(|e| Error::internal(format!("failed to receive response: {e}")))?
156    }
157}
158
159#[cfg(test)]
160mod tests {
161
162    use tokio::time::Instant;
163
164    use crate::{
165        MicrogridClientHandle,
166        client::test_utils::{MockComponent, MockMicrogridApiClient},
167        proto::common::metrics::{SimpleMetricValue, metric_value_variant},
168    };
169
170    fn new_client_handle() -> MicrogridClientHandle {
171        let api_client = MockMicrogridApiClient::new(
172            // Grid connection point
173            MockComponent::grid(1).with_children(vec![
174                // Main meter
175                MockComponent::meter(2)
176                    .with_power(vec![4.0, 5.0, 6.0, 7.0, 7.0, 7.0])
177                    .with_children(vec![
178                        // PV meter
179                        MockComponent::meter(3).with_children(vec![
180                            // PV inverter
181                            MockComponent::pv_inverter(4),
182                        ]),
183                        // Battery meter
184                        MockComponent::meter(5).with_children(vec![
185                            // Battery inverter
186                            MockComponent::battery_inverter(6).with_children(vec![
187                                // Battery
188                                MockComponent::battery(7),
189                            ]),
190                        ]),
191                    ]),
192            ]),
193        );
194
195        MicrogridClientHandle::new_from_client(api_client)
196    }
197
198    #[tokio::test]
199    async fn test_list_electrical_components() {
200        let handle = new_client_handle();
201
202        let components = handle
203            .list_electrical_components(vec![], vec![])
204            .await
205            .unwrap();
206        let component_ids: Vec<u64> = components.iter().map(|c| c.id).collect();
207        assert_eq!(component_ids, vec![1, 2, 3, 4, 5, 6, 7]);
208    }
209
210    #[tokio::test]
211    async fn test_list_electrical_component_connections() {
212        let handle = new_client_handle();
213
214        let connections = handle
215            .list_electrical_component_connections(vec![], vec![])
216            .await
217            .unwrap();
218
219        let connection_tuples: Vec<(u64, u64)> = connections
220            .iter()
221            .map(|c| {
222                (
223                    c.source_electrical_component_id,
224                    c.destination_electrical_component_id,
225                )
226            })
227            .collect();
228
229        assert_eq!(
230            connection_tuples,
231            vec![(1, 2), (2, 3), (3, 4), (2, 5), (5, 6), (6, 7)]
232        );
233    }
234
235    #[tokio::test(start_paused = true)]
236    async fn test_receive_component_telemetry_stream() {
237        let handle = new_client_handle();
238
239        let start = Instant::now();
240        let mut telemetry_rx = handle
241            .receive_electrical_component_telemetry_stream(2)
242            .await
243            .unwrap();
244
245        let mut values = vec![];
246        let mut elapsed_millis = vec![];
247        for _ in 0..10 {
248            let telemetry = telemetry_rx.recv().await.unwrap();
249            values.push(
250                if let metric_value_variant::MetricValueVariant::SimpleMetric(SimpleMetricValue {
251                    value,
252                }) = telemetry.metric_samples[0]
253                    .value
254                    .as_ref()
255                    .unwrap()
256                    .metric_value_variant
257                    .as_ref()
258                    .unwrap()
259                    .clone()
260                {
261                    value
262                } else {
263                    panic!("Unexpected metric value variant for live data");
264                },
265            );
266            elapsed_millis.push(start.elapsed().as_millis());
267        }
268
269        // Check that received values are as expected
270        assert_eq!(
271            values,
272            vec![
273                4.0, 5.0, 6.0, 7.0, 7.0, 7.0,
274                // repeats because the client stream closes and the actor reconnects
275                4.0, 5.0, 6.0, 7.0
276            ]
277        );
278
279        // Check that reconnect delays are as expected
280        assert_eq!(
281            elapsed_millis,
282            vec![
283                0, 200, 400, 600, 800, 1000,
284                // reconnect delay of 3000 ms, before receiving more samples
285                4000, 4200, 4400, 4600,
286            ]
287        );
288    }
289}