1use bincode::{Decode, Encode};
2use cu29::clock::RobotClock;
3use cu29::prelude::*;
4use iceoryx2::node::NodeBuilder;
5use iceoryx2::port::publisher::Publisher;
6use iceoryx2::prelude::*;
7use iceoryx2::service::port_factory::publish_subscribe::PortFactory;
8
9#[derive(Clone, Debug, Default, Decode, Encode)]
10pub struct IceorixCuMsg<P: CuMsgPayload>(CuMsg<P>);
11
12unsafe impl<P: CuMsgPayload> ZeroCopySend for IceorixCuMsg<P> {}
13pub struct IceoryxSink<P>
17where
18 P: CuMsgPayload + 'static, {
20 service_name: ServiceName,
21 node: iceoryx2::prelude::Node<ipc::Service>,
22 service: Option<PortFactory<ipc::Service, IceorixCuMsg<P>, ()>>,
23 publisher: Option<Publisher<ipc::Service, IceorixCuMsg<P>, ()>>,
24}
25
26impl<P> Freezable for IceoryxSink<P> where P: CuMsgPayload {}
27
28impl<P> CuSinkTask for IceoryxSink<P>
29where
30 P: CuMsgPayload + 'static,
31{
32 type Resources<'r> = ();
33 type Input<'m> = input_msg!(P);
34
35 fn new(config: Option<&ComponentConfig>, _resources: Self::Resources<'_>) -> CuResult<Self>
36 where
37 Self: Sized,
38 {
39 let config = config.ok_or_else(|| CuError::from("IceoryxSink: Missing configuration."))?;
40 let service_name_str = config.get::<String>("service").ok_or_else(|| {
41 CuError::from("IceoryxSink: Configuration requires 'service' key (string).")
42 })?;
43
44 debug!(
45 "IceoryxSink: Configuring service name: {}",
46 service_name_str.as_str()
47 );
48
49 let service_name = ServiceName::new(service_name_str.as_str()).map_err(|e| {
50 CuError::new_with_cause("IceoryxSink: Failed to create service name.", e)
51 })?;
52 let node: iceoryx2::prelude::Node<ipc::Service> =
53 NodeBuilder::new().create::<ipc::Service>().map_err(|e| {
54 CuError::new_with_cause(
55 format!("IceoryxSink({service_name_str}): Failed to create node.").as_str(),
56 e,
57 )
58 })?;
59
60 Ok(Self {
61 service_name,
62 node,
63 service: None,
64 publisher: None,
65 })
66 }
67
68 fn start(&mut self, _clock: &RobotClock) -> CuResult<()> {
69 let service = self
70 .node
71 .service_builder(&self.service_name)
72 .publish_subscribe::<IceorixCuMsg<P>>()
73 .open_or_create()
74 .map_err(|e| {
75 CuError::new_with_cause(
76 format!(
77 "IceoryxSink({}): Failed to create service.",
78 self.service_name
79 )
80 .as_str(),
81 e,
82 )
83 })?;
84
85 let publisher = service.publisher_builder().create().map_err(|e| {
86 CuError::new_with_cause(
87 format!(
88 "IceoryxSink({}): Failed to create publisher.",
89 self.service_name
90 )
91 .as_str(),
92 e,
93 )
94 })?;
95
96 self.service = Some(service);
97 self.publisher = Some(publisher);
98 Ok(())
99 }
100
101 fn process(&mut self, _clock: &RobotClock, input: &Self::Input<'_>) -> CuResult<()> {
102 let publisher = self.publisher.as_mut().ok_or_else(|| {
103 CuError::from(
104 format!("IceoryxSink({}): Publisher not found.", self.service_name).as_str(),
105 )
106 })?;
107
108 let dst = publisher.loan_uninit().map_err(|e| {
109 CuError::new_with_cause(
110 format!("IceoryxSink({}): Failed to loan uninit.", self.service_name).as_str(),
111 e,
112 )
113 })?;
114
115 let dst = dst.write_payload(IceorixCuMsg(input.clone()));
116
117 dst.send().map_err(|e| {
118 CuError::new_with_cause(
119 format!(
120 "IceoryxSink({}): Failed to send message.",
121 self.service_name
122 )
123 .as_str(),
124 e,
125 )
126 })?;
127
128 Ok(())
129 }
130
131 fn stop(&mut self, _clock: &RobotClock) -> CuResult<()> {
132 self.service = None;
133 self.publisher = None;
134 debug!("IceoryxSink({}): Stopped.", self.service_name.as_str());
135 Ok(())
136 }
137}