Skip to main content

azure_iot_rs/
moduleclient.rs

1use crate::error::IotError;
2use crate::message::IotHubMessage;
3use crate::transport::TransportProvider;
4use crate::{
5    ConnectionStatus, ConnectionStatusReason, IoTHubClientConfirmationResult,
6    IoTHubClientPropertyPayloadType, IoTHubMessageDispositionResult, IotHub,
7    IotHubDeviceTwinUpdateState,
8};
9use azure_iot_rs_sys::*;
10use futures::channel::oneshot;
11use log::info;
12use std::convert::TryInto;
13use std::ffi::{CStr, CString, c_void};
14use std::future::poll_fn;
15use std::os::raw::c_int;
16use std::ptr;
17use std::result::Result;
18use std::str;
19use std::task::Poll;
20use std::time::{Duration, SystemTime, UNIX_EPOCH};
21use std::{thread, time};
22
23unsafe extern "C" {
24    fn free(ptr: *mut c_void);
25}
26
27pub enum ModuleClientOption<'a> {
28    LogTrace(bool),
29    MessageTimeout(u32),
30    ProductInfo(&'a str),
31    RetryIntervalSec(u32),
32    RetryMaxDelaySecs(u32),
33    SasTokenLifetime(u32),
34    DoWorkFreqMs(u32),
35    AutoUrlEncodeDecode(bool),
36    KeepAlive(u32),
37    ModelId(&'a str),
38}
39
40impl TryInto<&'static CStr> for &ModuleClientOption<'_> {
41    type Error = IotError;
42
43    fn try_into(self) -> Result<&'static CStr, Self::Error> {
44        let name = match self {
45            ModuleClientOption::LogTrace(_) => c"logtrace",
46            ModuleClientOption::MessageTimeout(_) => c"messageTimeout",
47            ModuleClientOption::ProductInfo(_) => c"product_info",
48            ModuleClientOption::RetryIntervalSec(_) => c"retry_interval_sec",
49            ModuleClientOption::RetryMaxDelaySecs(_) => c"retry_max_delay_secs",
50            ModuleClientOption::SasTokenLifetime(_) => c"sas_token_lifetime",
51            ModuleClientOption::DoWorkFreqMs(_) => c"do_work_freq_ms",
52            ModuleClientOption::AutoUrlEncodeDecode(_) => c"auto_url_encode_decode",
53            ModuleClientOption::KeepAlive(_) => c"keepalive",
54            ModuleClientOption::ModelId(_) => c"model_id",
55        };
56        Ok(name)
57    }
58}
59
60pub trait ModuleEventCallback {
61    fn on_message(&mut self, message: IotHubMessage) -> IoTHubMessageDispositionResult {
62        info!("Received message: {:?}", message);
63        IoTHubMessageDispositionResult::Accepted
64    }
65
66    fn on_module_twin(&mut self, state: IotHubDeviceTwinUpdateState, data: &[u8]) {
67        info!(
68            "Received module twin update: {state} {}",
69            String::from_utf8_lossy(data)
70        );
71    }
72
73    fn on_module_method(&mut self, method_name: &str, payload: &[u8]) -> Result<Vec<u8>, IotError> {
74        info!(
75            "Received module method: {} with payload: {}",
76            method_name,
77            String::from_utf8_lossy(payload)
78        );
79        Ok(Vec::new())
80    }
81
82    fn on_input_message(
83        &mut self,
84        input_name: &str,
85        message: IotHubMessage,
86    ) -> IoTHubMessageDispositionResult {
87        info!("Received message on input {}: {:?}", input_name, message);
88        IoTHubMessageDispositionResult::Accepted
89    }
90
91    fn on_connection_status(&mut self, status: ConnectionStatus, reason: ConnectionStatusReason) {
92        info!("Connection status changed: {:?} ({:?})", status, reason);
93    }
94
95    fn on_confirmation(&mut self, result: Result<(), IotError>) {
96        if let Err(error) = result {
97            info!("Telemetry confirmation failed: {:?}", error);
98        }
99    }
100}
101
102#[derive(Debug)]
103pub struct MethodInvokeResponse {
104    pub status: ::std::os::raw::c_int,
105    pub payload: Vec<u8>,
106}
107
108pub struct IotHubModuleClient<C: ModuleEventCallback> {
109    handle: IOTHUB_MODULE_CLIENT_LL_HANDLE,
110    callback: C,
111}
112
113unsafe impl<C: ModuleEventCallback + Send> Send for IotHubModuleClient<C> {}
114
115// C callbacks.
116impl<C: ModuleEventCallback> IotHubModuleClient<C> {
117    unsafe extern "C" fn c_message_callback(
118        handle: *mut IOTHUB_MESSAGE_HANDLE_DATA_TAG,
119        ctx: *mut c_void,
120    ) -> IOTHUBMESSAGE_DISPOSITION_RESULT {
121        info!("Received message from hub");
122        let client = unsafe { &mut *(ctx as *mut IotHubModuleClient<C>) };
123        let message = IotHubMessage::from_handle(handle);
124        client.callback.on_message(message).as_raw()
125    }
126
127    unsafe extern "C" fn c_input_message_callback(
128        handle: *mut IOTHUB_MESSAGE_HANDLE_DATA_TAG,
129        ctx: *mut c_void,
130    ) -> IOTHUBMESSAGE_DISPOSITION_RESULT {
131        info!("Received message from hub");
132        let client = unsafe { &mut *(ctx as *mut IotHubModuleClient<C>) };
133        let message = IotHubMessage::from_handle(handle);
134        client
135            .callback
136            .on_input_message("default", message)
137            .as_raw()
138    }
139
140    unsafe extern "C" fn c_connection_status_callback(
141        status: IOTHUB_CLIENT_CONNECTION_STATUS,
142        reason: IOTHUB_CLIENT_CONNECTION_STATUS_REASON,
143        ctx: *mut c_void,
144    ) {
145        let client = unsafe { &mut *(ctx as *mut IotHubModuleClient<C>) };
146        client
147            .callback
148            .on_connection_status(status.into(), reason.into());
149    }
150
151    unsafe extern "C" fn c_module_twin_callback(
152        state: DEVICE_TWIN_UPDATE_STATE,
153        payload: *const u8,
154        size: usize,
155        ctx: *mut c_void,
156    ) {
157        let client = unsafe { &mut *(ctx as *mut IotHubModuleClient<C>) };
158        let data = unsafe { std::slice::from_raw_parts(payload, size) };
159        client.callback.on_module_twin(state.into(), data);
160    }
161
162    unsafe extern "C" fn c_module_method_callback(
163        method_name: *const i8,
164        payload: *const u8,
165        size: usize,
166        response_payload: *mut *mut u8,
167        response_payload_size: *mut usize,
168        ctx: *mut c_void,
169    ) -> ::std::os::raw::c_int {
170        let client = unsafe { &mut *(ctx as *mut IotHubModuleClient<C>) };
171        let command_name = unsafe { CStr::from_ptr(method_name) }
172            .to_str()
173            .unwrap_or_default();
174        let data = unsafe { std::slice::from_raw_parts(payload, size) };
175        let response = client.callback.on_module_method(command_name, data);
176        match response {
177            Ok(resp_data) => {
178                unsafe {
179                    *response_payload = resp_data.as_ptr() as *mut u8;
180                    *response_payload_size = resp_data.len();
181                }
182                200
183            }
184            Err(_) => {
185                unsafe {
186                    *response_payload = ptr::null_mut();
187                    *response_payload_size = 0;
188                }
189                500
190            }
191        }
192    }
193
194    fn initialize_callbacks(&mut self) -> Result<(), IotError> {
195        let handle = self.handle;
196        let context = self.context_ptr();
197        unsafe {
198            let input = c"input";
199            let result = IoTHubModuleClient_LL_SetInputMessageCallback(
200                handle,
201                input.as_ptr(),
202                Some(Self::c_input_message_callback),
203                context,
204            );
205            IotError::check_sdk_result(result)?;
206
207            let result = IoTHubModuleClient_LL_SetMessageCallback(
208                handle,
209                Some(Self::c_message_callback),
210                context,
211            );
212            IotError::check_sdk_result(result)?;
213
214            let result = IoTHubModuleClient_LL_SetModuleMethodCallback(
215                handle,
216                Some(Self::c_module_method_callback),
217                context,
218            );
219            IotError::check_sdk_result(result)?;
220
221            let result = IoTHubModuleClient_LL_SetModuleTwinCallback(
222                handle,
223                Some(Self::c_module_twin_callback),
224                context,
225            );
226            IotError::check_sdk_result(result)?;
227
228            let result = IoTHubModuleClient_LL_SetConnectionStatusCallback(
229                handle,
230                Some(Self::c_connection_status_callback),
231                context,
232            );
233            IotError::check_sdk_result(result)
234        }
235    }
236}
237
238// Async callbacks.
239impl<C: ModuleEventCallback> IotHubModuleClient<C> {
240    unsafe extern "C" fn c_async_confirmation_callback(
241        status: IOTHUB_CLIENT_RESULT,
242        ctx: *mut c_void,
243    ) {
244        let sender = unsafe { Box::from_raw(ctx as *mut oneshot::Sender<Result<(), IotError>>) };
245        let _ = sender.send(IotError::check_sdk_result(status));
246    }
247    unsafe extern "C" fn c_async_confirmation_result_callback(
248        result: IOTHUB_CLIENT_CONFIRMATION_RESULT,
249        ctx: *mut c_void,
250    ) {
251        let sender =
252            unsafe { Box::from_raw(ctx as *mut oneshot::Sender<IoTHubClientConfirmationResult>) };
253        let _ = sender.send(result.into());
254    }
255
256    unsafe extern "C" fn c_async_property_ack_callback(status_code: c_int, ctx: *mut c_void) {
257        let sender = unsafe { Box::from_raw(ctx as *mut oneshot::Sender<c_int>) };
258        let _ = sender.send(status_code);
259    }
260
261    unsafe extern "C" fn c_async_properties_received_callback(
262        payload_type: IOTHUB_CLIENT_PROPERTY_PAYLOAD_TYPE,
263        payload: *const u8,
264        payload_length: usize,
265        ctx: *mut c_void,
266    ) {
267        let sender = unsafe {
268            Box::from_raw(ctx as *mut oneshot::Sender<(IoTHubClientPropertyPayloadType, Vec<u8>)>)
269        };
270        let data = unsafe { std::slice::from_raw_parts(payload, payload_length) }.to_vec();
271        let _ = sender.send((payload_type.into(), data));
272    }
273
274    unsafe extern "C" fn c_get_twin_async_callback(
275        state: DEVICE_TWIN_UPDATE_STATE,
276        payload: *const u8,
277        size: usize,
278        ctx: *mut c_void,
279    ) {
280        let sender = unsafe {
281            Box::from_raw(ctx as *mut oneshot::Sender<(IotHubDeviceTwinUpdateState, Vec<u8>)>)
282        };
283        let data = unsafe { std::slice::from_raw_parts(payload, size) }.to_vec();
284        let _ = sender.send((state.into(), data));
285    }
286}
287
288impl<C: ModuleEventCallback> IotHubModuleClient<C> {
289    fn context_ptr(&mut self) -> *mut c_void {
290        self as *mut IotHubModuleClient<C> as *mut c_void
291    }
292
293    pub fn create_from_environment(
294        protocol: TransportProvider,
295        callback: C,
296    ) -> Result<Self, IotError> {
297        IotHub::ensure_initialized()?;
298        let handle = unsafe { IoTHubModuleClient_LL_CreateFromEnvironment(protocol.to_sdk()) };
299        if handle.is_null() {
300            return Err(IotError::Sdk(IOTHUB_CLIENT_RESULT_TAG_IOTHUB_CLIENT_ERROR));
301        }
302
303        let mut client = Self { handle, callback };
304
305        client.initialize_callbacks()?;
306        Ok(client)
307    }
308
309    pub fn create_from_connection_string(
310        connection_string: &str,
311        transport: TransportProvider,
312        callback: C,
313    ) -> Result<Self, IotError> {
314        IotHub::ensure_initialized()?;
315        let connection = CString::new(connection_string)?;
316        let handle = unsafe {
317            IoTHubModuleClient_LL_CreateFromConnectionString(
318                connection.as_ptr(),
319                transport.to_sdk(),
320            )
321        };
322        if handle.is_null() {
323            return Err(IotError::Sdk(IOTHUB_CLIENT_RESULT_TAG_IOTHUB_CLIENT_ERROR));
324        }
325        let mut client = Self { handle, callback };
326        client.initialize_callbacks()?;
327        Ok(client)
328    }
329
330    pub fn try_new(callback: C) -> Result<Self, IotError> {
331        Self::create_from_environment(TransportProvider::Mqtt, callback)
332    }
333
334    pub async fn send_event_async(&mut self, message: &IotHubMessage) -> Result<(), IotError> {
335        let (sender, mut receiver) = oneshot::channel::<Result<(), IotError>>();
336        let context = Box::into_raw(Box::new(sender)) as *mut c_void;
337
338        let result = unsafe {
339            IoTHubModuleClient_LL_SendEventAsync(
340                self.handle,
341                message.handle,
342                Some(Self::c_async_confirmation_callback),
343                context,
344            )
345        };
346
347        if let Err(error) = IotError::check_sdk_result(result) {
348            unsafe {
349                drop(Box::from_raw(
350                    context as *mut oneshot::Sender<Result<(), IotError>>,
351                ));
352            }
353            return Err(error);
354        }
355
356        poll_fn(|cx| {
357            self.do_work_once()?;
358
359            match receiver.try_recv() {
360                Ok(Some(send_result)) => Poll::Ready(send_result),
361                Ok(None) => {
362                    cx.waker().wake_by_ref();
363                    Poll::Pending
364                }
365                Err(_) => Poll::Ready(Err(IotError::Sdk(
366                    IOTHUB_CLIENT_RESULT_TAG_IOTHUB_CLIENT_ERROR,
367                ))),
368            }
369        })
370        .await
371    }
372
373    pub async fn send_message_result(
374        &mut self,
375        message: &IotHubMessage,
376    ) -> Result<IoTHubClientConfirmationResult, IotError> {
377        let output = c"output";
378        self.send_event_to_output_async(message, output).await
379    }
380
381    pub async fn send_message(
382        &mut self,
383        message: &IotHubMessage,
384    ) -> Result<IoTHubClientConfirmationResult, IotError> {
385        self.send_message_result(message).await
386    }
387
388    pub fn get_send_status(&self) -> Result<(), IotError> {
389        let handle = self.handle;
390        let mut status: IOTHUB_CLIENT_STATUS = 0;
391        let result = unsafe { IoTHubModuleClient_LL_GetSendStatus(handle, &mut status) };
392        IotError::check_sdk_result(result)?;
393        IotError::check_sdk_result(status)
394    }
395
396    pub fn set_retry_policy(
397        &self,
398        retry_policy: IOTHUB_CLIENT_RETRY_POLICY,
399        retry_timeout_limit_in_seconds: usize,
400    ) -> Result<(), IotError> {
401        let handle = self.handle;
402        let result = unsafe {
403            IoTHubModuleClient_LL_SetRetryPolicy(
404                handle,
405                retry_policy,
406                retry_timeout_limit_in_seconds,
407            )
408        };
409        IotError::check_sdk_result(result)?;
410        Ok(())
411    }
412
413    pub fn get_retry_policy(&self) -> Result<(IOTHUB_CLIENT_RETRY_POLICY, usize), IotError> {
414        let handle = self.handle;
415        let mut retry_policy: IOTHUB_CLIENT_RETRY_POLICY = 0;
416        let mut timeout_limit: usize = 0;
417        let result = unsafe {
418            IoTHubModuleClient_LL_GetRetryPolicy(handle, &mut retry_policy, &mut timeout_limit)
419        };
420        IotError::check_sdk_result(result)?;
421        Ok((retry_policy, timeout_limit))
422    }
423
424    pub fn get_last_message_receive_time(&self) -> Result<SystemTime, IotError> {
425        let handle = self.handle;
426        let mut receive_time: time_t = 0;
427        let result =
428            unsafe { IoTHubModuleClient_LL_GetLastMessageReceiveTime(handle, &mut receive_time) };
429        IotError::check_sdk_result(result)?;
430        let time = UNIX_EPOCH + Duration::from_secs(receive_time as u64);
431        Ok(time)
432    }
433
434    pub fn do_work_once(&self) -> Result<(), IotError> {
435        let handle = self.handle;
436        unsafe {
437            IoTHubModuleClient_LL_DoWork(handle);
438        }
439        Ok(())
440    }
441
442    pub fn do_work(&mut self) {
443        loop {
444            let _ = self.do_work_once();
445            thread::sleep(time::Duration::from_millis(100));
446        }
447    }
448
449    pub fn set_option_value<T>(&self, option_name: &str, value: &T) -> Result<(), IotError> {
450        let handle = self.handle;
451        let name = CString::new(option_name)?;
452        let result = unsafe {
453            IoTHubModuleClient_LL_SetOption(
454                handle,
455                name.as_ptr(),
456                value as *const T as *const c_void,
457            )
458        };
459        IotError::check_sdk_result(result)
460    }
461
462    fn set_bool_option(&self, name: &CStr, value: bool) -> Result<(), IotError> {
463        let value = if value { 1 } else { 0 };
464        let result = unsafe {
465            IoTHubModuleClient_LL_SetOption(
466                self.handle,
467                name.as_ptr(),
468                &value as *const i32 as *const c_void,
469            )
470        };
471        IotError::check_sdk_result(result)?;
472        Ok(())
473    }
474
475    fn set_int_option(&self, name: &CStr, value: u32) -> Result<(), IotError> {
476        let result = unsafe {
477            IoTHubModuleClient_LL_SetOption(
478                self.handle,
479                name.as_ptr(),
480                &value as *const u32 as *const c_void,
481            )
482        };
483        IotError::check_sdk_result(result)
484    }
485
486    fn set_str_option(&self, name: &CStr, value: &str) -> Result<(), IotError> {
487        let cvalue = CString::new(value)?;
488        let result = unsafe {
489            IoTHubModuleClient_LL_SetOption(
490                self.handle,
491                name.as_ptr(),
492                cvalue.as_ptr() as *const c_void,
493            )
494        };
495        IotError::check_sdk_result(result)
496    }
497
498    pub fn send_reported_state(
499        &mut self,
500        reported_state: &[u8],
501        callback: IOTHUB_CLIENT_REPORTED_STATE_CALLBACK,
502    ) -> Result<(), IotError> {
503        let handle = self.handle;
504        let result = unsafe {
505            IoTHubModuleClient_LL_SendReportedState(
506                handle,
507                reported_state.as_ptr(),
508                reported_state.len(),
509                callback,
510                self.context_ptr(),
511            )
512        };
513        IotError::check_sdk_result(result)
514    }
515
516    pub async fn get_twin_async(
517        &mut self,
518    ) -> Result<(IotHubDeviceTwinUpdateState, Vec<u8>), IotError> {
519        let (sender, mut receiver) = oneshot::channel::<(IotHubDeviceTwinUpdateState, Vec<u8>)>();
520        let context = Box::into_raw(Box::new(sender)) as *mut c_void;
521
522        let result = unsafe {
523            IoTHubModuleClient_LL_GetTwinAsync(
524                self.handle,
525                Some(Self::c_get_twin_async_callback),
526                context,
527            )
528        };
529
530        if let Err(error) = IotError::check_sdk_result(result) {
531            unsafe {
532                drop(Box::from_raw(
533                    context as *mut oneshot::Sender<(IotHubDeviceTwinUpdateState, Vec<u8>)>,
534                ));
535            }
536            return Err(error);
537        }
538
539        poll_fn(|cx| {
540            self.do_work_once()?;
541
542            match receiver.try_recv() {
543                Ok(Some(twin_update)) => Poll::Ready(Ok(twin_update)),
544                Ok(None) => {
545                    cx.waker().wake_by_ref();
546                    Poll::Pending
547                }
548                Err(_) => Poll::Ready(Err(IotError::Sdk(
549                    IOTHUB_CLIENT_RESULT_TAG_IOTHUB_CLIENT_ERROR,
550                ))),
551            }
552        })
553        .await
554    }
555
556    pub async fn send_event_to_output_async(
557        &mut self,
558        message: &IotHubMessage,
559        output_name: &CStr,
560    ) -> Result<IoTHubClientConfirmationResult, IotError> {
561        let (sender, mut receiver) = oneshot::channel::<IoTHubClientConfirmationResult>();
562        let context = Box::into_raw(Box::new(sender)) as *mut c_void;
563
564        let result = unsafe {
565            IoTHubModuleClient_LL_SendEventToOutputAsync(
566                self.handle,
567                message.handle,
568                output_name.as_ptr(),
569                Some(Self::c_async_confirmation_result_callback),
570                context,
571            )
572        };
573
574        if let Err(error) = IotError::check_sdk_result(result) {
575            unsafe {
576                drop(Box::from_raw(
577                    context as *mut oneshot::Sender<IoTHubClientConfirmationResult>,
578                ));
579            }
580            return Err(error);
581        }
582
583        poll_fn(|cx| {
584            self.do_work_once()?;
585
586            match receiver.try_recv() {
587                Ok(Some(confirmation)) => Poll::Ready(Ok(confirmation)),
588                Ok(None) => {
589                    cx.waker().wake_by_ref();
590                    Poll::Pending
591                }
592                Err(_) => Poll::Ready(Err(IotError::Sdk(
593                    IOTHUB_CLIENT_RESULT_TAG_IOTHUB_CLIENT_ERROR,
594                ))),
595            }
596        })
597        .await
598    }
599
600    pub fn device_method_invoke(
601        &self,
602        device_id: &str,
603        method_name: &str,
604        method_payload: &str,
605        timeout: Duration,
606    ) -> Result<MethodInvokeResponse, IotError> {
607        let handle = self.handle;
608        let device_id = CString::new(device_id)?;
609        let method_name = CString::new(method_name)?;
610        let method_payload = CString::new(method_payload)?;
611
612        let mut response_status: ::std::os::raw::c_int = 0;
613        let mut response_payload: *mut ::std::os::raw::c_uchar = ptr::null_mut();
614        let mut response_payload_size: usize = 0;
615
616        let result = unsafe {
617            IoTHubModuleClient_LL_DeviceMethodInvoke(
618                handle,
619                device_id.as_ptr(),
620                method_name.as_ptr(),
621                method_payload.as_ptr(),
622                timeout.as_secs() as ::std::os::raw::c_uint,
623                &mut response_status,
624                &mut response_payload,
625                &mut response_payload_size,
626            )
627        };
628
629        if result != IOTHUB_CLIENT_RESULT_TAG_IOTHUB_CLIENT_OK && !response_payload.is_null() {
630            unsafe { free(response_payload as *mut c_void) };
631        }
632        IotError::check_sdk_result(result)?;
633
634        let payload = if response_payload.is_null() || response_payload_size == 0 {
635            Vec::new()
636        } else {
637            let bytes =
638                unsafe { std::slice::from_raw_parts(response_payload, response_payload_size) }
639                    .to_vec();
640            unsafe { free(response_payload as *mut c_void) };
641            bytes
642        };
643
644        Ok(MethodInvokeResponse {
645            status: response_status,
646            payload,
647        })
648    }
649
650    pub fn module_method_invoke(
651        &self,
652        device_id: &str,
653        module_id: &str,
654        method_name: &str,
655        method_payload: &str,
656        timeout: ::std::os::raw::c_uint,
657    ) -> Result<MethodInvokeResponse, IotError> {
658        let handle = self.handle;
659        let device_id = CString::new(device_id)?;
660        let module_id = CString::new(module_id)?;
661        let method_name = CString::new(method_name)?;
662        let method_payload = CString::new(method_payload)?;
663
664        let mut response_status: ::std::os::raw::c_int = 0;
665        let mut response_payload: *mut ::std::os::raw::c_uchar = ptr::null_mut();
666        let mut response_payload_size: usize = 0;
667
668        let result = unsafe {
669            IoTHubModuleClient_LL_ModuleMethodInvoke(
670                handle,
671                device_id.as_ptr(),
672                module_id.as_ptr(),
673                method_name.as_ptr(),
674                method_payload.as_ptr(),
675                timeout,
676                &mut response_status,
677                &mut response_payload,
678                &mut response_payload_size,
679            )
680        };
681
682        if result != IOTHUB_CLIENT_RESULT_TAG_IOTHUB_CLIENT_OK && !response_payload.is_null() {
683            unsafe { free(response_payload as *mut c_void) };
684        }
685        IotError::check_sdk_result(result)?;
686
687        let payload = if response_payload.is_null() || response_payload_size == 0 {
688            Vec::new()
689        } else {
690            let bytes =
691                unsafe { std::slice::from_raw_parts(response_payload, response_payload_size) }
692                    .to_vec();
693            unsafe { free(response_payload as *mut c_void) };
694            bytes
695        };
696
697        Ok(MethodInvokeResponse {
698            status: response_status,
699            payload,
700        })
701    }
702
703    pub fn send_message_disposition(
704        &self,
705        message: &IotHubMessage,
706        disposition: IOTHUBMESSAGE_DISPOSITION_RESULT,
707    ) -> Result<(), IotError> {
708        let handle = self.handle;
709        let result = unsafe {
710            IoTHubModuleClient_LL_SendMessageDisposition(handle, message.handle, disposition)
711        };
712        IotError::check_sdk_result(result)?;
713        Ok(())
714    }
715
716    pub async fn send_telemetry_async(
717        &mut self,
718        message: &IotHubMessage,
719    ) -> Result<IoTHubClientConfirmationResult, IotError> {
720        let (sender, mut receiver) = oneshot::channel::<IoTHubClientConfirmationResult>();
721        let context = Box::into_raw(Box::new(sender)) as *mut c_void;
722
723        let result = unsafe {
724            IoTHubModuleClient_LL_SendTelemetryAsync(
725                self.handle,
726                message.handle,
727                Some(Self::c_async_confirmation_result_callback),
728                context,
729            )
730        };
731
732        if let Err(error) = IotError::check_sdk_result(result) {
733            unsafe {
734                drop(Box::from_raw(
735                    context as *mut oneshot::Sender<IoTHubClientConfirmationResult>,
736                ));
737            }
738            return Err(error);
739        }
740
741        poll_fn(|cx| {
742            self.do_work_once()?;
743
744            match receiver.try_recv() {
745                Ok(Some(confirmation)) => Poll::Ready(Ok(confirmation)),
746                Ok(None) => {
747                    cx.waker().wake_by_ref();
748                    Poll::Pending
749                }
750                Err(_) => Poll::Ready(Err(IotError::Sdk(
751                    IOTHUB_CLIENT_RESULT_TAG_IOTHUB_CLIENT_ERROR,
752                ))),
753            }
754        })
755        .await
756    }
757
758    pub fn subscribe_to_commands(
759        &mut self,
760        callback: IOTHUB_CLIENT_COMMAND_CALLBACK_ASYNC,
761    ) -> Result<(), IotError> {
762        let handle = self.handle;
763        let result = unsafe {
764            IoTHubModuleClient_LL_SubscribeToCommands(handle, callback, self.context_ptr())
765        };
766        IotError::check_sdk_result(result)
767    }
768
769    pub async fn send_properties_async(&mut self, properties: &[u8]) -> Result<c_int, IotError> {
770        let (sender, mut receiver) = oneshot::channel::<c_int>();
771        let context = Box::into_raw(Box::new(sender)) as *mut c_void;
772
773        let result = unsafe {
774            IoTHubModuleClient_LL_SendPropertiesAsync(
775                self.handle,
776                properties.as_ptr(),
777                properties.len(),
778                Some(Self::c_async_property_ack_callback),
779                context,
780            )
781        };
782
783        if let Err(error) = IotError::check_sdk_result(result) {
784            unsafe {
785                drop(Box::from_raw(context as *mut oneshot::Sender<c_int>));
786            }
787            return Err(error);
788        }
789
790        poll_fn(|cx| {
791            self.do_work_once()?;
792
793            match receiver.try_recv() {
794                Ok(Some(status_code)) => Poll::Ready(Ok(status_code)),
795                Ok(None) => {
796                    cx.waker().wake_by_ref();
797                    Poll::Pending
798                }
799                Err(_) => Poll::Ready(Err(IotError::Sdk(
800                    IOTHUB_CLIENT_RESULT_TAG_IOTHUB_CLIENT_ERROR,
801                ))),
802            }
803        })
804        .await
805    }
806
807    pub async fn get_properties_async(
808        &mut self,
809    ) -> Result<(IoTHubClientPropertyPayloadType, Vec<u8>), IotError> {
810        let (sender, mut receiver) =
811            oneshot::channel::<(IoTHubClientPropertyPayloadType, Vec<u8>)>();
812        let context = Box::into_raw(Box::new(sender)) as *mut c_void;
813
814        let result = unsafe {
815            IoTHubModuleClient_LL_GetPropertiesAsync(
816                self.handle,
817                Some(Self::c_async_properties_received_callback),
818                context,
819            )
820        };
821
822        if let Err(error) = IotError::check_sdk_result(result) {
823            unsafe {
824                drop(Box::from_raw(
825                    context as *mut oneshot::Sender<(IoTHubClientPropertyPayloadType, Vec<u8>)>,
826                ));
827            }
828            return Err(error);
829        }
830
831        poll_fn(|cx| {
832            self.do_work_once()?;
833
834            match receiver.try_recv() {
835                Ok(Some(properties)) => Poll::Ready(Ok(properties)),
836                Ok(None) => {
837                    cx.waker().wake_by_ref();
838                    Poll::Pending
839                }
840                Err(_) => Poll::Ready(Err(IotError::Sdk(
841                    IOTHUB_CLIENT_RESULT_TAG_IOTHUB_CLIENT_ERROR,
842                ))),
843            }
844        })
845        .await
846    }
847
848    pub async fn get_properties_and_subscribe_to_updates_async(
849        &mut self,
850    ) -> Result<(IoTHubClientPropertyPayloadType, Vec<u8>), IotError> {
851        let (sender, mut receiver) =
852            oneshot::channel::<(IoTHubClientPropertyPayloadType, Vec<u8>)>();
853        let context = Box::into_raw(Box::new(sender)) as *mut c_void;
854
855        let result = unsafe {
856            IoTHubModuleClient_LL_GetPropertiesAndSubscribeToUpdatesAsync(
857                self.handle,
858                Some(Self::c_async_properties_received_callback),
859                context,
860            )
861        };
862
863        if let Err(error) = IotError::check_sdk_result(result) {
864            unsafe {
865                drop(Box::from_raw(
866                    context as *mut oneshot::Sender<(IoTHubClientPropertyPayloadType, Vec<u8>)>,
867                ));
868            }
869            return Err(error);
870        }
871
872        poll_fn(|cx| {
873            self.do_work_once()?;
874
875            match receiver.try_recv() {
876                Ok(Some(properties)) => Poll::Ready(Ok(properties)),
877                Ok(None) => {
878                    cx.waker().wake_by_ref();
879                    Poll::Pending
880                }
881                Err(_) => Poll::Ready(Err(IotError::Sdk(
882                    IOTHUB_CLIENT_RESULT_TAG_IOTHUB_CLIENT_ERROR,
883                ))),
884            }
885        })
886        .await
887    }
888
889    pub fn set_option(&self, option: ModuleClientOption<'_>) -> Result<(), IotError> {
890        let name: &CStr = (&option).try_into()?;
891        match option {
892            ModuleClientOption::LogTrace(value)
893            | ModuleClientOption::AutoUrlEncodeDecode(value) => {
894                self.set_bool_option(name, value)?
895            }
896            ModuleClientOption::MessageTimeout(value)
897            | ModuleClientOption::RetryIntervalSec(value)
898            | ModuleClientOption::RetryMaxDelaySecs(value)
899            | ModuleClientOption::SasTokenLifetime(value)
900            | ModuleClientOption::DoWorkFreqMs(value)
901            | ModuleClientOption::KeepAlive(value) => self.set_int_option(name, value)?,
902            ModuleClientOption::ProductInfo(value) | ModuleClientOption::ModelId(value) => {
903                self.set_str_option(name, value)?
904            }
905        }
906
907        Ok(())
908    }
909}
910
911impl<C: ModuleEventCallback> Drop for IotHubModuleClient<C> {
912    fn drop(&mut self) {
913        unsafe {
914            IoTHubModuleClient_LL_Destroy(self.handle);
915        }
916    }
917}