frequenz_microgrid/client/
microgrid_client_handle.rs1use tokio::sync::{broadcast, mpsc, oneshot};
10use tonic::transport::Channel;
11
12use crate::{
13 Error,
14 client::MicrogridApiClient,
15 proto::{
16 common::microgrid::electrical_components::{
17 ElectricalComponent, ElectricalComponentConnection, ElectricalComponentTelemetry,
18 },
19 microgrid::microgrid_client::MicrogridClient,
20 },
21};
22
23use super::{instruction::Instruction, microgrid_client_actor::MicrogridClientActor};
24
25#[derive(Clone)]
30pub struct MicrogridClientHandle {
31 instructions_tx: mpsc::Sender<Instruction>,
32}
33
34impl MicrogridClientHandle {
35 pub async fn try_new(url: impl Into<String>) -> Result<Self, Error> {
38 let client = match MicrogridClient::<Channel>::connect(url.into()).await {
39 Ok(t) => t,
40 Err(e) => {
41 tracing::error!("Could not connect to server: {e}");
42 return Err(Error::connection_failure(format!(
43 "Could not connect to server: {e}"
44 )));
45 }
46 };
47
48 Ok(Self::new_from_client(client))
49 }
50
51 pub fn new_from_client(client: impl MicrogridApiClient) -> Self {
52 let (instructions_tx, instructions_rx) = mpsc::channel(100);
53 tokio::spawn(MicrogridClientActor::new_from_client(client, instructions_rx).run());
54 Self { instructions_tx }
55 }
56
57 pub async fn receive_electrical_component_telemetry_stream(
63 &self,
64 electrical_component_id: u64,
65 ) -> Result<broadcast::Receiver<ElectricalComponentTelemetry>, Error> {
66 let (response_tx, response_rx) = oneshot::channel();
67
68 self.instructions_tx
69 .send(Instruction::ReceiveElectricalComponentTelemetryStream {
70 electrical_component_id,
71 response_tx,
72 })
73 .await
74 .map_err(|_| Error::internal("failed to send instruction"))?;
75
76 response_rx
77 .await
78 .map_err(|e| Error::internal(format!("failed to receive response: {e}")))
79 }
80
81 pub async fn list_electrical_components(
100 &self,
101 electrical_component_ids: Vec<u64>,
102 electrical_component_categories: Vec<i32>,
103 ) -> Result<Vec<ElectricalComponent>, Error> {
104 let (response_tx, response_rx) = oneshot::channel();
105
106 self.instructions_tx
107 .send(Instruction::ListElectricalComponents {
108 response_tx,
109 electrical_component_ids,
110 electrical_component_categories,
111 })
112 .await
113 .map_err(|_| Error::internal("failed to send instruction"))?;
114
115 response_rx
116 .await
117 .map_err(|e| Error::internal(format!("failed to receive response: {e}")))?
118 }
119
120 pub async fn list_electrical_component_connections(
138 &self,
139 source_electrical_component_ids: Vec<u64>,
140 destination_electrical_component_ids: Vec<u64>,
141 ) -> Result<Vec<ElectricalComponentConnection>, Error> {
142 let (response_tx, response_rx) = oneshot::channel();
143
144 self.instructions_tx
145 .send(Instruction::ListElectricalComponentConnections {
146 response_tx,
147 source_electrical_component_ids,
148 destination_electrical_component_ids,
149 })
150 .await
151 .map_err(|_| Error::internal("failed to send instruction"))?;
152
153 response_rx
154 .await
155 .map_err(|e| Error::internal(format!("failed to receive response: {e}")))?
156 }
157}
158
159#[cfg(test)]
160mod tests {
161
162 use tokio::time::Instant;
163
164 use crate::{
165 MicrogridClientHandle,
166 client::test_utils::{MockComponent, MockMicrogridApiClient},
167 proto::common::metrics::{SimpleMetricValue, metric_value_variant},
168 };
169
170 fn new_client_handle() -> MicrogridClientHandle {
171 let api_client = MockMicrogridApiClient::new(
172 MockComponent::grid(1).with_children(vec![
174 MockComponent::meter(2)
176 .with_power(vec![4.0, 5.0, 6.0, 7.0, 7.0, 7.0])
177 .with_children(vec![
178 MockComponent::meter(3).with_children(vec![
180 MockComponent::pv_inverter(4),
182 ]),
183 MockComponent::meter(5).with_children(vec![
185 MockComponent::battery_inverter(6).with_children(vec![
187 MockComponent::battery(7),
189 ]),
190 ]),
191 ]),
192 ]),
193 );
194
195 MicrogridClientHandle::new_from_client(api_client)
196 }
197
198 #[tokio::test]
199 async fn test_list_electrical_components() {
200 let handle = new_client_handle();
201
202 let components = handle
203 .list_electrical_components(vec![], vec![])
204 .await
205 .unwrap();
206 let component_ids: Vec<u64> = components.iter().map(|c| c.id).collect();
207 assert_eq!(component_ids, vec![1, 2, 3, 4, 5, 6, 7]);
208 }
209
210 #[tokio::test]
211 async fn test_list_electrical_component_connections() {
212 let handle = new_client_handle();
213
214 let connections = handle
215 .list_electrical_component_connections(vec![], vec![])
216 .await
217 .unwrap();
218
219 let connection_tuples: Vec<(u64, u64)> = connections
220 .iter()
221 .map(|c| {
222 (
223 c.source_electrical_component_id,
224 c.destination_electrical_component_id,
225 )
226 })
227 .collect();
228
229 assert_eq!(
230 connection_tuples,
231 vec![(1, 2), (2, 3), (3, 4), (2, 5), (5, 6), (6, 7)]
232 );
233 }
234
235 #[tokio::test(start_paused = true)]
236 async fn test_receive_component_telemetry_stream() {
237 let handle = new_client_handle();
238
239 let start = Instant::now();
240 let mut telemetry_rx = handle
241 .receive_electrical_component_telemetry_stream(2)
242 .await
243 .unwrap();
244
245 let mut values = vec![];
246 let mut elapsed_millis = vec![];
247 for _ in 0..10 {
248 let telemetry = telemetry_rx.recv().await.unwrap();
249 values.push(
250 if let metric_value_variant::MetricValueVariant::SimpleMetric(SimpleMetricValue {
251 value,
252 }) = telemetry.metric_samples[0]
253 .value
254 .as_ref()
255 .unwrap()
256 .metric_value_variant
257 .as_ref()
258 .unwrap()
259 .clone()
260 {
261 value
262 } else {
263 panic!("Unexpected metric value variant for live data");
264 },
265 );
266 elapsed_millis.push(start.elapsed().as_millis());
267 }
268
269 assert_eq!(
271 values,
272 vec![
273 4.0, 5.0, 6.0, 7.0, 7.0, 7.0,
274 4.0, 5.0, 6.0, 7.0
276 ]
277 );
278
279 assert_eq!(
281 elapsed_millis,
282 vec![
283 0, 200, 400, 600, 800, 1000,
284 4000, 4200, 4400, 4600,
286 ]
287 );
288 }
289}