1use crate::error::IotError;
2use crate::message::IotHubMessage;
3use crate::transport::TransportProvider;
4use crate::{
5 ConnectionStatus, ConnectionStatusReason, IoTHubClientConfirmationResult,
6 IoTHubClientPropertyPayloadType, IoTHubMessageDispositionResult, IotHub,
7 IotHubDeviceTwinUpdateState,
8};
9use azure_iot_rs_sys::*;
10use futures::channel::oneshot;
11use log::info;
12use std::convert::TryInto;
13use std::ffi::{CStr, CString, c_void};
14use std::future::poll_fn;
15use std::os::raw::c_int;
16use std::ptr;
17use std::result::Result;
18use std::str;
19use std::task::Poll;
20use std::time::{Duration, SystemTime, UNIX_EPOCH};
21use std::{thread, time};
22
23unsafe extern "C" {
24 fn free(ptr: *mut c_void);
25}
26
27pub enum ModuleClientOption<'a> {
28 LogTrace(bool),
29 MessageTimeout(u32),
30 ProductInfo(&'a str),
31 RetryIntervalSec(u32),
32 RetryMaxDelaySecs(u32),
33 SasTokenLifetime(u32),
34 DoWorkFreqMs(u32),
35 AutoUrlEncodeDecode(bool),
36 KeepAlive(u32),
37 ModelId(&'a str),
38}
39
40impl TryInto<&'static CStr> for &ModuleClientOption<'_> {
41 type Error = IotError;
42
43 fn try_into(self) -> Result<&'static CStr, Self::Error> {
44 let name = match self {
45 ModuleClientOption::LogTrace(_) => c"logtrace",
46 ModuleClientOption::MessageTimeout(_) => c"messageTimeout",
47 ModuleClientOption::ProductInfo(_) => c"product_info",
48 ModuleClientOption::RetryIntervalSec(_) => c"retry_interval_sec",
49 ModuleClientOption::RetryMaxDelaySecs(_) => c"retry_max_delay_secs",
50 ModuleClientOption::SasTokenLifetime(_) => c"sas_token_lifetime",
51 ModuleClientOption::DoWorkFreqMs(_) => c"do_work_freq_ms",
52 ModuleClientOption::AutoUrlEncodeDecode(_) => c"auto_url_encode_decode",
53 ModuleClientOption::KeepAlive(_) => c"keepalive",
54 ModuleClientOption::ModelId(_) => c"model_id",
55 };
56 Ok(name)
57 }
58}
59
60pub trait ModuleEventCallback {
61 fn on_message(&mut self, message: IotHubMessage) -> IoTHubMessageDispositionResult {
62 info!("Received message: {:?}", message);
63 IoTHubMessageDispositionResult::Accepted
64 }
65
66 fn on_module_twin(&mut self, state: IotHubDeviceTwinUpdateState, data: &[u8]) {
67 info!(
68 "Received module twin update: {state} {}",
69 String::from_utf8_lossy(data)
70 );
71 }
72
73 fn on_module_method(&mut self, method_name: &str, payload: &[u8]) -> Result<Vec<u8>, IotError> {
74 info!(
75 "Received module method: {} with payload: {}",
76 method_name,
77 String::from_utf8_lossy(payload)
78 );
79 Ok(Vec::new())
80 }
81
82 fn on_input_message(
83 &mut self,
84 input_name: &str,
85 message: IotHubMessage,
86 ) -> IoTHubMessageDispositionResult {
87 info!("Received message on input {}: {:?}", input_name, message);
88 IoTHubMessageDispositionResult::Accepted
89 }
90
91 fn on_connection_status(&mut self, status: ConnectionStatus, reason: ConnectionStatusReason) {
92 info!("Connection status changed: {:?} ({:?})", status, reason);
93 }
94
95 fn on_confirmation(&mut self, result: Result<(), IotError>) {
96 if let Err(error) = result {
97 info!("Telemetry confirmation failed: {:?}", error);
98 }
99 }
100}
101
102#[derive(Debug)]
103pub struct MethodInvokeResponse {
104 pub status: ::std::os::raw::c_int,
105 pub payload: Vec<u8>,
106}
107
108pub struct IotHubModuleClient<C: ModuleEventCallback> {
109 handle: IOTHUB_MODULE_CLIENT_LL_HANDLE,
110 callback: C,
111}
112
113unsafe impl<C: ModuleEventCallback + Send> Send for IotHubModuleClient<C> {}
114
115impl<C: ModuleEventCallback> IotHubModuleClient<C> {
117 unsafe extern "C" fn c_message_callback(
118 handle: *mut IOTHUB_MESSAGE_HANDLE_DATA_TAG,
119 ctx: *mut c_void,
120 ) -> IOTHUBMESSAGE_DISPOSITION_RESULT {
121 info!("Received message from hub");
122 let client = unsafe { &mut *(ctx as *mut IotHubModuleClient<C>) };
123 let message = IotHubMessage::from_handle(handle);
124 client.callback.on_message(message).as_raw()
125 }
126
127 unsafe extern "C" fn c_input_message_callback(
128 handle: *mut IOTHUB_MESSAGE_HANDLE_DATA_TAG,
129 ctx: *mut c_void,
130 ) -> IOTHUBMESSAGE_DISPOSITION_RESULT {
131 info!("Received message from hub");
132 let client = unsafe { &mut *(ctx as *mut IotHubModuleClient<C>) };
133 let message = IotHubMessage::from_handle(handle);
134 client
135 .callback
136 .on_input_message("default", message)
137 .as_raw()
138 }
139
140 unsafe extern "C" fn c_connection_status_callback(
141 status: IOTHUB_CLIENT_CONNECTION_STATUS,
142 reason: IOTHUB_CLIENT_CONNECTION_STATUS_REASON,
143 ctx: *mut c_void,
144 ) {
145 let client = unsafe { &mut *(ctx as *mut IotHubModuleClient<C>) };
146 client
147 .callback
148 .on_connection_status(status.into(), reason.into());
149 }
150
151 unsafe extern "C" fn c_module_twin_callback(
152 state: DEVICE_TWIN_UPDATE_STATE,
153 payload: *const u8,
154 size: usize,
155 ctx: *mut c_void,
156 ) {
157 let client = unsafe { &mut *(ctx as *mut IotHubModuleClient<C>) };
158 let data = unsafe { std::slice::from_raw_parts(payload, size) };
159 client.callback.on_module_twin(state.into(), data);
160 }
161
162 unsafe extern "C" fn c_module_method_callback(
163 method_name: *const i8,
164 payload: *const u8,
165 size: usize,
166 response_payload: *mut *mut u8,
167 response_payload_size: *mut usize,
168 ctx: *mut c_void,
169 ) -> ::std::os::raw::c_int {
170 let client = unsafe { &mut *(ctx as *mut IotHubModuleClient<C>) };
171 let command_name = unsafe { CStr::from_ptr(method_name) }
172 .to_str()
173 .unwrap_or_default();
174 let data = unsafe { std::slice::from_raw_parts(payload, size) };
175 let response = client.callback.on_module_method(command_name, data);
176 match response {
177 Ok(resp_data) => {
178 unsafe {
179 *response_payload = resp_data.as_ptr() as *mut u8;
180 *response_payload_size = resp_data.len();
181 }
182 200
183 }
184 Err(_) => {
185 unsafe {
186 *response_payload = ptr::null_mut();
187 *response_payload_size = 0;
188 }
189 500
190 }
191 }
192 }
193
194 fn initialize_callbacks(&mut self) -> Result<(), IotError> {
195 let handle = self.handle;
196 let context = self.context_ptr();
197 unsafe {
198 let input = c"input";
199 let result = IoTHubModuleClient_LL_SetInputMessageCallback(
200 handle,
201 input.as_ptr(),
202 Some(Self::c_input_message_callback),
203 context,
204 );
205 IotError::check_sdk_result(result)?;
206
207 let result = IoTHubModuleClient_LL_SetMessageCallback(
208 handle,
209 Some(Self::c_message_callback),
210 context,
211 );
212 IotError::check_sdk_result(result)?;
213
214 let result = IoTHubModuleClient_LL_SetModuleMethodCallback(
215 handle,
216 Some(Self::c_module_method_callback),
217 context,
218 );
219 IotError::check_sdk_result(result)?;
220
221 let result = IoTHubModuleClient_LL_SetModuleTwinCallback(
222 handle,
223 Some(Self::c_module_twin_callback),
224 context,
225 );
226 IotError::check_sdk_result(result)?;
227
228 let result = IoTHubModuleClient_LL_SetConnectionStatusCallback(
229 handle,
230 Some(Self::c_connection_status_callback),
231 context,
232 );
233 IotError::check_sdk_result(result)
234 }
235 }
236}
237
238impl<C: ModuleEventCallback> IotHubModuleClient<C> {
240 unsafe extern "C" fn c_async_confirmation_callback(
241 status: IOTHUB_CLIENT_RESULT,
242 ctx: *mut c_void,
243 ) {
244 let sender = unsafe { Box::from_raw(ctx as *mut oneshot::Sender<Result<(), IotError>>) };
245 let _ = sender.send(IotError::check_sdk_result(status));
246 }
247 unsafe extern "C" fn c_async_confirmation_result_callback(
248 result: IOTHUB_CLIENT_CONFIRMATION_RESULT,
249 ctx: *mut c_void,
250 ) {
251 let sender =
252 unsafe { Box::from_raw(ctx as *mut oneshot::Sender<IoTHubClientConfirmationResult>) };
253 let _ = sender.send(result.into());
254 }
255
256 unsafe extern "C" fn c_async_property_ack_callback(status_code: c_int, ctx: *mut c_void) {
257 let sender = unsafe { Box::from_raw(ctx as *mut oneshot::Sender<c_int>) };
258 let _ = sender.send(status_code);
259 }
260
261 unsafe extern "C" fn c_async_properties_received_callback(
262 payload_type: IOTHUB_CLIENT_PROPERTY_PAYLOAD_TYPE,
263 payload: *const u8,
264 payload_length: usize,
265 ctx: *mut c_void,
266 ) {
267 let sender = unsafe {
268 Box::from_raw(ctx as *mut oneshot::Sender<(IoTHubClientPropertyPayloadType, Vec<u8>)>)
269 };
270 let data = unsafe { std::slice::from_raw_parts(payload, payload_length) }.to_vec();
271 let _ = sender.send((payload_type.into(), data));
272 }
273
274 unsafe extern "C" fn c_get_twin_async_callback(
275 state: DEVICE_TWIN_UPDATE_STATE,
276 payload: *const u8,
277 size: usize,
278 ctx: *mut c_void,
279 ) {
280 let sender = unsafe {
281 Box::from_raw(ctx as *mut oneshot::Sender<(IotHubDeviceTwinUpdateState, Vec<u8>)>)
282 };
283 let data = unsafe { std::slice::from_raw_parts(payload, size) }.to_vec();
284 let _ = sender.send((state.into(), data));
285 }
286}
287
288impl<C: ModuleEventCallback> IotHubModuleClient<C> {
289 fn context_ptr(&mut self) -> *mut c_void {
290 self as *mut IotHubModuleClient<C> as *mut c_void
291 }
292
293 pub fn create_from_environment(
294 protocol: TransportProvider,
295 callback: C,
296 ) -> Result<Self, IotError> {
297 IotHub::ensure_initialized()?;
298 let handle = unsafe { IoTHubModuleClient_LL_CreateFromEnvironment(protocol.to_sdk()) };
299 if handle.is_null() {
300 return Err(IotError::Sdk(IOTHUB_CLIENT_RESULT_TAG_IOTHUB_CLIENT_ERROR));
301 }
302
303 let mut client = Self { handle, callback };
304
305 client.initialize_callbacks()?;
306 Ok(client)
307 }
308
309 pub fn create_from_connection_string(
310 connection_string: &str,
311 transport: TransportProvider,
312 callback: C,
313 ) -> Result<Self, IotError> {
314 IotHub::ensure_initialized()?;
315 let connection = CString::new(connection_string)?;
316 let handle = unsafe {
317 IoTHubModuleClient_LL_CreateFromConnectionString(
318 connection.as_ptr(),
319 transport.to_sdk(),
320 )
321 };
322 if handle.is_null() {
323 return Err(IotError::Sdk(IOTHUB_CLIENT_RESULT_TAG_IOTHUB_CLIENT_ERROR));
324 }
325 let mut client = Self { handle, callback };
326 client.initialize_callbacks()?;
327 Ok(client)
328 }
329
330 pub fn try_new(callback: C) -> Result<Self, IotError> {
331 Self::create_from_environment(TransportProvider::Mqtt, callback)
332 }
333
334 pub async fn send_event_async(&mut self, message: &IotHubMessage) -> Result<(), IotError> {
335 let (sender, mut receiver) = oneshot::channel::<Result<(), IotError>>();
336 let context = Box::into_raw(Box::new(sender)) as *mut c_void;
337
338 let result = unsafe {
339 IoTHubModuleClient_LL_SendEventAsync(
340 self.handle,
341 message.handle,
342 Some(Self::c_async_confirmation_callback),
343 context,
344 )
345 };
346
347 if let Err(error) = IotError::check_sdk_result(result) {
348 unsafe {
349 drop(Box::from_raw(
350 context as *mut oneshot::Sender<Result<(), IotError>>,
351 ));
352 }
353 return Err(error);
354 }
355
356 poll_fn(|cx| {
357 self.do_work_once()?;
358
359 match receiver.try_recv() {
360 Ok(Some(send_result)) => Poll::Ready(send_result),
361 Ok(None) => {
362 cx.waker().wake_by_ref();
363 Poll::Pending
364 }
365 Err(_) => Poll::Ready(Err(IotError::Sdk(
366 IOTHUB_CLIENT_RESULT_TAG_IOTHUB_CLIENT_ERROR,
367 ))),
368 }
369 })
370 .await
371 }
372
373 pub async fn send_message_result(
374 &mut self,
375 message: &IotHubMessage,
376 ) -> Result<IoTHubClientConfirmationResult, IotError> {
377 let output = c"output";
378 self.send_event_to_output_async(message, output).await
379 }
380
381 pub async fn send_message(
382 &mut self,
383 message: &IotHubMessage,
384 ) -> Result<IoTHubClientConfirmationResult, IotError> {
385 self.send_message_result(message).await
386 }
387
388 pub fn get_send_status(&self) -> Result<(), IotError> {
389 let handle = self.handle;
390 let mut status: IOTHUB_CLIENT_STATUS = 0;
391 let result = unsafe { IoTHubModuleClient_LL_GetSendStatus(handle, &mut status) };
392 IotError::check_sdk_result(result)?;
393 IotError::check_sdk_result(status)
394 }
395
396 pub fn set_retry_policy(
397 &self,
398 retry_policy: IOTHUB_CLIENT_RETRY_POLICY,
399 retry_timeout_limit_in_seconds: usize,
400 ) -> Result<(), IotError> {
401 let handle = self.handle;
402 let result = unsafe {
403 IoTHubModuleClient_LL_SetRetryPolicy(
404 handle,
405 retry_policy,
406 retry_timeout_limit_in_seconds,
407 )
408 };
409 IotError::check_sdk_result(result)?;
410 Ok(())
411 }
412
413 pub fn get_retry_policy(&self) -> Result<(IOTHUB_CLIENT_RETRY_POLICY, usize), IotError> {
414 let handle = self.handle;
415 let mut retry_policy: IOTHUB_CLIENT_RETRY_POLICY = 0;
416 let mut timeout_limit: usize = 0;
417 let result = unsafe {
418 IoTHubModuleClient_LL_GetRetryPolicy(handle, &mut retry_policy, &mut timeout_limit)
419 };
420 IotError::check_sdk_result(result)?;
421 Ok((retry_policy, timeout_limit))
422 }
423
424 pub fn get_last_message_receive_time(&self) -> Result<SystemTime, IotError> {
425 let handle = self.handle;
426 let mut receive_time: time_t = 0;
427 let result =
428 unsafe { IoTHubModuleClient_LL_GetLastMessageReceiveTime(handle, &mut receive_time) };
429 IotError::check_sdk_result(result)?;
430 let time = UNIX_EPOCH + Duration::from_secs(receive_time as u64);
431 Ok(time)
432 }
433
434 pub fn do_work_once(&self) -> Result<(), IotError> {
435 let handle = self.handle;
436 unsafe {
437 IoTHubModuleClient_LL_DoWork(handle);
438 }
439 Ok(())
440 }
441
442 pub fn do_work(&mut self) {
443 loop {
444 let _ = self.do_work_once();
445 thread::sleep(time::Duration::from_millis(100));
446 }
447 }
448
449 pub fn set_option_value<T>(&self, option_name: &str, value: &T) -> Result<(), IotError> {
450 let handle = self.handle;
451 let name = CString::new(option_name)?;
452 let result = unsafe {
453 IoTHubModuleClient_LL_SetOption(
454 handle,
455 name.as_ptr(),
456 value as *const T as *const c_void,
457 )
458 };
459 IotError::check_sdk_result(result)
460 }
461
462 fn set_bool_option(&self, name: &CStr, value: bool) -> Result<(), IotError> {
463 let value = if value { 1 } else { 0 };
464 let result = unsafe {
465 IoTHubModuleClient_LL_SetOption(
466 self.handle,
467 name.as_ptr(),
468 &value as *const i32 as *const c_void,
469 )
470 };
471 IotError::check_sdk_result(result)?;
472 Ok(())
473 }
474
475 fn set_int_option(&self, name: &CStr, value: u32) -> Result<(), IotError> {
476 let result = unsafe {
477 IoTHubModuleClient_LL_SetOption(
478 self.handle,
479 name.as_ptr(),
480 &value as *const u32 as *const c_void,
481 )
482 };
483 IotError::check_sdk_result(result)
484 }
485
486 fn set_str_option(&self, name: &CStr, value: &str) -> Result<(), IotError> {
487 let cvalue = CString::new(value)?;
488 let result = unsafe {
489 IoTHubModuleClient_LL_SetOption(
490 self.handle,
491 name.as_ptr(),
492 cvalue.as_ptr() as *const c_void,
493 )
494 };
495 IotError::check_sdk_result(result)
496 }
497
498 pub fn send_reported_state(
499 &mut self,
500 reported_state: &[u8],
501 callback: IOTHUB_CLIENT_REPORTED_STATE_CALLBACK,
502 ) -> Result<(), IotError> {
503 let handle = self.handle;
504 let result = unsafe {
505 IoTHubModuleClient_LL_SendReportedState(
506 handle,
507 reported_state.as_ptr(),
508 reported_state.len(),
509 callback,
510 self.context_ptr(),
511 )
512 };
513 IotError::check_sdk_result(result)
514 }
515
516 pub async fn get_twin_async(
517 &mut self,
518 ) -> Result<(IotHubDeviceTwinUpdateState, Vec<u8>), IotError> {
519 let (sender, mut receiver) = oneshot::channel::<(IotHubDeviceTwinUpdateState, Vec<u8>)>();
520 let context = Box::into_raw(Box::new(sender)) as *mut c_void;
521
522 let result = unsafe {
523 IoTHubModuleClient_LL_GetTwinAsync(
524 self.handle,
525 Some(Self::c_get_twin_async_callback),
526 context,
527 )
528 };
529
530 if let Err(error) = IotError::check_sdk_result(result) {
531 unsafe {
532 drop(Box::from_raw(
533 context as *mut oneshot::Sender<(IotHubDeviceTwinUpdateState, Vec<u8>)>,
534 ));
535 }
536 return Err(error);
537 }
538
539 poll_fn(|cx| {
540 self.do_work_once()?;
541
542 match receiver.try_recv() {
543 Ok(Some(twin_update)) => Poll::Ready(Ok(twin_update)),
544 Ok(None) => {
545 cx.waker().wake_by_ref();
546 Poll::Pending
547 }
548 Err(_) => Poll::Ready(Err(IotError::Sdk(
549 IOTHUB_CLIENT_RESULT_TAG_IOTHUB_CLIENT_ERROR,
550 ))),
551 }
552 })
553 .await
554 }
555
556 pub async fn send_event_to_output_async(
557 &mut self,
558 message: &IotHubMessage,
559 output_name: &CStr,
560 ) -> Result<IoTHubClientConfirmationResult, IotError> {
561 let (sender, mut receiver) = oneshot::channel::<IoTHubClientConfirmationResult>();
562 let context = Box::into_raw(Box::new(sender)) as *mut c_void;
563
564 let result = unsafe {
565 IoTHubModuleClient_LL_SendEventToOutputAsync(
566 self.handle,
567 message.handle,
568 output_name.as_ptr(),
569 Some(Self::c_async_confirmation_result_callback),
570 context,
571 )
572 };
573
574 if let Err(error) = IotError::check_sdk_result(result) {
575 unsafe {
576 drop(Box::from_raw(
577 context as *mut oneshot::Sender<IoTHubClientConfirmationResult>,
578 ));
579 }
580 return Err(error);
581 }
582
583 poll_fn(|cx| {
584 self.do_work_once()?;
585
586 match receiver.try_recv() {
587 Ok(Some(confirmation)) => Poll::Ready(Ok(confirmation)),
588 Ok(None) => {
589 cx.waker().wake_by_ref();
590 Poll::Pending
591 }
592 Err(_) => Poll::Ready(Err(IotError::Sdk(
593 IOTHUB_CLIENT_RESULT_TAG_IOTHUB_CLIENT_ERROR,
594 ))),
595 }
596 })
597 .await
598 }
599
600 pub fn device_method_invoke(
601 &self,
602 device_id: &str,
603 method_name: &str,
604 method_payload: &str,
605 timeout: Duration,
606 ) -> Result<MethodInvokeResponse, IotError> {
607 let handle = self.handle;
608 let device_id = CString::new(device_id)?;
609 let method_name = CString::new(method_name)?;
610 let method_payload = CString::new(method_payload)?;
611
612 let mut response_status: ::std::os::raw::c_int = 0;
613 let mut response_payload: *mut ::std::os::raw::c_uchar = ptr::null_mut();
614 let mut response_payload_size: usize = 0;
615
616 let result = unsafe {
617 IoTHubModuleClient_LL_DeviceMethodInvoke(
618 handle,
619 device_id.as_ptr(),
620 method_name.as_ptr(),
621 method_payload.as_ptr(),
622 timeout.as_secs() as ::std::os::raw::c_uint,
623 &mut response_status,
624 &mut response_payload,
625 &mut response_payload_size,
626 )
627 };
628
629 if result != IOTHUB_CLIENT_RESULT_TAG_IOTHUB_CLIENT_OK && !response_payload.is_null() {
630 unsafe { free(response_payload as *mut c_void) };
631 }
632 IotError::check_sdk_result(result)?;
633
634 let payload = if response_payload.is_null() || response_payload_size == 0 {
635 Vec::new()
636 } else {
637 let bytes =
638 unsafe { std::slice::from_raw_parts(response_payload, response_payload_size) }
639 .to_vec();
640 unsafe { free(response_payload as *mut c_void) };
641 bytes
642 };
643
644 Ok(MethodInvokeResponse {
645 status: response_status,
646 payload,
647 })
648 }
649
650 pub fn module_method_invoke(
651 &self,
652 device_id: &str,
653 module_id: &str,
654 method_name: &str,
655 method_payload: &str,
656 timeout: ::std::os::raw::c_uint,
657 ) -> Result<MethodInvokeResponse, IotError> {
658 let handle = self.handle;
659 let device_id = CString::new(device_id)?;
660 let module_id = CString::new(module_id)?;
661 let method_name = CString::new(method_name)?;
662 let method_payload = CString::new(method_payload)?;
663
664 let mut response_status: ::std::os::raw::c_int = 0;
665 let mut response_payload: *mut ::std::os::raw::c_uchar = ptr::null_mut();
666 let mut response_payload_size: usize = 0;
667
668 let result = unsafe {
669 IoTHubModuleClient_LL_ModuleMethodInvoke(
670 handle,
671 device_id.as_ptr(),
672 module_id.as_ptr(),
673 method_name.as_ptr(),
674 method_payload.as_ptr(),
675 timeout,
676 &mut response_status,
677 &mut response_payload,
678 &mut response_payload_size,
679 )
680 };
681
682 if result != IOTHUB_CLIENT_RESULT_TAG_IOTHUB_CLIENT_OK && !response_payload.is_null() {
683 unsafe { free(response_payload as *mut c_void) };
684 }
685 IotError::check_sdk_result(result)?;
686
687 let payload = if response_payload.is_null() || response_payload_size == 0 {
688 Vec::new()
689 } else {
690 let bytes =
691 unsafe { std::slice::from_raw_parts(response_payload, response_payload_size) }
692 .to_vec();
693 unsafe { free(response_payload as *mut c_void) };
694 bytes
695 };
696
697 Ok(MethodInvokeResponse {
698 status: response_status,
699 payload,
700 })
701 }
702
703 pub fn send_message_disposition(
704 &self,
705 message: &IotHubMessage,
706 disposition: IOTHUBMESSAGE_DISPOSITION_RESULT,
707 ) -> Result<(), IotError> {
708 let handle = self.handle;
709 let result = unsafe {
710 IoTHubModuleClient_LL_SendMessageDisposition(handle, message.handle, disposition)
711 };
712 IotError::check_sdk_result(result)?;
713 Ok(())
714 }
715
716 pub async fn send_telemetry_async(
717 &mut self,
718 message: &IotHubMessage,
719 ) -> Result<IoTHubClientConfirmationResult, IotError> {
720 let (sender, mut receiver) = oneshot::channel::<IoTHubClientConfirmationResult>();
721 let context = Box::into_raw(Box::new(sender)) as *mut c_void;
722
723 let result = unsafe {
724 IoTHubModuleClient_LL_SendTelemetryAsync(
725 self.handle,
726 message.handle,
727 Some(Self::c_async_confirmation_result_callback),
728 context,
729 )
730 };
731
732 if let Err(error) = IotError::check_sdk_result(result) {
733 unsafe {
734 drop(Box::from_raw(
735 context as *mut oneshot::Sender<IoTHubClientConfirmationResult>,
736 ));
737 }
738 return Err(error);
739 }
740
741 poll_fn(|cx| {
742 self.do_work_once()?;
743
744 match receiver.try_recv() {
745 Ok(Some(confirmation)) => Poll::Ready(Ok(confirmation)),
746 Ok(None) => {
747 cx.waker().wake_by_ref();
748 Poll::Pending
749 }
750 Err(_) => Poll::Ready(Err(IotError::Sdk(
751 IOTHUB_CLIENT_RESULT_TAG_IOTHUB_CLIENT_ERROR,
752 ))),
753 }
754 })
755 .await
756 }
757
758 pub fn subscribe_to_commands(
759 &mut self,
760 callback: IOTHUB_CLIENT_COMMAND_CALLBACK_ASYNC,
761 ) -> Result<(), IotError> {
762 let handle = self.handle;
763 let result = unsafe {
764 IoTHubModuleClient_LL_SubscribeToCommands(handle, callback, self.context_ptr())
765 };
766 IotError::check_sdk_result(result)
767 }
768
769 pub async fn send_properties_async(&mut self, properties: &[u8]) -> Result<c_int, IotError> {
770 let (sender, mut receiver) = oneshot::channel::<c_int>();
771 let context = Box::into_raw(Box::new(sender)) as *mut c_void;
772
773 let result = unsafe {
774 IoTHubModuleClient_LL_SendPropertiesAsync(
775 self.handle,
776 properties.as_ptr(),
777 properties.len(),
778 Some(Self::c_async_property_ack_callback),
779 context,
780 )
781 };
782
783 if let Err(error) = IotError::check_sdk_result(result) {
784 unsafe {
785 drop(Box::from_raw(context as *mut oneshot::Sender<c_int>));
786 }
787 return Err(error);
788 }
789
790 poll_fn(|cx| {
791 self.do_work_once()?;
792
793 match receiver.try_recv() {
794 Ok(Some(status_code)) => Poll::Ready(Ok(status_code)),
795 Ok(None) => {
796 cx.waker().wake_by_ref();
797 Poll::Pending
798 }
799 Err(_) => Poll::Ready(Err(IotError::Sdk(
800 IOTHUB_CLIENT_RESULT_TAG_IOTHUB_CLIENT_ERROR,
801 ))),
802 }
803 })
804 .await
805 }
806
807 pub async fn get_properties_async(
808 &mut self,
809 ) -> Result<(IoTHubClientPropertyPayloadType, Vec<u8>), IotError> {
810 let (sender, mut receiver) =
811 oneshot::channel::<(IoTHubClientPropertyPayloadType, Vec<u8>)>();
812 let context = Box::into_raw(Box::new(sender)) as *mut c_void;
813
814 let result = unsafe {
815 IoTHubModuleClient_LL_GetPropertiesAsync(
816 self.handle,
817 Some(Self::c_async_properties_received_callback),
818 context,
819 )
820 };
821
822 if let Err(error) = IotError::check_sdk_result(result) {
823 unsafe {
824 drop(Box::from_raw(
825 context as *mut oneshot::Sender<(IoTHubClientPropertyPayloadType, Vec<u8>)>,
826 ));
827 }
828 return Err(error);
829 }
830
831 poll_fn(|cx| {
832 self.do_work_once()?;
833
834 match receiver.try_recv() {
835 Ok(Some(properties)) => Poll::Ready(Ok(properties)),
836 Ok(None) => {
837 cx.waker().wake_by_ref();
838 Poll::Pending
839 }
840 Err(_) => Poll::Ready(Err(IotError::Sdk(
841 IOTHUB_CLIENT_RESULT_TAG_IOTHUB_CLIENT_ERROR,
842 ))),
843 }
844 })
845 .await
846 }
847
848 pub async fn get_properties_and_subscribe_to_updates_async(
849 &mut self,
850 ) -> Result<(IoTHubClientPropertyPayloadType, Vec<u8>), IotError> {
851 let (sender, mut receiver) =
852 oneshot::channel::<(IoTHubClientPropertyPayloadType, Vec<u8>)>();
853 let context = Box::into_raw(Box::new(sender)) as *mut c_void;
854
855 let result = unsafe {
856 IoTHubModuleClient_LL_GetPropertiesAndSubscribeToUpdatesAsync(
857 self.handle,
858 Some(Self::c_async_properties_received_callback),
859 context,
860 )
861 };
862
863 if let Err(error) = IotError::check_sdk_result(result) {
864 unsafe {
865 drop(Box::from_raw(
866 context as *mut oneshot::Sender<(IoTHubClientPropertyPayloadType, Vec<u8>)>,
867 ));
868 }
869 return Err(error);
870 }
871
872 poll_fn(|cx| {
873 self.do_work_once()?;
874
875 match receiver.try_recv() {
876 Ok(Some(properties)) => Poll::Ready(Ok(properties)),
877 Ok(None) => {
878 cx.waker().wake_by_ref();
879 Poll::Pending
880 }
881 Err(_) => Poll::Ready(Err(IotError::Sdk(
882 IOTHUB_CLIENT_RESULT_TAG_IOTHUB_CLIENT_ERROR,
883 ))),
884 }
885 })
886 .await
887 }
888
889 pub fn set_option(&self, option: ModuleClientOption<'_>) -> Result<(), IotError> {
890 let name: &CStr = (&option).try_into()?;
891 match option {
892 ModuleClientOption::LogTrace(value)
893 | ModuleClientOption::AutoUrlEncodeDecode(value) => {
894 self.set_bool_option(name, value)?
895 }
896 ModuleClientOption::MessageTimeout(value)
897 | ModuleClientOption::RetryIntervalSec(value)
898 | ModuleClientOption::RetryMaxDelaySecs(value)
899 | ModuleClientOption::SasTokenLifetime(value)
900 | ModuleClientOption::DoWorkFreqMs(value)
901 | ModuleClientOption::KeepAlive(value) => self.set_int_option(name, value)?,
902 ModuleClientOption::ProductInfo(value) | ModuleClientOption::ModelId(value) => {
903 self.set_str_option(name, value)?
904 }
905 }
906
907 Ok(())
908 }
909}
910
911impl<C: ModuleEventCallback> Drop for IotHubModuleClient<C> {
912 fn drop(&mut self) {
913 unsafe {
914 IoTHubModuleClient_LL_Destroy(self.handle);
915 }
916 }
917}