embedded_nano_mesh/mesh_lib/node/
mod.rs

1mod constants;
2mod packet;
3mod receiver;
4mod router;
5mod timer;
6mod transmitter;
7mod types;
8
9pub use packet::{
10    ExactAddressType, GeneralAddressType, IdType, LifeTimeType, Packet, PacketDataBytes,
11};
12
13pub use router::PacketState;
14use types::PacketQueue;
15pub use types::{ms, NodeString};
16
17use self::router::{RouteError, RouteResult, Router};
18
19/// The main and only structure of the library that brings API for
20/// communication trough the mesh network.
21/// It works in the manner of listening of ether for
22/// specified period of time, which is called `listen_period`,
23/// and then sending out packets out of queues between those periods.
24///
25/// Also node resends caught packets, that were addressed to other
26/// nodes.
27///
28/// It has next methods:
29/// * `new` -                   Creates new instance of `Node`.
30/// * `send_to_exact` -         Sends the `data` to exact device. Call of this method does not provide any
31///                             response back.
32/// * `broadcast` -             Sends the `data` to all devices. Call of this method does not provide any
33///                             response back.
34/// * `send_ping_pong` -        Sends the `data` to exact device, and the receiving device will
35///                             be forsed to make answer back. The answer from receiving device
36///                             may tell if sending was successful.
37/// * `send_with_transaction` - Sends the `data` to exact device, and the receiving device will
38///                             be forsed to make answer back. The answer from receiving device
39///                             will tell if sending was successful.
40/// * `update` -                Updates the state of the node. This method should be called in
41///                             every loop iteration.
42pub struct Node {
43    transmitter: transmitter::Transmitter,
44    receiver: receiver::Receiver,
45    my_address: ExactAddressType,
46    timer: timer::Timer,
47    received_packet_queue: PacketQueue,
48    router: Router,
49}
50
51/// Error that can be returned by `Node` `update` method.
52pub struct NodeUpdateError {
53    pub is_receive_queue_full: bool,
54    pub is_transit_queue_full: bool,
55}
56
57/// Error that can be returned by `Node` `send` method or `broadcast` method.
58pub enum SendError {
59    SendingQueueIsFull,
60}
61
62impl core::fmt::Debug for SendError {
63    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
64        match self {
65            SendError::SendingQueueIsFull => write!(f, "SendingQueueIsFull"),
66        }
67    }
68}
69
70/// Errors, that may occur during the call
71/// of `Node` `send_with_transaction` or `send_ping_pong` method.
72pub enum SpecialSendError {
73    /// Case when expected response was not received.
74    Timeout,
75
76    /// Case, when the limit of number of
77    /// packets to send isreached.
78    SendingQueueIsFull,
79}
80
81impl core::fmt::Debug for SpecialSendError {
82    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
83        match self {
84            SpecialSendError::Timeout => write!(f, "Timeout"),
85            SpecialSendError::SendingQueueIsFull => write!(f, "SendingQueueIsFull"),
86        }
87    }
88}
89
90impl From<SendError> for SpecialSendError {
91    fn from(value: SendError) -> Self {
92        match value {
93            SendError::SendingQueueIsFull => SpecialSendError::SendingQueueIsFull,
94        }
95    }
96}
97
98/// User-friendly `Node` configuration structure.
99pub struct NodeConfig {
100    /// Address of configurable device. Instance of `ExactAddressType`.
101    pub device_address: ExactAddressType,
102
103    /// Instance of `ms` type. The time period in
104    /// milliseconds that configured device will listen for incoming packets
105    /// before speaking back into the ether.
106    pub listen_period: ms,
107}
108
109impl Node {
110    /// New Method
111    /// To initialize a `Node`, you need to provide `NodeConfig` with values:
112    /// - `ExactAddressType`: Sets the device's identification address in the network. Multiple deivces can share same address in the same network.
113    /// - `listen_period`: Sets period in milliseconds that determines how long the device will wait before transmitting packet to the network. It prevents network congestion.
114
115    /// `main.rs`:
116    /// ```
117    /// let mut mesh_node = Node::new(NodeConfig {
118    ///     device_address: ExactAddressType::new(1).unwrap(),
119    ///     listen_period: 150 as ms,
120    /// });
121    /// ```
122    pub fn new(config: NodeConfig) -> Node {
123        Node {
124            transmitter: transmitter::Transmitter::new(),
125            receiver: receiver::Receiver::new(),
126            my_address: config.device_address.clone(),
127            timer: timer::Timer::new(config.listen_period),
128            received_packet_queue: PacketQueue::new(),
129            router: Router::new(config.device_address.into()),
130        }
131    }
132
133    /// Send Ping-Pong Method
134    /// Sends a message with a "ping" flag to the destination node and
135    /// waits for the same message with a "pong" flag. Return value tells that the end device have received
136    /// the message at least once or returns an error if the ping-pong exchange fails.
137    /// The following arguments are required:
138    ///
139    /// ```text
140    /// `Ping-Pong time diagram`:
141    ///            +----------+              +----------+
142    ///            |  Sender  |              | Receiver |
143    ///            +--------- +              +----------+
144    ///                 |                         |
145    /// Ping-pong start |   --------Ping------->  |   <-- Receiver has received the message
146    ///                 |                         |
147    /// Ping-pong finish|   <-------Pong--------  |
148    ///                 |                         |
149    ///```
150    ///`main.rs`:
151    ///```
152    ///let _ = mesh_node.send_ping_pong(
153    ///    message.into_bytes(),               // Content.
154    ///    ExactAddressType::new(2).unwrap(),  // Send to device with address 2.
155    ///    10 as LifeTimeType,                 // Let message travel 10 devices before being destroyed.
156    ///    1000 as ms,                         // Set timeout to 1000 ms.
157    ///    || {
158    ///        Instant::now()
159    ///            .duration_since(program_start_time)
160    ///            .as_millis() as ms
161    ///    },                                  // Closure providing current time in milliseconds.
162    ///    &mut serial,                        // IO interface.
163    ///);
164    ///```
165    ///
166    /// parameters:
167    /// * `data` - Is the instance of `PacketDataBytes`, which is just type alias of
168    /// heapless vector of bytes of special size. This size is configured in the
169    /// node/packet/config.rs file, and can be adjusted for case of other data size is needed.
170    /// `Note!` That all devices should have same version of protocol flashed, in order to
171    /// be able to correctly to communicate with each other.
172    ///
173    /// * `destination_device_identifier` is instance of ExactDeviceAddressType,
174    /// This is made to presend device's address within the network.
175    ///
176    /// *`lifetime` - is the instance of `LifeTimeType`. This value configures the count of
177    /// how many nodes - the packet will be able to pass. Also this value is needed
178    /// to void the ether being jammed by packets, that in theory might be echoed
179    /// by the nodes to the infinity...
180    ///
181    /// * `timeout` - Is the period of time in milliseconds that
182    /// this device will listen for response. In case if no response was caught during that
183    /// period of time, the method will return `Err(SpecialSendError::Timeout)`.
184    ///
185    /// * `millis_provider` - Is the closure that returns current time in milliseconds,
186    ///
187    /// * `interface_driver` - Is the instance of `embedded_serial::MutNonBlockingRx`
188    ///                        and `MutBlockingTx` traits.
189    ///                        In other words the driver which will be used to
190    ///                        read and write from the interface.
191    pub fn send_ping_pong<I, M>(
192        &mut self,
193        data: PacketDataBytes,
194        destination_device_identifier: ExactAddressType,
195        lifetime: LifeTimeType,
196        timeout: ms,
197        millis_provider: M,
198        interface_driver: &mut I,
199    ) -> Result<(), SpecialSendError>
200    where
201        I: embedded_io::ReadReady + embedded_io::Read + embedded_io::Write,
202        M: Fn() -> ms,
203    {
204        self._special_send(
205            data,
206            destination_device_identifier,
207            PacketState::Ping,
208            PacketState::Pong,
209            lifetime,
210            timeout,
211            millis_provider,
212            interface_driver,
213        )
214    }
215
216    /// Send with Transaction Method
217    /// Sends a message and handles all further work to
218    /// ensure the target device have received it only once.
219    /// Method returns an error if the transaction failed.
220    ///
221    /// ```text
222    /// `Transaction time diagram`:
223    ///                       +----------+              +----------+
224    ///                       |  Sender  |              | Receiver |
225    ///                       +--------- +              +----------+
226    ///                            |                         |
227    ///     *Transaction start     | ---SendTransaction--->  |
228    ///                            |                         |
229    ///                    /       | <--AcceptTransaction--  |
230    /// (increment packet id by 1) |                         |
231    ///                    \       | ---InitTransaction--->  |    <--- Receiver has received the message
232    ///                            |                         |
233    ///     *Transaction finish    | <--FinishTransaction--  |
234    ///                            |                         |
235    /// ```
236    ///
237    /// `main.rs`:
238    /// ```
239    /// match mesh_node.send_with_transaction(
240    ///     message.into_bytes(),               // Content.
241    ///     ExactAddressType::new(2).unwrap(),  // Send to device with address 2.
242    ///     10 as LifeTimeType,                 // Let message travel 10 devices before being destroyed.
243    ///     2000 as ms,                         // Wait 2 seconds for response.
244    ///     || {
245    ///         Instant::now()
246    ///             .duration_since(program_start_time)
247    ///             .as_millis() as ms
248    ///     },                                  // Closure providing current time in milliseconds.
249    ///     &mut serial,                        // IO interface.
250    /// );
251    /// ```
252    /// parameters:
253    /// * `data` - Is the instance of `PacketDataBytes`, which is just type alias of
254    /// heapless vector of bytes of special size. This size is configured in the
255    /// node/packet/config.rs file, and can be adjusted for case of other data size is needed.
256    /// `Note!` That all devices should have same version of protocol flashed, in order to
257    /// be able to correctly to communicate with each other.
258    ///
259    /// * `destination_device_identifier` is instance of `ExactDeviceAddressType`,
260    /// That type is made for simplicity of reading the code, and to strict possible mess-ups
261    /// during the usage of methods. It is made to present device id within the network.
262    ///
263    /// * `lifetime` - is the instance of `LifeTimeType`. This value configures the count of
264    /// how many nodes - the packet will be able to pass. Also this value is needed
265    /// to void the ether being jammed by packets, that in theory might be echoed
266    /// by the nodes to the infinity...
267    /// Each device, once passes transit packet trough it - it reduces packet's lifetime.
268    ///
269    /// * `timeout` - Is the period of time in milliseconds that
270    /// this device will wait until packet that finishes the transaction - arrives.
271    /// In case if no response was caught during that period of time, the method will
272    /// return `Err(SpecialSendError::Timeout)`.
273    ///
274    /// * `millis_provider` - Is the closure that returns current time in milliseconds,
275    ///
276    /// * `interface_driver` - Is the instance of `embedded_serial::MutNonBlockingRx`
277    ///                        and `MutBlockingTx` traits.
278    ///                        In other words the driver which will be used to
279    ///                        read and write from the interface.
280    pub fn send_with_transaction<I, M>(
281        &mut self,
282        data: PacketDataBytes,
283        destination_device_identifier: ExactAddressType,
284        lifetime: LifeTimeType,
285        timeout: ms,
286        millis_provider: M,
287        interface_driver: &mut I,
288    ) -> Result<(), SpecialSendError>
289    where
290        I: embedded_io::ReadReady + embedded_io::Read + embedded_io::Write,
291        M: Fn() -> ms,
292    {
293        self._special_send(
294            data,
295            destination_device_identifier,
296            PacketState::SendTransaction,
297            PacketState::FinishTransaction,
298            lifetime,
299            timeout,
300            millis_provider,
301            interface_driver,
302        )
303    }
304
305    fn _special_send<I, M>(
306        &mut self,
307        data: PacketDataBytes,
308        destination_device_identifier: ExactAddressType,
309        request_state: PacketState,
310        expected_response_state: PacketState,
311        lifetime: LifeTimeType,
312        timeout: ms,
313        millis_provider: M,
314        interface_driver: &mut I,
315    ) -> Result<(), SpecialSendError>
316    where
317        I: embedded_io::ReadReady + embedded_io::Read + embedded_io::Write,
318        M: Fn() -> ms,
319    {
320        let mut current_time = millis_provider();
321        let wait_end_time = current_time + timeout;
322
323        while let Some(_) = self.receive() {} // Flush out all messages in the queuee.
324
325        let expected_response_packet_id = match self._send(Packet::new(
326            self.my_address.into(),
327            destination_device_identifier.into(),
328            0,
329            lifetime,
330            request_state.clone(),
331            true,
332            data,
333        )) {
334            Err(any_err) => return Err(any_err.into()),
335            Ok(expected_response_packet_id) => match request_state {
336                // It is needed to wait for response packet with specific packet id.
337                // Following the transaction time diagram - it is expected the packet to
338                // have it's id increased three times.
339                PacketState::SendTransaction => expected_response_packet_id + 1,
340                PacketState::Ping => expected_response_packet_id,
341                _ => expected_response_packet_id,
342            },
343        };
344
345        while current_time < wait_end_time {
346            let _ = self.update(interface_driver, current_time);
347
348            if let Some(answer) = self.receive() {
349                if !(answer.source_device_identifier == destination_device_identifier.into()) {
350                    continue;
351                }
352                if !(answer.get_spec_state() == expected_response_state) {
353                    continue;
354                }
355                if !(answer.get_id() == expected_response_packet_id) {
356                    continue;
357                }
358                return Ok(());
359            }
360
361            current_time = millis_provider();
362        }
363
364        Err(SpecialSendError::Timeout)
365    }
366
367    /// Send to exact Method
368    /// Sends the message to device with exact address in the network.
369    /// The `send_to_exact` method requires the following arguments:
370    ///
371    /// `main.rs`:
372    /// ```
373    /// let _ = match mesh_node.send_to_exact(
374    ///     message.into_bytes(),              // Content.
375    ///     ExactAddressType::new(2).unwrap(), // Send to device with address 2.
376    ///     10 as LifeTimeType, // Let message travel 10 devices before being destroyed.
377    ///     true, // filter_out_duplication
378    /// );
379    /// ```
380    ///
381    /// * `data` - Is the instance of `PacketDataBytes`, which is just type alias of
382    /// heapless vector of bytes of special size. This size is configured in the
383    /// node/packet/config.rs file.
384    /// `Note!` That all devices should have same version of protocol flashed, in order to
385    /// have best compatibility with each other.
386    ///
387    /// * `destination_device_identifier` is instance of `ExactAddressType`,
388    /// That type is made to limit possible mess-ups during the usage of method.
389    ///
390    /// * `lifetime` - is the instance of `LifeTimeType`. This value configures the count of
391    /// how many nodes - the packet will be able to pass. Also this value is provided
392    /// to void the ether being jammed by packets, that in theory might be echoed
393    /// by other nodes to the infinity...
394    /// Each device, once passes transit packet trough it - it reduces packet's lifetime.
395    ///
396    /// * `filter_out_duplication` - Tells if the other devices shall ignore
397    /// echoes of this message. It is strongly recommended to use in order to make lower load
398    /// onto the network.
399    pub fn send_to_exact(
400        &mut self,
401        data: PacketDataBytes,
402        destination_device_identifier: ExactAddressType,
403        lifetime: LifeTimeType,
404        filter_out_duplication: bool,
405    ) -> Result<(), SendError> {
406        match self._send(Packet::new(
407            self.my_address.into(),
408            destination_device_identifier.into(),
409            0, // Anyway it will be set later in the trasmitter.
410            lifetime,
411            PacketState::Normal,
412            filter_out_duplication,
413            data,
414        )) {
415            Ok(_) => Ok(()),
416            Err(err) => Err(err),
417        }
418    }
419
420    /// Broadcast Method
421    /// Shares the message to all nodes in the network.
422    /// Distance of sharing is set by `lifetime` parameter.
423    /// It sends packet with destination address set as
424    /// `GeneralAddressType::BROADCAST`. Every device will treats `GeneralAddressType::Broadcast`
425    /// as it's own address, so they keep the message as received and transits copy of that message further.
426    /// `main.rs`:
427    /// ```
428    /// let _ = mesh_node.broadcast(
429    ///     message.into_bytes(), // data.
430    ///     10 as LifeTimeType,   // lifetime.
431    /// );
432    /// ```
433    /// Sends the `data` to all devices.
434    ///
435    /// * `data` - Is the instance of `PacketDataBytes`, which is just type alias of
436    /// heapless vector of bytes of special size. This size is configured in the
437    /// node/packet/config.rs file.
438    /// `Note!` That all devices should have same version of protocol flashed, in order to
439    /// be able to correctly to communicate with each other.
440    ///
441    /// * `lifetime` - is the instance of `LifeTimeType`. This value configures the count of
442    /// how many nodes - the packet will be able to pass. Also this value is provided
443    /// to void the ether being jammed by packets, that in theory might be echoed
444    /// by other nodes to the infinity...
445    /// Each device, once passes transit packet trough it - it reduces packet's lifetime.
446    pub fn broadcast(
447        &mut self,
448        data: PacketDataBytes,
449        lifetime: LifeTimeType,
450    ) -> Result<(), SendError> {
451        match self._send(Packet::new(
452            self.my_address.into(),
453            GeneralAddressType::Broadcast.into(),
454            0,
455            lifetime,
456            PacketState::Normal,
457            true,
458            data,
459        )) {
460            Ok(_) => Ok(()),
461            Err(err) => Err(err),
462        }
463    }
464
465    fn _send(&mut self, packet: Packet) -> Result<IdType, SendError> {
466        match self.transmitter.send(packet) {
467            Ok(generated_packet_id) => Ok(generated_packet_id),
468            Err(transmitter::PacketQueueIsFull) => Err(SendError::SendingQueueIsFull),
469        }
470    }
471
472    /// Receive Method
473    /// Optionally returns `PacketDataBytes` instance with data,
474    /// which has been send exactly to this device, or has been
475    /// `broadcast`ed trough all the network.
476    ///
477    /// You can tell which type the packet is by matching `special_state` field of returned `Packet` instance.
478    /// Field contains value of `PacketState` enum.
479    ///
480    /// `main.rs`:
481    /// ```
482    /// match mesh_node.receive() {
483    ///     Some(packet) => ...,
484    ///     Node => ....,
485    /// }
486    /// ```
487
488    pub fn receive(&mut self) -> Option<Packet> {
489        self.received_packet_queue.pop_front()
490    }
491
492    /// Update Method
493    /// The most important method.
494    /// During call of `update` method - it does all internal work:
495    /// - routes packets trough the network
496    /// - transits packets that were sent to other devices
497    /// - handles `lifetime` of packets
498    /// - handles special packets like `ping` and `pong`, or any kind of transaction one.
499    /// - saves received packets that will be available trough `receive` method.
500    /// - sends packets, that are in the `send` queue.
501    ///
502    /// As the protocol relies on physical device - it is crucial to provide
503    /// driver for communication interface.
504    /// Also node shall know if it's the time to broadcast into the ether or not,
505    /// so for that purpose the closure that counts milliseconds since program start
506    /// is required.
507    ///
508    /// Methods: `send_ping_pong`, `send_with_transaction` also relies on `millis_provider` closure and `interface_driver`.
509    /// With out call this method in a loop - the node will stop working.
510    ///
511    ///`main.rs`:
512    ///```
513    /// loop {
514    ///    let current_time = Instant::now()
515    ///        .duration_since(program_start_time)
516    ///        .as_millis() as ms;
517    ///
518    ///    let _ = mesh_node.update(&mut serial, current_time);
519    /// }
520    ///```
521
522    /// Does all necessary internal work of mesh node:
523    /// * Receives packets from ether, and manages their further life.
524    ///     ** Data that is addressed to other devices are going to be send back into ether.
525    ///     ** Data addressed to current device, will be unpacked and stored.
526    ///
527    /// * Call of this method also requires the general types to be passed in.
528    /// As the process relies onto timing countings and onto serial stream,
529    ///
530    /// parameters:
531    /// * `interface_driver` - is instance of `MutNonBlockingRx` and `MutBlockingTx`
532    /// traits.
533    ///
534    /// * `current_time` - Is a closure which returns current time in milliseconds
535    /// since the start of the program.
536    pub fn update<I>(
537        &mut self,
538        interface_driver: &mut I,
539        current_time: ms,
540    ) -> Result<(), NodeUpdateError>
541    where
542        I: embedded_io::ReadReady + embedded_io::Read + embedded_io::Write,
543    {
544        if self.timer.is_time_to_speak(current_time) {
545            self.transmitter.update(interface_driver);
546            self.timer.record_speak_time(current_time);
547        }
548        self.receiver.update(current_time, interface_driver);
549
550        let packet_to_route = match self.receiver.receive(current_time) {
551            Some(packet_to_handle) => packet_to_handle,
552            None => return Ok(()),
553        };
554
555        let (received_packet, transit_packet) = match self.router.route(packet_to_route) {
556            Ok(ok_case) => match ok_case {
557                RouteResult::ReceivedOnly(packet) => (Some(packet), None),
558                RouteResult::TransitOnly(transit) => (None, Some(transit)),
559                RouteResult::ReceivedAndTransit { received, transit } => {
560                    (Some(received), Some(transit))
561                }
562            },
563            Err(RouteError::PacketLifetimeEnded) => (None, None),
564            Err(RouteError::RespondToBroadcastAddressError) => (None, None),
565        };
566
567        let (mut is_receive_queue_full, mut is_transit_queue_full): (bool, bool) = (false, false);
568
569        if let Some(received_packet) = received_packet {
570            match self.received_packet_queue.push_back(received_packet) {
571                Ok(()) => (),
572                Err(_) => {
573                    is_receive_queue_full = true;
574                }
575            }
576        }
577
578        if let Some(transit_packet) = transit_packet {
579            match self.transmitter.send_transit(transit_packet) {
580                Ok(_) => (),
581                Err(transmitter::PacketTransitQueueIsFull) => {
582                    is_transit_queue_full = true;
583                }
584            }
585        }
586
587        if is_receive_queue_full || is_transit_queue_full {
588            return Err(NodeUpdateError {
589                is_receive_queue_full,
590                is_transit_queue_full,
591            });
592        } else {
593            Ok(())
594        }
595    }
596}