mod constants;
mod packet;
mod receiver;
mod router;
mod timer;
mod transmitter;
mod types;
pub use packet::{AddressType, MULTICAST_RESERVED_IDENTIFIER};
use platform_serial::PlatformSerial;
pub use router::PacketState;
pub use types::NodeString;
pub use packet::{meta_data::PacketMetaData, LifeTimeType, PacketDataBytes};
use self::{
router::{PacketLifetimeEnded, PacketRouter, RouteResult},
types::PacketDataQueue,
};
use platform_millis::{ms, PlatformTime};
pub struct Node {
transmitter: transmitter::Transmitter,
receiver: receiver::Receiver,
my_address: AddressType,
timer: timer::Timer,
received_packet_meta_data_queue: PacketDataQueue,
packet_router: PacketRouter,
}
pub struct NodeUpdateError {
pub is_send_queue_full: bool,
pub is_transit_queue_full: bool,
}
pub enum SendError {
SendingQueueIsFull,
}
pub enum SpecialSendError {
Timeout,
MulticastAddressForbidden,
SendingQueueIsFull,
}
impl From<SendError> for SpecialSendError {
fn from(value: SendError) -> Self {
match value {
SendError::SendingQueueIsFull => SpecialSendError::SendingQueueIsFull,
}
}
}
pub struct NodeConfig {
pub device_address: AddressType,
pub listen_period: ms,
}
impl Node {
pub fn new(config: NodeConfig) -> Node {
Node {
transmitter: transmitter::Transmitter::new(),
receiver: receiver::Receiver::new(),
my_address: config.device_address.clone(),
timer: timer::Timer::new(config.listen_period),
received_packet_meta_data_queue: PacketDataQueue::new(),
packet_router: PacketRouter::new(config.device_address),
}
}
pub fn send_ping_pong<TIMER: PlatformTime, SERIAL: PlatformSerial<u8>>(
&mut self,
data: PacketDataBytes,
destination_device_identifier: AddressType,
lifetime: LifeTimeType,
filter_out_duplication: bool,
timeout: ms,
) -> Result<(), SpecialSendError> {
if destination_device_identifier == MULTICAST_RESERVED_IDENTIFIER {
return Err(SpecialSendError::MulticastAddressForbidden);
}
let mut current_time = TIMER::millis();
let wait_end_time = current_time + timeout;
while let Some(_) = self.receive() {}
if let Err(any_err) = self._send(PacketMetaData {
data,
source_device_identifier: self.my_address.clone(),
destination_device_identifier: destination_device_identifier.clone(),
lifetime,
filter_out_duplication,
spec_state: PacketState::Ping,
packet_id: 0,
}) {
return Err(any_err.into());
}
while current_time < wait_end_time {
let _ = self.update::<TIMER, SERIAL>();
if let Some(answer) = self.receive() {
if !(answer.spec_state == PacketState::Pong) {
continue;
}
if !(answer.source_device_identifier == destination_device_identifier) {
continue;
}
return Ok(());
}
current_time = TIMER::millis();
}
Err(SpecialSendError::Timeout)
}
pub fn send_with_transaction<TIMER: PlatformTime, SERIAL: PlatformSerial<u8>>(
&mut self,
data: PacketDataBytes,
destination_device_identifier: AddressType,
lifetime: LifeTimeType,
filter_out_duplication: bool,
timeout: ms,
) -> Result<(), SpecialSendError> {
if destination_device_identifier == MULTICAST_RESERVED_IDENTIFIER {
return Err(SpecialSendError::MulticastAddressForbidden);
}
let mut current_time = TIMER::millis();
let wait_end_time = current_time + timeout;
while let Some(_) = self.receive() {}
if let Err(any_err) = self._send(PacketMetaData {
data,
source_device_identifier: self.my_address.clone(),
destination_device_identifier: destination_device_identifier.clone(),
lifetime,
filter_out_duplication,
spec_state: PacketState::SendTransaction,
packet_id: 0,
}) {
return Err(any_err.into());
}
while current_time < wait_end_time {
let _ = self.update::<TIMER, SERIAL>();
if let Some(answer) = self.receive() {
if !(answer.spec_state == PacketState::FinishTransaction) {
continue;
}
if !(answer.source_device_identifier == destination_device_identifier) {
continue;
}
return Ok(());
}
current_time = TIMER::millis();
}
Err(SpecialSendError::Timeout)
}
pub fn send(
&mut self,
data: PacketDataBytes,
destination_device_identifier: AddressType,
lifetime: LifeTimeType,
filter_out_duplication: bool,
) -> Result<(), SendError> {
self._send(PacketMetaData {
data,
source_device_identifier: self.my_address.clone(),
destination_device_identifier,
lifetime,
filter_out_duplication,
spec_state: PacketState::Normal,
packet_id: 0,
})
}
fn _send(&mut self, packet_meta_data: PacketMetaData) -> Result<(), SendError> {
match self.transmitter.send(packet_meta_data) {
Ok(_) => Ok(()),
Err(transmitter::PacketQueueIsFull) => Err(SendError::SendingQueueIsFull),
}
}
pub fn receive(&mut self) -> Option<PacketMetaData> {
self.received_packet_meta_data_queue.pop_front()
}
pub fn update<TIMER: PlatformTime, SERIAL: PlatformSerial<u8>>(
&mut self,
) -> Result<(), NodeUpdateError> {
let current_time = TIMER::millis();
if self.timer.is_time_to_speak(current_time) {
self.transmitter.update::<SERIAL>();
self.timer.record_speak_time(current_time);
}
self.receiver.update::<SERIAL>(current_time);
let packet_to_handle = match self.receiver.receive(current_time) {
Some(packet_to_handle) => packet_to_handle,
None => return Ok(()),
};
let (received_packet, transit_packet): (Option<PacketMetaData>, Option<PacketMetaData>) =
match self.packet_router.route(packet_to_handle) {
Ok(ok_case) => match ok_case {
RouteResult::Received(packet) => (Some(packet), None),
RouteResult::Transit(transit) => (None, Some(transit)),
RouteResult::ReceivedAndTransit { received, transit } => {
(Some(received), Some(transit))
}
},
Err(PacketLifetimeEnded) => (None, None),
};
let (mut is_send_queue_full, mut is_transit_queue_full): (bool, bool) = (false, false);
if let Some(received_packet) = received_packet {
match self
.received_packet_meta_data_queue
.push_back(received_packet)
{
Ok(()) => (),
Err(_) => {
is_send_queue_full = true;
}
}
}
if let Some(transit_packet) = transit_packet {
match self.transmitter.send_transit(transit_packet) {
Ok(_) => (),
Err(transmitter::PacketTransitQueueIsFull) => {
is_transit_queue_full = true;
}
}
}
if (!is_send_queue_full) && (!is_transit_queue_full) {
Ok(())
} else {
Err(NodeUpdateError {
is_send_queue_full,
is_transit_queue_full,
})
}
}
}