use log::*;
use crate::communication::connection::{
Address, Connection, ConnectionFactory, InMsgQueue, OutMsgQueue, Readable, Writable, Write,
};
use crate::communication::messages::{GenericPackage, GenericPackageBuilder, PackageBuilder};
use crate::communication::router::Router;
use crate::communication::ConnectionError;
use crate::nodes::SystemNodeId;
use crate::properties::{init_buffer, SystemBufferVec, MAX_QUEUED_MESSAGES};
pub trait CommunicationService<Pkg: Writable + Readable>:
OutMsgQueue<Pkg> + InMsgQueue<Pkg>
{
}
struct QueuedPackage {
pub receiver_id: SystemNodeId,
pub package: GenericPackage,
pub ready: bool,
}
pub struct GenericCommunicationService<
Conn: Connection,
Addr: Address,
F: ConnectionFactory<Address = Addr, Connection = Conn>,
R: Router<Address = Addr>,
> {
pub id: SystemNodeId,
pub router: R,
pub connection_factory: F,
pub package_builder: GenericPackageBuilder,
package_queue: heapless::Vec<QueuedPackage, MAX_QUEUED_MESSAGES>,
}
impl<
Conn: Connection,
Addr: Address,
F: ConnectionFactory<Address = Addr, Connection = Conn>,
R: Router<Address = Addr>,
> GenericCommunicationService<Conn, Addr, F, R>
{
pub fn new(id: SystemNodeId, router: R, connection_factory: F) -> Self {
Self {
id,
router,
connection_factory,
package_builder: GenericPackageBuilder::default(),
package_queue: heapless::Vec::default(),
}
}
fn unqueue_available_package(&mut self) -> Option<GenericPackage> {
let mut package_ready_idx = None;
for (idx, package) in self.package_queue.iter().enumerate() {
if package.ready {
package_ready_idx = Some(idx);
}
}
package_ready_idx.map(|idx| self.package_queue.remove(idx).package)
}
pub fn update(&mut self, new_id: SystemNodeId, previous_id: SystemNodeId, address: Addr) {
if self.router.update(new_id, address).is_err() {
return;
}
for package in self.package_queue.iter_mut() {
if package.receiver_id == previous_id {
package.receiver_id = new_id;
package.ready = true
}
}
}
pub fn unpeer(&mut self, previous_address: Addr) -> Option<SystemNodeId> {
self.router.drop(previous_address)
}
pub fn handshake(&mut self, address: Addr, message: SystemBufferVec) {
match self.connection_factory.connect(&address) {
Ok(connection) => match Write::write(connection, message.as_slice()) {
Ok(bytes) => {
trace!("Wrote {} bytes to {}.", bytes, address);
self.connection_factory.clear();
}
Err(err) => {
warn!(
"Got error {:?} while trying to write message to {}.",
err, address
)
}
},
Err(ConnectionError::NotAvailable) => {
warn!("Connection to {} not available. Disconnecting.", address);
self.connection_factory.clear();
}
Err(_) => {
error!("Couldn't connect to {}. Dropping output package.", address);
}
}
}
pub fn queue(&mut self, peer_id: SystemNodeId, package: GenericPackage) {
if self
.package_queue
.push(QueuedPackage {
receiver_id: peer_id,
package: package.clone(),
ready: false,
})
.is_err()
{
error!("Couldn't queue package {:?} with peer {}", package, peer_id);
}
}
}
impl<
Conn: Connection,
Addr: Address,
F: ConnectionFactory<Address = Addr, Connection = Conn>,
R: Router<Address = Addr>,
> OutMsgQueue<GenericPackage> for GenericCommunicationService<Conn, Addr, F, R>
where
F: ConnectionFactory<Address = Addr, Connection = Conn>,
{
fn push(&mut self, mut pkg: GenericPackage) {
if let Some(destiny) = self.router.route(&pkg.get_receiver()) {
match self.connection_factory.connect(destiny) {
Ok(connection) => match Write::write(connection, pkg.get_mut_message()) {
Ok(bytes) => {
trace!("Wrote {} bytes to {}.", bytes, destiny);
self.connection_factory.clear();
self.router.keep(pkg.get_receiver(), true);
return;
}
Err(err) => {
warn!("Got error {:?} while trying to write message to {}.", err, destiny)
}
},
Err(ConnectionError::NotAvailable) => {
warn!("Connection to {} not available. Disconnecting.", destiny);
self.connection_factory.clear();
}
Err(_) => {
error!("Couldn't connect to {}. Dropping output package.", destiny);
}
}
self.router.keep(pkg.get_receiver(), false);
} else {
error!("Couldn't find receiver with ID #{} to route package. Dropping output package.", pkg.get_receiver());
}
}
}
impl<
Conn: Connection,
Addr: Address,
F: ConnectionFactory<Address = Addr, Connection = Conn>,
R: Router<Address = Addr>,
> InMsgQueue<GenericPackage> for GenericCommunicationService<Conn, Addr, F, R>
where
F: ConnectionFactory<Address = Addr, Connection = Conn>,
{
fn pop(&mut self) -> Option<GenericPackage> {
if let Some(package) = self.unqueue_available_package() {
return Some(package);
}
let (conn, from) = self.connection_factory.listen().ok()?;
let mut buffer = init_buffer();
let bytes = conn.read(&mut buffer).ok()?;
let buffer = SystemBufferVec::from_slice(&buffer[..bytes]).ok()?;
let from = self
.router
.inverse(&from)
.copied()
.unwrap_or_else(|| self.router.register(from).unwrap());
self.router.clean();
self.package_builder
.clean_copy()
.to(self.id)
.from(from)
.with_message(buffer)
.build()
.ok()
}
}
impl<
Conn: Connection,
Addr: Address,
F: ConnectionFactory<Address = Addr, Connection = Conn>,
R: Router<Address = Addr>,
> CommunicationService<GenericPackage> for GenericCommunicationService<Conn, Addr, F, R>
where
F: ConnectionFactory<Address = Addr, Connection = Conn>,
{
}