frequenz_microgrid/client/
microgrid_client_handle.rs1use chrono::TimeDelta;
10use tokio::sync::{broadcast, mpsc, oneshot};
11use tonic::transport::Channel;
12
13use crate::{
14 Bounds, Error,
15 client::MicrogridApiClient,
16 metric::Metric,
17 proto::{
18 common::metrics::Bounds as PbBounds,
19 common::microgrid::electrical_components::{
20 ElectricalComponent, ElectricalComponentCategory, ElectricalComponentConnection,
21 ElectricalComponentTelemetry,
22 },
23 microgrid::microgrid_client::MicrogridClient,
24 },
25};
26
27use super::{instruction::Instruction, microgrid_client_actor::MicrogridClientActor};
28
29#[derive(Clone)]
34pub struct MicrogridClientHandle {
35 instructions_tx: mpsc::Sender<Instruction>,
36}
37
38impl MicrogridClientHandle {
39 pub async fn try_new(url: impl Into<String>) -> Result<Self, Error> {
42 let client = match MicrogridClient::<Channel>::connect(url.into()).await {
43 Ok(t) => t,
44 Err(e) => {
45 tracing::error!("Could not connect to server: {e}");
46 return Err(Error::connection_failure(format!(
47 "Could not connect to server: {e}"
48 )));
49 }
50 };
51
52 Ok(Self::new_from_client(client))
53 }
54
55 pub fn new_from_client(client: impl MicrogridApiClient) -> Self {
56 let (instructions_tx, instructions_rx) = mpsc::channel(100);
57 tokio::spawn(MicrogridClientActor::new_from_client(client, instructions_rx).run());
58 Self { instructions_tx }
59 }
60
61 pub async fn receive_electrical_component_telemetry_stream(
67 &self,
68 electrical_component_id: u64,
69 ) -> Result<broadcast::Receiver<ElectricalComponentTelemetry>, Error> {
70 let (response_tx, response_rx) = oneshot::channel();
71
72 self.instructions_tx
73 .send(Instruction::ReceiveElectricalComponentTelemetryStream {
74 electrical_component_id,
75 response_tx,
76 })
77 .await
78 .map_err(|_| Error::internal("failed to send instruction"))?;
79
80 response_rx
81 .await
82 .map_err(|e| Error::internal(format!("failed to receive response: {e}")))
83 }
84
85 pub async fn list_electrical_components(
104 &self,
105 electrical_component_ids: Vec<u64>,
106 electrical_component_categories: Vec<ElectricalComponentCategory>,
107 ) -> Result<Vec<ElectricalComponent>, Error> {
108 let (response_tx, response_rx) = oneshot::channel();
109
110 self.instructions_tx
111 .send(Instruction::ListElectricalComponents {
112 response_tx,
113 electrical_component_ids,
114 electrical_component_categories,
115 })
116 .await
117 .map_err(|_| Error::internal("failed to send instruction"))?;
118
119 response_rx
120 .await
121 .map_err(|e| Error::internal(format!("failed to receive response: {e}")))?
122 }
123
124 pub async fn list_electrical_component_connections(
142 &self,
143 source_electrical_component_ids: Vec<u64>,
144 destination_electrical_component_ids: Vec<u64>,
145 ) -> Result<Vec<ElectricalComponentConnection>, Error> {
146 let (response_tx, response_rx) = oneshot::channel();
147
148 self.instructions_tx
149 .send(Instruction::ListElectricalComponentConnections {
150 response_tx,
151 source_electrical_component_ids,
152 destination_electrical_component_ids,
153 })
154 .await
155 .map_err(|_| Error::internal("failed to send instruction"))?;
156
157 response_rx
158 .await
159 .map_err(|e| Error::internal(format!("failed to receive response: {e}")))?
160 }
161
162 pub async fn augment_electrical_component_bounds<M, I>(
202 &self,
203 electrical_component_id: u64,
204 #[allow(unused_variables)] target_metric: M,
205 bounds: Vec<I>,
206 request_lifetime: Option<TimeDelta>,
207 ) -> Result<Option<chrono::DateTime<chrono::Utc>>, Error>
208 where
209 M: Metric,
210 Bounds<M::QuantityType>: Into<PbBounds>,
211 I: Into<Bounds<M::QuantityType>>,
212 {
213 let (response_tx, response_rx) = oneshot::channel();
214
215 self.instructions_tx
216 .send(Instruction::AugmentElectricalComponentBounds {
217 response_tx,
218 electrical_component_id,
219 target_metric: M::METRIC,
220 bounds: bounds.into_iter().map(|x| x.into().into()).collect(),
221 request_lifetime,
222 })
223 .await
224 .map_err(|_| Error::internal("failed to send instruction"))?;
225
226 response_rx
227 .await
228 .map_err(|e| Error::internal(format!("failed to receive response: {e}")))?
229 }
230}
231
232#[cfg(test)]
233mod tests {
234
235 use tokio::time::Instant;
236
237 use crate::{
238 MicrogridClientHandle,
239 client::test_utils::{MockComponent, MockMicrogridApiClient},
240 proto::common::{
241 metrics::{SimpleMetricValue, metric_value_variant},
242 microgrid::electrical_components::ElectricalComponentCategory,
243 },
244 };
245
246 fn new_client_handle() -> MicrogridClientHandle {
247 let api_client = MockMicrogridApiClient::new(
248 MockComponent::grid(1).with_children(vec![
250 MockComponent::meter(2)
252 .with_power(vec![4.0, 5.0, 6.0, 7.0, 7.0, 7.0])
253 .with_children(vec![
254 MockComponent::meter(3).with_children(vec![
256 MockComponent::pv_inverter(4),
258 ]),
259 MockComponent::meter(5).with_children(vec![
261 MockComponent::battery_inverter(6).with_children(vec![
263 MockComponent::battery(7),
265 ]),
266 ]),
267 ]),
268 ]),
269 );
270
271 MicrogridClientHandle::new_from_client(api_client)
272 }
273
274 #[tokio::test]
275 async fn test_list_electrical_components() {
276 let handle = new_client_handle();
277
278 let components = handle
279 .list_electrical_components(vec![], vec![])
280 .await
281 .unwrap();
282 let component_ids: Vec<u64> = components.iter().map(|c| c.id).collect();
283 assert_eq!(component_ids, vec![1, 2, 3, 4, 5, 6, 7]);
284 }
285
286 #[tokio::test]
287 async fn test_list_electrical_components_with_filters() {
288 let handle = new_client_handle();
289
290 let components = handle
291 .list_electrical_components(vec![1, 2], vec![])
292 .await
293 .unwrap();
294 let component_ids: Vec<u64> = components.iter().map(|c| c.id).collect();
295 assert_eq!(component_ids, vec![1, 2]);
296
297 let components = handle
298 .list_electrical_components(
299 vec![],
300 vec![
301 ElectricalComponentCategory::Meter,
302 ElectricalComponentCategory::Battery,
303 ],
304 )
305 .await
306 .unwrap();
307 let component_ids: Vec<u64> = components.iter().map(|c| c.id).collect();
308 assert_eq!(component_ids, vec![2, 3, 5, 7]);
309
310 let components = handle
311 .list_electrical_components(
312 vec![2, 3, 4],
313 vec![
314 ElectricalComponentCategory::Meter,
315 ElectricalComponentCategory::Battery,
316 ],
317 )
318 .await
319 .unwrap();
320 let component_ids: Vec<u64> = components.iter().map(|c| c.id).collect();
321 assert_eq!(component_ids, vec![2, 3]);
322 }
323
324 #[tokio::test]
325 async fn test_list_electrical_component_connections() {
326 let handle = new_client_handle();
327
328 let connections = handle
329 .list_electrical_component_connections(vec![], vec![])
330 .await
331 .unwrap();
332
333 let connection_tuples: Vec<(u64, u64)> = connections
334 .iter()
335 .map(|c| {
336 (
337 c.source_electrical_component_id,
338 c.destination_electrical_component_id,
339 )
340 })
341 .collect();
342
343 assert_eq!(
344 connection_tuples,
345 vec![(1, 2), (2, 3), (3, 4), (2, 5), (5, 6), (6, 7)]
346 );
347 }
348
349 #[tokio::test(start_paused = true)]
350 async fn test_receive_component_telemetry_stream() {
351 let handle = new_client_handle();
352
353 let start = Instant::now();
354 let mut telemetry_rx = handle
355 .receive_electrical_component_telemetry_stream(2)
356 .await
357 .unwrap();
358
359 let mut values = vec![];
360 let mut elapsed_millis = vec![];
361 for _ in 0..10 {
362 let telemetry = telemetry_rx.recv().await.unwrap();
363 values.push(
364 if let metric_value_variant::MetricValueVariant::SimpleMetric(SimpleMetricValue {
365 value,
366 }) = telemetry.metric_samples[0]
367 .value
368 .as_ref()
369 .unwrap()
370 .metric_value_variant
371 .as_ref()
372 .unwrap()
373 .clone()
374 {
375 value
376 } else {
377 panic!("Unexpected metric value variant for live data");
378 },
379 );
380 elapsed_millis.push(start.elapsed().as_millis());
381 }
382
383 assert_eq!(
385 values,
386 vec![
387 4.0, 5.0, 6.0, 7.0, 7.0, 7.0,
388 4.0, 5.0, 6.0, 7.0
390 ]
391 );
392
393 assert_eq!(
395 elapsed_millis,
396 vec![
397 0, 200, 400, 600, 800, 1000,
398 4000, 4200, 4400, 4600,
400 ]
401 );
402 }
403}