Skip to main content

azure_iot_rs/
moduleclient.rs

1use crate::error::IotError;
2use crate::message::IotHubMessage;
3use crate::transport::TransportProvider;
4use crate::{
5    ConnectionStatus, ConnectionStatusReason, IoTHubClientConfirmationResult,
6    IoTHubClientPropertyPayloadType, IoTHubMessageDispositionResult, IotHubDeviceTwinUpdateState,
7};
8use azure_iot_rs_sys::*;
9use futures::channel::oneshot;
10use log::info;
11use std::convert::TryInto;
12use std::ffi::{CStr, CString, c_void};
13use std::future::poll_fn;
14use std::os::raw::c_int;
15use std::ptr;
16use std::result::Result;
17use std::str;
18use std::sync::Once;
19use std::task::Poll;
20use std::time::{Duration, SystemTime, UNIX_EPOCH};
21use std::{thread, time};
22
23static IOTHUB: Once = Once::new();
24
25unsafe extern "C" {
26    fn free(ptr: *mut c_void);
27}
28
29pub enum ModuleClientOption<'a> {
30    LogTrace(bool),
31    MessageTimeout(u32),
32    ProductInfo(&'a str),
33    RetryIntervalSec(u32),
34    RetryMaxDelaySecs(u32),
35    SasTokenLifetime(u32),
36    DoWorkFreqMs(u32),
37    AutoUrlEncodeDecode(bool),
38    KeepAlive(u32),
39    ModelId(&'a str),
40}
41
42impl TryInto<&'static CStr> for &ModuleClientOption<'_> {
43    type Error = IotError;
44
45    fn try_into(self) -> Result<&'static CStr, Self::Error> {
46        let name = match self {
47            ModuleClientOption::LogTrace(_) => c"logtrace",
48            ModuleClientOption::MessageTimeout(_) => c"messageTimeout",
49            ModuleClientOption::ProductInfo(_) => c"product_info",
50            ModuleClientOption::RetryIntervalSec(_) => c"retry_interval_sec",
51            ModuleClientOption::RetryMaxDelaySecs(_) => c"retry_max_delay_secs",
52            ModuleClientOption::SasTokenLifetime(_) => c"sas_token_lifetime",
53            ModuleClientOption::DoWorkFreqMs(_) => c"do_work_freq_ms",
54            ModuleClientOption::AutoUrlEncodeDecode(_) => c"auto_url_encode_decode",
55            ModuleClientOption::KeepAlive(_) => c"keepalive",
56            ModuleClientOption::ModelId(_) => c"model_id",
57        };
58        Ok(name)
59    }
60}
61
62pub trait ModuleEventCallback {
63    fn on_message(&mut self, message: IotHubMessage) -> IoTHubMessageDispositionResult {
64        info!("Received message: {:?}", message);
65        IoTHubMessageDispositionResult::Accepted
66    }
67
68    fn on_module_twin(&mut self, state: IotHubDeviceTwinUpdateState, data: &[u8]) {
69        info!(
70            "Received module twin update: {state} {}",
71            String::from_utf8_lossy(data)
72        );
73    }
74
75    fn on_module_method(&mut self, method_name: &str, payload: &[u8]) -> Result<Vec<u8>, IotError> {
76        info!(
77            "Received module method: {} with payload: {}",
78            method_name,
79            String::from_utf8_lossy(payload)
80        );
81        Ok(Vec::new())
82    }
83
84    fn on_input_message(
85        &mut self,
86        input_name: &str,
87        message: IotHubMessage,
88    ) -> IoTHubMessageDispositionResult {
89        info!("Received message on input {}: {:?}", input_name, message);
90        IoTHubMessageDispositionResult::Accepted
91    }
92
93    fn on_connection_status(&mut self, status: ConnectionStatus, reason: ConnectionStatusReason) {
94        info!("Connection status changed: {:?} ({:?})", status, reason);
95    }
96
97    fn on_confirmation(&mut self, result: Result<(), IotError>) {
98        if let Err(error) = result {
99            info!("Telemetry confirmation failed: {:?}", error);
100        }
101    }
102}
103
104#[derive(Debug)]
105pub struct MethodInvokeResponse {
106    pub status: ::std::os::raw::c_int,
107    pub payload: Vec<u8>,
108}
109
110pub struct IotHubModuleClient<C: ModuleEventCallback> {
111    handle: IOTHUB_MODULE_CLIENT_LL_HANDLE,
112    callback: C,
113}
114
115unsafe impl<C: ModuleEventCallback + Send> Send for IotHubModuleClient<C> {}
116
117// C callbacks.
118impl<C: ModuleEventCallback> IotHubModuleClient<C> {
119    unsafe extern "C" fn c_message_callback(
120        handle: *mut IOTHUB_MESSAGE_HANDLE_DATA_TAG,
121        ctx: *mut c_void,
122    ) -> IOTHUBMESSAGE_DISPOSITION_RESULT {
123        info!("Received message from hub");
124        let client = unsafe { &mut *(ctx as *mut IotHubModuleClient<C>) };
125        let message = IotHubMessage::from_handle(handle);
126        client.callback.on_message(message).as_raw()
127    }
128
129    unsafe extern "C" fn c_input_message_callback(
130        handle: *mut IOTHUB_MESSAGE_HANDLE_DATA_TAG,
131        ctx: *mut c_void,
132    ) -> IOTHUBMESSAGE_DISPOSITION_RESULT {
133        info!("Received message from hub");
134        let client = unsafe { &mut *(ctx as *mut IotHubModuleClient<C>) };
135        let message = IotHubMessage::from_handle(handle);
136        client
137            .callback
138            .on_input_message("default", message)
139            .as_raw()
140    }
141
142    unsafe extern "C" fn c_connection_status_callback(
143        status: IOTHUB_CLIENT_CONNECTION_STATUS,
144        reason: IOTHUB_CLIENT_CONNECTION_STATUS_REASON,
145        ctx: *mut c_void,
146    ) {
147        let client = unsafe { &mut *(ctx as *mut IotHubModuleClient<C>) };
148        client
149            .callback
150            .on_connection_status(status.into(), reason.into());
151    }
152
153    unsafe extern "C" fn c_module_twin_callback(
154        state: DEVICE_TWIN_UPDATE_STATE,
155        payload: *const u8,
156        size: usize,
157        ctx: *mut c_void,
158    ) {
159        let client = unsafe { &mut *(ctx as *mut IotHubModuleClient<C>) };
160        let data = unsafe { std::slice::from_raw_parts(payload, size) };
161        client.callback.on_module_twin(state.into(), data);
162    }
163
164    unsafe extern "C" fn c_module_method_callback(
165        method_name: *const i8,
166        payload: *const u8,
167        size: usize,
168        response_payload: *mut *mut u8,
169        response_payload_size: *mut usize,
170        ctx: *mut c_void,
171    ) -> ::std::os::raw::c_int {
172        let client = unsafe { &mut *(ctx as *mut IotHubModuleClient<C>) };
173        let command_name = unsafe { CStr::from_ptr(method_name) }
174            .to_str()
175            .unwrap_or_default();
176        let data = unsafe { std::slice::from_raw_parts(payload, size) };
177        let response = client.callback.on_module_method(command_name, data);
178        match response {
179            Ok(resp_data) => {
180                unsafe {
181                    *response_payload = resp_data.as_ptr() as *mut u8;
182                    *response_payload_size = resp_data.len();
183                }
184                200
185            }
186            Err(_) => {
187                unsafe {
188                    *response_payload = ptr::null_mut();
189                    *response_payload_size = 0;
190                }
191                500
192            }
193        }
194    }
195
196    fn initialize_callbacks(&mut self) -> Result<(), IotError> {
197        let handle = self.handle;
198        let context = self.context_ptr();
199        unsafe {
200            let input = c"input";
201            let result = IoTHubModuleClient_LL_SetInputMessageCallback(
202                handle,
203                input.as_ptr(),
204                Some(Self::c_input_message_callback),
205                context,
206            );
207            IotError::check_sdk_result(result)?;
208
209            let result = IoTHubModuleClient_LL_SetMessageCallback(
210                handle,
211                Some(Self::c_message_callback),
212                context,
213            );
214            IotError::check_sdk_result(result)?;
215
216            let result = IoTHubModuleClient_LL_SetModuleMethodCallback(
217                handle,
218                Some(Self::c_module_method_callback),
219                context,
220            );
221            IotError::check_sdk_result(result)?;
222
223            let result = IoTHubModuleClient_LL_SetModuleTwinCallback(
224                handle,
225                Some(Self::c_module_twin_callback),
226                context,
227            );
228            IotError::check_sdk_result(result)?;
229
230            let result = IoTHubModuleClient_LL_SetConnectionStatusCallback(
231                handle,
232                Some(Self::c_connection_status_callback),
233                context,
234            );
235            IotError::check_sdk_result(result)
236        }
237    }
238}
239
240// Async callbacks.
241impl<C: ModuleEventCallback> IotHubModuleClient<C> {
242    unsafe extern "C" fn c_async_confirmation_callback(
243        status: IOTHUB_CLIENT_RESULT,
244        ctx: *mut c_void,
245    ) {
246        let sender = unsafe { Box::from_raw(ctx as *mut oneshot::Sender<Result<(), IotError>>) };
247        let _ = sender.send(IotError::check_sdk_result(status));
248    }
249    unsafe extern "C" fn c_async_confirmation_result_callback(
250        result: IOTHUB_CLIENT_CONFIRMATION_RESULT,
251        ctx: *mut c_void,
252    ) {
253        let sender =
254            unsafe { Box::from_raw(ctx as *mut oneshot::Sender<IoTHubClientConfirmationResult>) };
255        let _ = sender.send(result.into());
256    }
257
258    unsafe extern "C" fn c_async_property_ack_callback(status_code: c_int, ctx: *mut c_void) {
259        let sender = unsafe { Box::from_raw(ctx as *mut oneshot::Sender<c_int>) };
260        let _ = sender.send(status_code);
261    }
262
263    unsafe extern "C" fn c_async_properties_received_callback(
264        payload_type: IOTHUB_CLIENT_PROPERTY_PAYLOAD_TYPE,
265        payload: *const u8,
266        payload_length: usize,
267        ctx: *mut c_void,
268    ) {
269        let sender = unsafe {
270            Box::from_raw(ctx as *mut oneshot::Sender<(IoTHubClientPropertyPayloadType, Vec<u8>)>)
271        };
272        let data = unsafe { std::slice::from_raw_parts(payload, payload_length) }.to_vec();
273        let _ = sender.send((payload_type.into(), data));
274    }
275
276    unsafe extern "C" fn c_get_twin_async_callback(
277        state: DEVICE_TWIN_UPDATE_STATE,
278        payload: *const u8,
279        size: usize,
280        ctx: *mut c_void,
281    ) {
282        let sender = unsafe {
283            Box::from_raw(ctx as *mut oneshot::Sender<(IotHubDeviceTwinUpdateState, Vec<u8>)>)
284        };
285        let data = unsafe { std::slice::from_raw_parts(payload, size) }.to_vec();
286        let _ = sender.send((state.into(), data));
287    }
288}
289
290impl<C: ModuleEventCallback> IotHubModuleClient<C> {
291    fn ensure_initialized() {
292        IOTHUB.call_once(|| unsafe {
293            IoTHub_Init();
294        });
295    }
296
297    fn context_ptr(&mut self) -> *mut c_void {
298        self as *mut IotHubModuleClient<C> as *mut c_void
299    }
300
301    pub fn create_from_environment(
302        protocol: TransportProvider,
303        callback: C,
304    ) -> Result<Self, IotError> {
305        Self::ensure_initialized();
306        let handle = unsafe { IoTHubModuleClient_LL_CreateFromEnvironment(protocol.to_sdk()) };
307        if handle.is_null() {
308            return Err(IotError::Sdk(IOTHUB_CLIENT_RESULT_TAG_IOTHUB_CLIENT_ERROR));
309        }
310
311        let mut client = Self { handle, callback };
312
313        client.initialize_callbacks()?;
314        Ok(client)
315    }
316
317    pub fn create_from_connection_string(
318        connection_string: &str,
319        transport: TransportProvider,
320        callback: C,
321    ) -> Result<Self, IotError> {
322        Self::ensure_initialized();
323        let connection = CString::new(connection_string)?;
324        let handle = unsafe {
325            IoTHubModuleClient_LL_CreateFromConnectionString(
326                connection.as_ptr(),
327                transport.to_sdk(),
328            )
329        };
330        if handle.is_null() {
331            return Err(IotError::Sdk(IOTHUB_CLIENT_RESULT_TAG_IOTHUB_CLIENT_ERROR));
332        }
333        let mut client = Self { handle, callback };
334        client.initialize_callbacks()?;
335        Ok(client)
336    }
337
338    pub fn try_new(callback: C) -> Result<Self, IotError> {
339        Self::create_from_environment(TransportProvider::Mqtt, callback)
340    }
341
342    pub async fn send_event_async(&mut self, message: &IotHubMessage) -> Result<(), IotError> {
343        let (sender, mut receiver) = oneshot::channel::<Result<(), IotError>>();
344        let context = Box::into_raw(Box::new(sender)) as *mut c_void;
345
346        let result = unsafe {
347            IoTHubModuleClient_LL_SendEventAsync(
348                self.handle,
349                message.handle,
350                Some(Self::c_async_confirmation_callback),
351                context,
352            )
353        };
354
355        if let Err(error) = IotError::check_sdk_result(result) {
356            unsafe {
357                drop(Box::from_raw(
358                    context as *mut oneshot::Sender<Result<(), IotError>>,
359                ));
360            }
361            return Err(error);
362        }
363
364        poll_fn(|cx| {
365            self.do_work_once()?;
366
367            match receiver.try_recv() {
368                Ok(Some(send_result)) => Poll::Ready(send_result),
369                Ok(None) => {
370                    cx.waker().wake_by_ref();
371                    Poll::Pending
372                }
373                Err(_) => Poll::Ready(Err(IotError::Sdk(
374                    IOTHUB_CLIENT_RESULT_TAG_IOTHUB_CLIENT_ERROR,
375                ))),
376            }
377        })
378        .await
379    }
380
381    pub async fn send_message_result(
382        &mut self,
383        message: &IotHubMessage,
384    ) -> Result<IoTHubClientConfirmationResult, IotError> {
385        let output = c"output";
386        self.send_event_to_output_async(&message, output).await
387    }
388
389    pub async fn send_message(
390        &mut self,
391        message: &IotHubMessage,
392    ) -> Result<IoTHubClientConfirmationResult, IotError> {
393        self.send_message_result(message).await
394    }
395
396    pub fn get_send_status(&self) -> Result<(), IotError> {
397        let handle = self.handle;
398        let mut status: IOTHUB_CLIENT_STATUS = 0;
399        let result = unsafe { IoTHubModuleClient_LL_GetSendStatus(handle, &mut status) };
400        IotError::check_sdk_result(result)?;
401        IotError::check_sdk_result(status)
402    }
403
404    pub fn set_retry_policy(
405        &self,
406        retry_policy: IOTHUB_CLIENT_RETRY_POLICY,
407        retry_timeout_limit_in_seconds: usize,
408    ) -> Result<(), IotError> {
409        let handle = self.handle;
410        let result = unsafe {
411            IoTHubModuleClient_LL_SetRetryPolicy(
412                handle,
413                retry_policy,
414                retry_timeout_limit_in_seconds,
415            )
416        };
417        IotError::check_sdk_result(result)?;
418        Ok(())
419    }
420
421    pub fn get_retry_policy(&self) -> Result<(IOTHUB_CLIENT_RETRY_POLICY, usize), IotError> {
422        let handle = self.handle;
423        let mut retry_policy: IOTHUB_CLIENT_RETRY_POLICY = 0;
424        let mut timeout_limit: usize = 0;
425        let result = unsafe {
426            IoTHubModuleClient_LL_GetRetryPolicy(handle, &mut retry_policy, &mut timeout_limit)
427        };
428        IotError::check_sdk_result(result)?;
429        Ok((retry_policy, timeout_limit))
430    }
431
432    pub fn get_last_message_receive_time(&self) -> Result<SystemTime, IotError> {
433        let handle = self.handle;
434        let mut receive_time: time_t = 0;
435        let result =
436            unsafe { IoTHubModuleClient_LL_GetLastMessageReceiveTime(handle, &mut receive_time) };
437        IotError::check_sdk_result(result)?;
438        let time = UNIX_EPOCH + Duration::from_secs(receive_time as u64);
439        Ok(time)
440    }
441
442    pub fn do_work_once(&self) -> Result<(), IotError> {
443        let handle = self.handle;
444        unsafe {
445            IoTHubModuleClient_LL_DoWork(handle);
446        }
447        Ok(())
448    }
449
450    pub fn do_work(&mut self) {
451        loop {
452            let _ = self.do_work_once();
453            thread::sleep(time::Duration::from_millis(100));
454        }
455    }
456
457    pub fn set_option_value<T>(&self, option_name: &str, value: &T) -> Result<(), IotError> {
458        let handle = self.handle;
459        let name = CString::new(option_name)?;
460        let result = unsafe {
461            IoTHubModuleClient_LL_SetOption(
462                handle,
463                name.as_ptr(),
464                value as *const T as *const c_void,
465            )
466        };
467        IotError::check_sdk_result(result)
468    }
469
470    fn set_bool_option(&self, name: &CStr, value: bool) -> Result<(), IotError> {
471        let value = if value { 1 } else { 0 };
472        let result = unsafe {
473            IoTHubModuleClient_LL_SetOption(
474                self.handle,
475                name.as_ptr(),
476                &value as *const i32 as *const c_void,
477            )
478        };
479        IotError::check_sdk_result(result)?;
480        Ok(())
481    }
482
483    fn set_int_option(&self, name: &CStr, value: u32) -> Result<(), IotError> {
484        let result = unsafe {
485            IoTHubModuleClient_LL_SetOption(
486                self.handle,
487                name.as_ptr(),
488                &value as *const u32 as *const c_void,
489            )
490        };
491        IotError::check_sdk_result(result)
492    }
493
494    fn set_str_option(&self, name: &CStr, value: &str) -> Result<(), IotError> {
495        let cvalue = CString::new(value)?;
496        let result = unsafe {
497            IoTHubModuleClient_LL_SetOption(
498                self.handle,
499                name.as_ptr(),
500                cvalue.as_ptr() as *const c_void,
501            )
502        };
503        IotError::check_sdk_result(result)
504    }
505
506    pub fn send_reported_state(
507        &mut self,
508        reported_state: &[u8],
509        callback: IOTHUB_CLIENT_REPORTED_STATE_CALLBACK,
510    ) -> Result<(), IotError> {
511        let handle = self.handle;
512        let result = unsafe {
513            IoTHubModuleClient_LL_SendReportedState(
514                handle,
515                reported_state.as_ptr(),
516                reported_state.len(),
517                callback,
518                self.context_ptr(),
519            )
520        };
521        IotError::check_sdk_result(result)
522    }
523
524    pub async fn get_twin_async(
525        &mut self,
526    ) -> Result<(IotHubDeviceTwinUpdateState, Vec<u8>), IotError> {
527        let (sender, mut receiver) = oneshot::channel::<(IotHubDeviceTwinUpdateState, Vec<u8>)>();
528        let context = Box::into_raw(Box::new(sender)) as *mut c_void;
529
530        let result = unsafe {
531            IoTHubModuleClient_LL_GetTwinAsync(
532                self.handle,
533                Some(Self::c_get_twin_async_callback),
534                context,
535            )
536        };
537
538        if let Err(error) = IotError::check_sdk_result(result) {
539            unsafe {
540                drop(Box::from_raw(
541                    context as *mut oneshot::Sender<(IotHubDeviceTwinUpdateState, Vec<u8>)>,
542                ));
543            }
544            return Err(error);
545        }
546
547        poll_fn(|cx| {
548            self.do_work_once()?;
549
550            match receiver.try_recv() {
551                Ok(Some(twin_update)) => Poll::Ready(Ok(twin_update)),
552                Ok(None) => {
553                    cx.waker().wake_by_ref();
554                    Poll::Pending
555                }
556                Err(_) => Poll::Ready(Err(IotError::Sdk(
557                    IOTHUB_CLIENT_RESULT_TAG_IOTHUB_CLIENT_ERROR,
558                ))),
559            }
560        })
561        .await
562    }
563
564    pub async fn send_event_to_output_async(
565        &mut self,
566        message: &IotHubMessage,
567        output_name: &CStr,
568    ) -> Result<IoTHubClientConfirmationResult, IotError> {
569        let (sender, mut receiver) = oneshot::channel::<IoTHubClientConfirmationResult>();
570        let context = Box::into_raw(Box::new(sender)) as *mut c_void;
571
572        let result = unsafe {
573            IoTHubModuleClient_LL_SendEventToOutputAsync(
574                self.handle,
575                message.handle,
576                output_name.as_ptr(),
577                Some(Self::c_async_confirmation_result_callback),
578                context,
579            )
580        };
581
582        if let Err(error) = IotError::check_sdk_result(result) {
583            unsafe {
584                drop(Box::from_raw(
585                    context as *mut oneshot::Sender<IoTHubClientConfirmationResult>,
586                ));
587            }
588            return Err(error);
589        }
590
591        poll_fn(|cx| {
592            self.do_work_once()?;
593
594            match receiver.try_recv() {
595                Ok(Some(confirmation)) => Poll::Ready(Ok(confirmation)),
596                Ok(None) => {
597                    cx.waker().wake_by_ref();
598                    Poll::Pending
599                }
600                Err(_) => Poll::Ready(Err(IotError::Sdk(
601                    IOTHUB_CLIENT_RESULT_TAG_IOTHUB_CLIENT_ERROR,
602                ))),
603            }
604        })
605        .await
606    }
607
608    pub fn device_method_invoke(
609        &self,
610        device_id: &str,
611        method_name: &str,
612        method_payload: &str,
613        timeout: Duration,
614    ) -> Result<MethodInvokeResponse, IotError> {
615        let handle = self.handle;
616        let device_id = CString::new(device_id)?;
617        let method_name = CString::new(method_name)?;
618        let method_payload = CString::new(method_payload)?;
619
620        let mut response_status: ::std::os::raw::c_int = 0;
621        let mut response_payload: *mut ::std::os::raw::c_uchar = ptr::null_mut();
622        let mut response_payload_size: usize = 0;
623
624        let result = unsafe {
625            IoTHubModuleClient_LL_DeviceMethodInvoke(
626                handle,
627                device_id.as_ptr(),
628                method_name.as_ptr(),
629                method_payload.as_ptr(),
630                timeout.as_secs() as ::std::os::raw::c_uint,
631                &mut response_status,
632                &mut response_payload,
633                &mut response_payload_size,
634            )
635        };
636
637        if result != IOTHUB_CLIENT_RESULT_TAG_IOTHUB_CLIENT_OK && !response_payload.is_null() {
638            unsafe { free(response_payload as *mut c_void) };
639        }
640        IotError::check_sdk_result(result)?;
641
642        let payload = if response_payload.is_null() || response_payload_size == 0 {
643            Vec::new()
644        } else {
645            let bytes =
646                unsafe { std::slice::from_raw_parts(response_payload, response_payload_size) }
647                    .to_vec();
648            unsafe { free(response_payload as *mut c_void) };
649            bytes
650        };
651
652        Ok(MethodInvokeResponse {
653            status: response_status,
654            payload,
655        })
656    }
657
658    pub fn module_method_invoke(
659        &self,
660        device_id: &str,
661        module_id: &str,
662        method_name: &str,
663        method_payload: &str,
664        timeout: ::std::os::raw::c_uint,
665    ) -> Result<MethodInvokeResponse, IotError> {
666        let handle = self.handle;
667        let device_id = CString::new(device_id)?;
668        let module_id = CString::new(module_id)?;
669        let method_name = CString::new(method_name)?;
670        let method_payload = CString::new(method_payload)?;
671
672        let mut response_status: ::std::os::raw::c_int = 0;
673        let mut response_payload: *mut ::std::os::raw::c_uchar = ptr::null_mut();
674        let mut response_payload_size: usize = 0;
675
676        let result = unsafe {
677            IoTHubModuleClient_LL_ModuleMethodInvoke(
678                handle,
679                device_id.as_ptr(),
680                module_id.as_ptr(),
681                method_name.as_ptr(),
682                method_payload.as_ptr(),
683                timeout,
684                &mut response_status,
685                &mut response_payload,
686                &mut response_payload_size,
687            )
688        };
689
690        if result != IOTHUB_CLIENT_RESULT_TAG_IOTHUB_CLIENT_OK && !response_payload.is_null() {
691            unsafe { free(response_payload as *mut c_void) };
692        }
693        IotError::check_sdk_result(result)?;
694
695        let payload = if response_payload.is_null() || response_payload_size == 0 {
696            Vec::new()
697        } else {
698            let bytes =
699                unsafe { std::slice::from_raw_parts(response_payload, response_payload_size) }
700                    .to_vec();
701            unsafe { free(response_payload as *mut c_void) };
702            bytes
703        };
704
705        Ok(MethodInvokeResponse {
706            status: response_status,
707            payload,
708        })
709    }
710
711    pub fn send_message_disposition(
712        &self,
713        message: &IotHubMessage,
714        disposition: IOTHUBMESSAGE_DISPOSITION_RESULT,
715    ) -> Result<(), IotError> {
716        let handle = self.handle;
717        let result = unsafe {
718            IoTHubModuleClient_LL_SendMessageDisposition(handle, message.handle, disposition)
719        };
720        IotError::check_sdk_result(result)?;
721        Ok(())
722    }
723
724    pub async fn send_telemetry_async(
725        &mut self,
726        message: &IotHubMessage,
727    ) -> Result<IoTHubClientConfirmationResult, IotError> {
728        let (sender, mut receiver) = oneshot::channel::<IoTHubClientConfirmationResult>();
729        let context = Box::into_raw(Box::new(sender)) as *mut c_void;
730
731        let result = unsafe {
732            IoTHubModuleClient_LL_SendTelemetryAsync(
733                self.handle,
734                message.handle,
735                Some(Self::c_async_confirmation_result_callback),
736                context,
737            )
738        };
739
740        if let Err(error) = IotError::check_sdk_result(result) {
741            unsafe {
742                drop(Box::from_raw(
743                    context as *mut oneshot::Sender<IoTHubClientConfirmationResult>,
744                ));
745            }
746            return Err(error);
747        }
748
749        poll_fn(|cx| {
750            self.do_work_once()?;
751
752            match receiver.try_recv() {
753                Ok(Some(confirmation)) => Poll::Ready(Ok(confirmation)),
754                Ok(None) => {
755                    cx.waker().wake_by_ref();
756                    Poll::Pending
757                }
758                Err(_) => Poll::Ready(Err(IotError::Sdk(
759                    IOTHUB_CLIENT_RESULT_TAG_IOTHUB_CLIENT_ERROR,
760                ))),
761            }
762        })
763        .await
764    }
765
766    pub fn subscribe_to_commands(
767        &mut self,
768        callback: IOTHUB_CLIENT_COMMAND_CALLBACK_ASYNC,
769    ) -> Result<(), IotError> {
770        let handle = self.handle;
771        let result = unsafe {
772            IoTHubModuleClient_LL_SubscribeToCommands(handle, callback, self.context_ptr())
773        };
774        IotError::check_sdk_result(result)
775    }
776
777    pub async fn send_properties_async(&mut self, properties: &[u8]) -> Result<c_int, IotError> {
778        let (sender, mut receiver) = oneshot::channel::<c_int>();
779        let context = Box::into_raw(Box::new(sender)) as *mut c_void;
780
781        let result = unsafe {
782            IoTHubModuleClient_LL_SendPropertiesAsync(
783                self.handle,
784                properties.as_ptr(),
785                properties.len(),
786                Some(Self::c_async_property_ack_callback),
787                context,
788            )
789        };
790
791        if let Err(error) = IotError::check_sdk_result(result) {
792            unsafe {
793                drop(Box::from_raw(context as *mut oneshot::Sender<c_int>));
794            }
795            return Err(error);
796        }
797
798        poll_fn(|cx| {
799            self.do_work_once()?;
800
801            match receiver.try_recv() {
802                Ok(Some(status_code)) => Poll::Ready(Ok(status_code)),
803                Ok(None) => {
804                    cx.waker().wake_by_ref();
805                    Poll::Pending
806                }
807                Err(_) => Poll::Ready(Err(IotError::Sdk(
808                    IOTHUB_CLIENT_RESULT_TAG_IOTHUB_CLIENT_ERROR,
809                ))),
810            }
811        })
812        .await
813    }
814
815    pub async fn get_properties_async(
816        &mut self,
817    ) -> Result<(IoTHubClientPropertyPayloadType, Vec<u8>), IotError> {
818        let (sender, mut receiver) =
819            oneshot::channel::<(IoTHubClientPropertyPayloadType, Vec<u8>)>();
820        let context = Box::into_raw(Box::new(sender)) as *mut c_void;
821
822        let result = unsafe {
823            IoTHubModuleClient_LL_GetPropertiesAsync(
824                self.handle,
825                Some(Self::c_async_properties_received_callback),
826                context,
827            )
828        };
829
830        if let Err(error) = IotError::check_sdk_result(result) {
831            unsafe {
832                drop(Box::from_raw(
833                    context as *mut oneshot::Sender<(IoTHubClientPropertyPayloadType, Vec<u8>)>,
834                ));
835            }
836            return Err(error);
837        }
838
839        poll_fn(|cx| {
840            self.do_work_once()?;
841
842            match receiver.try_recv() {
843                Ok(Some(properties)) => Poll::Ready(Ok(properties)),
844                Ok(None) => {
845                    cx.waker().wake_by_ref();
846                    Poll::Pending
847                }
848                Err(_) => Poll::Ready(Err(IotError::Sdk(
849                    IOTHUB_CLIENT_RESULT_TAG_IOTHUB_CLIENT_ERROR,
850                ))),
851            }
852        })
853        .await
854    }
855
856    pub async fn get_properties_and_subscribe_to_updates_async(
857        &mut self,
858    ) -> Result<(IoTHubClientPropertyPayloadType, Vec<u8>), IotError> {
859        let (sender, mut receiver) =
860            oneshot::channel::<(IoTHubClientPropertyPayloadType, Vec<u8>)>();
861        let context = Box::into_raw(Box::new(sender)) as *mut c_void;
862
863        let result = unsafe {
864            IoTHubModuleClient_LL_GetPropertiesAndSubscribeToUpdatesAsync(
865                self.handle,
866                Some(Self::c_async_properties_received_callback),
867                context,
868            )
869        };
870
871        if let Err(error) = IotError::check_sdk_result(result) {
872            unsafe {
873                drop(Box::from_raw(
874                    context as *mut oneshot::Sender<(IoTHubClientPropertyPayloadType, Vec<u8>)>,
875                ));
876            }
877            return Err(error);
878        }
879
880        poll_fn(|cx| {
881            self.do_work_once()?;
882
883            match receiver.try_recv() {
884                Ok(Some(properties)) => Poll::Ready(Ok(properties)),
885                Ok(None) => {
886                    cx.waker().wake_by_ref();
887                    Poll::Pending
888                }
889                Err(_) => Poll::Ready(Err(IotError::Sdk(
890                    IOTHUB_CLIENT_RESULT_TAG_IOTHUB_CLIENT_ERROR,
891                ))),
892            }
893        })
894        .await
895    }
896
897    pub fn set_option(&self, option: ModuleClientOption<'_>) -> Result<(), IotError> {
898        let name: &CStr = (&option).try_into()?;
899        match option {
900            ModuleClientOption::LogTrace(value)
901            | ModuleClientOption::AutoUrlEncodeDecode(value) => {
902                self.set_bool_option(name, value)?
903            }
904            ModuleClientOption::MessageTimeout(value)
905            | ModuleClientOption::RetryIntervalSec(value)
906            | ModuleClientOption::RetryMaxDelaySecs(value)
907            | ModuleClientOption::SasTokenLifetime(value)
908            | ModuleClientOption::DoWorkFreqMs(value)
909            | ModuleClientOption::KeepAlive(value) => self.set_int_option(name, value)?,
910            ModuleClientOption::ProductInfo(value) | ModuleClientOption::ModelId(value) => {
911                self.set_str_option(name, value)?
912            }
913        }
914
915        Ok(())
916    }
917}
918
919impl<C: ModuleEventCallback> Drop for IotHubModuleClient<C> {
920    fn drop(&mut self) {
921        unsafe {
922            IoTHubModuleClient_LL_Destroy(self.handle);
923        }
924    }
925}