ocpp_client/ocpp_1_6/
ocpp_1_6_client.rs

1use std::collections::{BTreeMap, VecDeque};
2use std::future::Future;
3use std::sync::Arc;
4use std::time::Duration;
5use futures::{SinkExt, StreamExt, TryStreamExt};
6use futures::stream::{SplitSink};
7use rust_ocpp::v1_6::messages::authorize::{AuthorizeRequest, AuthorizeResponse};
8use rust_ocpp::v1_6::messages::boot_notification::{BootNotificationRequest, BootNotificationResponse};
9use rust_ocpp::v1_6::messages::cancel_reservation::{CancelReservationRequest, CancelReservationResponse};
10use rust_ocpp::v1_6::messages::change_availability::{ChangeAvailabilityRequest, ChangeAvailabilityResponse};
11use rust_ocpp::v1_6::messages::change_configuration::{ChangeConfigurationRequest, ChangeConfigurationResponse};
12use rust_ocpp::v1_6::messages::clear_cache::{ClearCacheRequest, ClearCacheResponse};
13use rust_ocpp::v1_6::messages::clear_charging_profile::{ClearChargingProfileRequest, ClearChargingProfileResponse};
14use rust_ocpp::v1_6::messages::data_transfer::{DataTransferRequest, DataTransferResponse};
15use rust_ocpp::v1_6::messages::diagnostics_status_notification::{DiagnosticsStatusNotificationRequest, DiagnosticsStatusNotificationResponse};
16use rust_ocpp::v1_6::messages::firmware_status_notification::{FirmwareStatusNotificationRequest, FirmwareStatusNotificationResponse};
17use rust_ocpp::v1_6::messages::get_composite_schedule::{GetCompositeScheduleRequest, GetCompositeScheduleResponse};
18use rust_ocpp::v1_6::messages::get_configuration::{GetConfigurationRequest, GetConfigurationResponse};
19use rust_ocpp::v1_6::messages::get_diagnostics::{GetDiagnosticsRequest, GetDiagnosticsResponse};
20use rust_ocpp::v1_6::messages::get_local_list_version::{GetLocalListVersionRequest, GetLocalListVersionResponse};
21use rust_ocpp::v1_6::messages::heart_beat::{HeartbeatRequest, HeartbeatResponse};
22use rust_ocpp::v1_6::messages::meter_values::{MeterValuesRequest, MeterValuesResponse};
23use rust_ocpp::v1_6::messages::remote_start_transaction::{RemoteStartTransactionRequest, RemoteStartTransactionResponse};
24use rust_ocpp::v1_6::messages::remote_stop_transaction::{RemoteStopTransactionRequest, RemoteStopTransactionResponse};
25use rust_ocpp::v1_6::messages::reserve_now::{ReserveNowRequest, ReserveNowResponse};
26use rust_ocpp::v1_6::messages::reset::{ResetRequest, ResetResponse};
27use rust_ocpp::v1_6::messages::send_local_list::{SendLocalListRequest, SendLocalListResponse};
28use rust_ocpp::v1_6::messages::set_charging_profile::{SetChargingProfileRequest, SetChargingProfileResponse};
29use rust_ocpp::v1_6::messages::start_transaction::{StartTransactionRequest, StartTransactionResponse};
30use rust_ocpp::v1_6::messages::status_notification::{StatusNotificationRequest, StatusNotificationResponse};
31use rust_ocpp::v1_6::messages::stop_transaction::{StopTransactionRequest, StopTransactionResponse};
32use rust_ocpp::v1_6::messages::trigger_message::{TriggerMessageRequest, TriggerMessageResponse};
33use rust_ocpp::v1_6::messages::unlock_connector::{UnlockConnectorRequest, UnlockConnectorResponse};
34use rust_ocpp::v1_6::messages::update_firmware::{UpdateFirmwareRequest, UpdateFirmwareResponse};
35use serde::de::DeserializeOwned;
36use serde::Serialize;
37use serde_json::{Value};
38use tokio::net::TcpStream;
39use tokio::sync::{Mutex, oneshot, broadcast, mpsc};
40use tokio::sync::broadcast::Sender;
41use tokio::time::timeout;
42use tokio_tungstenite::{MaybeTlsStream, WebSocketStream};
43use tokio_tungstenite::tungstenite::Message;
44use uuid::Uuid;
45use crate::ocpp_1_6::ocpp_1_6_error::OCPP1_6Error;
46use crate::ocpp_1_6::raw_ocpp_1_6_call::RawOcpp1_6Call;
47use crate::ocpp_1_6::raw_ocpp_1_6_error::RawOcpp1_6Error;
48use crate::ocpp_1_6::raw_ocpp_1_6_result::RawOcpp1_6Result;
49
50/// OCPP 1.6 client
51#[derive(Clone)]
52pub struct OCPP1_6Client {
53    sink: Arc<Mutex<SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, Message>>>,
54    response_channels: Arc<Mutex<BTreeMap<Uuid, oneshot::Sender<Result<Value, OCPP1_6Error>>>>>,
55    request_senders: Arc<Mutex<BTreeMap<String, mpsc::Sender<RawOcpp1_6Call>>>>,
56    pong_channels: Arc<Mutex<VecDeque<oneshot::Sender<()>>>>,
57    ping_sender: Sender<()>,
58    timeout: Duration
59}
60
61impl OCPP1_6Client {
62    pub(crate) fn new(stream: WebSocketStream<MaybeTlsStream<TcpStream>>) -> Self {
63        let (sink, stream) = stream.split();
64        let sink = Arc::new(Mutex::new(sink));
65
66        let response_channels = Arc::new(Mutex::new(BTreeMap::<Uuid, oneshot::Sender<Result<Value, OCPP1_6Error>>>::new()));
67        let response_channels2 = Arc::clone(&response_channels);
68
69        let pong_channels = Arc::new(Mutex::new(VecDeque::<oneshot::Sender<()>>::new()));
70        let pong_channels2 = Arc::clone(&pong_channels);
71
72        let request_senders: Arc<Mutex<BTreeMap<String, mpsc::Sender<RawOcpp1_6Call>>>> = Arc::new(Mutex::new(BTreeMap::new()));
73
74        let request_senders2 = request_senders.clone();
75        let sink2 = sink.clone();
76
77        let (ping_sender, _) = tokio::sync::broadcast::channel(10);
78        let ping_sender2 = ping_sender.clone();
79
80        tokio::spawn(async move {
81            stream
82                .map_err(|e| Box::<dyn std::error::Error + Send + Sync>::from(e))
83                .try_for_each(|message| {
84                    let response_channels2 = response_channels2.clone();
85                    let ping_sender = ping_sender2.clone();
86                    let pong_channels2 = pong_channels2.clone();
87                    let request_senders = request_senders2.clone();
88                    let sink = sink2.clone();
89                    async move {
90                        match message {
91                            Message::Text(raw_payload) => {
92                                let raw_value = serde_json::from_str(&raw_payload)?;
93
94                                match raw_value {
95                                    Value::Array(list) => {
96                                        if let Some(message_type_item) = list.get(0) {
97                                            if let Value::Number(message_type_raw) = message_type_item {
98                                                if let Some(message_type) = message_type_raw.as_u64() {
99                                                    match message_type {
100                                                        // CALL
101                                                        2 => {
102                                                            let call: RawOcpp1_6Call =
103                                                                serde_json::from_str(&raw_payload).unwrap();
104                                                            let action  = &call.2;
105                                                            let sender_opt = {
106                                                                let lock = request_senders.lock().await;
107                                                                lock.get(action).cloned()
108                                                            };
109                                                            match sender_opt {
110                                                                None => {
111                                                                    let error = OCPP1_6Error::new_not_implemented(&format!("Action '{}' is not implemented", action));
112                                                                    let payload = serde_json::to_string(&RawOcpp1_6Error(4, call.1.to_string(), error.code().to_string(), error.description().to_string(), error.details().to_owned())).unwrap();
113                                                                    let mut lock = sink.lock().await;
114                                                                    if let Err(err) = lock.send(Message::Text(payload)).await {
115                                                                        println!("Failed to send response: {:?}", err)
116                                                                    }
117                                                                }
118                                                                Some(sender) => {
119                                                                    if let Err(err) = sender.send(call).await {
120                                                                        println!("Error sending request: {:?}", err);
121                                                                    };
122                                                                }
123                                                            }
124                                                        },
125                                                        // RESPONSE
126                                                        3 => {
127                                                            let result: RawOcpp1_6Result =
128                                                                serde_json::from_str(&raw_payload).unwrap();
129                                                            let mut lock = response_channels2.lock().await;
130                                                            if let Some(sender) = lock.remove(&Uuid::parse_str(&result.1)?) {
131                                                                sender.send(Ok(result.2)).unwrap();
132                                                            }
133                                                        },
134                                                        // ERROR
135                                                        4 => {
136                                                            let error: RawOcpp1_6Error =
137                                                                serde_json::from_str(&raw_payload)?;
138                                                            let mut lock = response_channels2.lock().await;
139                                                            if let Some(sender) = lock.remove(&Uuid::parse_str(&error.1)?) {
140                                                                sender.send(Err(error.into())).unwrap();
141                                                            }
142                                                        },
143                                                        _ => println!("Unknown message type"),
144                                                    }
145                                                } else {
146                                                    println!("The message type has to be an integer, it cant have decimals")
147                                                }
148                                            } else {
149                                                println!("The first item in the array was not a number")
150                                            }
151                                        } else {
152                                            println!("The root list was empty")
153                                        }
154                                    }
155                                    _ => println!("A message should be an array of items"),
156                                }
157
158                            }
159                            Message::Ping(_) => {
160                                if ping_sender.receiver_count() > 0 {
161                                    if let Err(err) = ping_sender.send(()) {
162                                        println!("Error sending websocket ping: {:?}", err);
163                                    };
164                                }
165                            }
166                            Message::Pong(_) => {
167                                let mut lock = pong_channels2.lock().await;
168                                if let Some(sender) = lock.pop_back() {
169                                    sender.send(()).unwrap();
170                                }
171                            }
172                            _ => {}
173                        }
174                        Ok(())
175                    }
176
177                }).await?;
178            Ok::<(), Box<dyn std::error::Error + Send + Sync>>(())
179        });
180
181        Self {
182            sink,
183            response_channels,
184            request_senders,
185            pong_channels,
186            ping_sender,
187            timeout: Duration::from_secs(5)
188        }
189    }
190
191    /// Disconnect from the server
192    pub async fn disconnect(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
193        let mut lock = self.sink.lock().await;
194        lock.close().await?;
195        Ok(())
196    }
197
198    pub async fn send_authorize(&self, request: AuthorizeRequest) -> Result<Result<AuthorizeResponse, OCPP1_6Error>, Box<dyn std::error::Error + Send + Sync>> {
199        self.do_send_request(request, "Authorize").await
200    }
201
202    pub async fn send_boot_notification(&self, request: BootNotificationRequest) -> Result<Result<BootNotificationResponse, OCPP1_6Error>, Box<dyn std::error::Error + Send + Sync>> {
203        self.do_send_request(request, "BootNotification").await
204    }
205
206    pub async fn send_data_transfer(&self, request: DataTransferRequest) -> Result<Result<DataTransferResponse, OCPP1_6Error>, Box<dyn std::error::Error + Send + Sync>> {
207        self.do_send_request(request, "DataTransfer").await
208    }
209
210    pub async fn send_diagnostics_status_notification(&self, request: DiagnosticsStatusNotificationRequest) -> Result<Result<DiagnosticsStatusNotificationResponse, OCPP1_6Error>, Box<dyn std::error::Error + Send + Sync>> {
211        self.do_send_request(request, "DiagnosticsStatusNotification").await
212    }
213
214    pub async fn send_firmware_status_notification(&self, request: FirmwareStatusNotificationRequest) -> Result<Result<FirmwareStatusNotificationResponse, OCPP1_6Error>, Box<dyn std::error::Error + Send + Sync>> {
215        self.do_send_request(request, "FirmwareStatusNotification").await
216    }
217
218    pub async fn send_heartbeat(&self, request: HeartbeatRequest) -> Result<Result<HeartbeatResponse, OCPP1_6Error>, Box<dyn std::error::Error + Send + Sync>> {
219        self.do_send_request(request, "Heartbeat").await
220    }
221
222    pub async fn send_meter_values(&self, request: MeterValuesRequest) -> Result<Result<MeterValuesResponse, OCPP1_6Error>, Box<dyn std::error::Error + Send + Sync>> {
223        self.do_send_request(request, "MeterValues").await
224    }
225
226    pub async fn send_start_transaction(&self, request: StartTransactionRequest) -> Result<Result<StartTransactionResponse, OCPP1_6Error>, Box<dyn std::error::Error + Send + Sync>> {
227        self.do_send_request(request, "StartTransaction").await
228    }
229
230    pub async fn send_status_notification(&self, request: StatusNotificationRequest) -> Result<Result<StatusNotificationResponse, OCPP1_6Error>, Box<dyn std::error::Error + Send + Sync>> {
231        self.do_send_request(request, "StatusNotification").await
232    }
233
234    pub async fn send_stop_transaction(&self, request: StopTransactionRequest) -> Result<Result<StopTransactionResponse, OCPP1_6Error>, Box<dyn std::error::Error + Send + Sync>> {
235        self.do_send_request(request, "StopTransaction").await
236    }
237
238    pub async fn send_ping(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
239        {
240            let mut lock = self.sink.lock().await;
241            lock.send(Message::Ping(vec![])).await?;
242        }
243
244        let (s, r) = oneshot::channel();
245        {
246            let mut pong_channels = self.pong_channels.lock().await;
247            pong_channels.push_front(s);
248        }
249
250        r.await?;
251        Ok(())
252    }
253
254    pub async fn on_cancel_reservation<F: FnMut(CancelReservationRequest, Self) -> FF + Send + Sync + 'static, FF: Future<Output=Result<CancelReservationResponse, OCPP1_6Error>> + Send + Sync>(&self, callback: F) {
255        self.handle_on_request(callback, "CancelReservation").await
256    }
257
258    pub async fn on_change_availability<F: FnMut(ChangeAvailabilityRequest, Self) -> FF + Send + Sync + 'static, FF: Future<Output=Result<ChangeAvailabilityResponse, OCPP1_6Error>> + Send + Sync>(&self, callback: F) {
259        self.handle_on_request(callback, "ChangeAvailability").await
260    }
261
262    pub async fn on_change_configuration<F: FnMut(ChangeConfigurationRequest, Self) -> FF + Send + Sync + 'static, FF: Future<Output=Result<ChangeConfigurationResponse, OCPP1_6Error>> + Send + Sync>(&self, callback: F) {
263        self.handle_on_request(callback, "ChangeConfiguration").await
264    }
265
266    pub async fn on_clear_cache<F: FnMut(ClearCacheRequest, Self) -> FF + Send + Sync + 'static, FF: Future<Output=Result<ClearCacheResponse, OCPP1_6Error>> + Send + Sync>(&self, callback: F) {
267        self.handle_on_request(callback, "ClearCache").await
268    }
269
270    pub async fn on_clear_charging_profile<F: FnMut(ClearChargingProfileRequest, Self) -> FF + Send + Sync + 'static, FF: Future<Output=Result<ClearChargingProfileResponse, OCPP1_6Error>> + Send + Sync>(&self, callback: F) {
271        self.handle_on_request(callback, "ClearChargingProfile").await
272    }
273
274    pub async fn on_data_transfer<F: FnMut(DataTransferRequest, Self) -> FF + Send + Sync + 'static, FF: Future<Output=Result<DataTransferResponse, OCPP1_6Error>> + Send + Sync>(&self, callback: F) {
275        self.handle_on_request(callback, "DataTransfer").await
276    }
277
278    pub async fn on_get_composite_schedule<F: FnMut(GetCompositeScheduleRequest, Self) -> FF + Send + Sync + 'static, FF: Future<Output=Result<GetCompositeScheduleResponse, OCPP1_6Error>> + Send + Sync>(&self, callback: F) {
279        self.handle_on_request(callback, "GetCompositeSchedule").await
280    }
281
282    pub async fn on_get_configuration<F: FnMut(GetConfigurationRequest, Self) -> FF + Send + Sync + 'static, FF: Future<Output=Result<GetConfigurationResponse, OCPP1_6Error>> + Send + Sync>(&self, callback: F) {
283        self.handle_on_request(callback, "GetConfiguration").await
284    }
285
286    pub async fn on_get_diagnostics<F: FnMut(GetDiagnosticsRequest, Self) -> FF + Send + Sync + 'static, FF: Future<Output=Result<GetDiagnosticsResponse, OCPP1_6Error>> + Send + Sync>(&self, callback: F) {
287        self.handle_on_request(callback, "GetDiagnostics").await
288    }
289
290    pub async fn on_get_local_list_version<F: FnMut(GetLocalListVersionRequest, Self) -> FF + Send + Sync + 'static, FF: Future<Output=Result<GetLocalListVersionResponse, OCPP1_6Error>> + Send + Sync>(&self, callback: F) {
291        self.handle_on_request(callback, "GetLocalListVersion").await
292    }
293
294    pub async fn on_remote_start_transaction<F: FnMut(RemoteStartTransactionRequest, Self) -> FF + Send + Sync + 'static, FF: Future<Output=Result<RemoteStartTransactionResponse, OCPP1_6Error>> + Send + Sync>(&self, callback: F) {
295        self.handle_on_request(callback, "RemoteStartTransaction").await
296    }
297
298    pub async fn on_remote_stop_transaction<F: FnMut(RemoteStopTransactionRequest, Self) -> FF + Send + Sync + 'static, FF: Future<Output=Result<RemoteStopTransactionResponse, OCPP1_6Error>> + Send + Sync>(&self, callback: F) {
299        self.handle_on_request(callback, "RemoteStopTransaction").await
300    }
301
302    pub async fn on_reserve_now<F: FnMut(ReserveNowRequest, Self) -> FF + Send + Sync + 'static, FF: Future<Output=Result<ReserveNowResponse, OCPP1_6Error>> + Send + Sync>(&self, callback: F) {
303        self.handle_on_request(callback, "ReserveNow").await
304    }
305
306    pub async fn on_reset<F: FnMut(ResetRequest, Self) -> FF + Send + Sync + 'static, FF: Future<Output=Result<ResetResponse, OCPP1_6Error>> + Send + Sync>(&self, callback: F) {
307        self.handle_on_request(callback, "Reset").await
308    }
309
310    pub async fn on_send_local_list<F: FnMut(SendLocalListRequest, Self) -> FF + Send + Sync + 'static, FF: Future<Output=Result<SendLocalListResponse, OCPP1_6Error>> + Send + Sync>(&self, callback: F) {
311        self.handle_on_request(callback, "SendLocalList").await
312    }
313
314    pub async fn on_set_charging_profile<F: FnMut(SetChargingProfileRequest, Self) -> FF + Send + Sync + 'static, FF: Future<Output=Result<SetChargingProfileResponse, OCPP1_6Error>> + Send + Sync>(&self, callback: F) {
315        self.handle_on_request(callback, "SetChargingProfile").await
316    }
317
318    pub async fn on_trigger_message<F: FnMut(TriggerMessageRequest, Self) -> FF + Send + Sync + 'static, FF: Future<Output=Result<TriggerMessageResponse, OCPP1_6Error>> + Send + Sync>(&self, callback: F) {
319        self.handle_on_request(callback, "TriggerMessage").await
320    }
321
322    pub async fn on_unlock_connector<F: FnMut(UnlockConnectorRequest, Self) -> FF + Send + Sync + 'static, FF: Future<Output=Result<UnlockConnectorResponse, OCPP1_6Error>> + Send + Sync>(&self, callback: F) {
323        self.handle_on_request(callback, "UnlockConnector").await
324    }
325
326    pub async fn on_update_firmware<F: FnMut(UpdateFirmwareRequest, Self) -> FF + Send + Sync + 'static, FF: Future<Output=Result<UpdateFirmwareResponse, OCPP1_6Error>> + Send + Sync>(&self, callback: F) {
327        self.handle_on_request(callback, "UpdateFirmware").await
328    }
329
330    #[cfg(feature = "test")]
331    pub async fn wait_for_cancel_reservation<F: FnMut(CancelReservationRequest, Self) -> FF + Send + Sync + 'static, FF: Future<Output=Result<CancelReservationResponse, OCPP1_6Error>> + Send + Sync>(&self, callback: F) -> Result<CancelReservationRequest, Box<dyn std::error::Error + Send + Sync>> {
332        self.handle_wait_for_request(callback, "CancelReservation").await
333    }
334
335    #[cfg(feature = "test")]
336    pub async fn wait_for_change_availability<F: FnMut(ChangeAvailabilityRequest, Self) -> FF + Send + Sync + 'static, FF: Future<Output=Result<ChangeAvailabilityResponse, OCPP1_6Error>> + Send + Sync>(&self, callback: F) -> Result<ChangeAvailabilityRequest, Box<dyn std::error::Error + Send + Sync>> {
337        self.handle_wait_for_request(callback, "ChangeAvailability").await
338    }
339
340    #[cfg(feature = "test")]
341    pub async fn wait_for_change_configuration<F: FnMut(ChangeConfigurationRequest, Self) -> FF + Send + Sync + 'static, FF: Future<Output=Result<ChangeConfigurationResponse, OCPP1_6Error>> + Send + Sync>(&self, callback: F) -> Result<ChangeConfigurationRequest, Box<dyn std::error::Error + Send + Sync>> {
342        self.handle_wait_for_request(callback, "ChangeConfiguration").await
343    }
344
345    #[cfg(feature = "test")]
346    pub async fn wait_for_clear_cache<F: FnMut(ClearCacheRequest, Self) -> FF + Send + Sync + 'static, FF: Future<Output=Result<ClearCacheResponse, OCPP1_6Error>> + Send + Sync>(&self, callback: F) -> Result<ClearCacheRequest, Box<dyn std::error::Error + Send + Sync>> {
347        self.handle_wait_for_request(callback, "ClearCache").await
348    }
349
350    #[cfg(feature = "test")]
351    pub async fn wait_for_clear_charging_profile<F: FnMut(ClearChargingProfileRequest, Self) -> FF + Send + Sync + 'static, FF: Future<Output=Result<ClearChargingProfileResponse, OCPP1_6Error>> + Send + Sync>(&self, callback: F) -> Result<ClearChargingProfileRequest, Box<dyn std::error::Error + Send + Sync>> {
352        self.handle_wait_for_request(callback, "ClearChargingProfile").await
353    }
354
355    #[cfg(feature = "test")]
356    pub async fn wait_for_data_transfer<F: FnMut(DataTransferRequest, Self) -> FF + Send + Sync + 'static, FF: Future<Output=Result<DataTransferResponse, OCPP1_6Error>> + Send + Sync>(&self, callback: F) -> Result<DataTransferRequest, Box<dyn std::error::Error + Send + Sync>> {
357        self.handle_wait_for_request(callback, "DataTransfer").await
358    }
359
360    #[cfg(feature = "test")]
361    pub async fn wait_for_get_composite_schedule<F: FnMut(GetCompositeScheduleRequest, Self) -> FF + Send + Sync + 'static, FF: Future<Output=Result<GetCompositeScheduleResponse, OCPP1_6Error>> + Send + Sync>(&self, callback: F) -> Result<GetCompositeScheduleRequest, Box<dyn std::error::Error + Send + Sync>> {
362        self.handle_wait_for_request(callback, "GetCompositeSchedule").await
363    }
364
365    #[cfg(feature = "test")]
366    pub async fn wait_for_get_configuration<F: FnMut(GetConfigurationRequest, Self) -> FF + Send + Sync + 'static, FF: Future<Output=Result<GetConfigurationResponse, OCPP1_6Error>> + Send + Sync>(&self, callback: F) -> Result<GetConfigurationRequest, Box<dyn std::error::Error + Send + Sync>> {
367        self.handle_wait_for_request(callback, "GetConfiguration").await
368    }
369
370    #[cfg(feature = "test")]
371    pub async fn wait_for_get_diagnostics<F: FnMut(GetDiagnosticsRequest, Self) -> FF + Send + Sync + 'static, FF: Future<Output=Result<GetDiagnosticsResponse, OCPP1_6Error>> + Send + Sync>(&self, callback: F) -> Result<GetDiagnosticsRequest, Box<dyn std::error::Error + Send + Sync>> {
372        self.handle_wait_for_request(callback, "GetDiagnostics").await
373    }
374
375    #[cfg(feature = "test")]
376    pub async fn wait_for_get_local_list_version<F: FnMut(GetLocalListVersionRequest, Self) -> FF + Send + Sync + 'static, FF: Future<Output=Result<GetLocalListVersionResponse, OCPP1_6Error>> + Send + Sync>(&self, callback: F) -> Result<GetLocalListVersionRequest, Box<dyn std::error::Error + Send + Sync>> {
377        self.handle_wait_for_request(callback, "GetLocalListVersion").await
378    }
379
380    #[cfg(feature = "test")]
381    pub async fn wait_for_remote_start_transaction<F: FnMut(RemoteStartTransactionRequest, Self) -> FF + Send + Sync + 'static, FF: Future<Output=Result<RemoteStartTransactionResponse, OCPP1_6Error>> + Send + Sync>(&self, callback: F) -> Result<RemoteStartTransactionRequest, Box<dyn std::error::Error + Send + Sync>> {
382        self.handle_wait_for_request(callback, "RemoteStartTransaction").await
383    }
384
385    #[cfg(feature = "test")]
386    pub async fn wait_for_remote_stop_transaction<F: FnMut(RemoteStopTransactionRequest, Self) -> FF + Send + Sync + 'static, FF: Future<Output=Result<RemoteStopTransactionResponse, OCPP1_6Error>> + Send + Sync>(&self, callback: F) -> Result<RemoteStopTransactionRequest, Box<dyn std::error::Error + Send + Sync>> {
387        self.handle_wait_for_request(callback, "RemoteStopTransaction").await
388    }
389
390    #[cfg(feature = "test")]
391    pub async fn wait_for_reserve_now<F: FnMut(ReserveNowRequest, Self) -> FF + Send + Sync + 'static, FF: Future<Output=Result<ReserveNowResponse, OCPP1_6Error>> + Send + Sync>(&self, callback: F) -> Result<ReserveNowRequest, Box<dyn std::error::Error + Send + Sync>> {
392        self.handle_wait_for_request(callback, "ReserveNow").await
393    }
394
395    #[cfg(feature = "test")]
396    pub async fn wait_for_reset<F: FnMut(ResetRequest, Self) -> FF + Send + Sync + 'static, FF: Future<Output=Result<ResetResponse, OCPP1_6Error>> + Send + Sync>(&self, callback: F) -> Result<ResetRequest, Box<dyn std::error::Error + Send + Sync>> {
397        self.handle_wait_for_request(callback, "Reset").await
398    }
399
400    #[cfg(feature = "test")]
401    pub async fn wait_for_send_local_list<F: FnMut(SendLocalListRequest, Self) -> FF + Send + Sync + 'static, FF: Future<Output=Result<SendLocalListResponse, OCPP1_6Error>> + Send + Sync>(&self, callback: F) -> Result<SendLocalListRequest, Box<dyn std::error::Error + Send + Sync>> {
402        self.handle_wait_for_request(callback, "SendLocalList").await
403    }
404
405    #[cfg(feature = "test")]
406    pub async fn wait_for_set_charging_profile<F: FnMut(SetChargingProfileRequest, Self) -> FF + Send + Sync + 'static, FF: Future<Output=Result<SetChargingProfileResponse, OCPP1_6Error>> + Send + Sync>(&self, callback: F) -> Result<SetChargingProfileRequest, Box<dyn std::error::Error + Send + Sync>> {
407        self.handle_wait_for_request(callback, "SetChargingProfile").await
408    }
409
410    #[cfg(feature = "test")]
411    pub async fn wait_for_trigger_message<F: FnMut(TriggerMessageRequest, Self) -> FF + Send + Sync + 'static, FF: Future<Output=Result<TriggerMessageResponse, OCPP1_6Error>> + Send + Sync>(&self, callback: F) -> Result<TriggerMessageRequest, Box<dyn std::error::Error + Send + Sync>> {
412        self.handle_wait_for_request(callback, "TriggerMessage").await
413    }
414
415    #[cfg(feature = "test")]
416    pub async fn wait_for_unlock_connector<F: FnMut(UnlockConnectorRequest, Self) -> FF + Send + Sync + 'static, FF: Future<Output=Result<UnlockConnectorResponse, OCPP1_6Error>> + Send + Sync>(&self, callback: F) -> Result<UnlockConnectorRequest, Box<dyn std::error::Error + Send + Sync>> {
417        self.handle_wait_for_request(callback, "UnlockConnector").await
418    }
419
420    #[cfg(feature = "test")]
421    pub async fn wait_for_update_firmware<F: FnMut(UpdateFirmwareRequest, Self) -> FF + Send + Sync + 'static, FF: Future<Output=Result<UpdateFirmwareResponse, OCPP1_6Error>> + Send + Sync>(&self, callback: F) -> Result<UpdateFirmwareRequest, Box<dyn std::error::Error + Send + Sync>> {
422        self.handle_wait_for_request(callback, "UpdateFirmware").await
423    }
424
425    pub async fn on_ping<F: FnMut(Self) -> FF + Send + Sync + 'static, FF: Future<Output=()> + Send + Sync>(&self, mut callback: F) {
426        let mut recv = self.ping_sender.subscribe();
427
428        let s = self.clone();
429        tokio::spawn(async move {
430            while let Ok(()) = recv.recv().await {
431                callback(s.clone()).await;
432            }
433        });
434    }
435
436    async fn handle_on_request<P: DeserializeOwned + Send + Sync, R: Serialize + Send + Sync, F: FnMut(P, Self) -> FF + Send + Sync + 'static, FF: Future<Output=Result<R, OCPP1_6Error>> + Send + Sync>(&self, mut callback: F, action: &'static str) {
437        let (sender, mut recv) = mpsc::channel(1000);
438        {
439            let mut lock = self.request_senders.lock().await;
440            lock.insert(action.to_string(), sender);
441
442        }
443
444        let s = self.clone();
445        tokio::spawn(async move {
446            while let Some(call) = recv.recv().await {
447                match serde_json::from_value(call.3) {
448                    Ok(payload) => {
449                        let response = callback(payload, s.clone()).await;
450                        s.do_send_response(response, &call.1).await
451                    }
452                    Err(err) => {
453                        println!("Failed to parse payload: {:?}", err)
454                    }
455                }
456            }
457        });
458    }
459
460    #[cfg(feature = "test")]
461    async fn handle_wait_for_request<P: DeserializeOwned + Send + Sync, R: Serialize + Send + Sync, F: FnMut(P, Self) -> FF + Send + Sync + 'static, FF: Future<Output=Result<R, OCPP1_6Error>> + Send + Sync>(&self, mut callback: F, action: &'static str) -> Result<P, Box<dyn std::error::Error + Send + Sync>> {
462        let (sender, mut recv) = mpsc::channel(1000);
463        {
464            let mut lock = self.request_senders.lock().await;
465            lock.insert(action.to_string(), sender);
466
467        }
468
469        let s = self.clone();
470        match timeout(self.timeout, recv.recv()).await {
471            Ok(opt) => {
472                match opt {
473                    None => {
474                        Err("No call received".into())
475                    }
476                    Some(call) => {
477                        match serde_json::from_value(call.3.clone()) {
478                            Ok(payload) => {
479                                let response = callback(payload, s.clone()).await;
480                                self.do_send_response(response, &call.1).await;
481                                Ok(serde_json::from_value(call.3).unwrap())
482                            }
483                            Err(err) => {
484                                println!("Failed to parse payload: {:?}", err);
485                                Err("Failed to parse payload".into())
486                            }
487                        }
488                    }
489                }
490            }
491            Err(_) => {
492                Err("Timeout".into())
493            }
494        }
495    }
496
497    async fn do_send_response<R: Serialize>(&self, response: Result<R, OCPP1_6Error>, message_id: &str) {
498        let payload = match response {
499            Ok(r) => {
500                serde_json::to_string(&RawOcpp1_6Result(3, message_id.to_string(), serde_json::to_value(r).unwrap())).unwrap()
501            }
502            Err(e) => {
503                serde_json::to_string(&RawOcpp1_6Error(4, message_id.to_string(), e.code().to_string(), e.description().to_string(), e.details().to_owned())).unwrap()
504            }
505        };
506
507        let mut lock = self.sink.lock().await;
508        if let Err(err) = lock.send(Message::Text(payload)).await {
509            println!("Failed to send response: {:?}", err)
510        }
511    }
512
513    async fn do_send_request<P: Serialize, R: DeserializeOwned>(&self, request: P, action: &str) -> Result<Result<R, OCPP1_6Error>, Box<dyn std::error::Error + Send + Sync>> {
514        let message_id = Uuid::new_v4();
515
516        let call = RawOcpp1_6Call(2, message_id.to_string(), action.to_string(), serde_json::to_value(&request)?);
517
518        {
519            let mut lock = self.sink.lock().await;
520            lock.send(Message::Text(serde_json::to_string(&call)?)).await?;
521        }
522
523        let (s, r) = oneshot::channel();
524        {
525            let mut response_channels = self.response_channels.lock().await;
526            response_channels.insert(message_id, s);
527        }
528
529         match timeout(self.timeout, r).await? {
530             Ok(res) => {
531                 match res {
532                     Ok(value) => {
533                         Ok(Ok(serde_json::from_value(value)?))
534                     }
535                     Err(e) => Ok(Err(e))
536                 }
537             }
538             Err(_) => {
539                 Err("Timeout".into())
540             }
541         }
542    }
543}