1use crate::error::IotError;
2use crate::message::IotHubMessage;
3use crate::transport::TransportProvider;
4use crate::{
5 ConnectionStatus, ConnectionStatusReason, IoTHubClientConfirmationResult,
6 IoTHubClientPropertyPayloadType, IoTHubMessageDispositionResult, IotHubDeviceTwinUpdateState,
7};
8use azure_iot_rs_sys::*;
9use futures::channel::oneshot;
10use log::info;
11use std::convert::TryInto;
12use std::ffi::{CStr, CString, c_void};
13use std::future::poll_fn;
14use std::os::raw::c_int;
15use std::ptr;
16use std::result::Result;
17use std::str;
18use std::sync::Once;
19use std::task::Poll;
20use std::time::{Duration, SystemTime, UNIX_EPOCH};
21use std::{thread, time};
22
23static IOTHUB: Once = Once::new();
24
25unsafe extern "C" {
26 fn free(ptr: *mut c_void);
27}
28
29pub enum ModuleClientOption<'a> {
30 LogTrace(bool),
31 MessageTimeout(u32),
32 ProductInfo(&'a str),
33 RetryIntervalSec(u32),
34 RetryMaxDelaySecs(u32),
35 SasTokenLifetime(u32),
36 DoWorkFreqMs(u32),
37 AutoUrlEncodeDecode(bool),
38 KeepAlive(u32),
39 ModelId(&'a str),
40}
41
42impl TryInto<&'static CStr> for &ModuleClientOption<'_> {
43 type Error = IotError;
44
45 fn try_into(self) -> Result<&'static CStr, Self::Error> {
46 let name = match self {
47 ModuleClientOption::LogTrace(_) => c"logtrace",
48 ModuleClientOption::MessageTimeout(_) => c"messageTimeout",
49 ModuleClientOption::ProductInfo(_) => c"product_info",
50 ModuleClientOption::RetryIntervalSec(_) => c"retry_interval_sec",
51 ModuleClientOption::RetryMaxDelaySecs(_) => c"retry_max_delay_secs",
52 ModuleClientOption::SasTokenLifetime(_) => c"sas_token_lifetime",
53 ModuleClientOption::DoWorkFreqMs(_) => c"do_work_freq_ms",
54 ModuleClientOption::AutoUrlEncodeDecode(_) => c"auto_url_encode_decode",
55 ModuleClientOption::KeepAlive(_) => c"keepalive",
56 ModuleClientOption::ModelId(_) => c"model_id",
57 };
58 Ok(name)
59 }
60}
61
62pub trait ModuleEventCallback {
63 fn on_message(&mut self, message: IotHubMessage) -> IoTHubMessageDispositionResult {
64 info!("Received message: {:?}", message);
65 IoTHubMessageDispositionResult::Accepted
66 }
67
68 fn on_module_twin(&mut self, state: IotHubDeviceTwinUpdateState, data: &[u8]) {
69 info!(
70 "Received module twin update: {state} {}",
71 String::from_utf8_lossy(data)
72 );
73 }
74
75 fn on_module_method(&mut self, method_name: &str, payload: &[u8]) -> Result<Vec<u8>, IotError> {
76 info!(
77 "Received module method: {} with payload: {}",
78 method_name,
79 String::from_utf8_lossy(payload)
80 );
81 Ok(Vec::new())
82 }
83
84 fn on_input_message(
85 &mut self,
86 input_name: &str,
87 message: IotHubMessage,
88 ) -> IoTHubMessageDispositionResult {
89 info!("Received message on input {}: {:?}", input_name, message);
90 IoTHubMessageDispositionResult::Accepted
91 }
92
93 fn on_connection_status(&mut self, status: ConnectionStatus, reason: ConnectionStatusReason) {
94 info!("Connection status changed: {:?} ({:?})", status, reason);
95 }
96
97 fn on_confirmation(&mut self, result: Result<(), IotError>) {
98 if let Err(error) = result {
99 info!("Telemetry confirmation failed: {:?}", error);
100 }
101 }
102}
103
104#[derive(Debug)]
105pub struct MethodInvokeResponse {
106 pub status: ::std::os::raw::c_int,
107 pub payload: Vec<u8>,
108}
109
110pub struct IotHubModuleClient<C: ModuleEventCallback> {
111 handle: IOTHUB_MODULE_CLIENT_LL_HANDLE,
112 callback: C,
113}
114
115unsafe impl<C: ModuleEventCallback + Send> Send for IotHubModuleClient<C> {}
116
117impl<C: ModuleEventCallback> IotHubModuleClient<C> {
119 unsafe extern "C" fn c_message_callback(
120 handle: *mut IOTHUB_MESSAGE_HANDLE_DATA_TAG,
121 ctx: *mut c_void,
122 ) -> IOTHUBMESSAGE_DISPOSITION_RESULT {
123 info!("Received message from hub");
124 let client = unsafe { &mut *(ctx as *mut IotHubModuleClient<C>) };
125 let message = IotHubMessage::from_handle(handle);
126 client.callback.on_message(message).as_raw()
127 }
128
129 unsafe extern "C" fn c_input_message_callback(
130 handle: *mut IOTHUB_MESSAGE_HANDLE_DATA_TAG,
131 ctx: *mut c_void,
132 ) -> IOTHUBMESSAGE_DISPOSITION_RESULT {
133 info!("Received message from hub");
134 let client = unsafe { &mut *(ctx as *mut IotHubModuleClient<C>) };
135 let message = IotHubMessage::from_handle(handle);
136 client
137 .callback
138 .on_input_message("default", message)
139 .as_raw()
140 }
141
142 unsafe extern "C" fn c_connection_status_callback(
143 status: IOTHUB_CLIENT_CONNECTION_STATUS,
144 reason: IOTHUB_CLIENT_CONNECTION_STATUS_REASON,
145 ctx: *mut c_void,
146 ) {
147 let client = unsafe { &mut *(ctx as *mut IotHubModuleClient<C>) };
148 client
149 .callback
150 .on_connection_status(status.into(), reason.into());
151 }
152
153 unsafe extern "C" fn c_module_twin_callback(
154 state: DEVICE_TWIN_UPDATE_STATE,
155 payload: *const u8,
156 size: usize,
157 ctx: *mut c_void,
158 ) {
159 let client = unsafe { &mut *(ctx as *mut IotHubModuleClient<C>) };
160 let data = unsafe { std::slice::from_raw_parts(payload, size) };
161 client.callback.on_module_twin(state.into(), data);
162 }
163
164 unsafe extern "C" fn c_module_method_callback(
165 method_name: *const i8,
166 payload: *const u8,
167 size: usize,
168 response_payload: *mut *mut u8,
169 response_payload_size: *mut usize,
170 ctx: *mut c_void,
171 ) -> ::std::os::raw::c_int {
172 let client = unsafe { &mut *(ctx as *mut IotHubModuleClient<C>) };
173 let command_name = unsafe { CStr::from_ptr(method_name) }
174 .to_str()
175 .unwrap_or_default();
176 let data = unsafe { std::slice::from_raw_parts(payload, size) };
177 let response = client.callback.on_module_method(command_name, data);
178 match response {
179 Ok(resp_data) => {
180 unsafe {
181 *response_payload = resp_data.as_ptr() as *mut u8;
182 *response_payload_size = resp_data.len();
183 }
184 200
185 }
186 Err(_) => {
187 unsafe {
188 *response_payload = ptr::null_mut();
189 *response_payload_size = 0;
190 }
191 500
192 }
193 }
194 }
195
196 fn initialize_callbacks(&mut self) -> Result<(), IotError> {
197 let handle = self.handle;
198 let context = self.context_ptr();
199 unsafe {
200 let input = c"input";
201 let result = IoTHubModuleClient_LL_SetInputMessageCallback(
202 handle,
203 input.as_ptr(),
204 Some(Self::c_input_message_callback),
205 context,
206 );
207 IotError::check_sdk_result(result)?;
208
209 let result = IoTHubModuleClient_LL_SetMessageCallback(
210 handle,
211 Some(Self::c_message_callback),
212 context,
213 );
214 IotError::check_sdk_result(result)?;
215
216 let result = IoTHubModuleClient_LL_SetModuleMethodCallback(
217 handle,
218 Some(Self::c_module_method_callback),
219 context,
220 );
221 IotError::check_sdk_result(result)?;
222
223 let result = IoTHubModuleClient_LL_SetModuleTwinCallback(
224 handle,
225 Some(Self::c_module_twin_callback),
226 context,
227 );
228 IotError::check_sdk_result(result)?;
229
230 let result = IoTHubModuleClient_LL_SetConnectionStatusCallback(
231 handle,
232 Some(Self::c_connection_status_callback),
233 context,
234 );
235 IotError::check_sdk_result(result)
236 }
237 }
238}
239
240impl<C: ModuleEventCallback> IotHubModuleClient<C> {
242 unsafe extern "C" fn c_async_confirmation_callback(
243 status: IOTHUB_CLIENT_RESULT,
244 ctx: *mut c_void,
245 ) {
246 let sender = unsafe { Box::from_raw(ctx as *mut oneshot::Sender<Result<(), IotError>>) };
247 let _ = sender.send(IotError::check_sdk_result(status));
248 }
249 unsafe extern "C" fn c_async_confirmation_result_callback(
250 result: IOTHUB_CLIENT_CONFIRMATION_RESULT,
251 ctx: *mut c_void,
252 ) {
253 let sender =
254 unsafe { Box::from_raw(ctx as *mut oneshot::Sender<IoTHubClientConfirmationResult>) };
255 let _ = sender.send(result.into());
256 }
257
258 unsafe extern "C" fn c_async_property_ack_callback(status_code: c_int, ctx: *mut c_void) {
259 let sender = unsafe { Box::from_raw(ctx as *mut oneshot::Sender<c_int>) };
260 let _ = sender.send(status_code);
261 }
262
263 unsafe extern "C" fn c_async_properties_received_callback(
264 payload_type: IOTHUB_CLIENT_PROPERTY_PAYLOAD_TYPE,
265 payload: *const u8,
266 payload_length: usize,
267 ctx: *mut c_void,
268 ) {
269 let sender = unsafe {
270 Box::from_raw(ctx as *mut oneshot::Sender<(IoTHubClientPropertyPayloadType, Vec<u8>)>)
271 };
272 let data = unsafe { std::slice::from_raw_parts(payload, payload_length) }.to_vec();
273 let _ = sender.send((payload_type.into(), data));
274 }
275
276 unsafe extern "C" fn c_get_twin_async_callback(
277 state: DEVICE_TWIN_UPDATE_STATE,
278 payload: *const u8,
279 size: usize,
280 ctx: *mut c_void,
281 ) {
282 let sender = unsafe {
283 Box::from_raw(ctx as *mut oneshot::Sender<(IotHubDeviceTwinUpdateState, Vec<u8>)>)
284 };
285 let data = unsafe { std::slice::from_raw_parts(payload, size) }.to_vec();
286 let _ = sender.send((state.into(), data));
287 }
288}
289
290impl<C: ModuleEventCallback> IotHubModuleClient<C> {
291 fn ensure_initialized() {
292 IOTHUB.call_once(|| unsafe {
293 IoTHub_Init();
294 });
295 }
296
297 fn context_ptr(&mut self) -> *mut c_void {
298 self as *mut IotHubModuleClient<C> as *mut c_void
299 }
300
301 pub fn create_from_environment(
302 protocol: TransportProvider,
303 callback: C,
304 ) -> Result<Self, IotError> {
305 Self::ensure_initialized();
306 let handle = unsafe { IoTHubModuleClient_LL_CreateFromEnvironment(protocol.to_sdk()) };
307 if handle.is_null() {
308 return Err(IotError::Sdk(IOTHUB_CLIENT_RESULT_TAG_IOTHUB_CLIENT_ERROR));
309 }
310
311 let mut client = Self { handle, callback };
312
313 client.initialize_callbacks()?;
314 Ok(client)
315 }
316
317 pub fn create_from_connection_string(
318 connection_string: &str,
319 transport: TransportProvider,
320 callback: C,
321 ) -> Result<Self, IotError> {
322 Self::ensure_initialized();
323 let connection = CString::new(connection_string)?;
324 let handle = unsafe {
325 IoTHubModuleClient_LL_CreateFromConnectionString(
326 connection.as_ptr(),
327 transport.to_sdk(),
328 )
329 };
330 if handle.is_null() {
331 return Err(IotError::Sdk(IOTHUB_CLIENT_RESULT_TAG_IOTHUB_CLIENT_ERROR));
332 }
333 let mut client = Self { handle, callback };
334 client.initialize_callbacks()?;
335 Ok(client)
336 }
337
338 pub fn try_new(callback: C) -> Result<Self, IotError> {
339 Self::create_from_environment(TransportProvider::Mqtt, callback)
340 }
341
342 pub async fn send_event_async(&mut self, message: &IotHubMessage) -> Result<(), IotError> {
343 let (sender, mut receiver) = oneshot::channel::<Result<(), IotError>>();
344 let context = Box::into_raw(Box::new(sender)) as *mut c_void;
345
346 let result = unsafe {
347 IoTHubModuleClient_LL_SendEventAsync(
348 self.handle,
349 message.handle,
350 Some(Self::c_async_confirmation_callback),
351 context,
352 )
353 };
354
355 if let Err(error) = IotError::check_sdk_result(result) {
356 unsafe {
357 drop(Box::from_raw(
358 context as *mut oneshot::Sender<Result<(), IotError>>,
359 ));
360 }
361 return Err(error);
362 }
363
364 poll_fn(|cx| {
365 self.do_work_once()?;
366
367 match receiver.try_recv() {
368 Ok(Some(send_result)) => Poll::Ready(send_result),
369 Ok(None) => {
370 cx.waker().wake_by_ref();
371 Poll::Pending
372 }
373 Err(_) => Poll::Ready(Err(IotError::Sdk(
374 IOTHUB_CLIENT_RESULT_TAG_IOTHUB_CLIENT_ERROR,
375 ))),
376 }
377 })
378 .await
379 }
380
381 pub async fn send_message_result(
382 &mut self,
383 message: &IotHubMessage,
384 ) -> Result<IoTHubClientConfirmationResult, IotError> {
385 let output = c"output";
386 self.send_event_to_output_async(&message, output).await
387 }
388
389 pub async fn send_message(
390 &mut self,
391 message: &IotHubMessage,
392 ) -> Result<IoTHubClientConfirmationResult, IotError> {
393 self.send_message_result(message).await
394 }
395
396 pub fn get_send_status(&self) -> Result<(), IotError> {
397 let handle = self.handle;
398 let mut status: IOTHUB_CLIENT_STATUS = 0;
399 let result = unsafe { IoTHubModuleClient_LL_GetSendStatus(handle, &mut status) };
400 IotError::check_sdk_result(result)?;
401 IotError::check_sdk_result(status)
402 }
403
404 pub fn set_retry_policy(
405 &self,
406 retry_policy: IOTHUB_CLIENT_RETRY_POLICY,
407 retry_timeout_limit_in_seconds: usize,
408 ) -> Result<(), IotError> {
409 let handle = self.handle;
410 let result = unsafe {
411 IoTHubModuleClient_LL_SetRetryPolicy(
412 handle,
413 retry_policy,
414 retry_timeout_limit_in_seconds,
415 )
416 };
417 IotError::check_sdk_result(result)?;
418 Ok(())
419 }
420
421 pub fn get_retry_policy(&self) -> Result<(IOTHUB_CLIENT_RETRY_POLICY, usize), IotError> {
422 let handle = self.handle;
423 let mut retry_policy: IOTHUB_CLIENT_RETRY_POLICY = 0;
424 let mut timeout_limit: usize = 0;
425 let result = unsafe {
426 IoTHubModuleClient_LL_GetRetryPolicy(handle, &mut retry_policy, &mut timeout_limit)
427 };
428 IotError::check_sdk_result(result)?;
429 Ok((retry_policy, timeout_limit))
430 }
431
432 pub fn get_last_message_receive_time(&self) -> Result<SystemTime, IotError> {
433 let handle = self.handle;
434 let mut receive_time: time_t = 0;
435 let result =
436 unsafe { IoTHubModuleClient_LL_GetLastMessageReceiveTime(handle, &mut receive_time) };
437 IotError::check_sdk_result(result)?;
438 let time = UNIX_EPOCH + Duration::from_secs(receive_time as u64);
439 Ok(time)
440 }
441
442 pub fn do_work_once(&self) -> Result<(), IotError> {
443 let handle = self.handle;
444 unsafe {
445 IoTHubModuleClient_LL_DoWork(handle);
446 }
447 Ok(())
448 }
449
450 pub fn do_work(&mut self) {
451 loop {
452 let _ = self.do_work_once();
453 thread::sleep(time::Duration::from_millis(100));
454 }
455 }
456
457 pub fn set_option_value<T>(&self, option_name: &str, value: &T) -> Result<(), IotError> {
458 let handle = self.handle;
459 let name = CString::new(option_name)?;
460 let result = unsafe {
461 IoTHubModuleClient_LL_SetOption(
462 handle,
463 name.as_ptr(),
464 value as *const T as *const c_void,
465 )
466 };
467 IotError::check_sdk_result(result)
468 }
469
470 fn set_bool_option(&self, name: &CStr, value: bool) -> Result<(), IotError> {
471 let value = if value { 1 } else { 0 };
472 let result = unsafe {
473 IoTHubModuleClient_LL_SetOption(
474 self.handle,
475 name.as_ptr(),
476 &value as *const i32 as *const c_void,
477 )
478 };
479 IotError::check_sdk_result(result)?;
480 Ok(())
481 }
482
483 fn set_int_option(&self, name: &CStr, value: u32) -> Result<(), IotError> {
484 let result = unsafe {
485 IoTHubModuleClient_LL_SetOption(
486 self.handle,
487 name.as_ptr(),
488 &value as *const u32 as *const c_void,
489 )
490 };
491 IotError::check_sdk_result(result)
492 }
493
494 fn set_str_option(&self, name: &CStr, value: &str) -> Result<(), IotError> {
495 let cvalue = CString::new(value)?;
496 let result = unsafe {
497 IoTHubModuleClient_LL_SetOption(
498 self.handle,
499 name.as_ptr(),
500 cvalue.as_ptr() as *const c_void,
501 )
502 };
503 IotError::check_sdk_result(result)
504 }
505
506 pub fn send_reported_state(
507 &mut self,
508 reported_state: &[u8],
509 callback: IOTHUB_CLIENT_REPORTED_STATE_CALLBACK,
510 ) -> Result<(), IotError> {
511 let handle = self.handle;
512 let result = unsafe {
513 IoTHubModuleClient_LL_SendReportedState(
514 handle,
515 reported_state.as_ptr(),
516 reported_state.len(),
517 callback,
518 self.context_ptr(),
519 )
520 };
521 IotError::check_sdk_result(result)
522 }
523
524 pub async fn get_twin_async(
525 &mut self,
526 ) -> Result<(IotHubDeviceTwinUpdateState, Vec<u8>), IotError> {
527 let (sender, mut receiver) = oneshot::channel::<(IotHubDeviceTwinUpdateState, Vec<u8>)>();
528 let context = Box::into_raw(Box::new(sender)) as *mut c_void;
529
530 let result = unsafe {
531 IoTHubModuleClient_LL_GetTwinAsync(
532 self.handle,
533 Some(Self::c_get_twin_async_callback),
534 context,
535 )
536 };
537
538 if let Err(error) = IotError::check_sdk_result(result) {
539 unsafe {
540 drop(Box::from_raw(
541 context as *mut oneshot::Sender<(IotHubDeviceTwinUpdateState, Vec<u8>)>,
542 ));
543 }
544 return Err(error);
545 }
546
547 poll_fn(|cx| {
548 self.do_work_once()?;
549
550 match receiver.try_recv() {
551 Ok(Some(twin_update)) => Poll::Ready(Ok(twin_update)),
552 Ok(None) => {
553 cx.waker().wake_by_ref();
554 Poll::Pending
555 }
556 Err(_) => Poll::Ready(Err(IotError::Sdk(
557 IOTHUB_CLIENT_RESULT_TAG_IOTHUB_CLIENT_ERROR,
558 ))),
559 }
560 })
561 .await
562 }
563
564 pub async fn send_event_to_output_async(
565 &mut self,
566 message: &IotHubMessage,
567 output_name: &CStr,
568 ) -> Result<IoTHubClientConfirmationResult, IotError> {
569 let (sender, mut receiver) = oneshot::channel::<IoTHubClientConfirmationResult>();
570 let context = Box::into_raw(Box::new(sender)) as *mut c_void;
571
572 let result = unsafe {
573 IoTHubModuleClient_LL_SendEventToOutputAsync(
574 self.handle,
575 message.handle,
576 output_name.as_ptr(),
577 Some(Self::c_async_confirmation_result_callback),
578 context,
579 )
580 };
581
582 if let Err(error) = IotError::check_sdk_result(result) {
583 unsafe {
584 drop(Box::from_raw(
585 context as *mut oneshot::Sender<IoTHubClientConfirmationResult>,
586 ));
587 }
588 return Err(error);
589 }
590
591 poll_fn(|cx| {
592 self.do_work_once()?;
593
594 match receiver.try_recv() {
595 Ok(Some(confirmation)) => Poll::Ready(Ok(confirmation)),
596 Ok(None) => {
597 cx.waker().wake_by_ref();
598 Poll::Pending
599 }
600 Err(_) => Poll::Ready(Err(IotError::Sdk(
601 IOTHUB_CLIENT_RESULT_TAG_IOTHUB_CLIENT_ERROR,
602 ))),
603 }
604 })
605 .await
606 }
607
608 pub fn device_method_invoke(
609 &self,
610 device_id: &str,
611 method_name: &str,
612 method_payload: &str,
613 timeout: Duration,
614 ) -> Result<MethodInvokeResponse, IotError> {
615 let handle = self.handle;
616 let device_id = CString::new(device_id)?;
617 let method_name = CString::new(method_name)?;
618 let method_payload = CString::new(method_payload)?;
619
620 let mut response_status: ::std::os::raw::c_int = 0;
621 let mut response_payload: *mut ::std::os::raw::c_uchar = ptr::null_mut();
622 let mut response_payload_size: usize = 0;
623
624 let result = unsafe {
625 IoTHubModuleClient_LL_DeviceMethodInvoke(
626 handle,
627 device_id.as_ptr(),
628 method_name.as_ptr(),
629 method_payload.as_ptr(),
630 timeout.as_secs() as ::std::os::raw::c_uint,
631 &mut response_status,
632 &mut response_payload,
633 &mut response_payload_size,
634 )
635 };
636
637 if result != IOTHUB_CLIENT_RESULT_TAG_IOTHUB_CLIENT_OK && !response_payload.is_null() {
638 unsafe { free(response_payload as *mut c_void) };
639 }
640 IotError::check_sdk_result(result)?;
641
642 let payload = if response_payload.is_null() || response_payload_size == 0 {
643 Vec::new()
644 } else {
645 let bytes =
646 unsafe { std::slice::from_raw_parts(response_payload, response_payload_size) }
647 .to_vec();
648 unsafe { free(response_payload as *mut c_void) };
649 bytes
650 };
651
652 Ok(MethodInvokeResponse {
653 status: response_status,
654 payload,
655 })
656 }
657
658 pub fn module_method_invoke(
659 &self,
660 device_id: &str,
661 module_id: &str,
662 method_name: &str,
663 method_payload: &str,
664 timeout: ::std::os::raw::c_uint,
665 ) -> Result<MethodInvokeResponse, IotError> {
666 let handle = self.handle;
667 let device_id = CString::new(device_id)?;
668 let module_id = CString::new(module_id)?;
669 let method_name = CString::new(method_name)?;
670 let method_payload = CString::new(method_payload)?;
671
672 let mut response_status: ::std::os::raw::c_int = 0;
673 let mut response_payload: *mut ::std::os::raw::c_uchar = ptr::null_mut();
674 let mut response_payload_size: usize = 0;
675
676 let result = unsafe {
677 IoTHubModuleClient_LL_ModuleMethodInvoke(
678 handle,
679 device_id.as_ptr(),
680 module_id.as_ptr(),
681 method_name.as_ptr(),
682 method_payload.as_ptr(),
683 timeout,
684 &mut response_status,
685 &mut response_payload,
686 &mut response_payload_size,
687 )
688 };
689
690 if result != IOTHUB_CLIENT_RESULT_TAG_IOTHUB_CLIENT_OK && !response_payload.is_null() {
691 unsafe { free(response_payload as *mut c_void) };
692 }
693 IotError::check_sdk_result(result)?;
694
695 let payload = if response_payload.is_null() || response_payload_size == 0 {
696 Vec::new()
697 } else {
698 let bytes =
699 unsafe { std::slice::from_raw_parts(response_payload, response_payload_size) }
700 .to_vec();
701 unsafe { free(response_payload as *mut c_void) };
702 bytes
703 };
704
705 Ok(MethodInvokeResponse {
706 status: response_status,
707 payload,
708 })
709 }
710
711 pub fn send_message_disposition(
712 &self,
713 message: &IotHubMessage,
714 disposition: IOTHUBMESSAGE_DISPOSITION_RESULT,
715 ) -> Result<(), IotError> {
716 let handle = self.handle;
717 let result = unsafe {
718 IoTHubModuleClient_LL_SendMessageDisposition(handle, message.handle, disposition)
719 };
720 IotError::check_sdk_result(result)?;
721 Ok(())
722 }
723
724 pub async fn send_telemetry_async(
725 &mut self,
726 message: &IotHubMessage,
727 ) -> Result<IoTHubClientConfirmationResult, IotError> {
728 let (sender, mut receiver) = oneshot::channel::<IoTHubClientConfirmationResult>();
729 let context = Box::into_raw(Box::new(sender)) as *mut c_void;
730
731 let result = unsafe {
732 IoTHubModuleClient_LL_SendTelemetryAsync(
733 self.handle,
734 message.handle,
735 Some(Self::c_async_confirmation_result_callback),
736 context,
737 )
738 };
739
740 if let Err(error) = IotError::check_sdk_result(result) {
741 unsafe {
742 drop(Box::from_raw(
743 context as *mut oneshot::Sender<IoTHubClientConfirmationResult>,
744 ));
745 }
746 return Err(error);
747 }
748
749 poll_fn(|cx| {
750 self.do_work_once()?;
751
752 match receiver.try_recv() {
753 Ok(Some(confirmation)) => Poll::Ready(Ok(confirmation)),
754 Ok(None) => {
755 cx.waker().wake_by_ref();
756 Poll::Pending
757 }
758 Err(_) => Poll::Ready(Err(IotError::Sdk(
759 IOTHUB_CLIENT_RESULT_TAG_IOTHUB_CLIENT_ERROR,
760 ))),
761 }
762 })
763 .await
764 }
765
766 pub fn subscribe_to_commands(
767 &mut self,
768 callback: IOTHUB_CLIENT_COMMAND_CALLBACK_ASYNC,
769 ) -> Result<(), IotError> {
770 let handle = self.handle;
771 let result = unsafe {
772 IoTHubModuleClient_LL_SubscribeToCommands(handle, callback, self.context_ptr())
773 };
774 IotError::check_sdk_result(result)
775 }
776
777 pub async fn send_properties_async(&mut self, properties: &[u8]) -> Result<c_int, IotError> {
778 let (sender, mut receiver) = oneshot::channel::<c_int>();
779 let context = Box::into_raw(Box::new(sender)) as *mut c_void;
780
781 let result = unsafe {
782 IoTHubModuleClient_LL_SendPropertiesAsync(
783 self.handle,
784 properties.as_ptr(),
785 properties.len(),
786 Some(Self::c_async_property_ack_callback),
787 context,
788 )
789 };
790
791 if let Err(error) = IotError::check_sdk_result(result) {
792 unsafe {
793 drop(Box::from_raw(context as *mut oneshot::Sender<c_int>));
794 }
795 return Err(error);
796 }
797
798 poll_fn(|cx| {
799 self.do_work_once()?;
800
801 match receiver.try_recv() {
802 Ok(Some(status_code)) => Poll::Ready(Ok(status_code)),
803 Ok(None) => {
804 cx.waker().wake_by_ref();
805 Poll::Pending
806 }
807 Err(_) => Poll::Ready(Err(IotError::Sdk(
808 IOTHUB_CLIENT_RESULT_TAG_IOTHUB_CLIENT_ERROR,
809 ))),
810 }
811 })
812 .await
813 }
814
815 pub async fn get_properties_async(
816 &mut self,
817 ) -> Result<(IoTHubClientPropertyPayloadType, Vec<u8>), IotError> {
818 let (sender, mut receiver) =
819 oneshot::channel::<(IoTHubClientPropertyPayloadType, Vec<u8>)>();
820 let context = Box::into_raw(Box::new(sender)) as *mut c_void;
821
822 let result = unsafe {
823 IoTHubModuleClient_LL_GetPropertiesAsync(
824 self.handle,
825 Some(Self::c_async_properties_received_callback),
826 context,
827 )
828 };
829
830 if let Err(error) = IotError::check_sdk_result(result) {
831 unsafe {
832 drop(Box::from_raw(
833 context as *mut oneshot::Sender<(IoTHubClientPropertyPayloadType, Vec<u8>)>,
834 ));
835 }
836 return Err(error);
837 }
838
839 poll_fn(|cx| {
840 self.do_work_once()?;
841
842 match receiver.try_recv() {
843 Ok(Some(properties)) => Poll::Ready(Ok(properties)),
844 Ok(None) => {
845 cx.waker().wake_by_ref();
846 Poll::Pending
847 }
848 Err(_) => Poll::Ready(Err(IotError::Sdk(
849 IOTHUB_CLIENT_RESULT_TAG_IOTHUB_CLIENT_ERROR,
850 ))),
851 }
852 })
853 .await
854 }
855
856 pub async fn get_properties_and_subscribe_to_updates_async(
857 &mut self,
858 ) -> Result<(IoTHubClientPropertyPayloadType, Vec<u8>), IotError> {
859 let (sender, mut receiver) =
860 oneshot::channel::<(IoTHubClientPropertyPayloadType, Vec<u8>)>();
861 let context = Box::into_raw(Box::new(sender)) as *mut c_void;
862
863 let result = unsafe {
864 IoTHubModuleClient_LL_GetPropertiesAndSubscribeToUpdatesAsync(
865 self.handle,
866 Some(Self::c_async_properties_received_callback),
867 context,
868 )
869 };
870
871 if let Err(error) = IotError::check_sdk_result(result) {
872 unsafe {
873 drop(Box::from_raw(
874 context as *mut oneshot::Sender<(IoTHubClientPropertyPayloadType, Vec<u8>)>,
875 ));
876 }
877 return Err(error);
878 }
879
880 poll_fn(|cx| {
881 self.do_work_once()?;
882
883 match receiver.try_recv() {
884 Ok(Some(properties)) => Poll::Ready(Ok(properties)),
885 Ok(None) => {
886 cx.waker().wake_by_ref();
887 Poll::Pending
888 }
889 Err(_) => Poll::Ready(Err(IotError::Sdk(
890 IOTHUB_CLIENT_RESULT_TAG_IOTHUB_CLIENT_ERROR,
891 ))),
892 }
893 })
894 .await
895 }
896
897 pub fn set_option(&self, option: ModuleClientOption<'_>) -> Result<(), IotError> {
898 let name: &CStr = (&option).try_into()?;
899 match option {
900 ModuleClientOption::LogTrace(value)
901 | ModuleClientOption::AutoUrlEncodeDecode(value) => {
902 self.set_bool_option(name, value)?
903 }
904 ModuleClientOption::MessageTimeout(value)
905 | ModuleClientOption::RetryIntervalSec(value)
906 | ModuleClientOption::RetryMaxDelaySecs(value)
907 | ModuleClientOption::SasTokenLifetime(value)
908 | ModuleClientOption::DoWorkFreqMs(value)
909 | ModuleClientOption::KeepAlive(value) => self.set_int_option(name, value)?,
910 ModuleClientOption::ProductInfo(value) | ModuleClientOption::ModelId(value) => {
911 self.set_str_option(name, value)?
912 }
913 }
914
915 Ok(())
916 }
917}
918
919impl<C: ModuleEventCallback> Drop for IotHubModuleClient<C> {
920 fn drop(&mut self) {
921 unsafe {
922 IoTHubModuleClient_LL_Destroy(self.handle);
923 }
924 }
925}