1use core::{
6 ffi,
7 marker::PhantomData,
8 mem::MaybeUninit,
9 ptr, result, slice, str,
10 sync::atomic::{AtomicBool, Ordering},
11};
12
13use crate::{
14 c,
15 error::{Error, Result},
16 object::{Kv, Map, Object},
17};
18
19static INIT: AtomicBool = AtomicBool::new(false);
20static CONNECTED: AtomicBool = AtomicBool::new(false);
21
22#[non_exhaustive]
24#[derive(Debug, Clone, Copy)]
25pub struct Sdk {}
26
27#[derive(Debug, Clone, Copy)]
28pub struct IpcError<'a> {
29 pub error_code: &'a str,
30 pub message: &'a str,
31}
32
33#[derive(Debug, Clone, Copy, PartialEq, Eq)]
35#[repr(u8)]
36pub enum Qos {
37 AtMostOnce = 0,
39 AtLeastOnce = 1,
41}
42
43#[derive(Debug, Clone, Copy, PartialEq, Eq)]
45pub enum ComponentState {
46 Running,
48 Errored,
50}
51
52#[derive(Debug, Clone, Copy)]
54pub enum SubscribeToTopicPayload<'a> {
55 Json(Map<'a>),
57 Binary(&'a [u8]),
59}
60
61impl Sdk {
62 pub fn init() -> Self {
69 let already_init = INIT.swap(true, Ordering::AcqRel);
70 assert!(!already_init, "Sdk::init() called more than once");
71 unsafe { c::gg_sdk_init() };
72 Self {}
73 }
74
75 pub fn connect(&self) -> Result<()> {
83 let already_connected = CONNECTED.swap(true, Ordering::AcqRel);
84 if already_connected {
85 return Err(Error::Failure);
86 }
87 Result::from(unsafe { c::ggipc_connect() })
88 }
89
90 pub fn connect_with_token(
95 &self,
96 socket_path: &str,
97 auth_token: &str,
98 ) -> Result<()> {
99 let already_connected = CONNECTED.swap(true, Ordering::AcqRel);
100 if already_connected {
101 return Err(Error::Failure);
102 }
103
104 Result::from(unsafe {
105 c::ggipc_connect_with_token(socket_path.into(), auth_token.into())
106 })?;
107
108 Ok(())
109 }
110
111 pub fn publish_to_topic_json(
137 &self,
138 topic: &str,
139 payload: &[Kv<'_>],
140 ) -> Result<()> {
141 Result::from(unsafe {
142 c::ggipc_publish_to_topic_json(topic.into(), payload.into())
143 })
144 }
145
146 pub fn publish_to_topic_binary(
169 &self,
170 topic: &str,
171 payload: &[u8],
172 ) -> Result<()> {
173 Result::from(unsafe {
174 c::ggipc_publish_to_topic_binary(topic.into(), payload.into())
175 })
176 }
177
178 pub fn subscribe_to_topic<'a, F: Fn(&str, SubscribeToTopicPayload)>(
188 &self,
189 topic: &str,
190 callback: &'a F,
191 ) -> Result<Subscription<'a, F>> {
192 extern "C" fn trampoline<F: Fn(&str, SubscribeToTopicPayload)>(
193 ctx: *mut ffi::c_void,
194 topic: c::GgBuffer,
195 payload: c::GgObject,
196 _handle: c::GgIpcSubscriptionHandle,
197 ) {
198 let cb = unsafe { &*ctx.cast::<F>() };
199 let topic_str = unsafe {
200 str::from_utf8_unchecked(slice::from_raw_parts(
201 topic.data, topic.len,
202 ))
203 };
204
205 let unpacked = match unsafe { c::gg_obj_type(payload) } {
206 c::GgObjectType::GG_TYPE_MAP => {
207 let map = unsafe { c::gg_obj_into_map(payload) };
208 SubscribeToTopicPayload::Json(Map(unsafe {
209 slice::from_raw_parts(map.pairs as *const Kv, map.len)
210 }))
211 }
212 c::GgObjectType::GG_TYPE_BUF => {
213 let buf = unsafe { c::gg_obj_into_buf(payload) };
214 SubscribeToTopicPayload::Binary(unsafe {
215 slice::from_raw_parts(buf.data, buf.len)
216 })
217 }
218 _ => return,
219 };
220
221 cb(topic_str, unpacked);
222 }
223
224 let ctx = callback as *const F;
225 let mut handle = c::GgIpcSubscriptionHandle { val: 0 };
226
227 Result::from(unsafe {
228 c::ggipc_subscribe_to_topic(
229 topic.into(),
230 Some(trampoline::<F>),
231 ctx.cast::<ffi::c_void>().cast_mut(),
232 &raw mut handle,
233 )
234 })?;
235
236 debug_assert!(handle.val != 0);
237 Ok(Subscription {
238 handle,
239 phantom: PhantomData,
240 })
241 }
242
243 pub fn publish_to_iot_core(
266 &self,
267 topic: &str,
268 payload: &[u8],
269 qos: Qos,
270 ) -> Result<()> {
271 Result::from(unsafe {
272 c::ggipc_publish_to_iot_core(topic.into(), payload.into(), qos as u8)
273 })
274 }
275
276 pub fn subscribe_to_iot_core<'a, F: Fn(&str, &[u8])>(
286 &self,
287 topic_filter: &str,
288 qos: Qos,
289 callback: &'a F,
290 ) -> Result<Subscription<'a, F>> {
291 extern "C" fn trampoline<F: Fn(&str, &[u8])>(
292 ctx: *mut ffi::c_void,
293 topic: c::GgBuffer,
294 payload: c::GgBuffer,
295 _handle: c::GgIpcSubscriptionHandle,
296 ) {
297 let cb = unsafe { &*ctx.cast::<F>() };
298 let topic_str = unsafe {
299 str::from_utf8_unchecked(slice::from_raw_parts(
300 topic.data, topic.len,
301 ))
302 };
303 let payload_bytes =
304 unsafe { slice::from_raw_parts(payload.data, payload.len) };
305 cb(topic_str, payload_bytes);
306 }
307
308 let ctx = callback as *const F;
309 let mut handle = c::GgIpcSubscriptionHandle { val: 0 };
310
311 Result::from(unsafe {
312 c::ggipc_subscribe_to_iot_core(
313 topic_filter.into(),
314 qos as u8,
315 Some(trampoline::<F>),
316 ctx.cast::<ffi::c_void>().cast_mut(),
317 &raw mut handle,
318 )
319 })?;
320
321 debug_assert!(handle.val != 0);
322 Ok(Subscription {
323 handle,
324 phantom: PhantomData,
325 })
326 }
327
328 pub fn update_state(&self, state: ComponentState) -> Result<()> {
337 let c_state = match state {
338 ComponentState::Running => {
339 c::GgComponentState::GG_COMPONENT_STATE_RUNNING
340 }
341 ComponentState::Errored => {
342 c::GgComponentState::GG_COMPONENT_STATE_ERRORED
343 }
344 };
345 Result::from(unsafe { c::ggipc_update_state(c_state) })
346 }
347
348 pub fn restart_component(&self, component_name: &str) -> Result<()> {
369 Result::from(unsafe { c::ggipc_restart_component(component_name.into()) })
370 }
371
372 pub fn get_config<'a>(
382 &self,
383 key_path: &[&str],
384 component_name: Option<&str>,
385 result_mem: &'a mut [MaybeUninit<u8>],
386 ) -> Result<Object<'a>> {
387 let mut c_key_path_mem = [MaybeUninit::uninit(); MAX_KEY_PATH_LEN];
388 let c_key_path = key_path_to_buf_list(key_path, &mut c_key_path_mem)?;
389
390 let component_buf = component_name.map(c::GgBuffer::from);
391
392 let mem = c::GgBuffer {
393 data: result_mem.as_mut_ptr().cast::<u8>(),
394 len: result_mem.len(),
395 };
396
397 let mut obj = c::GgObject::default();
398
399 Result::from(unsafe {
400 c::ggipc_get_config(
401 c_key_path,
402 component_buf.as_ref().map_or(ptr::null(), ptr::from_ref),
403 mem,
404 &raw mut obj,
405 )
406 })?;
407
408 Ok(unsafe { ptr::read((&raw const obj).cast()) })
409 }
410
411 pub fn get_config_str<'a>(
421 &self,
422 key_path: &[&str],
423 component_name: Option<&str>,
424 result_mem: &'a mut [MaybeUninit<u8>],
425 ) -> Result<&'a str> {
426 let mut c_key_path_mem = [MaybeUninit::uninit(); MAX_KEY_PATH_LEN];
427 let c_key_path = key_path_to_buf_list(key_path, &mut c_key_path_mem)?;
428
429 let component_buf = component_name.map(c::GgBuffer::from);
430
431 let mut value = c::GgBuffer {
432 data: result_mem.as_mut_ptr().cast::<u8>(),
433 len: result_mem.len(),
434 };
435
436 Result::from(unsafe {
437 c::ggipc_get_config_str(
438 c_key_path,
439 component_buf.as_ref().map_or(ptr::null(), ptr::from_ref),
440 &raw mut value,
441 )
442 })?;
443
444 Ok(unsafe {
445 str::from_utf8_unchecked(slice::from_raw_parts(
446 value.data, value.len,
447 ))
448 })
449 }
450
451 pub fn update_config<'a>(
473 &self,
474 key_path: &[&str],
475 timestamp: Option<core::time::Duration>,
476 value_to_merge: impl Into<Object<'a>>,
477 ) -> Result<()> {
478 fn inner(
479 key_path: &[&str],
480 timestamp: Option<core::time::Duration>,
481 value_to_merge: Object,
482 ) -> Result<()> {
483 let mut c_key_path_mem = [MaybeUninit::uninit(); MAX_KEY_PATH_LEN];
484 let c_key_path =
485 key_path_to_buf_list(key_path, &mut c_key_path_mem)?;
486
487 #[expect(clippy::cast_possible_wrap)]
488 #[allow(clippy::cast_lossless, clippy::needless_update)]
489 let timespec = timestamp.map(|d| c::timespec {
490 tv_sec: d.as_secs() as _,
491 tv_nsec: d.subsec_nanos() as _,
492 ..c::timespec::default()
493 });
494
495 Result::from(unsafe {
496 c::ggipc_update_config(
497 c_key_path,
498 timespec.as_ref().map_or(ptr::null(), ptr::from_ref),
499 *ptr::from_ref(&value_to_merge).cast::<c::GgObject>(),
500 )
501 })
502 }
503 inner(key_path, timestamp, value_to_merge.into())
504 }
505
506 pub fn subscribe_to_configuration_update<'a, F: Fn(&str, &[&str])>(
516 &self,
517 component_name: Option<&str>,
518 key_path: &[&str],
519 callback: &'a F,
520 ) -> Result<Subscription<'a, F>> {
521 extern "C" fn trampoline<F: Fn(&str, &[&str])>(
522 ctx: *mut ffi::c_void,
523 component_name: c::GgBuffer,
524 key_path: c::GgList,
525 _handle: c::GgIpcSubscriptionHandle,
526 ) {
527 let cb = unsafe { &*ctx.cast::<F>() };
528 let component_str = unsafe {
529 str::from_utf8_unchecked(slice::from_raw_parts(
530 component_name.data,
531 component_name.len,
532 ))
533 };
534 let path_objs =
535 unsafe { slice::from_raw_parts(key_path.items, key_path.len) };
536
537 let mut path_strs_mem = [MaybeUninit::uninit(); MAX_KEY_PATH_LEN];
538 for (i, obj) in path_objs.iter().enumerate() {
539 let buf = unsafe { c::gg_obj_into_buf(*obj) };
540 let s = unsafe {
541 str::from_utf8_unchecked(slice::from_raw_parts(
542 buf.data, buf.len,
543 ))
544 };
545 path_strs_mem[i].write(s);
546 }
547 let path_strs = unsafe {
548 slice::from_raw_parts(
549 path_strs_mem.as_ptr().cast::<&str>(),
550 path_objs.len(),
551 )
552 };
553
554 cb(component_str, path_strs);
555 }
556
557 let mut c_key_path_mem = [MaybeUninit::uninit(); MAX_KEY_PATH_LEN];
558 let c_key_path = key_path_to_buf_list(key_path, &mut c_key_path_mem)?;
559
560 let component_buf = component_name.map(c::GgBuffer::from);
561
562 let ctx = callback as *const F;
563 let mut handle = c::GgIpcSubscriptionHandle { val: 0 };
564
565 Result::from(unsafe {
566 c::ggipc_subscribe_to_configuration_update(
567 component_buf.as_ref().map_or(ptr::null(), ptr::from_ref),
568 c_key_path,
569 Some(trampoline::<F>),
570 ctx.cast::<ffi::c_void>().cast_mut(),
571 &raw mut handle,
572 )
573 })?;
574
575 debug_assert!(handle.val != 0);
576 Ok(Subscription {
577 handle,
578 phantom: PhantomData,
579 })
580 }
581
582 pub fn get_thing_shadow<'a>(
594 &self,
595 thing_name: &str,
596 shadow_name: Option<&str>,
597 result_mem: &'a mut [MaybeUninit<u8>],
598 ) -> Result<&'a [u8]> {
599 let shadow_buf = shadow_name.map(c::GgBuffer::from);
600
601 let mut payload = c::GgBuffer {
602 data: result_mem.as_mut_ptr().cast::<u8>(),
603 len: result_mem.len(),
604 };
605
606 Result::from(unsafe {
607 c::ggipc_get_thing_shadow(
608 thing_name.into(),
609 shadow_buf.as_ref().map_or(ptr::null(), ptr::from_ref),
610 &raw mut payload,
611 )
612 })?;
613
614 Ok(unsafe { slice::from_raw_parts(payload.data, payload.len) })
615 }
616
617 #[expect(clippy::needless_pass_by_value)]
630 pub fn update_thing_shadow<'a>(
631 &self,
632 thing_name: &str,
633 shadow_name: Option<&str>,
634 payload: &[u8],
635 response_mem: Option<&'a mut [MaybeUninit<u8>]>,
636 ) -> Result<Option<&'a [u8]>> {
637 let shadow_buf = shadow_name.map(c::GgBuffer::from);
638
639 let mut response = response_mem.as_ref().map(|mem| c::GgBuffer {
640 data: mem.as_ptr() as *mut u8,
641 len: mem.len(),
642 });
643
644 Result::from(unsafe {
645 c::ggipc_update_thing_shadow(
646 thing_name.into(),
647 shadow_buf.as_ref().map_or(ptr::null(), ptr::from_ref),
648 payload.into(),
649 response
650 .as_mut()
651 .map_or(ptr::null_mut(), ptr::from_mut),
652 )
653 })?;
654
655 Ok(response
656 .map(|r| unsafe { slice::from_raw_parts(r.data, r.len) }))
657 }
658
659 pub fn delete_thing_shadow(
670 &self,
671 thing_name: &str,
672 shadow_name: Option<&str>,
673 ) -> Result<()> {
674 let shadow_buf = shadow_name.map(c::GgBuffer::from);
675
676 Result::from(unsafe {
677 c::ggipc_delete_thing_shadow(
678 thing_name.into(),
679 shadow_buf.as_ref().map_or(ptr::null(), ptr::from_ref),
680 )
681 })
682 }
683
684 pub fn list_named_shadows_for_thing<F: FnMut(&str)>(
695 &self,
696 thing_name: &str,
697 callback: &mut F,
698 ) -> Result<()> {
699 extern "C" fn trampoline<F: FnMut(&str)>(
700 ctx: *mut ffi::c_void,
701 shadow_name: c::GgBuffer,
702 ) {
703 let cb = unsafe { &mut *ctx.cast::<F>() };
704 let name = unsafe {
705 str::from_utf8_unchecked(slice::from_raw_parts(
706 shadow_name.data,
707 shadow_name.len,
708 ))
709 };
710 cb(name);
711 }
712
713 let ctx = callback as *mut F;
714
715 Result::from(unsafe {
716 c::ggipc_list_named_shadows_for_thing(
717 thing_name.into(),
718 Some(trampoline::<F>),
719 ctx.cast::<ffi::c_void>(),
720 )
721 })
722 }
723
724 pub fn call<
731 'a,
732 'b,
733 F: FnOnce(result::Result<&'b [Kv<'b>], IpcError<'b>>) -> Result<()>,
734 >(
735 &self,
736 operation: &str,
737 service_model_type: &str,
738 params: &[Kv<'a>],
739 mut callback: F,
740 ) -> Result<()> {
741 extern "C" fn result_trampoline<
742 'b,
743 F: FnOnce(result::Result<&'b [Kv<'b>], IpcError<'b>>) -> Result<()>,
744 >(
745 ctx: *mut ffi::c_void,
746 result: c::GgMap,
747 ) -> c::GgError {
748 let cb = unsafe { ctx.cast::<F>().read() };
749 let result_slice = unsafe {
750 slice::from_raw_parts(result.pairs as *const Kv, result.len)
751 };
752 cb(Ok(result_slice)).into()
753 }
754
755 extern "C" fn error_trampoline<
756 'b,
757 F: FnOnce(result::Result<&'b [Kv<'b>], IpcError<'b>>) -> Result<()>,
758 >(
759 ctx: *mut ffi::c_void,
760 error_code: c::GgBuffer,
761 message: c::GgBuffer,
762 ) -> c::GgError {
763 let cb = unsafe { ctx.cast::<F>().read() };
764 let code = unsafe {
765 str::from_utf8_unchecked(slice::from_raw_parts(
766 error_code.data,
767 error_code.len,
768 ))
769 };
770 let msg = unsafe {
771 str::from_utf8_unchecked(slice::from_raw_parts(
772 message.data,
773 message.len,
774 ))
775 };
776 cb(Err(IpcError {
777 error_code: code,
778 message: msg,
779 }))
780 .into()
781 }
782
783 Result::from(unsafe {
784 c::ggipc_call(
785 operation.into(),
786 service_model_type.into(),
787 params.into(),
788 Some(result_trampoline::<F>),
789 Some(error_trampoline::<F>),
790 (&raw mut callback).cast::<ffi::c_void>(),
791 )
792 })
793 }
794
795 pub fn subscribe<
802 'a,
803 'b,
804 'c,
805 F: FnOnce(result::Result<&'b [Kv<'b>], IpcError<'b>>) -> Result<()>,
806 G: Fn(usize, &str, &'b [Kv<'b>]) -> Result<()>,
807 >(
808 &self,
809 operation: &str,
810 service_model_type: &str,
811 params: &[Kv<'a>],
812 mut response_callback: F,
813 sub_callback: &'c G,
814 aux_ctx: usize,
815 ) -> Result<Subscription<'c, G>> {
816 extern "C" fn result_trampoline<
817 'b,
818 F: FnOnce(result::Result<&'b [Kv<'b>], IpcError<'b>>) -> Result<()>,
819 >(
820 ctx: *mut ffi::c_void,
821 result: c::GgMap,
822 ) -> c::GgError {
823 let cb = unsafe { ctx.cast::<F>().read() };
824 let result_slice = unsafe {
825 slice::from_raw_parts(result.pairs as *const Kv, result.len)
826 };
827 cb(Ok(result_slice)).into()
828 }
829
830 extern "C" fn error_trampoline<
831 'b,
832 F: FnOnce(result::Result<&'b [Kv<'b>], IpcError<'b>>) -> Result<()>,
833 >(
834 ctx: *mut ffi::c_void,
835 error_code: c::GgBuffer,
836 message: c::GgBuffer,
837 ) -> c::GgError {
838 let cb = unsafe { ctx.cast::<F>().read() };
839 let code = unsafe {
840 str::from_utf8_unchecked(slice::from_raw_parts(
841 error_code.data,
842 error_code.len,
843 ))
844 };
845 let msg = unsafe {
846 str::from_utf8_unchecked(slice::from_raw_parts(
847 message.data,
848 message.len,
849 ))
850 };
851 cb(Err(IpcError {
852 error_code: code,
853 message: msg,
854 }))
855 .into()
856 }
857
858 extern "C" fn sub_trampoline<
859 'b,
860 G: Fn(usize, &str, &'b [Kv<'b>]) -> Result<()>,
861 >(
862 ctx: *mut ffi::c_void,
863 aux_ctx: *mut ffi::c_void,
864 _handle: c::GgIpcSubscriptionHandle,
865 service_model_type: c::GgBuffer,
866 data: c::GgMap,
867 ) -> c::GgError {
868 let cb = unsafe { &*ctx.cast::<G>() };
869 let aux = aux_ctx as usize;
870 let smt = unsafe {
871 str::from_utf8_unchecked(slice::from_raw_parts(
872 service_model_type.data,
873 service_model_type.len,
874 ))
875 };
876 let map = unsafe {
877 slice::from_raw_parts(data.pairs.cast::<Kv>(), data.len)
878 };
879 cb(aux, smt, map).into()
880 }
881
882 let mut handle = c::GgIpcSubscriptionHandle { val: 0 };
883 let ctx = sub_callback as *const G;
884
885 Result::from(unsafe {
886 c::ggipc_subscribe(
887 operation.into(),
888 service_model_type.into(),
889 params.into(),
890 Some(result_trampoline::<F>),
891 Some(error_trampoline::<F>),
892 (&raw mut response_callback).cast::<ffi::c_void>(),
893 Some(sub_trampoline::<'b, G>),
894 ctx.cast::<ffi::c_void>().cast_mut(),
895 aux_ctx as *mut ffi::c_void,
896 &raw mut handle,
897 )
898 })?;
899
900 Ok(Subscription {
901 handle,
902 phantom: PhantomData,
903 })
904 }
905}
906
907#[derive(Debug)]
909pub struct Subscription<'a, T> {
910 handle: c::GgIpcSubscriptionHandle,
911 phantom: PhantomData<&'a T>,
912}
913
914impl<T> Drop for Subscription<'_, T> {
915 fn drop(&mut self) {
916 if self.handle.val != 0 {
917 unsafe { c::ggipc_close_subscription(self.handle) };
918 }
919 }
920}
921
922impl<T> Default for Subscription<'_, T> {
923 fn default() -> Self {
924 Self {
925 handle: c::GgIpcSubscriptionHandle { val: 0 },
926 phantom: PhantomData,
927 }
928 }
929}
930
931const MAX_KEY_PATH_LEN: usize = (c::GG_MAX_OBJECT_DEPTH - 1) as usize;
932
933fn key_path_to_buf_list(
934 key_path: &[&str],
935 bufs: &mut [MaybeUninit<c::GgBuffer>; MAX_KEY_PATH_LEN],
936) -> Result<c::GgBufList> {
937 if key_path.len() > MAX_KEY_PATH_LEN {
938 return Err(Error::Range);
939 }
940 for (i, k) in key_path.iter().enumerate() {
941 bufs[i].write((*k).into());
942 }
943 Ok(c::GgBufList {
944 bufs: bufs.as_mut_ptr().cast(),
945 len: key_path.len(),
946 })
947}
948
949#[cfg(test)]
950mod test {
951 use std::sync::{Condvar, Mutex};
952
953 use super::{Qos, Sdk};
954 use crate::c;
955 use crate::error::*;
956
957 static TEST_MUTEX: std::sync::Mutex<()> = std::sync::Mutex::new(());
958
959 pub(crate) fn run_ipc_handshake_test<F: FnOnce() -> Result<()>>(
960 test_body: F,
961 ) -> Result<()> {
962 unsafe {
963 Result::from(c::gg_test_setup_ipc(
964 c"/tmp/gg-test".as_ptr(),
965 0o666,
966 c"1234567890ABCDEF".as_ptr(),
967 ))?;
968
969 let pid = libc::fork();
970 assert!(pid >= 0, "fork failed");
971
972 if pid == 0 {
973 test_body().unwrap();
974 libc::exit(0);
975 }
976
977 Result::from(c::gg_test_accept_client_handshake(5))?;
978
979 Result::from(c::gg_test_wait_for_client_disconnect(30))?;
980
981 c::gg_test_close();
982
983 let mut status = 0;
984 libc::waitpid(pid, &mut status, 0);
985
986 assert_eq!(libc::WIFEXITED(status), true);
987 assert_eq!(libc::WEXITSTATUS(status), 0);
988 };
989
990 Ok(())
991 }
992
993 pub(crate) fn run_ipc_sequence_test<F: FnOnce() -> Result<()>>(
994 packet_sequence: c::GgipcPacketSequence,
995 test_body: F,
996 ) -> Result<()> {
997 unsafe {
998 Result::from(c::gg_test_setup_ipc(
999 c"/tmp/gg-test".as_ptr(),
1000 0o666,
1001 c"1234567890ABCDEF".as_ptr(),
1002 ))?;
1003
1004 let pid = libc::fork();
1005 assert!(pid >= 0, "fork failed");
1006
1007 if pid == 0 {
1008 test_body().unwrap();
1009 libc::exit(0);
1010 }
1011
1012 Result::from(c::gg_test_connect_request_disconnect_sequence(
1013 packet_sequence,
1014 10,
1015 ))?;
1016
1017 c::gg_test_close();
1018
1019 let mut status = 0;
1020 libc::waitpid(pid, &mut status, 0);
1021
1022 assert_eq!(libc::WIFEXITED(status), true);
1023 assert_eq!(libc::WEXITSTATUS(status), 0);
1024 };
1025
1026 Ok(())
1027 }
1028
1029 fn get_test_socket_path() -> &'static str {
1030 unsafe {
1031 let buf = c::gg_test_get_socket_path();
1032 core::str::from_utf8_unchecked(core::slice::from_raw_parts(
1033 buf.data, buf.len,
1034 ))
1035 }
1036 }
1037
1038 fn get_test_auth_token() -> &'static str {
1039 unsafe {
1040 let buf = c::gg_test_get_auth_token();
1041 core::str::from_utf8_unchecked(core::slice::from_raw_parts(
1042 buf.data, buf.len,
1043 ))
1044 }
1045 }
1046
1047 #[test]
1048 fn test_connect_okay() -> Result<()> {
1049 let _guard = TEST_MUTEX.lock().unwrap_or_else(|e| e.into_inner());
1050 run_ipc_handshake_test(|| {
1051 let sdk = Sdk::init();
1052 sdk.connect()
1053 })
1054 }
1055
1056 #[test]
1057 fn test_connect_with_token_okay() -> Result<()> {
1058 let _guard = TEST_MUTEX.lock().unwrap_or_else(|e| e.into_inner());
1059 run_ipc_handshake_test(|| unsafe {
1060 libc::unsetenv(c"SVCUID".as_ptr());
1062 libc::unsetenv(
1063 c"AWS_GG_NUCLEUS_DOMAIN_SOCKET_FILEPATH_FOR_COMPONENT".as_ptr(),
1064 );
1065
1066 let sdk = Sdk::init();
1067 sdk.connect_with_token(
1068 get_test_socket_path(),
1069 get_test_auth_token(),
1070 )
1071 })
1072 }
1073
1074 #[test]
1075 fn test_publish_to_iot_core_okay() -> Result<()> {
1076 let _guard = TEST_MUTEX.lock().unwrap_or_else(|e| e.into_inner());
1077 let topic = "my/topic";
1078 let payload_base64 = "SGVsbG8gd29ybGQh";
1079 let qos = "0";
1080 let seq = unsafe {
1081 c::gg_test_mqtt_publish_accepted_sequence(
1082 1,
1083 topic.into(),
1084 payload_base64.into(),
1085 qos.into(),
1086 )
1087 };
1088 run_ipc_sequence_test(seq, || {
1089 let sdk = Sdk::init();
1090 sdk.connect()?;
1091 sdk.publish_to_iot_core(
1092 "my/topic",
1093 b"Hello world!",
1094 Qos::AtMostOnce,
1095 )
1096 })
1097 }
1098
1099 #[test]
1100 fn test_publish_to_iot_core_bad_alloc() -> Result<()> {
1101 let _guard = TEST_MUTEX.lock().unwrap_or_else(|e| e.into_inner());
1102 run_ipc_handshake_test(|| {
1103 let sdk = Sdk::init();
1104 sdk.connect()?;
1105 let payload = [0u8; 0x20000];
1106 assert_eq!(
1107 sdk.publish_to_iot_core("my/topic", &payload, Qos::AtMostOnce),
1108 Err(Error::Nomem),
1109 );
1110 Ok(())
1111 })
1112 }
1113
1114 #[test]
1115 fn test_publish_to_iot_core_rejected() -> Result<()> {
1116 let _guard = TEST_MUTEX.lock().unwrap_or_else(|e| e.into_inner());
1117 let topic = "my/topic";
1118 let payload_base64 = "SGVsbG8gd29ybGQh";
1119 let qos = "0";
1120 let seq = unsafe {
1121 c::gg_test_mqtt_publish_error_sequence(
1122 1,
1123 topic.into(),
1124 payload_base64.into(),
1125 qos.into(),
1126 )
1127 };
1128 run_ipc_sequence_test(seq, || {
1129 let sdk = Sdk::init();
1130 sdk.connect()?;
1131 assert!(
1132 sdk.publish_to_iot_core(
1133 "my/topic",
1134 b"Hello world!",
1135 Qos::AtMostOnce
1136 )
1137 .is_err()
1138 );
1139 Ok(())
1140 })
1141 }
1142
1143 #[test]
1147 fn test_get_thing_shadow_okay() -> Result<()> {
1148 let _guard = TEST_MUTEX.lock().unwrap_or_else(|e| e.into_inner());
1149 let thing_name = "MyThing";
1150 let shadow_name = "myShadow";
1151 let payload = "hello";
1152 let payload_b64 = "aGVsbG8=";
1153 let seq = unsafe {
1154 c::gg_test_shadow_get_accepted_sequence(
1155 1,
1156 thing_name.into(),
1157 shadow_name.into(),
1158 payload_b64.into(),
1159 )
1160 };
1161 run_ipc_sequence_test(seq, || {
1162 let sdk = Sdk::init();
1163 sdk.connect()?;
1164 let mut buf = [std::mem::MaybeUninit::uninit(); 64];
1165 let result =
1166 sdk.get_thing_shadow(thing_name, Some(shadow_name), &mut buf)?;
1167 assert_eq!(result, payload.as_bytes());
1168 Ok(())
1169 })
1170 }
1171
1172 #[test]
1173 fn test_get_thing_shadow_rejected() -> Result<()> {
1174 let _guard = TEST_MUTEX.lock().unwrap_or_else(|e| e.into_inner());
1175 let thing_name = "MyThing";
1176 let shadow_name = "myShadow";
1177 let seq = unsafe {
1178 c::gg_test_shadow_get_error_sequence(
1179 1,
1180 thing_name.into(),
1181 shadow_name.into(),
1182 )
1183 };
1184 run_ipc_sequence_test(seq, || {
1185 let sdk = Sdk::init();
1186 sdk.connect()?;
1187 let mut buf = [std::mem::MaybeUninit::uninit(); 64];
1188 assert!(sdk
1189 .get_thing_shadow(thing_name, Some(shadow_name), &mut buf)
1190 .is_err());
1191 Ok(())
1192 })
1193 }
1194
1195 #[test]
1196 fn test_update_thing_shadow_okay() -> Result<()> {
1197 let _guard = TEST_MUTEX.lock().unwrap_or_else(|e| e.into_inner());
1198 let thing_name = "MyThing";
1199 let shadow_name = "myShadow";
1200 let payload = b"hello";
1201 let payload_b64 = "aGVsbG8=";
1202 let seq = unsafe {
1203 c::gg_test_shadow_update_accepted_sequence(
1204 1,
1205 thing_name.into(),
1206 shadow_name.into(),
1207 payload_b64.into(),
1208 payload_b64.into(),
1209 )
1210 };
1211 run_ipc_sequence_test(seq, || {
1212 let sdk = Sdk::init();
1213 sdk.connect()?;
1214 sdk.update_thing_shadow(thing_name, Some(shadow_name), payload, None)?;
1215 Ok(())
1216 })
1217 }
1218
1219 #[test]
1220 fn test_update_thing_shadow_rejected() -> Result<()> {
1221 let _guard = TEST_MUTEX.lock().unwrap_or_else(|e| e.into_inner());
1222 let thing_name = "MyThing";
1223 let shadow_name = "myShadow";
1224 let payload = b"hello";
1225 let payload_b64 = "aGVsbG8=";
1226 let seq = unsafe {
1227 c::gg_test_shadow_update_error_sequence(
1228 1,
1229 thing_name.into(),
1230 shadow_name.into(),
1231 payload_b64.into(),
1232 )
1233 };
1234 run_ipc_sequence_test(seq, || {
1235 let sdk = Sdk::init();
1236 sdk.connect()?;
1237 assert!(sdk
1238 .update_thing_shadow(thing_name, Some(shadow_name), payload, None)
1239 .is_err());
1240 Ok(())
1241 })
1242 }
1243
1244 #[test]
1245 fn test_delete_thing_shadow_okay() -> Result<()> {
1246 let _guard = TEST_MUTEX.lock().unwrap_or_else(|e| e.into_inner());
1247 let thing_name = "MyThing";
1248 let shadow_name = "myShadow";
1249 let seq = unsafe {
1250 c::gg_test_shadow_delete_accepted_sequence(
1251 1,
1252 thing_name.into(),
1253 shadow_name.into(),
1254 "".into(),
1255 )
1256 };
1257 run_ipc_sequence_test(seq, || {
1258 let sdk = Sdk::init();
1259 sdk.connect()?;
1260 sdk.delete_thing_shadow(thing_name, Some(shadow_name))?;
1261 Ok(())
1262 })
1263 }
1264
1265 #[test]
1266 fn test_delete_thing_shadow_rejected() -> Result<()> {
1267 let _guard = TEST_MUTEX.lock().unwrap_or_else(|e| e.into_inner());
1268 let thing_name = "MyThing";
1269 let shadow_name = "myShadow";
1270 let seq = unsafe {
1271 c::gg_test_shadow_delete_error_sequence(
1272 1,
1273 thing_name.into(),
1274 shadow_name.into(),
1275 )
1276 };
1277 run_ipc_sequence_test(seq, || {
1278 let sdk = Sdk::init();
1279 sdk.connect()?;
1280 assert!(sdk
1281 .delete_thing_shadow(thing_name, Some(shadow_name))
1282 .is_err());
1283 Ok(())
1284 })
1285 }
1286
1287 #[test]
1288 fn test_list_named_shadows_okay() -> Result<()> {
1289 let thing_name = "MyThing";
1290 let shadow_name = "myShadow";
1291 let timestamp: f64 = 1773436831.0;
1292
1293 let _guard = TEST_MUTEX.lock().unwrap_or_else(|e| e.into_inner());
1294 unsafe {
1295 Result::from(c::gg_test_setup_ipc(
1296 c"/tmp/gg-test".as_ptr(),
1297 0o666,
1298 c"1234567890ABCDEF".as_ptr(),
1299 ))?;
1300
1301 let pid = libc::fork();
1302 assert!(pid >= 0, "fork failed");
1303
1304 if pid == 0 {
1305 let sdk = Sdk::init();
1306 sdk.connect().unwrap();
1307 let mut count = 0usize;
1308 sdk.list_named_shadows_for_thing(
1309 thing_name,
1310 &mut |name: &str| {
1311 assert_eq!(name, "myShadow");
1312 count += 1;
1313 },
1314 )
1315 .unwrap();
1316 assert_eq!(count, 1);
1317 libc::exit(0);
1318 }
1319
1320 let mut result_item = c::gg_obj_buf(shadow_name.into());
1321 let results = c::GgList {
1322 items: &mut result_item,
1323 len: 1,
1324 };
1325
1326 Result::from(c::gg_test_accept_client_handshake(5))?;
1327
1328 Result::from(c::gg_test_expect_packet_sequence(
1329 c::gg_test_shadow_list_accepted_sequence(
1330 1,
1331 thing_name.into(),
1332 core::ptr::null_mut(),
1333 results,
1334 timestamp,
1335 core::ptr::null_mut(),
1336 ),
1337 5,
1338 ))?;
1339
1340 Result::from(c::gg_test_wait_for_client_disconnect(30))?;
1341
1342 c::gg_test_close();
1343
1344 let mut status = 0;
1345 libc::waitpid(pid, &mut status, 0);
1346 assert!(libc::WIFEXITED(status));
1347 assert_eq!(libc::WEXITSTATUS(status), 0);
1348 }
1349
1350 Ok(())
1351 }
1352
1353 #[test]
1354 fn test_list_named_shadows_rejected() -> Result<()> {
1355 let _guard = TEST_MUTEX.lock().unwrap_or_else(|e| e.into_inner());
1356 let thing_name = "MyThing";
1357 let seq = unsafe {
1358 c::gg_test_shadow_list_error_sequence(1, thing_name.into())
1359 };
1360 run_ipc_sequence_test(seq, || {
1361 let sdk = Sdk::init();
1362 sdk.connect()?;
1363 assert!(sdk
1364 .list_named_shadows_for_thing(thing_name, &mut |_: &str| {})
1365 .is_err());
1366 Ok(())
1367 })
1368 }
1369
1370 #[test]
1371 fn test_list_named_shadows_paginated_okay() -> Result<()> {
1372 let thing_name = "MyThing";
1373 let timestamp: f64 = 1773436831.0;
1374
1375 let _guard = TEST_MUTEX.lock().unwrap_or_else(|e| e.into_inner());
1376 unsafe {
1377 Result::from(c::gg_test_setup_ipc(
1378 c"/tmp/gg-test".as_ptr(),
1379 0o666,
1380 c"1234567890ABCDEF".as_ptr(),
1381 ))?;
1382
1383 let pid = libc::fork();
1384 assert!(pid >= 0, "fork failed");
1385
1386 if pid == 0 {
1387 let sdk = Sdk::init();
1388 sdk.connect().unwrap();
1389 let mut count = 0usize;
1390 let expected = ["shadow1", "shadow2"];
1391 sdk.list_named_shadows_for_thing(
1392 thing_name,
1393 &mut |name: &str| {
1394 assert_eq!(name, expected[count]);
1395 count += 1;
1396 },
1397 )
1398 .unwrap();
1399 assert_eq!(count, 2);
1400 libc::exit(0);
1401 }
1402
1403 let mut next_token1: c::GgBuffer = "token123".into();
1404 let mut next_token2: c::GgBuffer = "token456".into();
1405
1406 let mut page1_item = c::gg_obj_buf("shadow1".into());
1407 let page1 = c::GgList {
1408 items: &mut page1_item,
1409 len: 1,
1410 };
1411
1412 let mut page2_item = c::gg_obj_buf("shadow2".into());
1413 let page2 = c::GgList {
1414 items: &mut page2_item,
1415 len: 1,
1416 };
1417
1418 let empty = c::GgList {
1419 items: core::ptr::null_mut(),
1420 len: 0,
1421 };
1422
1423 Result::from(c::gg_test_accept_client_handshake(5))?;
1424
1425 Result::from(c::gg_test_expect_packet_sequence(
1426 c::gg_test_shadow_list_accepted_sequence(
1427 1,
1428 thing_name.into(),
1429 core::ptr::null_mut(),
1430 page1,
1431 timestamp,
1432 &mut next_token1,
1433 ),
1434 5,
1435 ))?;
1436
1437 Result::from(c::gg_test_expect_packet_sequence(
1438 c::gg_test_shadow_list_accepted_sequence(
1439 2,
1440 thing_name.into(),
1441 &mut next_token1,
1442 page2,
1443 timestamp,
1444 &mut next_token2,
1445 ),
1446 5,
1447 ))?;
1448
1449 Result::from(c::gg_test_expect_packet_sequence(
1450 c::gg_test_shadow_list_accepted_sequence(
1451 3,
1452 thing_name.into(),
1453 &mut next_token2,
1454 empty,
1455 timestamp,
1456 core::ptr::null_mut(),
1457 ),
1458 5,
1459 ))?;
1460
1461 Result::from(c::gg_test_wait_for_client_disconnect(30))?;
1462
1463 c::gg_test_close();
1464
1465 let mut status = 0;
1466 libc::waitpid(pid, &mut status, 0);
1467 assert!(libc::WIFEXITED(status));
1468 assert_eq!(libc::WEXITSTATUS(status), 0);
1469 }
1470
1471 Ok(())
1472 }
1473
1474 #[test]
1475 fn test_subscribe_to_iot_core_okay() -> Result<()> {
1476 let _guard = TEST_MUTEX.lock().unwrap_or_else(|e| e.into_inner());
1477 let topic = "my/topic";
1478 let payload_base64 = "SGVsbG8gd29ybGQh";
1479 let qos = "0";
1480 let expected_calls: usize = 3;
1481 let seq = unsafe {
1482 c::gg_test_mqtt_subscribe_accepted_sequence(
1483 1,
1484 topic.into(),
1485 payload_base64.into(),
1486 qos.into(),
1487 expected_calls,
1488 )
1489 };
1490 run_ipc_sequence_test(seq, || {
1491 let sdk = Sdk::init();
1492 sdk.connect()?;
1493
1494 let pair = (Mutex::new(0usize), Condvar::new());
1495 let cb = |t: &str, p: &[u8]| {
1496 assert_eq!(t, "my/topic");
1497 assert_eq!(p, b"Hello world!");
1498 let mut count = pair.0.lock().unwrap();
1499 *count += 1;
1500 if *count >= expected_calls {
1501 pair.1.notify_one();
1502 }
1503 };
1504 let sub =
1505 sdk.subscribe_to_iot_core("my/topic", Qos::AtMostOnce, &cb)?;
1506
1507 let guard = pair.0.lock().unwrap();
1508 let (guard, timeout) = pair
1509 .1
1510 .wait_timeout_while(
1511 guard,
1512 std::time::Duration::from_secs(5),
1513 |count| *count < expected_calls,
1514 )
1515 .unwrap();
1516 assert!(
1517 !timeout.timed_out(),
1518 "Timed out waiting for subscription responses"
1519 );
1520 assert_eq!(*guard, expected_calls);
1521 std::mem::forget(sub);
1522 Ok(())
1523 })
1524 }
1525}