cu_iceoryx2_src/
lib.rs

1use bincode::{Decode, Encode};
2use cu29::prelude::*;
3use iceoryx2::node::NodeBuilder;
4use iceoryx2::port::subscriber::Subscriber;
5use iceoryx2::prelude::*;
6use iceoryx2::service::port_factory::publish_subscribe::PortFactory;
7
8#[derive(Clone, Debug, Default, Decode, Encode)]
9pub struct IceorixCuMsg<P: CuMsgPayload>(CuMsg<P>);
10
11unsafe impl<P: CuMsgPayload> ZeroCopySend for IceorixCuMsg<P> {}
12
13/// This is a source task that receives messages from an iceoryx2 service.
14/// P is the payload type of the messages.
15pub struct IceoryxSrc<P>
16where
17    P: CuMsgPayload + 'static,
18    IceorixCuMsg<P>: iceoryx2::prelude::ZeroCopySend,
19{
20    service_name: ServiceName,
21    node: iceoryx2::prelude::Node<ipc::Service>,
22    service: Option<PortFactory<ipc::Service, IceorixCuMsg<P>, ()>>,
23    subscriber: Option<Subscriber<ipc::Service, IceorixCuMsg<P>, ()>>,
24}
25
26impl<P> Freezable for IceoryxSrc<P>
27where
28    P: CuMsgPayload,
29    IceorixCuMsg<P>: iceoryx2::prelude::ZeroCopySend,
30{
31}
32
33impl<P> CuSrcTask for IceoryxSrc<P>
34where
35    P: CuMsgPayload + 'static,
36    IceorixCuMsg<P>: iceoryx2::prelude::ZeroCopySend,
37{
38    type Resources<'r> = ();
39    type Output<'m> = output_msg!(P);
40
41    fn new(config: Option<&ComponentConfig>, _resources: Self::Resources<'_>) -> CuResult<Self>
42    where
43        Self: Sized,
44    {
45        let config =
46            config.ok_or_else(|| CuError::from("IceoryxSource: Missing configuration."))?;
47        let service_name_str = config.get::<String>("service").ok_or_else(|| {
48            CuError::from("IceoryxSource: Configuration requires 'service' key (string).")
49        })?;
50
51        debug!(
52            "IceoryxSource: Configuring service name: {}",
53            service_name_str.as_str()
54        );
55
56        let service_name = ServiceName::new(service_name_str.as_str()).map_err(|e| {
57            CuError::new_with_cause("IceoryxSource: Failed to create service name.", e)
58        })?;
59
60        let node: iceoryx2::prelude::Node<ipc::Service> =
61            NodeBuilder::new().create::<ipc::Service>().map_err(|e| {
62                CuError::new_with_cause(
63                    format!("IceoryxSource({service_name_str}): Failed to create node.").as_str(),
64                    e,
65                )
66            })?;
67
68        Ok(Self {
69            service_name,
70            node,
71            service: None,
72            subscriber: None,
73        })
74    }
75
76    fn start(&mut self, _clock: &RobotClock) -> CuResult<()> {
77        let service = self
78            .node
79            .service_builder(&self.service_name)
80            .publish_subscribe::<IceorixCuMsg<P>>()
81            .open_or_create()
82            .map_err(|e| {
83                CuError::new_with_cause(
84                    format!(
85                        "IceoryxSource({}): Failed to create service.",
86                        self.service_name
87                    )
88                    .as_str(),
89                    e,
90                )
91            })?;
92
93        let subscriber = service.subscriber_builder().create().map_err(|e| {
94            CuError::new_with_cause(
95                format!(
96                    "IceoryxSource({}): Failed to create subscriber.",
97                    self.service_name
98                )
99                .as_str(),
100                e,
101            )
102        })?;
103
104        self.subscriber = Some(subscriber);
105        self.service = Some(service);
106        Ok(())
107    }
108
109    fn process(&mut self, _clock: &RobotClock, new_msg: &mut Self::Output<'_>) -> CuResult<()> {
110        let sub = self.subscriber.as_ref().ok_or_else(|| {
111            CuError::from(
112                format!(
113                    "IceoryxSource({}): Subscriber not found.",
114                    self.service_name
115                )
116                .as_str(),
117            )
118        })?;
119
120        if let Some(icemsg) = sub.receive().map_err(|e| {
121            CuError::new_with_cause(
122                format!(
123                    "IceoryxSource({}): Error receiving message.",
124                    self.service_name
125                )
126                .as_str(),
127                e,
128            )
129        })? {
130            new_msg.set_payload(
131                icemsg
132                    .payload()
133                    .0
134                    .payload()
135                    .ok_or(CuError::from(
136                        format!(
137                            "IceoryxSource({}): Failed to get payload.",
138                            self.service_name
139                        )
140                        .as_str(),
141                    ))?
142                    .clone(),
143            );
144
145            new_msg.tov = icemsg.payload().0.tov;
146        } else {
147            debug!(
148                "IceoryxSource({}): No message received.",
149                self.service_name.as_str()
150            );
151        }
152
153        Ok(())
154    }
155    fn stop(&mut self, _clock: &RobotClock) -> CuResult<()> {
156        self.service = None;
157        self.subscriber = None;
158        debug!("IceoryxSource({}): Stopped.", self.service_name.as_str());
159        Ok(())
160    }
161}