mod active;
mod config;
mod idle;
mod subscription;
pub use crate::node::active::*;
pub use crate::node::config::*;
pub use crate::node::idle::*;
pub use crate::node::subscription::*;
extern crate alloc;
use tokio::net::TcpStream;
use tokio::runtime::Runtime;
use tokio::sync::Mutex as TokioMutex;
use tokio::task::JoinHandle;
use tokio::time::{sleep, Duration};
use tracing::*;
use std::net::SocketAddr;
use std::error::Error;
use std::marker::{PhantomData, Sync};
use std::result::Result;
use std::sync::Arc;
use alloc::vec::Vec;
use postcard::*;
use serde::{de::DeserializeOwned, Serialize};
use crate::msg::*;
use std::fmt::Debug;
pub trait Message: Serialize + DeserializeOwned + Debug + Sync + Send + Clone {}
impl<T> Message for T where T: Serialize + DeserializeOwned + Debug + Sync + Send + Clone {}
#[derive(Debug)]
pub struct Idle;
#[derive(Debug)]
pub struct Active;
#[derive(Debug)]
pub struct Subscription;
#[derive(Debug, Clone)]
pub struct SubscriptionData<T: Message> {
pub data: T,
pub timestamp: String,
}
#[derive(Debug)]
pub struct Node<State, T: Message> {
pub __state: PhantomData<State>,
pub phantom: PhantomData<T>,
pub runtime: Runtime,
pub stream: Option<TcpStream>,
pub name: String,
pub topic: String,
pub host_addr: SocketAddr,
pub subscription_data: Arc<TokioMutex<Option<SubscriptionData<T>>>>,
pub task_subscribe: Option<JoinHandle<()>>,
}
pub async fn try_connection(host_addr: SocketAddr) -> Result<TcpStream, Box<dyn Error>> {
let mut connection_attempts = 0;
let mut stream: Option<TcpStream> = None;
while connection_attempts < 5 {
match TcpStream::connect(host_addr).await {
Ok(my_stream) => {
stream = Some(my_stream);
break;
}
Err(e) => {
connection_attempts += 1;
sleep(Duration::from_millis(1_000)).await;
warn!("{:?}", e);
}
}
}
let stream = stream.unwrap();
Ok(stream)
}
pub async fn handshake(stream: TcpStream, topic: String) -> Result<TcpStream, Box<dyn Error>> {
loop {
stream.writable().await.unwrap();
match stream.try_write(topic.as_bytes()) {
Ok(_n) => {
info!("{}: Wrote {} bytes to host", topic, _n);
break;
}
Err(e) => {
if e.kind() == std::io::ErrorKind::WouldBlock {
} else {
error!("NODE handshake error: {:?}", e);
}
}
}
}
info!("{}: Successfully connected to host", topic);
sleep(Duration::from_millis(20)).await;
Ok(stream)
}
pub async fn send_msg(
stream: &mut &TcpStream,
packet_as_bytes: Vec<u8>,
) -> Result<(), Box<dyn Error>> {
stream.writable().await.unwrap();
loop {
match stream.try_write(&packet_as_bytes) {
Ok(_n) => {
break;
}
Err(e) => {
if e.kind() == std::io::ErrorKind::WouldBlock {}
continue;
}
}
}
Ok(())
}
pub async fn await_response<T: Message>(
stream: &mut &TcpStream,
_max_buffer_size: usize, ) -> Result<GenericMsg, postcard::Error> {
let mut buf = [0u8; 4096];
loop {
stream.readable().await.unwrap();
match stream.try_read(&mut buf) {
Ok(0) => continue,
Ok(n) => {
let bytes = &buf[..n];
let msg: Result<GenericMsg, postcard::Error> = from_bytes(bytes);
return msg;
}
Err(e) => {
if e.kind() == std::io::ErrorKind::WouldBlock {}
continue;
}
}
}
}