embedded_nano_mesh/mesh_lib/node/mod.rs
1mod constants;
2mod packet;
3mod receiver;
4mod router;
5mod timer;
6mod transmitter;
7mod types;
8
9pub use packet::{
10 ExactAddressType, GeneralAddressType, IdType, LifeTimeType, Packet, PacketDataBytes,
11};
12
13pub use router::PacketState;
14use types::PacketQueue;
15pub use types::{ms, NodeString};
16
17use self::router::{RouteError, RouteResult, Router};
18
19/// The main and only structure of the library that brings API for
20/// communication trough the mesh network.
21/// It works in the manner of listening of ether for
22/// specified period of time, which is called `listen_period`,
23/// and then sending out packets out of queues between those periods.
24///
25/// Also node resends caught packets, that were addressed to other
26/// nodes.
27///
28/// It has next methods:
29/// * `new` - Creates new instance of `Node`.
30/// * `send_to_exact` - Sends the `data` to exact device. Call of this method does not provide any
31/// response back.
32/// * `broadcast` - Sends the `data` to all devices. Call of this method does not provide any
33/// response back.
34/// * `send_ping_pong` - Sends the `data` to exact device, and the receiving device will
35/// be forsed to make answer back. The answer from receiving device
36/// may tell if sending was successful.
37/// * `send_with_transaction` - Sends the `data` to exact device, and the receiving device will
38/// be forsed to make answer back. The answer from receiving device
39/// will tell if sending was successful.
40/// * `update` - Updates the state of the node. This method should be called in
41/// every loop iteration.
42pub struct Node {
43 transmitter: transmitter::Transmitter,
44 receiver: receiver::Receiver,
45 my_address: ExactAddressType,
46 timer: timer::Timer,
47 received_packet_queue: PacketQueue,
48 router: Router,
49}
50
51/// Error that can be returned by `Node` `update` method.
52pub struct NodeUpdateError {
53 pub is_receive_queue_full: bool,
54 pub is_transit_queue_full: bool,
55}
56
57/// Error that can be returned by `Node` `send` method or `broadcast` method.
58pub enum SendError {
59 SendingQueueIsFull,
60}
61
62impl core::fmt::Debug for SendError {
63 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
64 match self {
65 SendError::SendingQueueIsFull => write!(f, "SendingQueueIsFull"),
66 }
67 }
68}
69
70/// Errors, that may occur during the call
71/// of `Node` `send_with_transaction` or `send_ping_pong` method.
72pub enum SpecialSendError {
73 /// Case when expected response was not received.
74 Timeout,
75
76 /// Case, when the limit of number of
77 /// packets to send isreached.
78 SendingQueueIsFull,
79}
80
81impl core::fmt::Debug for SpecialSendError {
82 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
83 match self {
84 SpecialSendError::Timeout => write!(f, "Timeout"),
85 SpecialSendError::SendingQueueIsFull => write!(f, "SendingQueueIsFull"),
86 }
87 }
88}
89
90impl From<SendError> for SpecialSendError {
91 fn from(value: SendError) -> Self {
92 match value {
93 SendError::SendingQueueIsFull => SpecialSendError::SendingQueueIsFull,
94 }
95 }
96}
97
98/// User-friendly `Node` configuration structure.
99pub struct NodeConfig {
100 /// Address of configurable device. Instance of `ExactAddressType`.
101 pub device_address: ExactAddressType,
102
103 /// Instance of `ms` type. The time period in
104 /// milliseconds that configured device will listen for incoming packets
105 /// before speaking back into the ether.
106 pub listen_period: ms,
107}
108
109impl Node {
110 /// New Method
111 /// To initialize a `Node`, you need to provide `NodeConfig` with values:
112 /// - `ExactAddressType`: Sets the device's identification address in the network. Multiple deivces can share same address in the same network.
113 /// - `listen_period`: Sets period in milliseconds that determines how long the device will wait before transmitting packet to the network. It prevents network congestion.
114
115 /// `main.rs`:
116 /// ```
117 /// let mut mesh_node = Node::new(NodeConfig {
118 /// device_address: ExactAddressType::new(1).unwrap(),
119 /// listen_period: 150 as ms,
120 /// });
121 /// ```
122 pub fn new(config: NodeConfig) -> Node {
123 Node {
124 transmitter: transmitter::Transmitter::new(),
125 receiver: receiver::Receiver::new(),
126 my_address: config.device_address.clone(),
127 timer: timer::Timer::new(config.listen_period),
128 received_packet_queue: PacketQueue::new(),
129 router: Router::new(config.device_address.into()),
130 }
131 }
132
133 /// Send Ping-Pong Method
134 /// Sends a message with a "ping" flag to the destination node and
135 /// waits for the same message with a "pong" flag. Return value tells that the end device have received
136 /// the message at least once or returns an error if the ping-pong exchange fails.
137 /// The following arguments are required:
138 ///
139 /// ```text
140 /// `Ping-Pong time diagram`:
141 /// +----------+ +----------+
142 /// | Sender | | Receiver |
143 /// +--------- + +----------+
144 /// | |
145 /// Ping-pong start | --------Ping-------> | <-- Receiver has received the message
146 /// | |
147 /// Ping-pong finish| <-------Pong-------- |
148 /// | |
149 ///```
150 ///`main.rs`:
151 ///```
152 ///let _ = mesh_node.send_ping_pong(
153 /// message.into_bytes(), // Content.
154 /// ExactAddressType::new(2).unwrap(), // Send to device with address 2.
155 /// 10 as LifeTimeType, // Let message travel 10 devices before being destroyed.
156 /// 1000 as ms, // Set timeout to 1000 ms.
157 /// || {
158 /// Instant::now()
159 /// .duration_since(program_start_time)
160 /// .as_millis() as ms
161 /// }, // Closure providing current time in milliseconds.
162 /// &mut serial, // IO interface.
163 ///);
164 ///```
165 ///
166 /// parameters:
167 /// * `data` - Is the instance of `PacketDataBytes`, which is just type alias of
168 /// heapless vector of bytes of special size. This size is configured in the
169 /// node/packet/config.rs file, and can be adjusted for case of other data size is needed.
170 /// `Note!` That all devices should have same version of protocol flashed, in order to
171 /// be able to correctly to communicate with each other.
172 ///
173 /// * `destination_device_identifier` is instance of ExactDeviceAddressType,
174 /// This is made to presend device's address within the network.
175 ///
176 /// *`lifetime` - is the instance of `LifeTimeType`. This value configures the count of
177 /// how many nodes - the packet will be able to pass. Also this value is needed
178 /// to void the ether being jammed by packets, that in theory might be echoed
179 /// by the nodes to the infinity...
180 ///
181 /// * `timeout` - Is the period of time in milliseconds that
182 /// this device will listen for response. In case if no response was caught during that
183 /// period of time, the method will return `Err(SpecialSendError::Timeout)`.
184 ///
185 /// * `millis_provider` - Is the closure that returns current time in milliseconds,
186 ///
187 /// * `interface_driver` - Is the instance of `embedded_serial::MutNonBlockingRx`
188 /// and `MutBlockingTx` traits.
189 /// In other words the driver which will be used to
190 /// read and write from the interface.
191 pub fn send_ping_pong<I, M>(
192 &mut self,
193 data: PacketDataBytes,
194 destination_device_identifier: ExactAddressType,
195 lifetime: LifeTimeType,
196 timeout: ms,
197 millis_provider: M,
198 interface_driver: &mut I,
199 ) -> Result<(), SpecialSendError>
200 where
201 I: embedded_io::ReadReady + embedded_io::Read + embedded_io::Write,
202 M: Fn() -> ms,
203 {
204 self._special_send(
205 data,
206 destination_device_identifier,
207 PacketState::Ping,
208 PacketState::Pong,
209 lifetime,
210 timeout,
211 millis_provider,
212 interface_driver,
213 )
214 }
215
216 /// Send with Transaction Method
217 /// Sends a message and handles all further work to
218 /// ensure the target device have received it only once.
219 /// Method returns an error if the transaction failed.
220 ///
221 /// ```text
222 /// `Transaction time diagram`:
223 /// +----------+ +----------+
224 /// | Sender | | Receiver |
225 /// +--------- + +----------+
226 /// | |
227 /// *Transaction start | ---SendTransaction---> |
228 /// | |
229 /// / | <--AcceptTransaction-- |
230 /// (increment packet id by 1) | |
231 /// \ | ---InitTransaction---> | <--- Receiver has received the message
232 /// | |
233 /// *Transaction finish | <--FinishTransaction-- |
234 /// | |
235 /// ```
236 ///
237 /// `main.rs`:
238 /// ```
239 /// match mesh_node.send_with_transaction(
240 /// message.into_bytes(), // Content.
241 /// ExactAddressType::new(2).unwrap(), // Send to device with address 2.
242 /// 10 as LifeTimeType, // Let message travel 10 devices before being destroyed.
243 /// 2000 as ms, // Wait 2 seconds for response.
244 /// || {
245 /// Instant::now()
246 /// .duration_since(program_start_time)
247 /// .as_millis() as ms
248 /// }, // Closure providing current time in milliseconds.
249 /// &mut serial, // IO interface.
250 /// );
251 /// ```
252 /// parameters:
253 /// * `data` - Is the instance of `PacketDataBytes`, which is just type alias of
254 /// heapless vector of bytes of special size. This size is configured in the
255 /// node/packet/config.rs file, and can be adjusted for case of other data size is needed.
256 /// `Note!` That all devices should have same version of protocol flashed, in order to
257 /// be able to correctly to communicate with each other.
258 ///
259 /// * `destination_device_identifier` is instance of `ExactDeviceAddressType`,
260 /// That type is made for simplicity of reading the code, and to strict possible mess-ups
261 /// during the usage of methods. It is made to present device id within the network.
262 ///
263 /// * `lifetime` - is the instance of `LifeTimeType`. This value configures the count of
264 /// how many nodes - the packet will be able to pass. Also this value is needed
265 /// to void the ether being jammed by packets, that in theory might be echoed
266 /// by the nodes to the infinity...
267 /// Each device, once passes transit packet trough it - it reduces packet's lifetime.
268 ///
269 /// * `timeout` - Is the period of time in milliseconds that
270 /// this device will wait until packet that finishes the transaction - arrives.
271 /// In case if no response was caught during that period of time, the method will
272 /// return `Err(SpecialSendError::Timeout)`.
273 ///
274 /// * `millis_provider` - Is the closure that returns current time in milliseconds,
275 ///
276 /// * `interface_driver` - Is the instance of `embedded_serial::MutNonBlockingRx`
277 /// and `MutBlockingTx` traits.
278 /// In other words the driver which will be used to
279 /// read and write from the interface.
280 pub fn send_with_transaction<I, M>(
281 &mut self,
282 data: PacketDataBytes,
283 destination_device_identifier: ExactAddressType,
284 lifetime: LifeTimeType,
285 timeout: ms,
286 millis_provider: M,
287 interface_driver: &mut I,
288 ) -> Result<(), SpecialSendError>
289 where
290 I: embedded_io::ReadReady + embedded_io::Read + embedded_io::Write,
291 M: Fn() -> ms,
292 {
293 self._special_send(
294 data,
295 destination_device_identifier,
296 PacketState::SendTransaction,
297 PacketState::FinishTransaction,
298 lifetime,
299 timeout,
300 millis_provider,
301 interface_driver,
302 )
303 }
304
305 fn _special_send<I, M>(
306 &mut self,
307 data: PacketDataBytes,
308 destination_device_identifier: ExactAddressType,
309 request_state: PacketState,
310 expected_response_state: PacketState,
311 lifetime: LifeTimeType,
312 timeout: ms,
313 millis_provider: M,
314 interface_driver: &mut I,
315 ) -> Result<(), SpecialSendError>
316 where
317 I: embedded_io::ReadReady + embedded_io::Read + embedded_io::Write,
318 M: Fn() -> ms,
319 {
320 let mut current_time = millis_provider();
321 let wait_end_time = current_time + timeout;
322
323 while let Some(_) = self.receive() {} // Flush out all messages in the queuee.
324
325 let expected_response_packet_id = match self._send(Packet::new(
326 self.my_address.into(),
327 destination_device_identifier.into(),
328 0,
329 lifetime,
330 request_state.clone(),
331 true,
332 data,
333 )) {
334 Err(any_err) => return Err(any_err.into()),
335 Ok(expected_response_packet_id) => match request_state {
336 // It is needed to wait for response packet with specific packet id.
337 // Following the transaction time diagram - it is expected the packet to
338 // have it's id increased three times.
339 PacketState::SendTransaction => expected_response_packet_id + 1,
340 PacketState::Ping => expected_response_packet_id,
341 _ => expected_response_packet_id,
342 },
343 };
344
345 while current_time < wait_end_time {
346 let _ = self.update(interface_driver, current_time);
347
348 if let Some(answer) = self.receive() {
349 if !(answer.source_device_identifier == destination_device_identifier.into()) {
350 continue;
351 }
352 if !(answer.get_spec_state() == expected_response_state) {
353 continue;
354 }
355 if !(answer.get_id() == expected_response_packet_id) {
356 continue;
357 }
358 return Ok(());
359 }
360
361 current_time = millis_provider();
362 }
363
364 Err(SpecialSendError::Timeout)
365 }
366
367 /// Send to exact Method
368 /// Sends the message to device with exact address in the network.
369 /// The `send_to_exact` method requires the following arguments:
370 ///
371 /// `main.rs`:
372 /// ```
373 /// let _ = match mesh_node.send_to_exact(
374 /// message.into_bytes(), // Content.
375 /// ExactAddressType::new(2).unwrap(), // Send to device with address 2.
376 /// 10 as LifeTimeType, // Let message travel 10 devices before being destroyed.
377 /// true, // filter_out_duplication
378 /// );
379 /// ```
380 ///
381 /// * `data` - Is the instance of `PacketDataBytes`, which is just type alias of
382 /// heapless vector of bytes of special size. This size is configured in the
383 /// node/packet/config.rs file.
384 /// `Note!` That all devices should have same version of protocol flashed, in order to
385 /// have best compatibility with each other.
386 ///
387 /// * `destination_device_identifier` is instance of `ExactAddressType`,
388 /// That type is made to limit possible mess-ups during the usage of method.
389 ///
390 /// * `lifetime` - is the instance of `LifeTimeType`. This value configures the count of
391 /// how many nodes - the packet will be able to pass. Also this value is provided
392 /// to void the ether being jammed by packets, that in theory might be echoed
393 /// by other nodes to the infinity...
394 /// Each device, once passes transit packet trough it - it reduces packet's lifetime.
395 ///
396 /// * `filter_out_duplication` - Tells if the other devices shall ignore
397 /// echoes of this message. It is strongly recommended to use in order to make lower load
398 /// onto the network.
399 pub fn send_to_exact(
400 &mut self,
401 data: PacketDataBytes,
402 destination_device_identifier: ExactAddressType,
403 lifetime: LifeTimeType,
404 filter_out_duplication: bool,
405 ) -> Result<(), SendError> {
406 match self._send(Packet::new(
407 self.my_address.into(),
408 destination_device_identifier.into(),
409 0, // Anyway it will be set later in the trasmitter.
410 lifetime,
411 PacketState::Normal,
412 filter_out_duplication,
413 data,
414 )) {
415 Ok(_) => Ok(()),
416 Err(err) => Err(err),
417 }
418 }
419
420 /// Broadcast Method
421 /// Shares the message to all nodes in the network.
422 /// Distance of sharing is set by `lifetime` parameter.
423 /// It sends packet with destination address set as
424 /// `GeneralAddressType::BROADCAST`. Every device will treats `GeneralAddressType::Broadcast`
425 /// as it's own address, so they keep the message as received and transits copy of that message further.
426 /// `main.rs`:
427 /// ```
428 /// let _ = mesh_node.broadcast(
429 /// message.into_bytes(), // data.
430 /// 10 as LifeTimeType, // lifetime.
431 /// );
432 /// ```
433 /// Sends the `data` to all devices.
434 ///
435 /// * `data` - Is the instance of `PacketDataBytes`, which is just type alias of
436 /// heapless vector of bytes of special size. This size is configured in the
437 /// node/packet/config.rs file.
438 /// `Note!` That all devices should have same version of protocol flashed, in order to
439 /// be able to correctly to communicate with each other.
440 ///
441 /// * `lifetime` - is the instance of `LifeTimeType`. This value configures the count of
442 /// how many nodes - the packet will be able to pass. Also this value is provided
443 /// to void the ether being jammed by packets, that in theory might be echoed
444 /// by other nodes to the infinity...
445 /// Each device, once passes transit packet trough it - it reduces packet's lifetime.
446 pub fn broadcast(
447 &mut self,
448 data: PacketDataBytes,
449 lifetime: LifeTimeType,
450 ) -> Result<(), SendError> {
451 match self._send(Packet::new(
452 self.my_address.into(),
453 GeneralAddressType::Broadcast.into(),
454 0,
455 lifetime,
456 PacketState::Normal,
457 true,
458 data,
459 )) {
460 Ok(_) => Ok(()),
461 Err(err) => Err(err),
462 }
463 }
464
465 fn _send(&mut self, packet: Packet) -> Result<IdType, SendError> {
466 match self.transmitter.send(packet) {
467 Ok(generated_packet_id) => Ok(generated_packet_id),
468 Err(transmitter::PacketQueueIsFull) => Err(SendError::SendingQueueIsFull),
469 }
470 }
471
472 /// Receive Method
473 /// Optionally returns `PacketDataBytes` instance with data,
474 /// which has been send exactly to this device, or has been
475 /// `broadcast`ed trough all the network.
476 ///
477 /// You can tell which type the packet is by matching `special_state` field of returned `Packet` instance.
478 /// Field contains value of `PacketState` enum.
479 ///
480 /// `main.rs`:
481 /// ```
482 /// match mesh_node.receive() {
483 /// Some(packet) => ...,
484 /// Node => ....,
485 /// }
486 /// ```
487
488 pub fn receive(&mut self) -> Option<Packet> {
489 self.received_packet_queue.pop_front()
490 }
491
492 /// Update Method
493 /// The most important method.
494 /// During call of `update` method - it does all internal work:
495 /// - routes packets trough the network
496 /// - transits packets that were sent to other devices
497 /// - handles `lifetime` of packets
498 /// - handles special packets like `ping` and `pong`, or any kind of transaction one.
499 /// - saves received packets that will be available trough `receive` method.
500 /// - sends packets, that are in the `send` queue.
501 ///
502 /// As the protocol relies on physical device - it is crucial to provide
503 /// driver for communication interface.
504 /// Also node shall know if it's the time to broadcast into the ether or not,
505 /// so for that purpose the closure that counts milliseconds since program start
506 /// is required.
507 ///
508 /// Methods: `send_ping_pong`, `send_with_transaction` also relies on `millis_provider` closure and `interface_driver`.
509 /// With out call this method in a loop - the node will stop working.
510 ///
511 ///`main.rs`:
512 ///```
513 /// loop {
514 /// let current_time = Instant::now()
515 /// .duration_since(program_start_time)
516 /// .as_millis() as ms;
517 ///
518 /// let _ = mesh_node.update(&mut serial, current_time);
519 /// }
520 ///```
521
522 /// Does all necessary internal work of mesh node:
523 /// * Receives packets from ether, and manages their further life.
524 /// ** Data that is addressed to other devices are going to be send back into ether.
525 /// ** Data addressed to current device, will be unpacked and stored.
526 ///
527 /// * Call of this method also requires the general types to be passed in.
528 /// As the process relies onto timing countings and onto serial stream,
529 ///
530 /// parameters:
531 /// * `interface_driver` - is instance of `MutNonBlockingRx` and `MutBlockingTx`
532 /// traits.
533 ///
534 /// * `current_time` - Is a closure which returns current time in milliseconds
535 /// since the start of the program.
536 pub fn update<I>(
537 &mut self,
538 interface_driver: &mut I,
539 current_time: ms,
540 ) -> Result<(), NodeUpdateError>
541 where
542 I: embedded_io::ReadReady + embedded_io::Read + embedded_io::Write,
543 {
544 if self.timer.is_time_to_speak(current_time) {
545 self.transmitter.update(interface_driver);
546 self.timer.record_speak_time(current_time);
547 }
548 self.receiver.update(current_time, interface_driver);
549
550 let packet_to_route = match self.receiver.receive(current_time) {
551 Some(packet_to_handle) => packet_to_handle,
552 None => return Ok(()),
553 };
554
555 let (received_packet, transit_packet) = match self.router.route(packet_to_route) {
556 Ok(ok_case) => match ok_case {
557 RouteResult::ReceivedOnly(packet) => (Some(packet), None),
558 RouteResult::TransitOnly(transit) => (None, Some(transit)),
559 RouteResult::ReceivedAndTransit { received, transit } => {
560 (Some(received), Some(transit))
561 }
562 },
563 Err(RouteError::PacketLifetimeEnded) => (None, None),
564 Err(RouteError::RespondToBroadcastAddressError) => (None, None),
565 };
566
567 let (mut is_receive_queue_full, mut is_transit_queue_full): (bool, bool) = (false, false);
568
569 if let Some(received_packet) = received_packet {
570 match self.received_packet_queue.push_back(received_packet) {
571 Ok(()) => (),
572 Err(_) => {
573 is_receive_queue_full = true;
574 }
575 }
576 }
577
578 if let Some(transit_packet) = transit_packet {
579 match self.transmitter.send_transit(transit_packet) {
580 Ok(_) => (),
581 Err(transmitter::PacketTransitQueueIsFull) => {
582 is_transit_queue_full = true;
583 }
584 }
585 }
586
587 if is_receive_queue_full || is_transit_queue_full {
588 return Err(NodeUpdateError {
589 is_receive_queue_full,
590 is_transit_queue_full,
591 });
592 } else {
593 Ok(())
594 }
595 }
596}