1#![cfg_attr(not(feature = "std"), no_std)]
2
3extern crate alloc;
4
5use alloc::boxed::Box;
6use alloc::format;
7use alloc::string::String;
8use alloc::vec::Vec;
9use core::any::Any;
10use core::cell::UnsafeCell;
11use core::marker::PhantomData;
12
13use cu29::cubridge::{BridgeChannel, BridgeChannelConfig, BridgeChannelSet, CuBridge};
14use cu29::prelude::*;
15use iceoryx2::node::node_name::NodeName;
16use iceoryx2::node::{Node, NodeBuilder};
17use iceoryx2::port::publisher::Publisher;
18use iceoryx2::port::subscriber::Subscriber;
19use iceoryx2::service::service_name::ServiceName;
20
21#[cfg(feature = "std")]
22use iceoryx2::service::ipc;
23#[cfg(not(feature = "std"))]
24use iceoryx2::service::local;
25
26#[cfg(feature = "std")]
27type IceoryxService = ipc::Service;
28#[cfg(not(feature = "std"))]
29type IceoryxService = local::Service;
30
31fn encode_message<Payload: CuMsgPayload>(msg: &CuMsg<Payload>) -> CuResult<Vec<u8>> {
32 bincode::encode_to_vec(msg, bincode::config::standard())
33 .map_err(|e| CuError::new_with_cause("Iceoryx2Bridge: bincode encode failed", e))
34}
35
36fn decode_message<Payload: CuMsgPayload>(bytes: &[u8]) -> CuResult<CuMsg<Payload>> {
37 let (decoded, _): (CuMsg<Payload>, usize) =
38 bincode::decode_from_slice(bytes, bincode::config::standard())
39 .map_err(|e| CuError::new_with_cause("Iceoryx2Bridge: bincode decode failed", e))?;
40 Ok(decoded)
41}
42
43#[derive(Clone, Debug)]
44struct IceoryxChannelConfig<Id: Copy> {
45 id: Id,
46 service: String,
47 max_payload_bytes: usize,
48}
49
50struct IceoryxTxChannel<Payload>
51where
52 Payload: CuMsgPayload + 'static,
53{
54 service_name: ServiceName,
55 publisher: Publisher<IceoryxService, [u8], ()>,
56 max_payload_bytes: usize,
57 _payload: PhantomData<Payload>,
58}
59
60struct IceoryxRxChannel<Payload>
61where
62 Payload: CuMsgPayload + 'static,
63{
64 service_name: ServiceName,
65 subscriber: Subscriber<IceoryxService, [u8], ()>,
66 _payload: PhantomData<Payload>,
67}
68
69struct IceoryxTxChannelEntry<Id: Copy> {
70 id: Id,
71 channel: Box<dyn Any>,
72}
73
74struct IceoryxRxChannelEntry<Id: Copy> {
75 id: Id,
76 channel: Box<dyn Any>,
77}
78
79struct IceoryxContext<TxId: Copy, RxId: Copy> {
80 node: Node<IceoryxService>,
81 tx_channels: Vec<IceoryxTxChannelEntry<TxId>>,
82 rx_channels: Vec<IceoryxRxChannelEntry<RxId>>,
83}
84
85struct RuntimeContext<TxId: Copy, RxId: Copy> {
86 inner: UnsafeCell<IceoryxContext<TxId, RxId>>,
87}
88
89impl<TxId: Copy, RxId: Copy> RuntimeContext<TxId, RxId> {
90 fn new(inner: IceoryxContext<TxId, RxId>) -> Self {
91 Self {
92 inner: UnsafeCell::new(inner),
93 }
94 }
95
96 fn get_mut(&mut self) -> &mut IceoryxContext<TxId, RxId> {
97 self.inner.get_mut()
98 }
99}
100
101unsafe impl<TxId: Copy, RxId: Copy> Send for RuntimeContext<TxId, RxId> {}
105unsafe impl<TxId: Copy, RxId: Copy> Sync for RuntimeContext<TxId, RxId> {}
108
109#[derive(Reflect)]
110#[reflect(from_reflect = false, no_field_bounds, type_path = false)]
111pub struct Iceoryx2Bridge<Tx, Rx>
112where
113 Tx: BridgeChannelSet + 'static,
114 Rx: BridgeChannelSet + 'static,
115 Tx::Id: Send + Sync + 'static,
116 Rx::Id: Send + Sync + 'static,
117{
118 #[reflect(ignore)]
119 node_name: Option<NodeName>,
120 #[reflect(ignore)]
121 tx_channels: Vec<IceoryxChannelConfig<Tx::Id>>,
122 #[reflect(ignore)]
123 rx_channels: Vec<IceoryxChannelConfig<Rx::Id>>,
124 #[reflect(ignore)]
125 ctx: Option<Box<RuntimeContext<Tx::Id, Rx::Id>>>,
126}
127
128impl<Tx, Rx> Freezable for Iceoryx2Bridge<Tx, Rx>
129where
130 Tx: BridgeChannelSet + 'static,
131 Rx: BridgeChannelSet + 'static,
132 Tx::Id: Send + Sync + 'static,
133 Rx::Id: Send + Sync + 'static,
134{
135}
136
137impl<Tx, Rx> cu29::reflect::TypePath for Iceoryx2Bridge<Tx, Rx>
138where
139 Tx: BridgeChannelSet + 'static,
140 Rx: BridgeChannelSet + 'static,
141 Tx::Id: Send + Sync + 'static,
142 Rx::Id: Send + Sync + 'static,
143{
144 fn type_path() -> &'static str {
145 "cu_iceoryx2_bridge::Iceoryx2Bridge"
146 }
147
148 fn short_type_path() -> &'static str {
149 "Iceoryx2Bridge"
150 }
151
152 fn type_ident() -> Option<&'static str> {
153 Some("Iceoryx2Bridge")
154 }
155
156 fn crate_name() -> Option<&'static str> {
157 Some("cu_iceoryx2_bridge")
158 }
159
160 fn module_path() -> Option<&'static str> {
161 Some("cu_iceoryx2_bridge")
162 }
163}
164
165impl<Tx, Rx> Iceoryx2Bridge<Tx, Rx>
166where
167 Tx: BridgeChannelSet + 'static,
168 Rx: BridgeChannelSet + 'static,
169 Tx::Id: Send + Sync + 'static,
170 Rx::Id: Send + Sync + 'static,
171{
172 fn ctx_mut(&mut self) -> CuResult<&mut IceoryxContext<Tx::Id, Rx::Id>> {
173 let Some(ctx) = self.ctx.as_deref_mut() else {
174 return Err(CuError::from("Iceoryx2Bridge: Context not initialized"));
175 };
176 Ok(ctx.get_mut())
177 }
178
179 fn parse_default_max_payload(config: Option<&ComponentConfig>) -> CuResult<usize> {
180 if let Some(config) = config
181 && let Some(value) = config.get::<u64>("max_payload_bytes")?
182 {
183 return usize::try_from(value).map_err(|_| {
184 CuError::from("Iceoryx2Bridge: max_payload_bytes does not fit in usize")
185 });
186 }
187 Ok(64 * 1024)
188 }
189
190 fn parse_node_name(config: Option<&ComponentConfig>) -> CuResult<Option<NodeName>> {
191 if let Some(config) = config
192 && let Some(raw) = config.get::<String>("node_name")?
193 {
194 let node_name = NodeName::new(raw.as_str())
195 .map_err(|e| CuError::new_with_cause("Iceoryx2Bridge: Invalid node_name", e))?;
196 return Ok(Some(node_name));
197 }
198 Ok(None)
199 }
200
201 fn channel_route<Id: Copy + core::fmt::Debug>(
202 channel: &BridgeChannelConfig<Id>,
203 ) -> CuResult<String> {
204 channel
205 .effective_route()
206 .map(|route| route.into_owned())
207 .ok_or_else(|| {
208 let id = channel.channel.id;
209 CuError::from(format!(
210 "Iceoryx2Bridge: Missing service name for channel {:?}",
211 id
212 ))
213 })
214 }
215
216 fn channel_max_payload<Id: Copy>(
217 channel: &BridgeChannelConfig<Id>,
218 default: usize,
219 ) -> CuResult<usize> {
220 if let Some(config) = channel.config.as_ref()
221 && let Some(value) = config.get::<u64>("max_payload_bytes")?
222 {
223 return usize::try_from(value).map_err(|_| {
224 CuError::from("Iceoryx2Bridge: max_payload_bytes does not fit in usize")
225 });
226 }
227 Ok(default)
228 }
229
230 fn find_tx_config(&self, id: Tx::Id) -> Option<&IceoryxChannelConfig<Tx::Id>> {
231 self.tx_channels.iter().find(|channel| channel.id == id)
232 }
233
234 fn find_rx_config(&self, id: Rx::Id) -> Option<&IceoryxChannelConfig<Rx::Id>> {
235 self.rx_channels.iter().find(|channel| channel.id == id)
236 }
237
238 fn find_tx_channel_mut<Payload: CuMsgPayload + 'static>(
239 channels: &mut [IceoryxTxChannelEntry<Tx::Id>],
240 id: Tx::Id,
241 ) -> CuResult<Option<&mut IceoryxTxChannel<Payload>>> {
242 let entry = channels.iter_mut().find(|channel| channel.id == id);
243 if let Some(entry) = entry {
244 return entry
245 .channel
246 .downcast_mut::<IceoryxTxChannel<Payload>>()
247 .ok_or_else(|| CuError::from("Iceoryx2Bridge: Tx channel payload mismatch"))
248 .map(Some);
249 }
250 Ok(None)
251 }
252
253 fn find_rx_channel_mut<Payload: CuMsgPayload + 'static>(
254 channels: &mut [IceoryxRxChannelEntry<Rx::Id>],
255 id: Rx::Id,
256 ) -> CuResult<Option<&mut IceoryxRxChannel<Payload>>> {
257 let entry = channels.iter_mut().find(|channel| channel.id == id);
258 if let Some(entry) = entry {
259 return entry
260 .channel
261 .downcast_mut::<IceoryxRxChannel<Payload>>()
262 .ok_or_else(|| CuError::from("Iceoryx2Bridge: Rx channel payload mismatch"))
263 .map(Some);
264 }
265 Ok(None)
266 }
267}
268
269impl<Payload> IceoryxTxChannel<Payload>
270where
271 Payload: CuMsgPayload + 'static,
272{
273 fn new(
274 node: &mut Node<IceoryxService>,
275 service_str: &str,
276 max_payload_bytes: usize,
277 ) -> CuResult<Self> {
278 let service_name = ServiceName::new(service_str).map_err(|e| {
279 CuError::new_with_cause("Iceoryx2Bridge: Failed to create service name", e)
280 })?;
281
282 let service = node
283 .service_builder(&service_name)
284 .publish_subscribe::<[u8]>()
285 .open_or_create()
286 .map_err(|e| {
287 CuError::new_with_cause(
288 format!(
289 "Iceoryx2Bridge({}): Failed to create service",
290 service_name.as_str()
291 )
292 .as_str(),
293 e,
294 )
295 })?;
296
297 let publisher = service
298 .publisher_builder()
299 .initial_max_slice_len(max_payload_bytes)
300 .create()
301 .map_err(|e| {
302 CuError::new_with_cause(
303 format!(
304 "Iceoryx2Bridge({}): Failed to create publisher",
305 service_name.as_str()
306 )
307 .as_str(),
308 e,
309 )
310 })?;
311
312 Ok(Self {
313 service_name,
314 publisher,
315 max_payload_bytes,
316 _payload: PhantomData,
317 })
318 }
319
320 fn send(&mut self, msg: &CuMsg<Payload>) -> CuResult<()> {
321 let encoded = encode_message(msg)?;
322 if encoded.len() > self.max_payload_bytes {
323 return Err(CuError::from(format!(
324 "Iceoryx2Bridge({}): payload size {} exceeds max_payload_bytes {}",
325 self.service_name,
326 encoded.len(),
327 self.max_payload_bytes
328 )));
329 }
330
331 let sample = self
332 .publisher
333 .loan_slice_uninit(encoded.len())
334 .map_err(|e| {
335 CuError::new_with_cause(
336 format!(
337 "Iceoryx2Bridge({}): Failed to loan sample",
338 self.service_name
339 )
340 .as_str(),
341 e,
342 )
343 })?;
344 let sample = sample.write_from_fn(|idx| encoded[idx]);
345 sample.send().map_err(|e| {
346 CuError::new_with_cause(
347 format!(
348 "Iceoryx2Bridge({}): Failed to send sample",
349 self.service_name
350 )
351 .as_str(),
352 e,
353 )
354 })?;
355 Ok(())
356 }
357}
358
359impl<Payload> IceoryxRxChannel<Payload>
360where
361 Payload: CuMsgPayload + 'static,
362{
363 fn new(node: &mut Node<IceoryxService>, service_str: &str) -> CuResult<Self> {
364 let service_name = ServiceName::new(service_str).map_err(|e| {
365 CuError::new_with_cause("Iceoryx2Bridge: Failed to create service name", e)
366 })?;
367
368 let service = node
369 .service_builder(&service_name)
370 .publish_subscribe::<[u8]>()
371 .open_or_create()
372 .map_err(|e| {
373 CuError::new_with_cause(
374 format!(
375 "Iceoryx2Bridge({}): Failed to create service",
376 service_name.as_str()
377 )
378 .as_str(),
379 e,
380 )
381 })?;
382
383 let subscriber = service.subscriber_builder().create().map_err(|e| {
384 CuError::new_with_cause(
385 format!(
386 "Iceoryx2Bridge({}): Failed to create subscriber",
387 service_name.as_str()
388 )
389 .as_str(),
390 e,
391 )
392 })?;
393
394 Ok(Self {
395 service_name,
396 subscriber,
397 _payload: PhantomData,
398 })
399 }
400
401 fn receive(&mut self, ctx: &CuContext, msg: &mut CuMsg<Payload>) -> CuResult<()> {
402 msg.tov = Tov::Time(ctx.now());
403 let sample = self.subscriber.receive().map_err(|e| {
404 CuError::new_with_cause(
405 format!("Iceoryx2Bridge({}): Receive failed", self.service_name).as_str(),
406 e,
407 )
408 })?;
409
410 if let Some(sample) = sample {
411 let payload = sample.payload();
412 let decoded = decode_message(payload)?;
413 *msg = decoded;
414 } else {
415 msg.clear_payload();
416 }
417 Ok(())
418 }
419}
420
421impl<Tx, Rx> CuBridge for Iceoryx2Bridge<Tx, Rx>
422where
423 Tx: BridgeChannelSet + 'static,
424 Rx: BridgeChannelSet + 'static,
425 Tx::Id: core::fmt::Debug + Send + Sync + 'static,
426 Rx::Id: core::fmt::Debug + Send + Sync + 'static,
427{
428 type Tx = Tx;
429 type Rx = Rx;
430 type Resources<'r> = ();
431
432 fn new(
433 config: Option<&ComponentConfig>,
434 tx_channels: &[BridgeChannelConfig<<Self::Tx as BridgeChannelSet>::Id>],
435 rx_channels: &[BridgeChannelConfig<<Self::Rx as BridgeChannelSet>::Id>],
436 _resources: Self::Resources<'_>,
437 ) -> CuResult<Self>
438 where
439 Self: Sized,
440 {
441 let node_name = Self::parse_node_name(config)?;
442 let default_max_payload = Self::parse_default_max_payload(config)?;
443
444 let mut tx_cfgs = Vec::with_capacity(tx_channels.len());
445 for channel in tx_channels {
446 let service = Self::channel_route(channel)?;
447 let max_payload = Self::channel_max_payload(channel, default_max_payload)?;
448 tx_cfgs.push(IceoryxChannelConfig {
449 id: channel.channel.id,
450 service,
451 max_payload_bytes: max_payload,
452 });
453 }
454
455 let mut rx_cfgs = Vec::with_capacity(rx_channels.len());
456 for channel in rx_channels {
457 let service = Self::channel_route(channel)?;
458 let max_payload = Self::channel_max_payload(channel, default_max_payload)?;
459 rx_cfgs.push(IceoryxChannelConfig {
460 id: channel.channel.id,
461 service,
462 max_payload_bytes: max_payload,
463 });
464 }
465
466 Ok(Self {
467 node_name,
468 tx_channels: tx_cfgs,
469 rx_channels: rx_cfgs,
470 ctx: None,
471 })
472 }
473
474 fn start(&mut self, _ctx: &CuContext) -> CuResult<()> {
475 let mut builder = NodeBuilder::new();
476 if let Some(name) = &self.node_name {
477 builder = builder.name(name);
478 }
479 let node = builder
480 .create::<IceoryxService>()
481 .map_err(|e| CuError::new_with_cause("Iceoryx2Bridge: Failed to create node", e))?;
482
483 let ctx = IceoryxContext::<Tx::Id, Rx::Id> {
484 node,
485 tx_channels: Vec::new(),
486 rx_channels: Vec::new(),
487 };
488 self.ctx = Some(Box::new(RuntimeContext::new(ctx)));
489 Ok(())
490 }
491
492 fn send<'a, Payload>(
493 &mut self,
494 _ctx: &CuContext,
495 channel: &'static BridgeChannel<<Self::Tx as BridgeChannelSet>::Id, Payload>,
496 msg: &CuMsg<Payload>,
497 ) -> CuResult<()>
498 where
499 Payload: CuMsgPayload + 'a + 'static,
500 {
501 let cfg = self.find_tx_config(channel.id()).ok_or_else(|| {
502 CuError::from(format!(
503 "Iceoryx2Bridge: Unknown Tx channel {:?}",
504 channel.id()
505 ))
506 })?;
507 let service = cfg.service.clone();
508 let max_payload_bytes = cfg.max_payload_bytes;
509
510 let ctx = self.ctx_mut()?;
511
512 if let Some(tx_channel) =
513 Self::find_tx_channel_mut::<Payload>(&mut ctx.tx_channels, channel.id())?
514 {
515 return tx_channel.send(msg);
516 }
517
518 let mut new_channel =
519 IceoryxTxChannel::<Payload>::new(&mut ctx.node, &service, max_payload_bytes)?;
520 new_channel.send(msg)?;
521 ctx.tx_channels.push(IceoryxTxChannelEntry {
522 id: channel.id(),
523 channel: Box::new(new_channel),
524 });
525 Ok(())
526 }
527
528 fn receive<'a, Payload>(
529 &mut self,
530 ctx: &CuContext,
531 channel: &'static BridgeChannel<<Self::Rx as BridgeChannelSet>::Id, Payload>,
532 msg: &mut CuMsg<Payload>,
533 ) -> CuResult<()>
534 where
535 Payload: CuMsgPayload + 'a + 'static,
536 {
537 let cfg = self.find_rx_config(channel.id()).ok_or_else(|| {
538 CuError::from(format!(
539 "Iceoryx2Bridge: Unknown Rx channel {:?}",
540 channel.id()
541 ))
542 })?;
543 let service = cfg.service.clone();
544
545 let runtime_ctx = self.ctx_mut()?;
546
547 if let Some(rx_channel) =
548 Self::find_rx_channel_mut::<Payload>(&mut runtime_ctx.rx_channels, channel.id())?
549 {
550 return rx_channel.receive(ctx, msg);
551 }
552
553 let mut new_channel = IceoryxRxChannel::<Payload>::new(&mut runtime_ctx.node, &service)?;
554 new_channel.receive(ctx, msg)?;
555 runtime_ctx.rx_channels.push(IceoryxRxChannelEntry {
556 id: channel.id(),
557 channel: Box::new(new_channel),
558 });
559 Ok(())
560 }
561
562 fn stop(&mut self, _ctx: &CuContext) -> CuResult<()> {
563 self.ctx = None;
564 Ok(())
565 }
566}