canadensis/node/
basic.rs

1use crate::core::transport::Transmitter;
2use crate::node::{MinimalNode, NodeError};
3use crate::{Node, PublishError, ResponseToken, ServiceToken, StartSendError, TransferHandler};
4use alloc::vec::Vec;
5use canadensis_core::time::{milliseconds, MicrosecondDuration32};
6use canadensis_core::transfer::ServiceTransfer;
7use canadensis_core::transport::{Receiver, Transport};
8use canadensis_core::{nb, Priority, ServiceId, ServiceSubscribeError, SubjectId};
9use canadensis_data_types::uavcan::node::get_info_1_0::{self, GetInfoResponse};
10use canadensis_data_types::uavcan::node::health_1_0::Health;
11use canadensis_data_types::uavcan::node::heartbeat_1_0;
12use canadensis_data_types::uavcan::node::mode_1_0::Mode;
13use canadensis_data_types::uavcan::node::port::list_1_0::{self, List};
14use canadensis_data_types::uavcan::node::port::service_id_list_1_0::ServiceIDList;
15use canadensis_data_types::uavcan::node::port::subject_id_1_0;
16use canadensis_data_types::uavcan::node::port::subject_id_list_1_0::SubjectIDList;
17use canadensis_encoding::bits::BitArray;
18use canadensis_encoding::{Message, Request, Response, Serialize};
19
20/// A node that provides all basic application-layer functionality
21///
22/// This node performs the following functions:
23///
24/// * Sending a `uavcan.node.Heartbeat` every second
25/// * Responding to `uavcan.node.GetInfo` requests
26/// * Sending a `uavcan.node.port.List` message every 10 seconds
27///
28/// A BasicNode uses up two publisher slots in the underlying node.
29///
30/// The underlying node type `N` is usually a [`CoreNode`](crate::node::CoreNode).
31pub struct BasicNode<N>
32where
33    N: Node,
34{
35    node: MinimalNode<N>,
36    port_list: List,
37    node_info: GetInfoResponse,
38    seconds_since_port_list_published: u8,
39}
40
41impl<N> BasicNode<N>
42where
43    N: Node,
44{
45    /// Creates a new basic node
46    ///
47    /// * `node`: The underlying node (this is usually a [`CoreNode`](crate::node::CoreNode))
48    /// * `node_info`: The information that should be returned when handling node information requests
49    pub fn new(
50        mut node: N,
51        node_info: GetInfoResponse,
52    ) -> Result<
53        Self,
54        NodeError<
55            StartSendError<<N::Transmitter as Transmitter<N::Clock>>::Error>,
56            ServiceSubscribeError<<N::Receiver as Receiver<N::Clock>>::Error>,
57        >,
58    > {
59        // The MinimalNode takes care of heartbeats.
60        // Do node info and port list here.
61
62        node.subscribe_request(get_info_1_0::SERVICE, 0, milliseconds(1000))
63            .map_err(NodeError::Receiver)?;
64        node.start_publishing(
65            list_1_0::SUBJECT,
66            milliseconds(1000),
67            Priority::Optional.into(),
68        )
69        .map_err(NodeError::Transmitter)?;
70
71        let minimal = MinimalNode::new(node).map_err(NodeError::Transmitter)?;
72
73        // Initialize the port list with the Heartbeat publisher, GetInfo responder, and List publisher
74        let port_list = List {
75            publishers: SubjectIDList::SparseList({
76                let mut published_topics = heapless::Vec::new();
77                published_topics
78                    .push(subject_id_1_0::SubjectID {
79                        value: heartbeat_1_0::SUBJECT.into(),
80                    })
81                    .ok()
82                    .unwrap();
83                published_topics
84                    .push(subject_id_1_0::SubjectID {
85                        value: list_1_0::SUBJECT.into(),
86                    })
87                    .ok()
88                    .unwrap();
89                published_topics
90            }),
91            subscribers: SubjectIDList::SparseList(heapless::Vec::new()),
92            clients: ServiceIDList {
93                mask: BitArray::new(512),
94            },
95            servers: {
96                let mut servers = BitArray::new(512);
97                servers.set(get_info_1_0::SERVICE.into(), true);
98                ServiceIDList { mask: servers }
99            },
100        };
101
102        Ok(BasicNode {
103            node: minimal,
104            port_list,
105            node_info,
106            seconds_since_port_list_published: 0,
107        })
108    }
109
110    /// This function must be called once per second to send heartbeat and port list messages
111    pub fn run_per_second_tasks(
112        &mut self,
113    ) -> nb::Result<(), PublishError<<N::Transmitter as Transmitter<N::Clock>>::Error>> {
114        self.node.run_per_second_tasks()?;
115        if self.seconds_since_port_list_published == 10 {
116            self.seconds_since_port_list_published = 1;
117            self.publish_port_list()?;
118        } else {
119            self.seconds_since_port_list_published += 1;
120        }
121        Ok(())
122    }
123
124    fn publish_port_list(
125        &mut self,
126    ) -> nb::Result<(), PublishError<<N::Transmitter as Transmitter<N::Clock>>::Error>> {
127        self.node
128            .node_mut()
129            .publish(list_1_0::SUBJECT, &self.port_list)
130    }
131
132    /// Sets the operating mode that will be reported in the heartbeat messages
133    pub fn set_mode(&mut self, mode: Mode) {
134        self.node.set_mode(mode);
135    }
136    /// Sets the health status that will be reported in the heartbeat messages
137    pub fn set_health(&mut self, health: Health) {
138        self.node.set_health(health);
139    }
140    /// Sets the vendor-specific status code that will be reported in the heartbeat messages
141    pub fn set_status_code(&mut self, status: u8) {
142        self.node.set_status_code(status);
143    }
144
145    /// Returns a reference to the enclosed node
146    pub fn node(&self) -> &N {
147        self.node.node()
148    }
149    /// Returns a mutable reference to the enclosed node
150    pub fn node_mut(&mut self) -> &mut N {
151        self.node.node_mut()
152    }
153
154    /// Returns a reference to the enclosed minimal node
155    pub fn minimal_node(&self) -> &MinimalNode<N> {
156        &self.node
157    }
158    /// Returns a mutable reference to the enclosed minimal node
159    pub fn minimal_node_mut(&mut self) -> &mut MinimalNode<N> {
160        &mut self.node
161    }
162}
163
164impl<N> Node for BasicNode<N>
165where
166    N: Node,
167{
168    type Clock = N::Clock;
169    type Transport = N::Transport;
170    type Transmitter = N::Transmitter;
171    type Receiver = N::Receiver;
172
173    fn receive<H>(
174        &mut self,
175        handler: &mut H,
176    ) -> Result<(), <N::Receiver as Receiver<N::Clock>>::Error>
177    where
178        H: TransferHandler<Self::Transport>,
179    {
180        let mut chained_handler = NodeInfoHandler {
181            response: &self.node_info,
182        }
183        .chain(handler);
184        self.node.node_mut().receive(&mut chained_handler)
185    }
186
187    fn start_publishing(
188        &mut self,
189        subject: SubjectId,
190        timeout: MicrosecondDuration32,
191        priority: <Self::Transport as Transport>::Priority,
192    ) -> Result<(), StartSendError<<N::Transmitter as Transmitter<N::Clock>>::Error>> {
193        self.node
194            .node_mut()
195            .start_publishing(subject, timeout, priority)?;
196        // Record that this port is in use
197        insert_into_list(&mut self.port_list.publishers, subject);
198        Ok(())
199    }
200
201    fn stop_publishing(&mut self, subject: SubjectId) {
202        self.node.node_mut().stop_publishing(subject);
203        remove_from_list(&mut self.port_list.publishers, subject);
204    }
205
206    fn publish<T>(
207        &mut self,
208        subject: SubjectId,
209        payload: &T,
210    ) -> nb::Result<(), PublishError<<N::Transmitter as Transmitter<N::Clock>>::Error>>
211    where
212        T: Message + Serialize,
213    {
214        self.node.node_mut().publish(subject, payload)
215    }
216
217    fn publish_loopback<T>(
218        &mut self,
219        subject: SubjectId,
220        payload: &T,
221    ) -> nb::Result<(), PublishError<<Self::Transmitter as Transmitter<Self::Clock>>::Error>>
222    where
223        T: Message + Serialize,
224    {
225        self.node.node_mut().publish_loopback(subject, payload)
226    }
227
228    fn start_sending_requests<T>(
229        &mut self,
230        service: ServiceId,
231        receive_timeout: MicrosecondDuration32,
232        response_payload_size_max: usize,
233        priority: <Self::Transport as Transport>::Priority,
234    ) -> Result<ServiceToken<T>, StartSendError<<N::Receiver as Receiver<N::Clock>>::Error>>
235    where
236        T: Request,
237    {
238        let token = self.node.node_mut().start_sending_requests(
239            service,
240            receive_timeout,
241            response_payload_size_max,
242            priority,
243        )?;
244        // Record that this node is a client for the service
245        self.port_list.clients.mask.set(service.into(), true);
246
247        Ok(token)
248    }
249
250    fn stop_sending_requests<T>(&mut self, token: ServiceToken<T>)
251    where
252        T: Request,
253    {
254        let service_id = token.service_id();
255        self.node.node_mut().stop_sending_requests(token);
256        self.port_list.clients.mask.set(service_id.into(), false);
257    }
258
259    fn send_request<T>(
260        &mut self,
261        token: &ServiceToken<T>,
262        payload: &T,
263        destination: <Self::Transport as Transport>::NodeId,
264    ) -> nb::Result<
265        <Self::Transport as Transport>::TransferId,
266        <N::Transmitter as Transmitter<N::Clock>>::Error,
267    >
268    where
269        T: Request + Serialize,
270    {
271        self.node
272            .node_mut()
273            .send_request(token, payload, destination)
274    }
275
276    fn send_request_loopback<T>(
277        &mut self,
278        token: &ServiceToken<T>,
279        payload: &T,
280        destination: <Self::Transport as Transport>::NodeId,
281    ) -> nb::Result<
282        <Self::Transport as Transport>::TransferId,
283        <Self::Transmitter as Transmitter<Self::Clock>>::Error,
284    >
285    where
286        T: Request + Serialize,
287    {
288        self.node
289            .node_mut()
290            .send_request_loopback(token, payload, destination)
291    }
292
293    fn subscribe_message(
294        &mut self,
295        subject: SubjectId,
296        payload_size_max: usize,
297        timeout: MicrosecondDuration32,
298    ) -> Result<(), <N::Receiver as Receiver<N::Clock>>::Error> {
299        self.node
300            .node_mut()
301            .subscribe_message(subject, payload_size_max, timeout)?;
302
303        // Record that this node is subscribed
304        insert_into_list(&mut self.port_list.subscribers, subject);
305
306        Ok(())
307    }
308
309    fn unsubscribe_message(&mut self, subject: SubjectId) {
310        self.node.node_mut().unsubscribe_message(subject);
311        remove_from_list(&mut self.port_list.subscribers, subject);
312    }
313
314    fn subscribe_request(
315        &mut self,
316        service: ServiceId,
317        payload_size_max: usize,
318        timeout: MicrosecondDuration32,
319    ) -> Result<(), ServiceSubscribeError<<N::Receiver as Receiver<N::Clock>>::Error>> {
320        self.node
321            .node_mut()
322            .subscribe_request(service, payload_size_max, timeout)?;
323
324        // Record that this node provides the service
325        self.port_list.servers.mask.set(service.into(), true);
326
327        Ok(())
328    }
329
330    fn unsubscribe_request(&mut self, service: ServiceId) {
331        self.node.node_mut().unsubscribe_request(service);
332        self.port_list.servers.mask.set(service.into(), false);
333    }
334
335    fn send_response<T>(
336        &mut self,
337        token: ResponseToken<Self::Transport>,
338        timeout: MicrosecondDuration32,
339        payload: &T,
340    ) -> nb::Result<(), <N::Transmitter as Transmitter<N::Clock>>::Error>
341    where
342        T: Response + Serialize,
343    {
344        self.node.node_mut().send_response(token, timeout, payload)
345    }
346
347    fn flush(
348        &mut self,
349    ) -> canadensis_core::nb::Result<(), <N::Transmitter as Transmitter<N::Clock>>::Error> {
350        self.node.node_mut().flush()
351    }
352
353    fn clock(&self) -> &Self::Clock {
354        self.node.node().clock()
355    }
356
357    fn clock_mut(&mut self) -> &mut Self::Clock {
358        self.node.node_mut().clock_mut()
359    }
360
361    fn transmitter(&self) -> &Self::Transmitter {
362        self.node.node().transmitter()
363    }
364
365    fn transmitter_mut(&mut self) -> &mut Self::Transmitter {
366        self.node.node_mut().transmitter_mut()
367    }
368
369    fn receiver(&self) -> &Self::Receiver {
370        self.node.node().receiver()
371    }
372
373    fn receiver_mut(&mut self) -> &mut Self::Receiver {
374        self.node.node_mut().receiver_mut()
375    }
376
377    fn node_id(&self) -> Option<<Self::Transport as Transport>::NodeId> {
378        self.node.node().node_id()
379    }
380
381    fn set_node_id(&mut self, node_id: <Self::Transport as Transport>::NodeId) {
382        self.node.node_mut().set_node_id(node_id)
383    }
384
385    fn publishers(&self) -> impl Iterator<Item = SubjectId> {
386        self.node.node().publishers()
387    }
388
389    fn subscribers(&self) -> impl Iterator<Item = SubjectId> {
390        self.node.node().subscribers()
391    }
392
393    fn clients(&self) -> impl Iterator<Item = ServiceId> {
394        self.node.node().clients()
395    }
396
397    fn servers(&self) -> impl Iterator<Item = ServiceId> {
398        self.node.node().servers()
399    }
400}
401
402fn insert_into_list(subject_list: &mut SubjectIDList, subject: SubjectId) {
403    match subject_list {
404        SubjectIDList::Mask(mask) => {
405            mask.set(subject.into(), true);
406        }
407        SubjectIDList::SparseList(list) => {
408            // Check that this subject is not already in the list
409            if !list.iter().any(|in_list| in_list.value == subject.into()) {
410                match list.push(subject_id_1_0::SubjectID {
411                    value: subject.into(),
412                }) {
413                    Ok(_) => {}
414                    Err(_) => {
415                        // The list is full, need to switch to the mask representation
416                        let mut mask = BitArray::new(SubjectIDList::CAPACITY as usize);
417                        for port in list.iter() {
418                            mask.set(port.value.into(), true);
419                        }
420                        // Set the bit for the topic that's now subscribed
421                        mask.set(subject.into(), true);
422                        *subject_list = SubjectIDList::Mask(mask);
423                    }
424                }
425            }
426        }
427        SubjectIDList::Total(_) => { /* All subject IDs in use, can't add */ }
428    };
429}
430
431fn remove_from_list(subject_list: &mut SubjectIDList, subject: SubjectId) {
432    match subject_list {
433        SubjectIDList::Mask(mask) => {
434            mask.set(subject.into(), false);
435        }
436        SubjectIDList::SparseList(list) => {
437            if let Some(position) = list
438                .iter()
439                .position(|id_in_list| id_in_list.value == u16::from(subject))
440            {
441                list.swap_remove(position);
442            }
443        }
444        SubjectIDList::Total(_) => {
445            // Convert from total into a mask with everything except subject set to 1
446            let mut mask = BitArray::new(SubjectIDList::CAPACITY.into());
447            mask.fill(true);
448            mask.set(subject.into(), false);
449            *subject_list = SubjectIDList::Mask(mask);
450        }
451    }
452}
453
454/// Responds to NodeInfo requests with the provided response
455struct NodeInfoHandler<'r> {
456    response: &'r GetInfoResponse,
457}
458
459impl<'r, T> TransferHandler<T> for NodeInfoHandler<'r>
460where
461    T: Transport,
462{
463    fn handle_request<N: Node<Transport = T>>(
464        &mut self,
465        node: &mut N,
466        token: ResponseToken<T>,
467        transfer: &ServiceTransfer<Vec<u8>, T>,
468    ) -> bool {
469        if transfer.header.service == get_info_1_0::SERVICE {
470            let _ = node.send_response(token, milliseconds(1000), self.response);
471            true
472        } else {
473            false
474        }
475    }
476}