cu_iceoryx2_sink/
lib.rs

1use bincode::{Decode, Encode};
2use cu29::clock::RobotClock;
3use cu29::prelude::*;
4use iceoryx2::node::NodeBuilder;
5use iceoryx2::port::publisher::Publisher;
6use iceoryx2::prelude::*;
7use iceoryx2::service::port_factory::publish_subscribe::PortFactory;
8
9#[derive(Clone, Debug, Default, Decode, Encode)]
10pub struct IceorixCuMsg<P: CuMsgPayload>(CuMsg<P>);
11
12unsafe impl<P: CuMsgPayload> ZeroCopySend for IceorixCuMsg<P> {}
13/// This is a sink task that sends messages to an iceoryx2 service.
14/// P is the payload type of the messages.
15/// Copper messages and Iceoryx2 payloads are compatible.
16pub struct IceoryxSink<P>
17where
18    P: CuMsgPayload + 'static, // TODO: Maybe with something a little more generic we can be ROS2 compatible
19{
20    service_name: ServiceName,
21    node: iceoryx2::prelude::Node<ipc::Service>,
22    service: Option<PortFactory<ipc::Service, IceorixCuMsg<P>, ()>>,
23    publisher: Option<Publisher<ipc::Service, IceorixCuMsg<P>, ()>>,
24}
25
26impl<P> Freezable for IceoryxSink<P> where P: CuMsgPayload {}
27
28impl<P> CuSinkTask for IceoryxSink<P>
29where
30    P: CuMsgPayload + 'static,
31{
32    type Resources<'r> = ();
33    type Input<'m> = input_msg!(P);
34
35    fn new(config: Option<&ComponentConfig>, _resources: Self::Resources<'_>) -> CuResult<Self>
36    where
37        Self: Sized,
38    {
39        let config = config.ok_or_else(|| CuError::from("IceoryxSink: Missing configuration."))?;
40        let service_name_str = config.get::<String>("service").ok_or_else(|| {
41            CuError::from("IceoryxSink: Configuration requires 'service' key (string).")
42        })?;
43
44        debug!(
45            "IceoryxSink: Configuring service name: {}",
46            service_name_str.as_str()
47        );
48
49        let service_name = ServiceName::new(service_name_str.as_str()).map_err(|e| {
50            CuError::new_with_cause("IceoryxSink: Failed to create service name.", e)
51        })?;
52        let node: iceoryx2::prelude::Node<ipc::Service> =
53            NodeBuilder::new().create::<ipc::Service>().map_err(|e| {
54                CuError::new_with_cause(
55                    format!("IceoryxSink({service_name_str}): Failed to create node.").as_str(),
56                    e,
57                )
58            })?;
59
60        Ok(Self {
61            service_name,
62            node,
63            service: None,
64            publisher: None,
65        })
66    }
67
68    fn start(&mut self, _clock: &RobotClock) -> CuResult<()> {
69        let service = self
70            .node
71            .service_builder(&self.service_name)
72            .publish_subscribe::<IceorixCuMsg<P>>()
73            .open_or_create()
74            .map_err(|e| {
75                CuError::new_with_cause(
76                    format!(
77                        "IceoryxSink({}): Failed to create service.",
78                        self.service_name
79                    )
80                    .as_str(),
81                    e,
82                )
83            })?;
84
85        let publisher = service.publisher_builder().create().map_err(|e| {
86            CuError::new_with_cause(
87                format!(
88                    "IceoryxSink({}): Failed to create publisher.",
89                    self.service_name
90                )
91                .as_str(),
92                e,
93            )
94        })?;
95
96        self.service = Some(service);
97        self.publisher = Some(publisher);
98        Ok(())
99    }
100
101    fn process(&mut self, _clock: &RobotClock, input: &Self::Input<'_>) -> CuResult<()> {
102        let publisher = self.publisher.as_mut().ok_or_else(|| {
103            CuError::from(
104                format!("IceoryxSink({}): Publisher not found.", self.service_name).as_str(),
105            )
106        })?;
107
108        let dst = publisher.loan_uninit().map_err(|e| {
109            CuError::new_with_cause(
110                format!("IceoryxSink({}): Failed to loan uninit.", self.service_name).as_str(),
111                e,
112            )
113        })?;
114
115        let dst = dst.write_payload(IceorixCuMsg(input.clone()));
116
117        dst.send().map_err(|e| {
118            CuError::new_with_cause(
119                format!(
120                    "IceoryxSink({}): Failed to send message.",
121                    self.service_name
122                )
123                .as_str(),
124                e,
125            )
126        })?;
127
128        Ok(())
129    }
130
131    fn stop(&mut self, _clock: &RobotClock) -> CuResult<()> {
132        self.service = None;
133        self.publisher = None;
134        debug!("IceoryxSink({}): Stopped.", self.service_name.as_str());
135        Ok(())
136    }
137}