Skip to main content

cu_iceoryx2_bridge/
lib.rs

1#![cfg_attr(not(feature = "std"), no_std)]
2
3extern crate alloc;
4
5use alloc::boxed::Box;
6use alloc::format;
7use alloc::string::String;
8use alloc::vec::Vec;
9use core::any::Any;
10use core::cell::UnsafeCell;
11use core::marker::PhantomData;
12
13use cu29::cubridge::{BridgeChannel, BridgeChannelConfig, BridgeChannelSet, CuBridge};
14use cu29::prelude::*;
15use iceoryx2::node::node_name::NodeName;
16use iceoryx2::node::{Node, NodeBuilder};
17use iceoryx2::port::publisher::Publisher;
18use iceoryx2::port::subscriber::Subscriber;
19use iceoryx2::service::service_name::ServiceName;
20
21#[cfg(feature = "std")]
22use iceoryx2::service::ipc;
23#[cfg(not(feature = "std"))]
24use iceoryx2::service::local;
25
26#[cfg(feature = "std")]
27type IceoryxService = ipc::Service;
28#[cfg(not(feature = "std"))]
29type IceoryxService = local::Service;
30
31fn encode_message<Payload: CuMsgPayload>(msg: &CuMsg<Payload>) -> CuResult<Vec<u8>> {
32    bincode::encode_to_vec(msg, bincode::config::standard())
33        .map_err(|e| CuError::new_with_cause("Iceoryx2Bridge: bincode encode failed", e))
34}
35
36fn decode_message<Payload: CuMsgPayload>(bytes: &[u8]) -> CuResult<CuMsg<Payload>> {
37    let (decoded, _): (CuMsg<Payload>, usize) =
38        bincode::decode_from_slice(bytes, bincode::config::standard())
39            .map_err(|e| CuError::new_with_cause("Iceoryx2Bridge: bincode decode failed", e))?;
40    Ok(decoded)
41}
42
43#[derive(Clone, Debug)]
44struct IceoryxChannelConfig<Id: Copy> {
45    id: Id,
46    service: String,
47    max_payload_bytes: usize,
48}
49
50struct IceoryxTxChannel<Payload>
51where
52    Payload: CuMsgPayload + 'static,
53{
54    service_name: ServiceName,
55    publisher: Publisher<IceoryxService, [u8], ()>,
56    max_payload_bytes: usize,
57    _payload: PhantomData<Payload>,
58}
59
60struct IceoryxRxChannel<Payload>
61where
62    Payload: CuMsgPayload + 'static,
63{
64    service_name: ServiceName,
65    subscriber: Subscriber<IceoryxService, [u8], ()>,
66    _payload: PhantomData<Payload>,
67}
68
69struct IceoryxTxChannelEntry<Id: Copy> {
70    id: Id,
71    channel: Box<dyn Any>,
72}
73
74struct IceoryxRxChannelEntry<Id: Copy> {
75    id: Id,
76    channel: Box<dyn Any>,
77}
78
79struct IceoryxContext<TxId: Copy, RxId: Copy> {
80    node: Node<IceoryxService>,
81    tx_channels: Vec<IceoryxTxChannelEntry<TxId>>,
82    rx_channels: Vec<IceoryxRxChannelEntry<RxId>>,
83}
84
85struct RuntimeContext<TxId: Copy, RxId: Copy> {
86    inner: UnsafeCell<IceoryxContext<TxId, RxId>>,
87}
88
89impl<TxId: Copy, RxId: Copy> RuntimeContext<TxId, RxId> {
90    fn new(inner: IceoryxContext<TxId, RxId>) -> Self {
91        Self {
92            inner: UnsafeCell::new(inner),
93        }
94    }
95
96    fn get_mut(&mut self) -> &mut IceoryxContext<TxId, RxId> {
97        self.inner.get_mut()
98    }
99}
100
101// SAFETY:
102// `RuntimeContext` is only accessed via methods that require `&mut Iceoryx2Bridge`,
103// which provides exclusive access and prevents concurrent mutation.
104unsafe impl<TxId: Copy, RxId: Copy> Send for RuntimeContext<TxId, RxId> {}
105// SAFETY:
106// See `Send` rationale above; synchronized access is guaranteed by external `&mut self`.
107unsafe impl<TxId: Copy, RxId: Copy> Sync for RuntimeContext<TxId, RxId> {}
108
109#[derive(Reflect)]
110#[reflect(from_reflect = false, no_field_bounds, type_path = false)]
111pub struct Iceoryx2Bridge<Tx, Rx>
112where
113    Tx: BridgeChannelSet + 'static,
114    Rx: BridgeChannelSet + 'static,
115    Tx::Id: Send + Sync + 'static,
116    Rx::Id: Send + Sync + 'static,
117{
118    #[reflect(ignore)]
119    node_name: Option<NodeName>,
120    #[reflect(ignore)]
121    tx_channels: Vec<IceoryxChannelConfig<Tx::Id>>,
122    #[reflect(ignore)]
123    rx_channels: Vec<IceoryxChannelConfig<Rx::Id>>,
124    #[reflect(ignore)]
125    ctx: Option<Box<RuntimeContext<Tx::Id, Rx::Id>>>,
126}
127
128impl<Tx, Rx> Freezable for Iceoryx2Bridge<Tx, Rx>
129where
130    Tx: BridgeChannelSet + 'static,
131    Rx: BridgeChannelSet + 'static,
132    Tx::Id: Send + Sync + 'static,
133    Rx::Id: Send + Sync + 'static,
134{
135}
136
137impl<Tx, Rx> cu29::reflect::TypePath for Iceoryx2Bridge<Tx, Rx>
138where
139    Tx: BridgeChannelSet + 'static,
140    Rx: BridgeChannelSet + 'static,
141    Tx::Id: Send + Sync + 'static,
142    Rx::Id: Send + Sync + 'static,
143{
144    fn type_path() -> &'static str {
145        "cu_iceoryx2_bridge::Iceoryx2Bridge"
146    }
147
148    fn short_type_path() -> &'static str {
149        "Iceoryx2Bridge"
150    }
151
152    fn type_ident() -> Option<&'static str> {
153        Some("Iceoryx2Bridge")
154    }
155
156    fn crate_name() -> Option<&'static str> {
157        Some("cu_iceoryx2_bridge")
158    }
159
160    fn module_path() -> Option<&'static str> {
161        Some("cu_iceoryx2_bridge")
162    }
163}
164
165impl<Tx, Rx> Iceoryx2Bridge<Tx, Rx>
166where
167    Tx: BridgeChannelSet + 'static,
168    Rx: BridgeChannelSet + 'static,
169    Tx::Id: Send + Sync + 'static,
170    Rx::Id: Send + Sync + 'static,
171{
172    fn ctx_mut(&mut self) -> CuResult<&mut IceoryxContext<Tx::Id, Rx::Id>> {
173        let Some(ctx) = self.ctx.as_deref_mut() else {
174            return Err(CuError::from("Iceoryx2Bridge: Context not initialized"));
175        };
176        Ok(ctx.get_mut())
177    }
178
179    fn parse_default_max_payload(config: Option<&ComponentConfig>) -> CuResult<usize> {
180        if let Some(config) = config
181            && let Some(value) = config.get::<u64>("max_payload_bytes")?
182        {
183            return usize::try_from(value).map_err(|_| {
184                CuError::from("Iceoryx2Bridge: max_payload_bytes does not fit in usize")
185            });
186        }
187        Ok(64 * 1024)
188    }
189
190    fn parse_node_name(config: Option<&ComponentConfig>) -> CuResult<Option<NodeName>> {
191        if let Some(config) = config
192            && let Some(raw) = config.get::<String>("node_name")?
193        {
194            let node_name = NodeName::new(raw.as_str())
195                .map_err(|e| CuError::new_with_cause("Iceoryx2Bridge: Invalid node_name", e))?;
196            return Ok(Some(node_name));
197        }
198        Ok(None)
199    }
200
201    fn channel_route<Id: Copy + core::fmt::Debug>(
202        channel: &BridgeChannelConfig<Id>,
203    ) -> CuResult<String> {
204        channel
205            .effective_route()
206            .map(|route| route.into_owned())
207            .ok_or_else(|| {
208                let id = channel.channel.id;
209                CuError::from(format!(
210                    "Iceoryx2Bridge: Missing service name for channel {:?}",
211                    id
212                ))
213            })
214    }
215
216    fn channel_max_payload<Id: Copy>(
217        channel: &BridgeChannelConfig<Id>,
218        default: usize,
219    ) -> CuResult<usize> {
220        if let Some(config) = channel.config.as_ref()
221            && let Some(value) = config.get::<u64>("max_payload_bytes")?
222        {
223            return usize::try_from(value).map_err(|_| {
224                CuError::from("Iceoryx2Bridge: max_payload_bytes does not fit in usize")
225            });
226        }
227        Ok(default)
228    }
229
230    fn find_tx_config(&self, id: Tx::Id) -> Option<&IceoryxChannelConfig<Tx::Id>> {
231        self.tx_channels.iter().find(|channel| channel.id == id)
232    }
233
234    fn find_rx_config(&self, id: Rx::Id) -> Option<&IceoryxChannelConfig<Rx::Id>> {
235        self.rx_channels.iter().find(|channel| channel.id == id)
236    }
237
238    fn find_tx_channel_mut<Payload: CuMsgPayload + 'static>(
239        channels: &mut [IceoryxTxChannelEntry<Tx::Id>],
240        id: Tx::Id,
241    ) -> CuResult<Option<&mut IceoryxTxChannel<Payload>>> {
242        let entry = channels.iter_mut().find(|channel| channel.id == id);
243        if let Some(entry) = entry {
244            return entry
245                .channel
246                .downcast_mut::<IceoryxTxChannel<Payload>>()
247                .ok_or_else(|| CuError::from("Iceoryx2Bridge: Tx channel payload mismatch"))
248                .map(Some);
249        }
250        Ok(None)
251    }
252
253    fn find_rx_channel_mut<Payload: CuMsgPayload + 'static>(
254        channels: &mut [IceoryxRxChannelEntry<Rx::Id>],
255        id: Rx::Id,
256    ) -> CuResult<Option<&mut IceoryxRxChannel<Payload>>> {
257        let entry = channels.iter_mut().find(|channel| channel.id == id);
258        if let Some(entry) = entry {
259            return entry
260                .channel
261                .downcast_mut::<IceoryxRxChannel<Payload>>()
262                .ok_or_else(|| CuError::from("Iceoryx2Bridge: Rx channel payload mismatch"))
263                .map(Some);
264        }
265        Ok(None)
266    }
267}
268
269impl<Payload> IceoryxTxChannel<Payload>
270where
271    Payload: CuMsgPayload + 'static,
272{
273    fn new(
274        node: &mut Node<IceoryxService>,
275        service_str: &str,
276        max_payload_bytes: usize,
277    ) -> CuResult<Self> {
278        let service_name = ServiceName::new(service_str).map_err(|e| {
279            CuError::new_with_cause("Iceoryx2Bridge: Failed to create service name", e)
280        })?;
281
282        let service = node
283            .service_builder(&service_name)
284            .publish_subscribe::<[u8]>()
285            .open_or_create()
286            .map_err(|e| {
287                CuError::new_with_cause(
288                    format!(
289                        "Iceoryx2Bridge({}): Failed to create service",
290                        service_name.as_str()
291                    )
292                    .as_str(),
293                    e,
294                )
295            })?;
296
297        let publisher = service
298            .publisher_builder()
299            .initial_max_slice_len(max_payload_bytes)
300            .create()
301            .map_err(|e| {
302                CuError::new_with_cause(
303                    format!(
304                        "Iceoryx2Bridge({}): Failed to create publisher",
305                        service_name.as_str()
306                    )
307                    .as_str(),
308                    e,
309                )
310            })?;
311
312        Ok(Self {
313            service_name,
314            publisher,
315            max_payload_bytes,
316            _payload: PhantomData,
317        })
318    }
319
320    fn send(&mut self, msg: &CuMsg<Payload>) -> CuResult<()> {
321        let encoded = encode_message(msg)?;
322        if encoded.len() > self.max_payload_bytes {
323            return Err(CuError::from(format!(
324                "Iceoryx2Bridge({}): payload size {} exceeds max_payload_bytes {}",
325                self.service_name,
326                encoded.len(),
327                self.max_payload_bytes
328            )));
329        }
330
331        let sample = self
332            .publisher
333            .loan_slice_uninit(encoded.len())
334            .map_err(|e| {
335                CuError::new_with_cause(
336                    format!(
337                        "Iceoryx2Bridge({}): Failed to loan sample",
338                        self.service_name
339                    )
340                    .as_str(),
341                    e,
342                )
343            })?;
344        let sample = sample.write_from_fn(|idx| encoded[idx]);
345        sample.send().map_err(|e| {
346            CuError::new_with_cause(
347                format!(
348                    "Iceoryx2Bridge({}): Failed to send sample",
349                    self.service_name
350                )
351                .as_str(),
352                e,
353            )
354        })?;
355        Ok(())
356    }
357}
358
359impl<Payload> IceoryxRxChannel<Payload>
360where
361    Payload: CuMsgPayload + 'static,
362{
363    fn new(node: &mut Node<IceoryxService>, service_str: &str) -> CuResult<Self> {
364        let service_name = ServiceName::new(service_str).map_err(|e| {
365            CuError::new_with_cause("Iceoryx2Bridge: Failed to create service name", e)
366        })?;
367
368        let service = node
369            .service_builder(&service_name)
370            .publish_subscribe::<[u8]>()
371            .open_or_create()
372            .map_err(|e| {
373                CuError::new_with_cause(
374                    format!(
375                        "Iceoryx2Bridge({}): Failed to create service",
376                        service_name.as_str()
377                    )
378                    .as_str(),
379                    e,
380                )
381            })?;
382
383        let subscriber = service.subscriber_builder().create().map_err(|e| {
384            CuError::new_with_cause(
385                format!(
386                    "Iceoryx2Bridge({}): Failed to create subscriber",
387                    service_name.as_str()
388                )
389                .as_str(),
390                e,
391            )
392        })?;
393
394        Ok(Self {
395            service_name,
396            subscriber,
397            _payload: PhantomData,
398        })
399    }
400
401    fn receive(&mut self, ctx: &CuContext, msg: &mut CuMsg<Payload>) -> CuResult<()> {
402        msg.tov = Tov::Time(ctx.now());
403        let sample = self.subscriber.receive().map_err(|e| {
404            CuError::new_with_cause(
405                format!("Iceoryx2Bridge({}): Receive failed", self.service_name).as_str(),
406                e,
407            )
408        })?;
409
410        if let Some(sample) = sample {
411            let payload = sample.payload();
412            let decoded = decode_message(payload)?;
413            *msg = decoded;
414        } else {
415            msg.clear_payload();
416        }
417        Ok(())
418    }
419}
420
421impl<Tx, Rx> CuBridge for Iceoryx2Bridge<Tx, Rx>
422where
423    Tx: BridgeChannelSet + 'static,
424    Rx: BridgeChannelSet + 'static,
425    Tx::Id: core::fmt::Debug + Send + Sync + 'static,
426    Rx::Id: core::fmt::Debug + Send + Sync + 'static,
427{
428    type Tx = Tx;
429    type Rx = Rx;
430    type Resources<'r> = ();
431
432    fn new(
433        config: Option<&ComponentConfig>,
434        tx_channels: &[BridgeChannelConfig<<Self::Tx as BridgeChannelSet>::Id>],
435        rx_channels: &[BridgeChannelConfig<<Self::Rx as BridgeChannelSet>::Id>],
436        _resources: Self::Resources<'_>,
437    ) -> CuResult<Self>
438    where
439        Self: Sized,
440    {
441        let node_name = Self::parse_node_name(config)?;
442        let default_max_payload = Self::parse_default_max_payload(config)?;
443
444        let mut tx_cfgs = Vec::with_capacity(tx_channels.len());
445        for channel in tx_channels {
446            let service = Self::channel_route(channel)?;
447            let max_payload = Self::channel_max_payload(channel, default_max_payload)?;
448            tx_cfgs.push(IceoryxChannelConfig {
449                id: channel.channel.id,
450                service,
451                max_payload_bytes: max_payload,
452            });
453        }
454
455        let mut rx_cfgs = Vec::with_capacity(rx_channels.len());
456        for channel in rx_channels {
457            let service = Self::channel_route(channel)?;
458            let max_payload = Self::channel_max_payload(channel, default_max_payload)?;
459            rx_cfgs.push(IceoryxChannelConfig {
460                id: channel.channel.id,
461                service,
462                max_payload_bytes: max_payload,
463            });
464        }
465
466        Ok(Self {
467            node_name,
468            tx_channels: tx_cfgs,
469            rx_channels: rx_cfgs,
470            ctx: None,
471        })
472    }
473
474    fn start(&mut self, _ctx: &CuContext) -> CuResult<()> {
475        let mut builder = NodeBuilder::new();
476        if let Some(name) = &self.node_name {
477            builder = builder.name(name);
478        }
479        let node = builder
480            .create::<IceoryxService>()
481            .map_err(|e| CuError::new_with_cause("Iceoryx2Bridge: Failed to create node", e))?;
482
483        let ctx = IceoryxContext::<Tx::Id, Rx::Id> {
484            node,
485            tx_channels: Vec::new(),
486            rx_channels: Vec::new(),
487        };
488        self.ctx = Some(Box::new(RuntimeContext::new(ctx)));
489        Ok(())
490    }
491
492    fn send<'a, Payload>(
493        &mut self,
494        _ctx: &CuContext,
495        channel: &'static BridgeChannel<<Self::Tx as BridgeChannelSet>::Id, Payload>,
496        msg: &CuMsg<Payload>,
497    ) -> CuResult<()>
498    where
499        Payload: CuMsgPayload + 'a + 'static,
500    {
501        let cfg = self.find_tx_config(channel.id()).ok_or_else(|| {
502            CuError::from(format!(
503                "Iceoryx2Bridge: Unknown Tx channel {:?}",
504                channel.id()
505            ))
506        })?;
507        let service = cfg.service.clone();
508        let max_payload_bytes = cfg.max_payload_bytes;
509
510        let ctx = self.ctx_mut()?;
511
512        if let Some(tx_channel) =
513            Self::find_tx_channel_mut::<Payload>(&mut ctx.tx_channels, channel.id())?
514        {
515            return tx_channel.send(msg);
516        }
517
518        let mut new_channel =
519            IceoryxTxChannel::<Payload>::new(&mut ctx.node, &service, max_payload_bytes)?;
520        new_channel.send(msg)?;
521        ctx.tx_channels.push(IceoryxTxChannelEntry {
522            id: channel.id(),
523            channel: Box::new(new_channel),
524        });
525        Ok(())
526    }
527
528    fn receive<'a, Payload>(
529        &mut self,
530        ctx: &CuContext,
531        channel: &'static BridgeChannel<<Self::Rx as BridgeChannelSet>::Id, Payload>,
532        msg: &mut CuMsg<Payload>,
533    ) -> CuResult<()>
534    where
535        Payload: CuMsgPayload + 'a + 'static,
536    {
537        let cfg = self.find_rx_config(channel.id()).ok_or_else(|| {
538            CuError::from(format!(
539                "Iceoryx2Bridge: Unknown Rx channel {:?}",
540                channel.id()
541            ))
542        })?;
543        let service = cfg.service.clone();
544
545        let runtime_ctx = self.ctx_mut()?;
546
547        if let Some(rx_channel) =
548            Self::find_rx_channel_mut::<Payload>(&mut runtime_ctx.rx_channels, channel.id())?
549        {
550            return rx_channel.receive(ctx, msg);
551        }
552
553        let mut new_channel = IceoryxRxChannel::<Payload>::new(&mut runtime_ctx.node, &service)?;
554        new_channel.receive(ctx, msg)?;
555        runtime_ctx.rx_channels.push(IceoryxRxChannelEntry {
556            id: channel.id(),
557            channel: Box::new(new_channel),
558        });
559        Ok(())
560    }
561
562    fn stop(&mut self, _ctx: &CuContext) -> CuResult<()> {
563        self.ctx = None;
564        Ok(())
565    }
566}