cu_iceoryx2_src/
lib.rs

1use bincode::{Decode, Encode};
2use cu29::prelude::*;
3use iceoryx2::node::NodeBuilder;
4use iceoryx2::port::subscriber::Subscriber;
5use iceoryx2::prelude::*;
6use iceoryx2::service::port_factory::publish_subscribe::PortFactory;
7
8#[derive(Clone, Debug, Default, Decode, Encode)]
9pub struct IceorixCuMsg<P: CuMsgPayload>(CuMsg<P>);
10
11unsafe impl<P: CuMsgPayload> ZeroCopySend for IceorixCuMsg<P> {}
12
13/// This is a source task that receives messages from an iceoryx2 service.
14/// P is the payload type of the messages.
15pub struct IceoryxSrc<P>
16where
17    P: CuMsgPayload + 'static,
18    IceorixCuMsg<P>: iceoryx2::prelude::ZeroCopySend,
19{
20    service_name: ServiceName,
21    node: iceoryx2::prelude::Node<ipc::Service>,
22    service: Option<PortFactory<ipc::Service, IceorixCuMsg<P>, ()>>,
23    subscriber: Option<Subscriber<ipc::Service, IceorixCuMsg<P>, ()>>,
24}
25
26impl<P> Freezable for IceoryxSrc<P>
27where
28    P: CuMsgPayload,
29    IceorixCuMsg<P>: iceoryx2::prelude::ZeroCopySend,
30{
31}
32
33impl<P> CuSrcTask for IceoryxSrc<P>
34where
35    P: CuMsgPayload + 'static,
36    IceorixCuMsg<P>: iceoryx2::prelude::ZeroCopySend,
37{
38    type Output<'m> = output_msg!(P);
39
40    fn new(config: Option<&ComponentConfig>) -> CuResult<Self>
41    where
42        Self: Sized,
43    {
44        let config =
45            config.ok_or_else(|| CuError::from("IceoryxSource: Missing configuration."))?;
46        let service_name_str = config.get::<String>("service").ok_or_else(|| {
47            CuError::from("IceoryxSource: Configuration requires 'service' key (string).")
48        })?;
49
50        debug!(
51            "IceoryxSource: Configuring service name: {}",
52            service_name_str.as_str()
53        );
54
55        let service_name = ServiceName::new(service_name_str.as_str()).map_err(|e| {
56            CuError::new_with_cause("IceoryxSource: Failed to create service name.", e)
57        })?;
58
59        let node: iceoryx2::prelude::Node<ipc::Service> =
60            NodeBuilder::new().create::<ipc::Service>().map_err(|e| {
61                CuError::new_with_cause(
62                    format!("IceoryxSource({service_name_str}): Failed to create node.").as_str(),
63                    e,
64                )
65            })?;
66
67        Ok(Self {
68            service_name,
69            node,
70            service: None,
71            subscriber: None,
72        })
73    }
74
75    fn start(&mut self, _clock: &RobotClock) -> CuResult<()> {
76        let service = self
77            .node
78            .service_builder(&self.service_name)
79            .publish_subscribe::<IceorixCuMsg<P>>()
80            .open_or_create()
81            .map_err(|e| {
82                CuError::new_with_cause(
83                    format!(
84                        "IceoryxSource({}): Failed to create service.",
85                        self.service_name
86                    )
87                    .as_str(),
88                    e,
89                )
90            })?;
91
92        let subscriber = service.subscriber_builder().create().map_err(|e| {
93            CuError::new_with_cause(
94                format!(
95                    "IceoryxSource({}): Failed to create subscriber.",
96                    self.service_name
97                )
98                .as_str(),
99                e,
100            )
101        })?;
102
103        self.subscriber = Some(subscriber);
104        self.service = Some(service);
105        Ok(())
106    }
107
108    fn process(&mut self, _clock: &RobotClock, new_msg: &mut Self::Output<'_>) -> CuResult<()> {
109        let sub = self.subscriber.as_ref().ok_or_else(|| {
110            CuError::from(
111                format!(
112                    "IceoryxSource({}): Subscriber not found.",
113                    self.service_name
114                )
115                .as_str(),
116            )
117        })?;
118
119        if let Some(icemsg) = sub.receive().map_err(|e| {
120            CuError::new_with_cause(
121                format!(
122                    "IceoryxSource({}): Error receiving message.",
123                    self.service_name
124                )
125                .as_str(),
126                e,
127            )
128        })? {
129            new_msg.set_payload(
130                icemsg
131                    .payload()
132                    .0
133                    .payload()
134                    .ok_or(CuError::from(
135                        format!(
136                            "IceoryxSource({}): Failed to get payload.",
137                            self.service_name
138                        )
139                        .as_str(),
140                    ))?
141                    .clone(),
142            );
143
144            new_msg.tov = icemsg.payload().0.tov;
145        } else {
146            debug!(
147                "IceoryxSource({}): No message received.",
148                self.service_name.as_str()
149            );
150        }
151
152        Ok(())
153    }
154    fn stop(&mut self, _clock: &RobotClock) -> CuResult<()> {
155        self.service = None;
156        self.subscriber = None;
157        debug!("IceoryxSource({}): Stopped.", self.service_name.as_str());
158        Ok(())
159    }
160}