1use std::collections::{BTreeMap, VecDeque};
2use std::future::Future;
3use std::sync::Arc;
4use std::time::Duration;
5use futures::{SinkExt, StreamExt, TryStreamExt};
6use futures::stream::{SplitSink};
7use rust_ocpp::v1_6::messages::authorize::{AuthorizeRequest, AuthorizeResponse};
8use rust_ocpp::v1_6::messages::boot_notification::{BootNotificationRequest, BootNotificationResponse};
9use rust_ocpp::v1_6::messages::cancel_reservation::{CancelReservationRequest, CancelReservationResponse};
10use rust_ocpp::v1_6::messages::change_availability::{ChangeAvailabilityRequest, ChangeAvailabilityResponse};
11use rust_ocpp::v1_6::messages::change_configuration::{ChangeConfigurationRequest, ChangeConfigurationResponse};
12use rust_ocpp::v1_6::messages::clear_cache::{ClearCacheRequest, ClearCacheResponse};
13use rust_ocpp::v1_6::messages::clear_charging_profile::{ClearChargingProfileRequest, ClearChargingProfileResponse};
14use rust_ocpp::v1_6::messages::data_transfer::{DataTransferRequest, DataTransferResponse};
15use rust_ocpp::v1_6::messages::diagnostics_status_notification::{DiagnosticsStatusNotificationRequest, DiagnosticsStatusNotificationResponse};
16use rust_ocpp::v1_6::messages::firmware_status_notification::{FirmwareStatusNotificationRequest, FirmwareStatusNotificationResponse};
17use rust_ocpp::v1_6::messages::get_composite_schedule::{GetCompositeScheduleRequest, GetCompositeScheduleResponse};
18use rust_ocpp::v1_6::messages::get_configuration::{GetConfigurationRequest, GetConfigurationResponse};
19use rust_ocpp::v1_6::messages::get_diagnostics::{GetDiagnosticsRequest, GetDiagnosticsResponse};
20use rust_ocpp::v1_6::messages::get_local_list_version::{GetLocalListVersionRequest, GetLocalListVersionResponse};
21use rust_ocpp::v1_6::messages::heart_beat::{HeartbeatRequest, HeartbeatResponse};
22use rust_ocpp::v1_6::messages::meter_values::{MeterValuesRequest, MeterValuesResponse};
23use rust_ocpp::v1_6::messages::remote_start_transaction::{RemoteStartTransactionRequest, RemoteStartTransactionResponse};
24use rust_ocpp::v1_6::messages::remote_stop_transaction::{RemoteStopTransactionRequest, RemoteStopTransactionResponse};
25use rust_ocpp::v1_6::messages::reserve_now::{ReserveNowRequest, ReserveNowResponse};
26use rust_ocpp::v1_6::messages::reset::{ResetRequest, ResetResponse};
27use rust_ocpp::v1_6::messages::send_local_list::{SendLocalListRequest, SendLocalListResponse};
28use rust_ocpp::v1_6::messages::set_charging_profile::{SetChargingProfileRequest, SetChargingProfileResponse};
29use rust_ocpp::v1_6::messages::start_transaction::{StartTransactionRequest, StartTransactionResponse};
30use rust_ocpp::v1_6::messages::status_notification::{StatusNotificationRequest, StatusNotificationResponse};
31use rust_ocpp::v1_6::messages::stop_transaction::{StopTransactionRequest, StopTransactionResponse};
32use rust_ocpp::v1_6::messages::trigger_message::{TriggerMessageRequest, TriggerMessageResponse};
33use rust_ocpp::v1_6::messages::unlock_connector::{UnlockConnectorRequest, UnlockConnectorResponse};
34use rust_ocpp::v1_6::messages::update_firmware::{UpdateFirmwareRequest, UpdateFirmwareResponse};
35use serde::de::DeserializeOwned;
36use serde::Serialize;
37use serde_json::{Value};
38use tokio::net::TcpStream;
39use tokio::sync::{Mutex, oneshot, broadcast, mpsc};
40use tokio::sync::broadcast::Sender;
41use tokio::time::timeout;
42use tokio_tungstenite::{MaybeTlsStream, WebSocketStream};
43use tokio_tungstenite::tungstenite::Message;
44use uuid::Uuid;
45use crate::ocpp_1_6::ocpp_1_6_error::OCPP1_6Error;
46use crate::ocpp_1_6::raw_ocpp_1_6_call::RawOcpp1_6Call;
47use crate::ocpp_1_6::raw_ocpp_1_6_error::RawOcpp1_6Error;
48use crate::ocpp_1_6::raw_ocpp_1_6_result::RawOcpp1_6Result;
49
50#[derive(Clone)]
52pub struct OCPP1_6Client {
53 sink: Arc<Mutex<SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, Message>>>,
54 response_channels: Arc<Mutex<BTreeMap<Uuid, oneshot::Sender<Result<Value, OCPP1_6Error>>>>>,
55 request_senders: Arc<Mutex<BTreeMap<String, mpsc::Sender<RawOcpp1_6Call>>>>,
56 pong_channels: Arc<Mutex<VecDeque<oneshot::Sender<()>>>>,
57 ping_sender: Sender<()>,
58 timeout: Duration
59}
60
61impl OCPP1_6Client {
62 pub(crate) fn new(stream: WebSocketStream<MaybeTlsStream<TcpStream>>) -> Self {
63 let (sink, stream) = stream.split();
64 let sink = Arc::new(Mutex::new(sink));
65
66 let response_channels = Arc::new(Mutex::new(BTreeMap::<Uuid, oneshot::Sender<Result<Value, OCPP1_6Error>>>::new()));
67 let response_channels2 = Arc::clone(&response_channels);
68
69 let pong_channels = Arc::new(Mutex::new(VecDeque::<oneshot::Sender<()>>::new()));
70 let pong_channels2 = Arc::clone(&pong_channels);
71
72 let request_senders: Arc<Mutex<BTreeMap<String, mpsc::Sender<RawOcpp1_6Call>>>> = Arc::new(Mutex::new(BTreeMap::new()));
73
74 let request_senders2 = request_senders.clone();
75 let sink2 = sink.clone();
76
77 let (ping_sender, _) = tokio::sync::broadcast::channel(10);
78 let ping_sender2 = ping_sender.clone();
79
80 tokio::spawn(async move {
81 stream
82 .map_err(|e| Box::<dyn std::error::Error + Send + Sync>::from(e))
83 .try_for_each(|message| {
84 let response_channels2 = response_channels2.clone();
85 let ping_sender = ping_sender2.clone();
86 let pong_channels2 = pong_channels2.clone();
87 let request_senders = request_senders2.clone();
88 let sink = sink2.clone();
89 async move {
90 match message {
91 Message::Text(raw_payload) => {
92 let raw_value = serde_json::from_str(&raw_payload)?;
93
94 match raw_value {
95 Value::Array(list) => {
96 if let Some(message_type_item) = list.get(0) {
97 if let Value::Number(message_type_raw) = message_type_item {
98 if let Some(message_type) = message_type_raw.as_u64() {
99 match message_type {
100 2 => {
102 let call: RawOcpp1_6Call =
103 serde_json::from_str(&raw_payload).unwrap();
104 let action = &call.2;
105 let sender_opt = {
106 let lock = request_senders.lock().await;
107 lock.get(action).cloned()
108 };
109 match sender_opt {
110 None => {
111 let error = OCPP1_6Error::new_not_implemented(&format!("Action '{}' is not implemented", action));
112 let payload = serde_json::to_string(&RawOcpp1_6Error(4, call.1.to_string(), error.code().to_string(), error.description().to_string(), error.details().to_owned())).unwrap();
113 let mut lock = sink.lock().await;
114 if let Err(err) = lock.send(Message::Text(payload)).await {
115 println!("Failed to send response: {:?}", err)
116 }
117 }
118 Some(sender) => {
119 if let Err(err) = sender.send(call).await {
120 println!("Error sending request: {:?}", err);
121 };
122 }
123 }
124 },
125 3 => {
127 let result: RawOcpp1_6Result =
128 serde_json::from_str(&raw_payload).unwrap();
129 let mut lock = response_channels2.lock().await;
130 if let Some(sender) = lock.remove(&Uuid::parse_str(&result.1)?) {
131 sender.send(Ok(result.2)).unwrap();
132 }
133 },
134 4 => {
136 let error: RawOcpp1_6Error =
137 serde_json::from_str(&raw_payload)?;
138 let mut lock = response_channels2.lock().await;
139 if let Some(sender) = lock.remove(&Uuid::parse_str(&error.1)?) {
140 sender.send(Err(error.into())).unwrap();
141 }
142 },
143 _ => println!("Unknown message type"),
144 }
145 } else {
146 println!("The message type has to be an integer, it cant have decimals")
147 }
148 } else {
149 println!("The first item in the array was not a number")
150 }
151 } else {
152 println!("The root list was empty")
153 }
154 }
155 _ => println!("A message should be an array of items"),
156 }
157
158 }
159 Message::Ping(_) => {
160 if ping_sender.receiver_count() > 0 {
161 if let Err(err) = ping_sender.send(()) {
162 println!("Error sending websocket ping: {:?}", err);
163 };
164 }
165 }
166 Message::Pong(_) => {
167 let mut lock = pong_channels2.lock().await;
168 if let Some(sender) = lock.pop_back() {
169 sender.send(()).unwrap();
170 }
171 }
172 _ => {}
173 }
174 Ok(())
175 }
176
177 }).await?;
178 Ok::<(), Box<dyn std::error::Error + Send + Sync>>(())
179 });
180
181 Self {
182 sink,
183 response_channels,
184 request_senders,
185 pong_channels,
186 ping_sender,
187 timeout: Duration::from_secs(5)
188 }
189 }
190
191 pub async fn disconnect(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
193 let mut lock = self.sink.lock().await;
194 lock.close().await?;
195 Ok(())
196 }
197
198 pub async fn send_authorize(&self, request: AuthorizeRequest) -> Result<Result<AuthorizeResponse, OCPP1_6Error>, Box<dyn std::error::Error + Send + Sync>> {
199 self.do_send_request(request, "Authorize").await
200 }
201
202 pub async fn send_boot_notification(&self, request: BootNotificationRequest) -> Result<Result<BootNotificationResponse, OCPP1_6Error>, Box<dyn std::error::Error + Send + Sync>> {
203 self.do_send_request(request, "BootNotification").await
204 }
205
206 pub async fn send_data_transfer(&self, request: DataTransferRequest) -> Result<Result<DataTransferResponse, OCPP1_6Error>, Box<dyn std::error::Error + Send + Sync>> {
207 self.do_send_request(request, "DataTransfer").await
208 }
209
210 pub async fn send_diagnostics_status_notification(&self, request: DiagnosticsStatusNotificationRequest) -> Result<Result<DiagnosticsStatusNotificationResponse, OCPP1_6Error>, Box<dyn std::error::Error + Send + Sync>> {
211 self.do_send_request(request, "DiagnosticsStatusNotification").await
212 }
213
214 pub async fn send_firmware_status_notification(&self, request: FirmwareStatusNotificationRequest) -> Result<Result<FirmwareStatusNotificationResponse, OCPP1_6Error>, Box<dyn std::error::Error + Send + Sync>> {
215 self.do_send_request(request, "FirmwareStatusNotification").await
216 }
217
218 pub async fn send_heartbeat(&self, request: HeartbeatRequest) -> Result<Result<HeartbeatResponse, OCPP1_6Error>, Box<dyn std::error::Error + Send + Sync>> {
219 self.do_send_request(request, "Heartbeat").await
220 }
221
222 pub async fn send_meter_values(&self, request: MeterValuesRequest) -> Result<Result<MeterValuesResponse, OCPP1_6Error>, Box<dyn std::error::Error + Send + Sync>> {
223 self.do_send_request(request, "MeterValues").await
224 }
225
226 pub async fn send_start_transaction(&self, request: StartTransactionRequest) -> Result<Result<StartTransactionResponse, OCPP1_6Error>, Box<dyn std::error::Error + Send + Sync>> {
227 self.do_send_request(request, "StartTransaction").await
228 }
229
230 pub async fn send_status_notification(&self, request: StatusNotificationRequest) -> Result<Result<StatusNotificationResponse, OCPP1_6Error>, Box<dyn std::error::Error + Send + Sync>> {
231 self.do_send_request(request, "StatusNotification").await
232 }
233
234 pub async fn send_stop_transaction(&self, request: StopTransactionRequest) -> Result<Result<StopTransactionResponse, OCPP1_6Error>, Box<dyn std::error::Error + Send + Sync>> {
235 self.do_send_request(request, "StopTransaction").await
236 }
237
238 pub async fn send_ping(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
239 {
240 let mut lock = self.sink.lock().await;
241 lock.send(Message::Ping(vec![])).await?;
242 }
243
244 let (s, r) = oneshot::channel();
245 {
246 let mut pong_channels = self.pong_channels.lock().await;
247 pong_channels.push_front(s);
248 }
249
250 r.await?;
251 Ok(())
252 }
253
254 pub async fn on_cancel_reservation<F: FnMut(CancelReservationRequest, Self) -> FF + Send + Sync + 'static, FF: Future<Output=Result<CancelReservationResponse, OCPP1_6Error>> + Send + Sync>(&self, callback: F) {
255 self.handle_on_request(callback, "CancelReservation").await
256 }
257
258 pub async fn on_change_availability<F: FnMut(ChangeAvailabilityRequest, Self) -> FF + Send + Sync + 'static, FF: Future<Output=Result<ChangeAvailabilityResponse, OCPP1_6Error>> + Send + Sync>(&self, callback: F) {
259 self.handle_on_request(callback, "ChangeAvailability").await
260 }
261
262 pub async fn on_change_configuration<F: FnMut(ChangeConfigurationRequest, Self) -> FF + Send + Sync + 'static, FF: Future<Output=Result<ChangeConfigurationResponse, OCPP1_6Error>> + Send + Sync>(&self, callback: F) {
263 self.handle_on_request(callback, "ChangeConfiguration").await
264 }
265
266 pub async fn on_clear_cache<F: FnMut(ClearCacheRequest, Self) -> FF + Send + Sync + 'static, FF: Future<Output=Result<ClearCacheResponse, OCPP1_6Error>> + Send + Sync>(&self, callback: F) {
267 self.handle_on_request(callback, "ClearCache").await
268 }
269
270 pub async fn on_clear_charging_profile<F: FnMut(ClearChargingProfileRequest, Self) -> FF + Send + Sync + 'static, FF: Future<Output=Result<ClearChargingProfileResponse, OCPP1_6Error>> + Send + Sync>(&self, callback: F) {
271 self.handle_on_request(callback, "ClearChargingProfile").await
272 }
273
274 pub async fn on_data_transfer<F: FnMut(DataTransferRequest, Self) -> FF + Send + Sync + 'static, FF: Future<Output=Result<DataTransferResponse, OCPP1_6Error>> + Send + Sync>(&self, callback: F) {
275 self.handle_on_request(callback, "DataTransfer").await
276 }
277
278 pub async fn on_get_composite_schedule<F: FnMut(GetCompositeScheduleRequest, Self) -> FF + Send + Sync + 'static, FF: Future<Output=Result<GetCompositeScheduleResponse, OCPP1_6Error>> + Send + Sync>(&self, callback: F) {
279 self.handle_on_request(callback, "GetCompositeSchedule").await
280 }
281
282 pub async fn on_get_configuration<F: FnMut(GetConfigurationRequest, Self) -> FF + Send + Sync + 'static, FF: Future<Output=Result<GetConfigurationResponse, OCPP1_6Error>> + Send + Sync>(&self, callback: F) {
283 self.handle_on_request(callback, "GetConfiguration").await
284 }
285
286 pub async fn on_get_diagnostics<F: FnMut(GetDiagnosticsRequest, Self) -> FF + Send + Sync + 'static, FF: Future<Output=Result<GetDiagnosticsResponse, OCPP1_6Error>> + Send + Sync>(&self, callback: F) {
287 self.handle_on_request(callback, "GetDiagnostics").await
288 }
289
290 pub async fn on_get_local_list_version<F: FnMut(GetLocalListVersionRequest, Self) -> FF + Send + Sync + 'static, FF: Future<Output=Result<GetLocalListVersionResponse, OCPP1_6Error>> + Send + Sync>(&self, callback: F) {
291 self.handle_on_request(callback, "GetLocalListVersion").await
292 }
293
294 pub async fn on_remote_start_transaction<F: FnMut(RemoteStartTransactionRequest, Self) -> FF + Send + Sync + 'static, FF: Future<Output=Result<RemoteStartTransactionResponse, OCPP1_6Error>> + Send + Sync>(&self, callback: F) {
295 self.handle_on_request(callback, "RemoteStartTransaction").await
296 }
297
298 pub async fn on_remote_stop_transaction<F: FnMut(RemoteStopTransactionRequest, Self) -> FF + Send + Sync + 'static, FF: Future<Output=Result<RemoteStopTransactionResponse, OCPP1_6Error>> + Send + Sync>(&self, callback: F) {
299 self.handle_on_request(callback, "RemoteStopTransaction").await
300 }
301
302 pub async fn on_reserve_now<F: FnMut(ReserveNowRequest, Self) -> FF + Send + Sync + 'static, FF: Future<Output=Result<ReserveNowResponse, OCPP1_6Error>> + Send + Sync>(&self, callback: F) {
303 self.handle_on_request(callback, "ReserveNow").await
304 }
305
306 pub async fn on_reset<F: FnMut(ResetRequest, Self) -> FF + Send + Sync + 'static, FF: Future<Output=Result<ResetResponse, OCPP1_6Error>> + Send + Sync>(&self, callback: F) {
307 self.handle_on_request(callback, "Reset").await
308 }
309
310 pub async fn on_send_local_list<F: FnMut(SendLocalListRequest, Self) -> FF + Send + Sync + 'static, FF: Future<Output=Result<SendLocalListResponse, OCPP1_6Error>> + Send + Sync>(&self, callback: F) {
311 self.handle_on_request(callback, "SendLocalList").await
312 }
313
314 pub async fn on_set_charging_profile<F: FnMut(SetChargingProfileRequest, Self) -> FF + Send + Sync + 'static, FF: Future<Output=Result<SetChargingProfileResponse, OCPP1_6Error>> + Send + Sync>(&self, callback: F) {
315 self.handle_on_request(callback, "SetChargingProfile").await
316 }
317
318 pub async fn on_trigger_message<F: FnMut(TriggerMessageRequest, Self) -> FF + Send + Sync + 'static, FF: Future<Output=Result<TriggerMessageResponse, OCPP1_6Error>> + Send + Sync>(&self, callback: F) {
319 self.handle_on_request(callback, "TriggerMessage").await
320 }
321
322 pub async fn on_unlock_connector<F: FnMut(UnlockConnectorRequest, Self) -> FF + Send + Sync + 'static, FF: Future<Output=Result<UnlockConnectorResponse, OCPP1_6Error>> + Send + Sync>(&self, callback: F) {
323 self.handle_on_request(callback, "UnlockConnector").await
324 }
325
326 pub async fn on_update_firmware<F: FnMut(UpdateFirmwareRequest, Self) -> FF + Send + Sync + 'static, FF: Future<Output=Result<UpdateFirmwareResponse, OCPP1_6Error>> + Send + Sync>(&self, callback: F) {
327 self.handle_on_request(callback, "UpdateFirmware").await
328 }
329
330 #[cfg(feature = "test")]
331 pub async fn wait_for_cancel_reservation<F: FnMut(CancelReservationRequest, Self) -> FF + Send + Sync + 'static, FF: Future<Output=Result<CancelReservationResponse, OCPP1_6Error>> + Send + Sync>(&self, callback: F) -> Result<CancelReservationRequest, Box<dyn std::error::Error + Send + Sync>> {
332 self.handle_wait_for_request(callback, "CancelReservation").await
333 }
334
335 #[cfg(feature = "test")]
336 pub async fn wait_for_change_availability<F: FnMut(ChangeAvailabilityRequest, Self) -> FF + Send + Sync + 'static, FF: Future<Output=Result<ChangeAvailabilityResponse, OCPP1_6Error>> + Send + Sync>(&self, callback: F) -> Result<ChangeAvailabilityRequest, Box<dyn std::error::Error + Send + Sync>> {
337 self.handle_wait_for_request(callback, "ChangeAvailability").await
338 }
339
340 #[cfg(feature = "test")]
341 pub async fn wait_for_change_configuration<F: FnMut(ChangeConfigurationRequest, Self) -> FF + Send + Sync + 'static, FF: Future<Output=Result<ChangeConfigurationResponse, OCPP1_6Error>> + Send + Sync>(&self, callback: F) -> Result<ChangeConfigurationRequest, Box<dyn std::error::Error + Send + Sync>> {
342 self.handle_wait_for_request(callback, "ChangeConfiguration").await
343 }
344
345 #[cfg(feature = "test")]
346 pub async fn wait_for_clear_cache<F: FnMut(ClearCacheRequest, Self) -> FF + Send + Sync + 'static, FF: Future<Output=Result<ClearCacheResponse, OCPP1_6Error>> + Send + Sync>(&self, callback: F) -> Result<ClearCacheRequest, Box<dyn std::error::Error + Send + Sync>> {
347 self.handle_wait_for_request(callback, "ClearCache").await
348 }
349
350 #[cfg(feature = "test")]
351 pub async fn wait_for_clear_charging_profile<F: FnMut(ClearChargingProfileRequest, Self) -> FF + Send + Sync + 'static, FF: Future<Output=Result<ClearChargingProfileResponse, OCPP1_6Error>> + Send + Sync>(&self, callback: F) -> Result<ClearChargingProfileRequest, Box<dyn std::error::Error + Send + Sync>> {
352 self.handle_wait_for_request(callback, "ClearChargingProfile").await
353 }
354
355 #[cfg(feature = "test")]
356 pub async fn wait_for_data_transfer<F: FnMut(DataTransferRequest, Self) -> FF + Send + Sync + 'static, FF: Future<Output=Result<DataTransferResponse, OCPP1_6Error>> + Send + Sync>(&self, callback: F) -> Result<DataTransferRequest, Box<dyn std::error::Error + Send + Sync>> {
357 self.handle_wait_for_request(callback, "DataTransfer").await
358 }
359
360 #[cfg(feature = "test")]
361 pub async fn wait_for_get_composite_schedule<F: FnMut(GetCompositeScheduleRequest, Self) -> FF + Send + Sync + 'static, FF: Future<Output=Result<GetCompositeScheduleResponse, OCPP1_6Error>> + Send + Sync>(&self, callback: F) -> Result<GetCompositeScheduleRequest, Box<dyn std::error::Error + Send + Sync>> {
362 self.handle_wait_for_request(callback, "GetCompositeSchedule").await
363 }
364
365 #[cfg(feature = "test")]
366 pub async fn wait_for_get_configuration<F: FnMut(GetConfigurationRequest, Self) -> FF + Send + Sync + 'static, FF: Future<Output=Result<GetConfigurationResponse, OCPP1_6Error>> + Send + Sync>(&self, callback: F) -> Result<GetConfigurationRequest, Box<dyn std::error::Error + Send + Sync>> {
367 self.handle_wait_for_request(callback, "GetConfiguration").await
368 }
369
370 #[cfg(feature = "test")]
371 pub async fn wait_for_get_diagnostics<F: FnMut(GetDiagnosticsRequest, Self) -> FF + Send + Sync + 'static, FF: Future<Output=Result<GetDiagnosticsResponse, OCPP1_6Error>> + Send + Sync>(&self, callback: F) -> Result<GetDiagnosticsRequest, Box<dyn std::error::Error + Send + Sync>> {
372 self.handle_wait_for_request(callback, "GetDiagnostics").await
373 }
374
375 #[cfg(feature = "test")]
376 pub async fn wait_for_get_local_list_version<F: FnMut(GetLocalListVersionRequest, Self) -> FF + Send + Sync + 'static, FF: Future<Output=Result<GetLocalListVersionResponse, OCPP1_6Error>> + Send + Sync>(&self, callback: F) -> Result<GetLocalListVersionRequest, Box<dyn std::error::Error + Send + Sync>> {
377 self.handle_wait_for_request(callback, "GetLocalListVersion").await
378 }
379
380 #[cfg(feature = "test")]
381 pub async fn wait_for_remote_start_transaction<F: FnMut(RemoteStartTransactionRequest, Self) -> FF + Send + Sync + 'static, FF: Future<Output=Result<RemoteStartTransactionResponse, OCPP1_6Error>> + Send + Sync>(&self, callback: F) -> Result<RemoteStartTransactionRequest, Box<dyn std::error::Error + Send + Sync>> {
382 self.handle_wait_for_request(callback, "RemoteStartTransaction").await
383 }
384
385 #[cfg(feature = "test")]
386 pub async fn wait_for_remote_stop_transaction<F: FnMut(RemoteStopTransactionRequest, Self) -> FF + Send + Sync + 'static, FF: Future<Output=Result<RemoteStopTransactionResponse, OCPP1_6Error>> + Send + Sync>(&self, callback: F) -> Result<RemoteStopTransactionRequest, Box<dyn std::error::Error + Send + Sync>> {
387 self.handle_wait_for_request(callback, "RemoteStopTransaction").await
388 }
389
390 #[cfg(feature = "test")]
391 pub async fn wait_for_reserve_now<F: FnMut(ReserveNowRequest, Self) -> FF + Send + Sync + 'static, FF: Future<Output=Result<ReserveNowResponse, OCPP1_6Error>> + Send + Sync>(&self, callback: F) -> Result<ReserveNowRequest, Box<dyn std::error::Error + Send + Sync>> {
392 self.handle_wait_for_request(callback, "ReserveNow").await
393 }
394
395 #[cfg(feature = "test")]
396 pub async fn wait_for_reset<F: FnMut(ResetRequest, Self) -> FF + Send + Sync + 'static, FF: Future<Output=Result<ResetResponse, OCPP1_6Error>> + Send + Sync>(&self, callback: F) -> Result<ResetRequest, Box<dyn std::error::Error + Send + Sync>> {
397 self.handle_wait_for_request(callback, "Reset").await
398 }
399
400 #[cfg(feature = "test")]
401 pub async fn wait_for_send_local_list<F: FnMut(SendLocalListRequest, Self) -> FF + Send + Sync + 'static, FF: Future<Output=Result<SendLocalListResponse, OCPP1_6Error>> + Send + Sync>(&self, callback: F) -> Result<SendLocalListRequest, Box<dyn std::error::Error + Send + Sync>> {
402 self.handle_wait_for_request(callback, "SendLocalList").await
403 }
404
405 #[cfg(feature = "test")]
406 pub async fn wait_for_set_charging_profile<F: FnMut(SetChargingProfileRequest, Self) -> FF + Send + Sync + 'static, FF: Future<Output=Result<SetChargingProfileResponse, OCPP1_6Error>> + Send + Sync>(&self, callback: F) -> Result<SetChargingProfileRequest, Box<dyn std::error::Error + Send + Sync>> {
407 self.handle_wait_for_request(callback, "SetChargingProfile").await
408 }
409
410 #[cfg(feature = "test")]
411 pub async fn wait_for_trigger_message<F: FnMut(TriggerMessageRequest, Self) -> FF + Send + Sync + 'static, FF: Future<Output=Result<TriggerMessageResponse, OCPP1_6Error>> + Send + Sync>(&self, callback: F) -> Result<TriggerMessageRequest, Box<dyn std::error::Error + Send + Sync>> {
412 self.handle_wait_for_request(callback, "TriggerMessage").await
413 }
414
415 #[cfg(feature = "test")]
416 pub async fn wait_for_unlock_connector<F: FnMut(UnlockConnectorRequest, Self) -> FF + Send + Sync + 'static, FF: Future<Output=Result<UnlockConnectorResponse, OCPP1_6Error>> + Send + Sync>(&self, callback: F) -> Result<UnlockConnectorRequest, Box<dyn std::error::Error + Send + Sync>> {
417 self.handle_wait_for_request(callback, "UnlockConnector").await
418 }
419
420 #[cfg(feature = "test")]
421 pub async fn wait_for_update_firmware<F: FnMut(UpdateFirmwareRequest, Self) -> FF + Send + Sync + 'static, FF: Future<Output=Result<UpdateFirmwareResponse, OCPP1_6Error>> + Send + Sync>(&self, callback: F) -> Result<UpdateFirmwareRequest, Box<dyn std::error::Error + Send + Sync>> {
422 self.handle_wait_for_request(callback, "UpdateFirmware").await
423 }
424
425 pub async fn on_ping<F: FnMut(Self) -> FF + Send + Sync + 'static, FF: Future<Output=()> + Send + Sync>(&self, mut callback: F) {
426 let mut recv = self.ping_sender.subscribe();
427
428 let s = self.clone();
429 tokio::spawn(async move {
430 while let Ok(()) = recv.recv().await {
431 callback(s.clone()).await;
432 }
433 });
434 }
435
436 async fn handle_on_request<P: DeserializeOwned + Send + Sync, R: Serialize + Send + Sync, F: FnMut(P, Self) -> FF + Send + Sync + 'static, FF: Future<Output=Result<R, OCPP1_6Error>> + Send + Sync>(&self, mut callback: F, action: &'static str) {
437 let (sender, mut recv) = mpsc::channel(1000);
438 {
439 let mut lock = self.request_senders.lock().await;
440 lock.insert(action.to_string(), sender);
441
442 }
443
444 let s = self.clone();
445 tokio::spawn(async move {
446 while let Some(call) = recv.recv().await {
447 match serde_json::from_value(call.3) {
448 Ok(payload) => {
449 let response = callback(payload, s.clone()).await;
450 s.do_send_response(response, &call.1).await
451 }
452 Err(err) => {
453 println!("Failed to parse payload: {:?}", err)
454 }
455 }
456 }
457 });
458 }
459
460 #[cfg(feature = "test")]
461 async fn handle_wait_for_request<P: DeserializeOwned + Send + Sync, R: Serialize + Send + Sync, F: FnMut(P, Self) -> FF + Send + Sync + 'static, FF: Future<Output=Result<R, OCPP1_6Error>> + Send + Sync>(&self, mut callback: F, action: &'static str) -> Result<P, Box<dyn std::error::Error + Send + Sync>> {
462 let (sender, mut recv) = mpsc::channel(1000);
463 {
464 let mut lock = self.request_senders.lock().await;
465 lock.insert(action.to_string(), sender);
466
467 }
468
469 let s = self.clone();
470 match timeout(self.timeout, recv.recv()).await {
471 Ok(opt) => {
472 match opt {
473 None => {
474 Err("No call received".into())
475 }
476 Some(call) => {
477 match serde_json::from_value(call.3.clone()) {
478 Ok(payload) => {
479 let response = callback(payload, s.clone()).await;
480 self.do_send_response(response, &call.1).await;
481 Ok(serde_json::from_value(call.3).unwrap())
482 }
483 Err(err) => {
484 println!("Failed to parse payload: {:?}", err);
485 Err("Failed to parse payload".into())
486 }
487 }
488 }
489 }
490 }
491 Err(_) => {
492 Err("Timeout".into())
493 }
494 }
495 }
496
497 async fn do_send_response<R: Serialize>(&self, response: Result<R, OCPP1_6Error>, message_id: &str) {
498 let payload = match response {
499 Ok(r) => {
500 serde_json::to_string(&RawOcpp1_6Result(3, message_id.to_string(), serde_json::to_value(r).unwrap())).unwrap()
501 }
502 Err(e) => {
503 serde_json::to_string(&RawOcpp1_6Error(4, message_id.to_string(), e.code().to_string(), e.description().to_string(), e.details().to_owned())).unwrap()
504 }
505 };
506
507 let mut lock = self.sink.lock().await;
508 if let Err(err) = lock.send(Message::Text(payload)).await {
509 println!("Failed to send response: {:?}", err)
510 }
511 }
512
513 async fn do_send_request<P: Serialize, R: DeserializeOwned>(&self, request: P, action: &str) -> Result<Result<R, OCPP1_6Error>, Box<dyn std::error::Error + Send + Sync>> {
514 let message_id = Uuid::new_v4();
515
516 let call = RawOcpp1_6Call(2, message_id.to_string(), action.to_string(), serde_json::to_value(&request)?);
517
518 {
519 let mut lock = self.sink.lock().await;
520 lock.send(Message::Text(serde_json::to_string(&call)?)).await?;
521 }
522
523 let (s, r) = oneshot::channel();
524 {
525 let mut response_channels = self.response_channels.lock().await;
526 response_channels.insert(message_id, s);
527 }
528
529 match timeout(self.timeout, r).await? {
530 Ok(res) => {
531 match res {
532 Ok(value) => {
533 Ok(Ok(serde_json::from_value(value)?))
534 }
535 Err(e) => Ok(Err(e))
536 }
537 }
538 Err(_) => {
539 Err("Timeout".into())
540 }
541 }
542 }
543}