Skip to main content

gg_sdk/
ipc.rs

1// aws-greengrass-component-sdk - Lightweight AWS IoT Greengrass SDK
2// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3// SPDX-License-Identifier: Apache-2.0
4
5use core::{
6    ffi,
7    marker::PhantomData,
8    mem::MaybeUninit,
9    ptr, result, slice, str,
10    sync::atomic::{AtomicBool, Ordering},
11};
12
13use crate::{
14    c,
15    error::{Error, Result},
16    object::{Kv, Map, Object},
17};
18
19static INIT: AtomicBool = AtomicBool::new(false);
20static CONNECTED: AtomicBool = AtomicBool::new(false);
21
22/// AWS IoT Greengrass IPC SDK client.
23#[non_exhaustive]
24#[derive(Debug, Clone, Copy)]
25pub struct Sdk {}
26
27#[derive(Debug, Clone, Copy)]
28pub struct IpcError<'a> {
29    pub error_code: &'a str,
30    pub message: &'a str,
31}
32
33/// MQTT Quality of Service level.
34#[derive(Debug, Clone, Copy, PartialEq, Eq)]
35#[repr(u8)]
36pub enum Qos {
37    /// At most once delivery (QoS 0)
38    AtMostOnce = 0,
39    /// At least once delivery (QoS 1)
40    AtLeastOnce = 1,
41}
42
43/// Component lifecycle state.
44#[derive(Debug, Clone, Copy, PartialEq, Eq)]
45pub enum ComponentState {
46    /// Component is running
47    Running,
48    /// Component encountered an error
49    Errored,
50}
51
52/// Payload received from a topic subscription.
53#[derive(Debug, Clone, Copy)]
54pub enum SubscribeToTopicPayload<'a> {
55    /// JSON payload
56    Json(Map<'a>),
57    /// Binary payload
58    Binary(&'a [u8]),
59}
60
61impl Sdk {
62    /// Initialize the SDK.
63    ///
64    /// Must be called before using any IPC operations.
65    ///
66    /// # Panics
67    /// Panics if called more than once.
68    pub fn init() -> Self {
69        let already_init = INIT.swap(true, Ordering::AcqRel);
70        assert!(!already_init, "Sdk::init() called more than once");
71        unsafe { c::gg_sdk_init() };
72        Self {}
73    }
74
75    /// Connect to the AWS IoT Greengrass Core IPC service.
76    ///
77    /// Uses `SVCUID` and `AWS_GG_NUCLEUS_DOMAIN_SOCKET_FILEPATH_FOR_COMPONENT`
78    /// environment variables set by the Greengrass nucleus.
79    ///
80    /// # Errors
81    /// Returns error if environment variables are missing, connected or connecting, or connection fails.
82    pub fn connect(&self) -> Result<()> {
83        let already_connected = CONNECTED.swap(true, Ordering::AcqRel);
84        if already_connected {
85            return Err(Error::Failure);
86        }
87        Result::from(unsafe { c::ggipc_connect() })
88    }
89
90    /// Connect to the AWS IoT Greengrass Core IPC service with explicit credentials.
91    ///
92    /// # Errors
93    /// Returns error if connected or connecting, or if connection fails.
94    pub fn connect_with_token(
95        &self,
96        socket_path: &str,
97        auth_token: &str,
98    ) -> Result<()> {
99        let already_connected = CONNECTED.swap(true, Ordering::AcqRel);
100        if already_connected {
101            return Err(Error::Failure);
102        }
103
104        Result::from(unsafe {
105            c::ggipc_connect_with_token(socket_path.into(), auth_token.into())
106        })?;
107
108        Ok(())
109    }
110
111    /// Publish a JSON message to a local pub/sub topic.
112    ///
113    /// Sends messages to other Greengrass components subscribed to the topic.
114    /// Requires `aws.greengrass#PublishToTopic` authorization.
115    ///
116    /// See: <https://docs.aws.amazon.com/greengrass/v2/developerguide/ipc-publish-subscribe.html#ipc-operation-publishtotopic>
117    ///
118    /// # Examples
119    ///
120    /// ```no_run
121    /// use gg_sdk::{Sdk, Kv, Object};
122    ///
123    /// let sdk = Sdk::init();
124    /// sdk.connect()?;
125    ///
126    /// let payload = [
127    ///     Kv::new("temperature", Object::f64(72.5)),
128    ///     Kv::new("humidity", Object::i64(45)),
129    /// ];
130    /// sdk.publish_to_topic_json("sensor/data", &payload[..])?;
131    /// # Ok::<(), gg_sdk::Error>(())
132    /// ```
133    ///
134    /// # Errors
135    /// Returns error if publish fails.
136    pub fn publish_to_topic_json(
137        &self,
138        topic: &str,
139        payload: &[Kv<'_>],
140    ) -> Result<()> {
141        Result::from(unsafe {
142            c::ggipc_publish_to_topic_json(topic.into(), payload.into())
143        })
144    }
145
146    /// Publish a binary message to a local pub/sub topic.
147    ///
148    /// Sends messages to other Greengrass components subscribed to the topic.
149    /// Requires `aws.greengrass#PublishToTopic` authorization.
150    ///
151    /// See: <https://docs.aws.amazon.com/greengrass/v2/developerguide/ipc-publish-subscribe.html#ipc-operation-publishtotopic>
152    ///
153    /// # Examples
154    ///
155    /// ```no_run
156    /// use gg_sdk::Sdk;
157    ///
158    /// let sdk = Sdk::init();
159    /// sdk.connect()?;
160    ///
161    /// let data = b"binary payload data";
162    /// sdk.publish_to_topic_binary("sensor/raw", data)?;
163    /// # Ok::<(), gg_sdk::Error>(())
164    /// ```
165    ///
166    /// # Errors
167    /// Returns error if publish fails.
168    pub fn publish_to_topic_binary(
169        &self,
170        topic: &str,
171        payload: &[u8],
172    ) -> Result<()> {
173        Result::from(unsafe {
174            c::ggipc_publish_to_topic_binary(topic.into(), payload.into())
175        })
176    }
177
178    /// Subscribe to messages on a local pub/sub topic.
179    ///
180    /// Receives messages from other Greengrass components publishing to the topic.
181    /// Requires `aws.greengrass#SubscribeToTopic` authorization.
182    ///
183    /// See: <https://docs.aws.amazon.com/greengrass/v2/developerguide/ipc-publish-subscribe.html#ipc-operation-subscribetotopic>
184    ///
185    /// # Errors
186    /// Returns error if subscription fails.
187    pub fn subscribe_to_topic<'a, F: Fn(&str, SubscribeToTopicPayload)>(
188        &self,
189        topic: &str,
190        callback: &'a F,
191    ) -> Result<Subscription<'a, F>> {
192        extern "C" fn trampoline<F: Fn(&str, SubscribeToTopicPayload)>(
193            ctx: *mut ffi::c_void,
194            topic: c::GgBuffer,
195            payload: c::GgObject,
196            _handle: c::GgIpcSubscriptionHandle,
197        ) {
198            let cb = unsafe { &*ctx.cast::<F>() };
199            let topic_str = unsafe {
200                str::from_utf8_unchecked(slice::from_raw_parts(
201                    topic.data, topic.len,
202                ))
203            };
204
205            let unpacked = match unsafe { c::gg_obj_type(payload) } {
206                c::GgObjectType::GG_TYPE_MAP => {
207                    let map = unsafe { c::gg_obj_into_map(payload) };
208                    SubscribeToTopicPayload::Json(Map(unsafe {
209                        slice::from_raw_parts(map.pairs as *const Kv, map.len)
210                    }))
211                }
212                c::GgObjectType::GG_TYPE_BUF => {
213                    let buf = unsafe { c::gg_obj_into_buf(payload) };
214                    SubscribeToTopicPayload::Binary(unsafe {
215                        slice::from_raw_parts(buf.data, buf.len)
216                    })
217                }
218                _ => return,
219            };
220
221            cb(topic_str, unpacked);
222        }
223
224        let ctx = callback as *const F;
225        let mut handle = c::GgIpcSubscriptionHandle { val: 0 };
226
227        Result::from(unsafe {
228            c::ggipc_subscribe_to_topic(
229                topic.into(),
230                Some(trampoline::<F>),
231                ctx.cast::<ffi::c_void>().cast_mut(),
232                &raw mut handle,
233            )
234        })?;
235
236        debug_assert!(handle.val != 0);
237        Ok(Subscription {
238            handle,
239            phantom: PhantomData,
240        })
241    }
242
243    /// Publish an MQTT message to AWS IoT Core.
244    ///
245    /// Sends messages to AWS IoT Core MQTT broker with specified QoS.
246    /// Requires `aws.greengrass#PublishToIoTCore` authorization.
247    ///
248    /// See: <https://docs.aws.amazon.com/greengrass/v2/developerguide/ipc-iot-core-mqtt.html#ipc-operation-publishtoiotcore>
249    ///
250    /// # Examples
251    ///
252    /// ```no_run
253    /// use gg_sdk::{Sdk, Qos};
254    ///
255    /// let sdk = Sdk::init();
256    /// sdk.connect()?;
257    ///
258    /// let payload = b"telemetry data";
259    /// sdk.publish_to_iot_core("device/telemetry", payload, Qos::AtMostOnce)?;
260    /// # Ok::<(), gg_sdk::Error>(())
261    /// ```
262    ///
263    /// # Errors
264    /// Returns error if publish fails.
265    pub fn publish_to_iot_core(
266        &self,
267        topic: &str,
268        payload: &[u8],
269        qos: Qos,
270    ) -> Result<()> {
271        Result::from(unsafe {
272            c::ggipc_publish_to_iot_core(topic.into(), payload.into(), qos as u8)
273        })
274    }
275
276    /// Subscribe to MQTT messages from AWS IoT Core.
277    ///
278    /// Receives messages from AWS IoT Core MQTT broker on matching topics.
279    /// Requires `aws.greengrass#SubscribeToIoTCore` authorization.
280    ///
281    /// See: <https://docs.aws.amazon.com/greengrass/v2/developerguide/ipc-iot-core-mqtt.html#ipc-operation-subscribetoiotcore>
282    ///
283    /// # Errors
284    /// Returns error if subscription fails.
285    pub fn subscribe_to_iot_core<'a, F: Fn(&str, &[u8])>(
286        &self,
287        topic_filter: &str,
288        qos: Qos,
289        callback: &'a F,
290    ) -> Result<Subscription<'a, F>> {
291        extern "C" fn trampoline<F: Fn(&str, &[u8])>(
292            ctx: *mut ffi::c_void,
293            topic: c::GgBuffer,
294            payload: c::GgBuffer,
295            _handle: c::GgIpcSubscriptionHandle,
296        ) {
297            let cb = unsafe { &*ctx.cast::<F>() };
298            let topic_str = unsafe {
299                str::from_utf8_unchecked(slice::from_raw_parts(
300                    topic.data, topic.len,
301                ))
302            };
303            let payload_bytes =
304                unsafe { slice::from_raw_parts(payload.data, payload.len) };
305            cb(topic_str, payload_bytes);
306        }
307
308        let ctx = callback as *const F;
309        let mut handle = c::GgIpcSubscriptionHandle { val: 0 };
310
311        Result::from(unsafe {
312            c::ggipc_subscribe_to_iot_core(
313                topic_filter.into(),
314                qos as u8,
315                Some(trampoline::<F>),
316                ctx.cast::<ffi::c_void>().cast_mut(),
317                &raw mut handle,
318            )
319        })?;
320
321        debug_assert!(handle.val != 0);
322        Ok(Subscription {
323            handle,
324            phantom: PhantomData,
325        })
326    }
327
328    /// Update component state.
329    ///
330    /// Reports component state to the Greengrass nucleus.
331    ///
332    /// See: <https://docs.aws.amazon.com/greengrass/v2/developerguide/ipc-component-lifecycle.html#ipc-operation-updatestate>
333    ///
334    /// # Errors
335    /// Returns error if state update fails.
336    pub fn update_state(&self, state: ComponentState) -> Result<()> {
337        let c_state = match state {
338            ComponentState::Running => {
339                c::GgComponentState::GG_COMPONENT_STATE_RUNNING
340            }
341            ComponentState::Errored => {
342                c::GgComponentState::GG_COMPONENT_STATE_ERRORED
343            }
344        };
345        Result::from(unsafe { c::ggipc_update_state(c_state) })
346    }
347
348    /// Restart a Greengrass component.
349    ///
350    /// Requests the nucleus to restart the specified component.
351    ///
352    /// See: <https://docs.aws.amazon.com/greengrass/v2/developerguide/ipc-local-deployments-components.html#ipc-operation-restartcomponent>
353    ///
354    /// # Examples
355    ///
356    /// ```no_run
357    /// use gg_sdk::Sdk;
358    ///
359    /// let sdk = Sdk::init();
360    /// sdk.connect()?;
361    ///
362    /// sdk.restart_component("com.example.MyComponent")?;
363    /// # Ok::<(), gg_sdk::Error>(())
364    /// ```
365    ///
366    /// # Errors
367    /// Returns error if restart fails.
368    pub fn restart_component(&self, component_name: &str) -> Result<()> {
369        Result::from(unsafe { c::ggipc_restart_component(component_name.into()) })
370    }
371
372    /// Get component configuration value.
373    ///
374    /// Retrieves configuration for the specified key path. Pass empty slice for complete config.
375    /// Requires `aws.greengrass#GetConfiguration` authorization.
376    ///
377    /// See: <https://docs.aws.amazon.com/greengrass/v2/developerguide/ipc-component-configuration.html#ipc-operation-getconfiguration>
378    ///
379    /// # Errors
380    /// Returns error if config retrieval fails.
381    pub fn get_config<'a>(
382        &self,
383        key_path: &[&str],
384        component_name: Option<&str>,
385        result_mem: &'a mut [MaybeUninit<u8>],
386    ) -> Result<Object<'a>> {
387        let mut c_key_path_mem = [MaybeUninit::uninit(); MAX_KEY_PATH_LEN];
388        let c_key_path = key_path_to_buf_list(key_path, &mut c_key_path_mem)?;
389
390        let component_buf = component_name.map(c::GgBuffer::from);
391
392        let mem = c::GgBuffer {
393            data: result_mem.as_mut_ptr().cast::<u8>(),
394            len: result_mem.len(),
395        };
396
397        let mut obj = c::GgObject::default();
398
399        Result::from(unsafe {
400            c::ggipc_get_config(
401                c_key_path,
402                component_buf.as_ref().map_or(ptr::null(), ptr::from_ref),
403                mem,
404                &raw mut obj,
405            )
406        })?;
407
408        Ok(unsafe { ptr::read((&raw const obj).cast()) })
409    }
410
411    /// Get component configuration value as a string.
412    ///
413    /// Alternative API to [`Sdk::get_config`] for string type values.
414    /// Requires `aws.greengrass#GetConfiguration` authorization.
415    ///
416    /// See: <https://docs.aws.amazon.com/greengrass/v2/developerguide/ipc-component-configuration.html#ipc-operation-getconfiguration>
417    ///
418    /// # Errors
419    /// Returns error if config retrieval fails.
420    pub fn get_config_str<'a>(
421        &self,
422        key_path: &[&str],
423        component_name: Option<&str>,
424        result_mem: &'a mut [MaybeUninit<u8>],
425    ) -> Result<&'a str> {
426        let mut c_key_path_mem = [MaybeUninit::uninit(); MAX_KEY_PATH_LEN];
427        let c_key_path = key_path_to_buf_list(key_path, &mut c_key_path_mem)?;
428
429        let component_buf = component_name.map(c::GgBuffer::from);
430
431        let mut value = c::GgBuffer {
432            data: result_mem.as_mut_ptr().cast::<u8>(),
433            len: result_mem.len(),
434        };
435
436        Result::from(unsafe {
437            c::ggipc_get_config_str(
438                c_key_path,
439                component_buf.as_ref().map_or(ptr::null(), ptr::from_ref),
440                &raw mut value,
441            )
442        })?;
443
444        Ok(unsafe {
445            str::from_utf8_unchecked(slice::from_raw_parts(
446                value.data, value.len,
447            ))
448        })
449    }
450
451    /// Update component configuration.
452    ///
453    /// Merges the provided value into the component's configuration at the key path.
454    /// Requires `aws.greengrass#UpdateConfiguration` authorization.
455    ///
456    /// See: <https://docs.aws.amazon.com/greengrass/v2/developerguide/ipc-component-configuration.html#ipc-operation-updateconfiguration>
457    ///
458    /// # Examples
459    ///
460    /// ```no_run
461    /// use gg_sdk::Sdk;
462    ///
463    /// let sdk = Sdk::init();
464    /// sdk.connect()?;
465    ///
466    /// sdk.update_config(&["maxRetries"], None, 100_i64)?;
467    /// # Ok::<(), gg_sdk::Error>(())
468    /// ```
469    ///
470    /// # Errors
471    /// Returns error if config update fails.
472    pub fn update_config<'a>(
473        &self,
474        key_path: &[&str],
475        timestamp: Option<core::time::Duration>,
476        value_to_merge: impl Into<Object<'a>>,
477    ) -> Result<()> {
478        fn inner(
479            key_path: &[&str],
480            timestamp: Option<core::time::Duration>,
481            value_to_merge: Object,
482        ) -> Result<()> {
483            let mut c_key_path_mem = [MaybeUninit::uninit(); MAX_KEY_PATH_LEN];
484            let c_key_path =
485                key_path_to_buf_list(key_path, &mut c_key_path_mem)?;
486
487            #[expect(clippy::cast_possible_wrap)]
488            #[allow(clippy::cast_lossless, clippy::needless_update)]
489            let timespec = timestamp.map(|d| c::timespec {
490                tv_sec: d.as_secs() as _,
491                tv_nsec: d.subsec_nanos() as _,
492                ..c::timespec::default()
493            });
494
495            Result::from(unsafe {
496                c::ggipc_update_config(
497                    c_key_path,
498                    timespec.as_ref().map_or(ptr::null(), ptr::from_ref),
499                    *ptr::from_ref(&value_to_merge).cast::<c::GgObject>(),
500                )
501            })
502        }
503        inner(key_path, timestamp, value_to_merge.into())
504    }
505
506    /// Subscribe to component configuration updates.
507    ///
508    /// Receives notifications when configuration changes for the specified key path.
509    /// Requires `aws.greengrass#SubscribeToConfigurationUpdate` authorization.
510    ///
511    /// See: <https://docs.aws.amazon.com/greengrass/v2/developerguide/ipc-component-configuration.html#ipc-operation-subscribetoconfigurationupdate>
512    ///
513    /// # Errors
514    /// Returns error if subscription fails.
515    pub fn subscribe_to_configuration_update<'a, F: Fn(&str, &[&str])>(
516        &self,
517        component_name: Option<&str>,
518        key_path: &[&str],
519        callback: &'a F,
520    ) -> Result<Subscription<'a, F>> {
521        extern "C" fn trampoline<F: Fn(&str, &[&str])>(
522            ctx: *mut ffi::c_void,
523            component_name: c::GgBuffer,
524            key_path: c::GgList,
525            _handle: c::GgIpcSubscriptionHandle,
526        ) {
527            let cb = unsafe { &*ctx.cast::<F>() };
528            let component_str = unsafe {
529                str::from_utf8_unchecked(slice::from_raw_parts(
530                    component_name.data,
531                    component_name.len,
532                ))
533            };
534            let path_objs =
535                unsafe { slice::from_raw_parts(key_path.items, key_path.len) };
536
537            let mut path_strs_mem = [MaybeUninit::uninit(); MAX_KEY_PATH_LEN];
538            for (i, obj) in path_objs.iter().enumerate() {
539                let buf = unsafe { c::gg_obj_into_buf(*obj) };
540                let s = unsafe {
541                    str::from_utf8_unchecked(slice::from_raw_parts(
542                        buf.data, buf.len,
543                    ))
544                };
545                path_strs_mem[i].write(s);
546            }
547            let path_strs = unsafe {
548                slice::from_raw_parts(
549                    path_strs_mem.as_ptr().cast::<&str>(),
550                    path_objs.len(),
551                )
552            };
553
554            cb(component_str, path_strs);
555        }
556
557        let mut c_key_path_mem = [MaybeUninit::uninit(); MAX_KEY_PATH_LEN];
558        let c_key_path = key_path_to_buf_list(key_path, &mut c_key_path_mem)?;
559
560        let component_buf = component_name.map(c::GgBuffer::from);
561
562        let ctx = callback as *const F;
563        let mut handle = c::GgIpcSubscriptionHandle { val: 0 };
564
565        Result::from(unsafe {
566            c::ggipc_subscribe_to_configuration_update(
567                component_buf.as_ref().map_or(ptr::null(), ptr::from_ref),
568                c_key_path,
569                Some(trampoline::<F>),
570                ctx.cast::<ffi::c_void>().cast_mut(),
571                &raw mut handle,
572            )
573        })?;
574
575        debug_assert!(handle.val != 0);
576        Ok(Subscription {
577            handle,
578            phantom: PhantomData,
579        })
580    }
581
582    /// Get the shadow for a thing.
583    ///
584    /// Retrieves the shadow document for the specified thing and shadow name.
585    /// Pass `None` for `shadow_name` to use the classic shadow.
586    /// `result_mem` must be large enough to hold the decoded shadow document.
587    /// Requires `aws.greengrass#GetThingShadow` authorization.
588    ///
589    /// See: <https://docs.aws.amazon.com/greengrass/v2/developerguide/ipc-local-shadows.html#ipc-operation-getthingshadow>
590    ///
591    /// # Errors
592    /// Returns error if the shadow retrieval fails.
593    pub fn get_thing_shadow<'a>(
594        &self,
595        thing_name: &str,
596        shadow_name: Option<&str>,
597        result_mem: &'a mut [MaybeUninit<u8>],
598    ) -> Result<&'a [u8]> {
599        let shadow_buf = shadow_name.map(c::GgBuffer::from);
600
601        let mut payload = c::GgBuffer {
602            data: result_mem.as_mut_ptr().cast::<u8>(),
603            len: result_mem.len(),
604        };
605
606        Result::from(unsafe {
607            c::ggipc_get_thing_shadow(
608                thing_name.into(),
609                shadow_buf.as_ref().map_or(ptr::null(), ptr::from_ref),
610                &raw mut payload,
611            )
612        })?;
613
614        Ok(unsafe { slice::from_raw_parts(payload.data, payload.len) })
615    }
616
617    /// Update the shadow for a thing.
618    ///
619    /// Updates the shadow document for the specified thing and shadow name.
620    /// Pass `None` for `shadow_name` to use the classic shadow.
621    /// Pass `Some` buffer for `response_mem` to receive the response payload,
622    /// or `None` to ignore it.
623    /// Requires `aws.greengrass#UpdateThingShadow` authorization.
624    ///
625    /// See: <https://docs.aws.amazon.com/greengrass/v2/developerguide/ipc-local-shadows.html#ipc-operation-updatethingshadow>
626    ///
627    /// # Errors
628    /// Returns error if the shadow update fails.
629    #[expect(clippy::needless_pass_by_value)]
630    pub fn update_thing_shadow<'a>(
631        &self,
632        thing_name: &str,
633        shadow_name: Option<&str>,
634        payload: &[u8],
635        response_mem: Option<&'a mut [MaybeUninit<u8>]>,
636    ) -> Result<Option<&'a [u8]>> {
637        let shadow_buf = shadow_name.map(c::GgBuffer::from);
638
639        let mut response = response_mem.as_ref().map(|mem| c::GgBuffer {
640            data: mem.as_ptr() as *mut u8,
641            len: mem.len(),
642        });
643
644        Result::from(unsafe {
645            c::ggipc_update_thing_shadow(
646                thing_name.into(),
647                shadow_buf.as_ref().map_or(ptr::null(), ptr::from_ref),
648                payload.into(),
649                response
650                    .as_mut()
651                    .map_or(ptr::null_mut(), ptr::from_mut),
652            )
653        })?;
654
655        Ok(response
656            .map(|r| unsafe { slice::from_raw_parts(r.data, r.len) }))
657    }
658
659    /// Delete the shadow for a thing.
660    ///
661    /// Deletes the shadow document for the specified thing and shadow name.
662    /// Pass `None` for `shadow_name` to use the classic shadow.
663    /// Requires `aws.greengrass#DeleteThingShadow` authorization.
664    ///
665    /// See: <https://docs.aws.amazon.com/greengrass/v2/developerguide/ipc-local-shadows.html#ipc-operation-deletethingshadow>
666    ///
667    /// # Errors
668    /// Returns error if the shadow deletion fails.
669    pub fn delete_thing_shadow(
670        &self,
671        thing_name: &str,
672        shadow_name: Option<&str>,
673    ) -> Result<()> {
674        let shadow_buf = shadow_name.map(c::GgBuffer::from);
675
676        Result::from(unsafe {
677            c::ggipc_delete_thing_shadow(
678                thing_name.into(),
679                shadow_buf.as_ref().map_or(ptr::null(), ptr::from_ref),
680            )
681        })
682    }
683
684    /// List named shadows for a thing.
685    ///
686    /// Lists all named shadows for the specified thing, handling pagination
687    /// internally. The callback is invoked once per shadow name.
688    /// Requires `aws.greengrass#ListNamedShadowsForThing` authorization.
689    ///
690    /// See: <https://docs.aws.amazon.com/greengrass/v2/developerguide/ipc-local-shadows.html#ipc-operation-listnamedshadowsforthing>
691    ///
692    /// # Errors
693    /// Returns error if listing fails.
694    pub fn list_named_shadows_for_thing<F: FnMut(&str)>(
695        &self,
696        thing_name: &str,
697        callback: &mut F,
698    ) -> Result<()> {
699        extern "C" fn trampoline<F: FnMut(&str)>(
700            ctx: *mut ffi::c_void,
701            shadow_name: c::GgBuffer,
702        ) {
703            let cb = unsafe { &mut *ctx.cast::<F>() };
704            let name = unsafe {
705                str::from_utf8_unchecked(slice::from_raw_parts(
706                    shadow_name.data,
707                    shadow_name.len,
708                ))
709            };
710            cb(name);
711        }
712
713        let ctx = callback as *mut F;
714
715        Result::from(unsafe {
716            c::ggipc_list_named_shadows_for_thing(
717                thing_name.into(),
718                Some(trampoline::<F>),
719                ctx.cast::<ffi::c_void>(),
720            )
721        })
722    }
723
724    /// Make a generic IPC call.
725    ///
726    /// Low-level interface for invoking IPC operations not covered by specific methods.
727    ///
728    /// # Errors
729    /// Returns error if IPC call fails.
730    pub fn call<
731        'a,
732        'b,
733        F: FnOnce(result::Result<&'b [Kv<'b>], IpcError<'b>>) -> Result<()>,
734    >(
735        &self,
736        operation: &str,
737        service_model_type: &str,
738        params: &[Kv<'a>],
739        mut callback: F,
740    ) -> Result<()> {
741        extern "C" fn result_trampoline<
742            'b,
743            F: FnOnce(result::Result<&'b [Kv<'b>], IpcError<'b>>) -> Result<()>,
744        >(
745            ctx: *mut ffi::c_void,
746            result: c::GgMap,
747        ) -> c::GgError {
748            let cb = unsafe { ctx.cast::<F>().read() };
749            let result_slice = unsafe {
750                slice::from_raw_parts(result.pairs as *const Kv, result.len)
751            };
752            cb(Ok(result_slice)).into()
753        }
754
755        extern "C" fn error_trampoline<
756            'b,
757            F: FnOnce(result::Result<&'b [Kv<'b>], IpcError<'b>>) -> Result<()>,
758        >(
759            ctx: *mut ffi::c_void,
760            error_code: c::GgBuffer,
761            message: c::GgBuffer,
762        ) -> c::GgError {
763            let cb = unsafe { ctx.cast::<F>().read() };
764            let code = unsafe {
765                str::from_utf8_unchecked(slice::from_raw_parts(
766                    error_code.data,
767                    error_code.len,
768                ))
769            };
770            let msg = unsafe {
771                str::from_utf8_unchecked(slice::from_raw_parts(
772                    message.data,
773                    message.len,
774                ))
775            };
776            cb(Err(IpcError {
777                error_code: code,
778                message: msg,
779            }))
780            .into()
781        }
782
783        Result::from(unsafe {
784            c::ggipc_call(
785                operation.into(),
786                service_model_type.into(),
787                params.into(),
788                Some(result_trampoline::<F>),
789                Some(error_trampoline::<F>),
790                (&raw mut callback).cast::<ffi::c_void>(),
791            )
792        })
793    }
794
795    /// Subscribe to a generic IPC stream.
796    ///
797    /// Low-level interface for subscribing to IPC operations not covered by specific methods.
798    ///
799    /// # Errors
800    /// Returns error if subscription fails.
801    pub fn subscribe<
802        'a,
803        'b,
804        'c,
805        F: FnOnce(result::Result<&'b [Kv<'b>], IpcError<'b>>) -> Result<()>,
806        G: Fn(usize, &str, &'b [Kv<'b>]) -> Result<()>,
807    >(
808        &self,
809        operation: &str,
810        service_model_type: &str,
811        params: &[Kv<'a>],
812        mut response_callback: F,
813        sub_callback: &'c G,
814        aux_ctx: usize,
815    ) -> Result<Subscription<'c, G>> {
816        extern "C" fn result_trampoline<
817            'b,
818            F: FnOnce(result::Result<&'b [Kv<'b>], IpcError<'b>>) -> Result<()>,
819        >(
820            ctx: *mut ffi::c_void,
821            result: c::GgMap,
822        ) -> c::GgError {
823            let cb = unsafe { ctx.cast::<F>().read() };
824            let result_slice = unsafe {
825                slice::from_raw_parts(result.pairs as *const Kv, result.len)
826            };
827            cb(Ok(result_slice)).into()
828        }
829
830        extern "C" fn error_trampoline<
831            'b,
832            F: FnOnce(result::Result<&'b [Kv<'b>], IpcError<'b>>) -> Result<()>,
833        >(
834            ctx: *mut ffi::c_void,
835            error_code: c::GgBuffer,
836            message: c::GgBuffer,
837        ) -> c::GgError {
838            let cb = unsafe { ctx.cast::<F>().read() };
839            let code = unsafe {
840                str::from_utf8_unchecked(slice::from_raw_parts(
841                    error_code.data,
842                    error_code.len,
843                ))
844            };
845            let msg = unsafe {
846                str::from_utf8_unchecked(slice::from_raw_parts(
847                    message.data,
848                    message.len,
849                ))
850            };
851            cb(Err(IpcError {
852                error_code: code,
853                message: msg,
854            }))
855            .into()
856        }
857
858        extern "C" fn sub_trampoline<
859            'b,
860            G: Fn(usize, &str, &'b [Kv<'b>]) -> Result<()>,
861        >(
862            ctx: *mut ffi::c_void,
863            aux_ctx: *mut ffi::c_void,
864            _handle: c::GgIpcSubscriptionHandle,
865            service_model_type: c::GgBuffer,
866            data: c::GgMap,
867        ) -> c::GgError {
868            let cb = unsafe { &*ctx.cast::<G>() };
869            let aux = aux_ctx as usize;
870            let smt = unsafe {
871                str::from_utf8_unchecked(slice::from_raw_parts(
872                    service_model_type.data,
873                    service_model_type.len,
874                ))
875            };
876            let map = unsafe {
877                slice::from_raw_parts(data.pairs.cast::<Kv>(), data.len)
878            };
879            cb(aux, smt, map).into()
880        }
881
882        let mut handle = c::GgIpcSubscriptionHandle { val: 0 };
883        let ctx = sub_callback as *const G;
884
885        Result::from(unsafe {
886            c::ggipc_subscribe(
887                operation.into(),
888                service_model_type.into(),
889                params.into(),
890                Some(result_trampoline::<F>),
891                Some(error_trampoline::<F>),
892                (&raw mut response_callback).cast::<ffi::c_void>(),
893                Some(sub_trampoline::<'b, G>),
894                ctx.cast::<ffi::c_void>().cast_mut(),
895                aux_ctx as *mut ffi::c_void,
896                &raw mut handle,
897            )
898        })?;
899
900        Ok(Subscription {
901            handle,
902            phantom: PhantomData,
903        })
904    }
905}
906
907/// Handle for an active IPC subscription.
908#[derive(Debug)]
909pub struct Subscription<'a, T> {
910    handle: c::GgIpcSubscriptionHandle,
911    phantom: PhantomData<&'a T>,
912}
913
914impl<T> Drop for Subscription<'_, T> {
915    fn drop(&mut self) {
916        if self.handle.val != 0 {
917            unsafe { c::ggipc_close_subscription(self.handle) };
918        }
919    }
920}
921
922impl<T> Default for Subscription<'_, T> {
923    fn default() -> Self {
924        Self {
925            handle: c::GgIpcSubscriptionHandle { val: 0 },
926            phantom: PhantomData,
927        }
928    }
929}
930
931const MAX_KEY_PATH_LEN: usize = (c::GG_MAX_OBJECT_DEPTH - 1) as usize;
932
933fn key_path_to_buf_list(
934    key_path: &[&str],
935    bufs: &mut [MaybeUninit<c::GgBuffer>; MAX_KEY_PATH_LEN],
936) -> Result<c::GgBufList> {
937    if key_path.len() > MAX_KEY_PATH_LEN {
938        return Err(Error::Range);
939    }
940    for (i, k) in key_path.iter().enumerate() {
941      bufs[i].write((*k).into());
942    }
943    Ok(c::GgBufList {
944        bufs: bufs.as_mut_ptr().cast(),
945        len: key_path.len(),
946    })
947}
948
949#[cfg(test)]
950mod test {
951    use std::sync::{Condvar, Mutex};
952
953    use super::{Qos, Sdk};
954    use crate::c;
955    use crate::error::*;
956
957    static TEST_MUTEX: std::sync::Mutex<()> = std::sync::Mutex::new(());
958
959    pub(crate) fn run_ipc_handshake_test<F: FnOnce() -> Result<()>>(
960        test_body: F,
961    ) -> Result<()> {
962        unsafe {
963            Result::from(c::gg_test_setup_ipc(
964                c"/tmp/gg-test".as_ptr(),
965                0o666,
966                c"1234567890ABCDEF".as_ptr(),
967            ))?;
968
969            let pid = libc::fork();
970            assert!(pid >= 0, "fork failed");
971
972            if pid == 0 {
973                test_body().unwrap();
974                libc::exit(0);
975            }
976
977            Result::from(c::gg_test_accept_client_handshake(5))?;
978
979            Result::from(c::gg_test_wait_for_client_disconnect(30))?;
980
981            c::gg_test_close();
982
983            let mut status = 0;
984            libc::waitpid(pid, &mut status, 0);
985
986            assert_eq!(libc::WIFEXITED(status), true);
987            assert_eq!(libc::WEXITSTATUS(status), 0);
988        };
989
990        Ok(())
991    }
992
993    pub(crate) fn run_ipc_sequence_test<F: FnOnce() -> Result<()>>(
994        packet_sequence: c::GgipcPacketSequence,
995        test_body: F,
996    ) -> Result<()> {
997        unsafe {
998            Result::from(c::gg_test_setup_ipc(
999                c"/tmp/gg-test".as_ptr(),
1000                0o666,
1001                c"1234567890ABCDEF".as_ptr(),
1002            ))?;
1003
1004            let pid = libc::fork();
1005            assert!(pid >= 0, "fork failed");
1006
1007            if pid == 0 {
1008                test_body().unwrap();
1009                libc::exit(0);
1010            }
1011
1012            Result::from(c::gg_test_connect_request_disconnect_sequence(
1013                packet_sequence,
1014                10,
1015            ))?;
1016
1017            c::gg_test_close();
1018
1019            let mut status = 0;
1020            libc::waitpid(pid, &mut status, 0);
1021
1022            assert_eq!(libc::WIFEXITED(status), true);
1023            assert_eq!(libc::WEXITSTATUS(status), 0);
1024        };
1025
1026        Ok(())
1027    }
1028
1029    fn get_test_socket_path() -> &'static str {
1030        unsafe {
1031            let buf = c::gg_test_get_socket_path();
1032            core::str::from_utf8_unchecked(core::slice::from_raw_parts(
1033                buf.data, buf.len,
1034            ))
1035        }
1036    }
1037
1038    fn get_test_auth_token() -> &'static str {
1039        unsafe {
1040            let buf = c::gg_test_get_auth_token();
1041            core::str::from_utf8_unchecked(core::slice::from_raw_parts(
1042                buf.data, buf.len,
1043            ))
1044        }
1045    }
1046
1047    #[test]
1048    fn test_connect_okay() -> Result<()> {
1049        let _guard = TEST_MUTEX.lock().unwrap_or_else(|e| e.into_inner());
1050        run_ipc_handshake_test(|| {
1051            let sdk = Sdk::init();
1052            sdk.connect()
1053        })
1054    }
1055
1056    #[test]
1057    fn test_connect_with_token_okay() -> Result<()> {
1058        let _guard = TEST_MUTEX.lock().unwrap_or_else(|e| e.into_inner());
1059        run_ipc_handshake_test(|| unsafe {
1060            // Unset env vars to force explicit token usage
1061            libc::unsetenv(c"SVCUID".as_ptr());
1062            libc::unsetenv(
1063                c"AWS_GG_NUCLEUS_DOMAIN_SOCKET_FILEPATH_FOR_COMPONENT".as_ptr(),
1064            );
1065
1066            let sdk = Sdk::init();
1067            sdk.connect_with_token(
1068                get_test_socket_path(),
1069                get_test_auth_token(),
1070            )
1071        })
1072    }
1073
1074    #[test]
1075    fn test_publish_to_iot_core_okay() -> Result<()> {
1076        let _guard = TEST_MUTEX.lock().unwrap_or_else(|e| e.into_inner());
1077        let topic = "my/topic";
1078        let payload_base64 = "SGVsbG8gd29ybGQh";
1079        let qos = "0";
1080        let seq = unsafe {
1081            c::gg_test_mqtt_publish_accepted_sequence(
1082                1,
1083                topic.into(),
1084                payload_base64.into(),
1085                qos.into(),
1086            )
1087        };
1088        run_ipc_sequence_test(seq, || {
1089            let sdk = Sdk::init();
1090            sdk.connect()?;
1091            sdk.publish_to_iot_core(
1092                "my/topic",
1093                b"Hello world!",
1094                Qos::AtMostOnce,
1095            )
1096        })
1097    }
1098
1099    #[test]
1100    fn test_publish_to_iot_core_bad_alloc() -> Result<()> {
1101        let _guard = TEST_MUTEX.lock().unwrap_or_else(|e| e.into_inner());
1102        run_ipc_handshake_test(|| {
1103            let sdk = Sdk::init();
1104            sdk.connect()?;
1105            let payload = [0u8; 0x20000];
1106            assert_eq!(
1107                sdk.publish_to_iot_core("my/topic", &payload, Qos::AtMostOnce),
1108                Err(Error::Nomem),
1109            );
1110            Ok(())
1111        })
1112    }
1113
1114    #[test]
1115    fn test_publish_to_iot_core_rejected() -> Result<()> {
1116        let _guard = TEST_MUTEX.lock().unwrap_or_else(|e| e.into_inner());
1117        let topic = "my/topic";
1118        let payload_base64 = "SGVsbG8gd29ybGQh";
1119        let qos = "0";
1120        let seq = unsafe {
1121            c::gg_test_mqtt_publish_error_sequence(
1122                1,
1123                topic.into(),
1124                payload_base64.into(),
1125                qos.into(),
1126            )
1127        };
1128        run_ipc_sequence_test(seq, || {
1129            let sdk = Sdk::init();
1130            sdk.connect()?;
1131            assert!(
1132                sdk.publish_to_iot_core(
1133                    "my/topic",
1134                    b"Hello world!",
1135                    Qos::AtMostOnce
1136                )
1137                .is_err()
1138            );
1139            Ok(())
1140        })
1141    }
1142
1143    // NOTE: publish_to_iot_core_invalid_qos cannot be ported; the Rust Qos
1144    // enum prevents invalid values at compile time.
1145
1146    #[test]
1147    fn test_get_thing_shadow_okay() -> Result<()> {
1148        let _guard = TEST_MUTEX.lock().unwrap_or_else(|e| e.into_inner());
1149        let thing_name = "MyThing";
1150        let shadow_name = "myShadow";
1151        let payload = "hello";
1152        let payload_b64 = "aGVsbG8=";
1153        let seq = unsafe {
1154            c::gg_test_shadow_get_accepted_sequence(
1155                1,
1156                thing_name.into(),
1157                shadow_name.into(),
1158                payload_b64.into(),
1159            )
1160        };
1161        run_ipc_sequence_test(seq, || {
1162            let sdk = Sdk::init();
1163            sdk.connect()?;
1164            let mut buf = [std::mem::MaybeUninit::uninit(); 64];
1165            let result =
1166                sdk.get_thing_shadow(thing_name, Some(shadow_name), &mut buf)?;
1167            assert_eq!(result, payload.as_bytes());
1168            Ok(())
1169        })
1170    }
1171
1172    #[test]
1173    fn test_get_thing_shadow_rejected() -> Result<()> {
1174        let _guard = TEST_MUTEX.lock().unwrap_or_else(|e| e.into_inner());
1175        let thing_name = "MyThing";
1176        let shadow_name = "myShadow";
1177        let seq = unsafe {
1178            c::gg_test_shadow_get_error_sequence(
1179                1,
1180                thing_name.into(),
1181                shadow_name.into(),
1182            )
1183        };
1184        run_ipc_sequence_test(seq, || {
1185            let sdk = Sdk::init();
1186            sdk.connect()?;
1187            let mut buf = [std::mem::MaybeUninit::uninit(); 64];
1188            assert!(sdk
1189                .get_thing_shadow(thing_name, Some(shadow_name), &mut buf)
1190                .is_err());
1191            Ok(())
1192        })
1193    }
1194
1195    #[test]
1196    fn test_update_thing_shadow_okay() -> Result<()> {
1197        let _guard = TEST_MUTEX.lock().unwrap_or_else(|e| e.into_inner());
1198        let thing_name = "MyThing";
1199        let shadow_name = "myShadow";
1200        let payload = b"hello";
1201        let payload_b64 = "aGVsbG8=";
1202        let seq = unsafe {
1203            c::gg_test_shadow_update_accepted_sequence(
1204                1,
1205                thing_name.into(),
1206                shadow_name.into(),
1207                payload_b64.into(),
1208                payload_b64.into(),
1209            )
1210        };
1211        run_ipc_sequence_test(seq, || {
1212            let sdk = Sdk::init();
1213            sdk.connect()?;
1214            sdk.update_thing_shadow(thing_name, Some(shadow_name), payload, None)?;
1215            Ok(())
1216        })
1217    }
1218
1219    #[test]
1220    fn test_update_thing_shadow_rejected() -> Result<()> {
1221        let _guard = TEST_MUTEX.lock().unwrap_or_else(|e| e.into_inner());
1222        let thing_name = "MyThing";
1223        let shadow_name = "myShadow";
1224        let payload = b"hello";
1225        let payload_b64 = "aGVsbG8=";
1226        let seq = unsafe {
1227            c::gg_test_shadow_update_error_sequence(
1228                1,
1229                thing_name.into(),
1230                shadow_name.into(),
1231                payload_b64.into(),
1232            )
1233        };
1234        run_ipc_sequence_test(seq, || {
1235            let sdk = Sdk::init();
1236            sdk.connect()?;
1237            assert!(sdk
1238                .update_thing_shadow(thing_name, Some(shadow_name), payload, None)
1239                .is_err());
1240            Ok(())
1241        })
1242    }
1243
1244    #[test]
1245    fn test_delete_thing_shadow_okay() -> Result<()> {
1246        let _guard = TEST_MUTEX.lock().unwrap_or_else(|e| e.into_inner());
1247        let thing_name = "MyThing";
1248        let shadow_name = "myShadow";
1249        let seq = unsafe {
1250            c::gg_test_shadow_delete_accepted_sequence(
1251                1,
1252                thing_name.into(),
1253                shadow_name.into(),
1254                "".into(),
1255            )
1256        };
1257        run_ipc_sequence_test(seq, || {
1258            let sdk = Sdk::init();
1259            sdk.connect()?;
1260            sdk.delete_thing_shadow(thing_name, Some(shadow_name))?;
1261            Ok(())
1262        })
1263    }
1264
1265    #[test]
1266    fn test_delete_thing_shadow_rejected() -> Result<()> {
1267        let _guard = TEST_MUTEX.lock().unwrap_or_else(|e| e.into_inner());
1268        let thing_name = "MyThing";
1269        let shadow_name = "myShadow";
1270        let seq = unsafe {
1271            c::gg_test_shadow_delete_error_sequence(
1272                1,
1273                thing_name.into(),
1274                shadow_name.into(),
1275            )
1276        };
1277        run_ipc_sequence_test(seq, || {
1278            let sdk = Sdk::init();
1279            sdk.connect()?;
1280            assert!(sdk
1281                .delete_thing_shadow(thing_name, Some(shadow_name))
1282                .is_err());
1283            Ok(())
1284        })
1285    }
1286
1287    #[test]
1288    fn test_list_named_shadows_okay() -> Result<()> {
1289        let thing_name = "MyThing";
1290        let shadow_name = "myShadow";
1291        let timestamp: f64 = 1773436831.0;
1292
1293        let _guard = TEST_MUTEX.lock().unwrap_or_else(|e| e.into_inner());
1294        unsafe {
1295            Result::from(c::gg_test_setup_ipc(
1296                c"/tmp/gg-test".as_ptr(),
1297                0o666,
1298                c"1234567890ABCDEF".as_ptr(),
1299            ))?;
1300
1301            let pid = libc::fork();
1302            assert!(pid >= 0, "fork failed");
1303
1304            if pid == 0 {
1305                let sdk = Sdk::init();
1306                sdk.connect().unwrap();
1307                let mut count = 0usize;
1308                sdk.list_named_shadows_for_thing(
1309                    thing_name,
1310                    &mut |name: &str| {
1311                        assert_eq!(name, "myShadow");
1312                        count += 1;
1313                    },
1314                )
1315                .unwrap();
1316                assert_eq!(count, 1);
1317                libc::exit(0);
1318            }
1319
1320            let mut result_item = c::gg_obj_buf(shadow_name.into());
1321            let results = c::GgList {
1322                items: &mut result_item,
1323                len: 1,
1324            };
1325
1326            Result::from(c::gg_test_accept_client_handshake(5))?;
1327
1328            Result::from(c::gg_test_expect_packet_sequence(
1329                c::gg_test_shadow_list_accepted_sequence(
1330                    1,
1331                    thing_name.into(),
1332                    core::ptr::null_mut(),
1333                    results,
1334                    timestamp,
1335                    core::ptr::null_mut(),
1336                ),
1337                5,
1338            ))?;
1339
1340            Result::from(c::gg_test_wait_for_client_disconnect(30))?;
1341
1342            c::gg_test_close();
1343
1344            let mut status = 0;
1345            libc::waitpid(pid, &mut status, 0);
1346            assert!(libc::WIFEXITED(status));
1347            assert_eq!(libc::WEXITSTATUS(status), 0);
1348        }
1349
1350        Ok(())
1351    }
1352
1353    #[test]
1354    fn test_list_named_shadows_rejected() -> Result<()> {
1355        let _guard = TEST_MUTEX.lock().unwrap_or_else(|e| e.into_inner());
1356        let thing_name = "MyThing";
1357        let seq = unsafe {
1358            c::gg_test_shadow_list_error_sequence(1, thing_name.into())
1359        };
1360        run_ipc_sequence_test(seq, || {
1361            let sdk = Sdk::init();
1362            sdk.connect()?;
1363            assert!(sdk
1364                .list_named_shadows_for_thing(thing_name, &mut |_: &str| {})
1365                .is_err());
1366            Ok(())
1367        })
1368    }
1369
1370    #[test]
1371    fn test_list_named_shadows_paginated_okay() -> Result<()> {
1372        let thing_name = "MyThing";
1373        let timestamp: f64 = 1773436831.0;
1374
1375        let _guard = TEST_MUTEX.lock().unwrap_or_else(|e| e.into_inner());
1376        unsafe {
1377            Result::from(c::gg_test_setup_ipc(
1378                c"/tmp/gg-test".as_ptr(),
1379                0o666,
1380                c"1234567890ABCDEF".as_ptr(),
1381            ))?;
1382
1383            let pid = libc::fork();
1384            assert!(pid >= 0, "fork failed");
1385
1386            if pid == 0 {
1387                let sdk = Sdk::init();
1388                sdk.connect().unwrap();
1389                let mut count = 0usize;
1390                let expected = ["shadow1", "shadow2"];
1391                sdk.list_named_shadows_for_thing(
1392                    thing_name,
1393                    &mut |name: &str| {
1394                        assert_eq!(name, expected[count]);
1395                        count += 1;
1396                    },
1397                )
1398                .unwrap();
1399                assert_eq!(count, 2);
1400                libc::exit(0);
1401            }
1402
1403            let mut next_token1: c::GgBuffer = "token123".into();
1404            let mut next_token2: c::GgBuffer = "token456".into();
1405
1406            let mut page1_item = c::gg_obj_buf("shadow1".into());
1407            let page1 = c::GgList {
1408                items: &mut page1_item,
1409                len: 1,
1410            };
1411
1412            let mut page2_item = c::gg_obj_buf("shadow2".into());
1413            let page2 = c::GgList {
1414                items: &mut page2_item,
1415                len: 1,
1416            };
1417
1418            let empty = c::GgList {
1419                items: core::ptr::null_mut(),
1420                len: 0,
1421            };
1422
1423            Result::from(c::gg_test_accept_client_handshake(5))?;
1424
1425            Result::from(c::gg_test_expect_packet_sequence(
1426                c::gg_test_shadow_list_accepted_sequence(
1427                    1,
1428                    thing_name.into(),
1429                    core::ptr::null_mut(),
1430                    page1,
1431                    timestamp,
1432                    &mut next_token1,
1433                ),
1434                5,
1435            ))?;
1436
1437            Result::from(c::gg_test_expect_packet_sequence(
1438                c::gg_test_shadow_list_accepted_sequence(
1439                    2,
1440                    thing_name.into(),
1441                    &mut next_token1,
1442                    page2,
1443                    timestamp,
1444                    &mut next_token2,
1445                ),
1446                5,
1447            ))?;
1448
1449            Result::from(c::gg_test_expect_packet_sequence(
1450                c::gg_test_shadow_list_accepted_sequence(
1451                    3,
1452                    thing_name.into(),
1453                    &mut next_token2,
1454                    empty,
1455                    timestamp,
1456                    core::ptr::null_mut(),
1457                ),
1458                5,
1459            ))?;
1460
1461            Result::from(c::gg_test_wait_for_client_disconnect(30))?;
1462
1463            c::gg_test_close();
1464
1465            let mut status = 0;
1466            libc::waitpid(pid, &mut status, 0);
1467            assert!(libc::WIFEXITED(status));
1468            assert_eq!(libc::WEXITSTATUS(status), 0);
1469        }
1470
1471        Ok(())
1472    }
1473
1474    #[test]
1475    fn test_subscribe_to_iot_core_okay() -> Result<()> {
1476        let _guard = TEST_MUTEX.lock().unwrap_or_else(|e| e.into_inner());
1477        let topic = "my/topic";
1478        let payload_base64 = "SGVsbG8gd29ybGQh";
1479        let qos = "0";
1480        let expected_calls: usize = 3;
1481        let seq = unsafe {
1482            c::gg_test_mqtt_subscribe_accepted_sequence(
1483                1,
1484                topic.into(),
1485                payload_base64.into(),
1486                qos.into(),
1487                expected_calls,
1488            )
1489        };
1490        run_ipc_sequence_test(seq, || {
1491            let sdk = Sdk::init();
1492            sdk.connect()?;
1493
1494            let pair = (Mutex::new(0usize), Condvar::new());
1495            let cb = |t: &str, p: &[u8]| {
1496                assert_eq!(t, "my/topic");
1497                assert_eq!(p, b"Hello world!");
1498                let mut count = pair.0.lock().unwrap();
1499                *count += 1;
1500                if *count >= expected_calls {
1501                    pair.1.notify_one();
1502                }
1503            };
1504            let sub =
1505                sdk.subscribe_to_iot_core("my/topic", Qos::AtMostOnce, &cb)?;
1506
1507            let guard = pair.0.lock().unwrap();
1508            let (guard, timeout) = pair
1509                .1
1510                .wait_timeout_while(
1511                    guard,
1512                    std::time::Duration::from_secs(5),
1513                    |count| *count < expected_calls,
1514                )
1515                .unwrap();
1516            assert!(
1517                !timeout.timed_out(),
1518                "Timed out waiting for subscription responses"
1519            );
1520            assert_eq!(*guard, expected_calls);
1521            std::mem::forget(sub);
1522            Ok(())
1523        })
1524    }
1525}