use alloc::vec;
use alloc::vec::Vec;
use core::time::Duration;
use socket2::{Domain, Socket, Type};
use std::io::Read;
use std::net::TcpListener;
use std::net::{SocketAddr, TcpStream};
use std::thread;
use crate::tmtc::{ReceivesTc, TmPacketSource};
use thiserror::Error;
pub use crate::hal::std::tcp_cobs_server::{CobsTcParser, CobsTmSender, TcpTmtcInCobsServer};
pub use crate::hal::std::tcp_spacepackets_server::{
SpacepacketsTcParser, SpacepacketsTmSender, TcpSpacepacketsServer,
};
#[derive(Debug, Copy, Clone)]
pub struct ServerConfig {
pub addr: SocketAddr,
pub inner_loop_delay: Duration,
pub tm_buffer_size: usize,
pub tc_buffer_size: usize,
pub reuse_addr: bool,
pub reuse_port: bool,
}
impl ServerConfig {
pub fn new(
addr: SocketAddr,
inner_loop_delay: Duration,
tm_buffer_size: usize,
tc_buffer_size: usize,
) -> Self {
Self {
addr,
inner_loop_delay,
tm_buffer_size,
tc_buffer_size,
reuse_addr: false,
reuse_port: false,
}
}
}
#[derive(Error, Debug)]
pub enum TcpTmtcError<TmError, TcError> {
#[error("TM retrieval error: {0}")]
TmError(TmError),
#[error("TC retrieval error: {0}")]
TcError(TcError),
#[error("io error: {0}")]
Io(#[from] std::io::Error),
}
#[derive(Debug, Default)]
pub struct ConnectionResult {
pub addr: Option<SocketAddr>,
pub num_received_tcs: u32,
pub num_sent_tms: u32,
}
pub trait TcpTcParser<TmError, TcError> {
fn handle_tc_parsing(
&mut self,
tc_buffer: &mut [u8],
tc_receiver: &mut (impl ReceivesTc<Error = TcError> + ?Sized),
conn_result: &mut ConnectionResult,
current_write_idx: usize,
next_write_idx: &mut usize,
) -> Result<(), TcpTmtcError<TmError, TcError>>;
}
pub trait TcpTmSender<TmError, TcError> {
fn handle_tm_sending(
&mut self,
tm_buffer: &mut [u8],
tm_source: &mut (impl TmPacketSource<Error = TmError> + ?Sized),
conn_result: &mut ConnectionResult,
stream: &mut TcpStream,
) -> Result<bool, TcpTmtcError<TmError, TcError>>;
}
pub struct TcpTmtcGenericServer<
TmError,
TcError,
TmSource: TmPacketSource<Error = TmError>,
TcReceiver: ReceivesTc<Error = TcError>,
TmSender: TcpTmSender<TmError, TcError>,
TcParser: TcpTcParser<TmError, TcError>,
> {
pub(crate) listener: TcpListener,
pub(crate) inner_loop_delay: Duration,
pub(crate) tm_source: TmSource,
pub(crate) tm_buffer: Vec<u8>,
pub(crate) tc_receiver: TcReceiver,
pub(crate) tc_buffer: Vec<u8>,
tc_handler: TcParser,
tm_handler: TmSender,
}
impl<
TmError: 'static,
TcError: 'static,
TmSource: TmPacketSource<Error = TmError>,
TcReceiver: ReceivesTc<Error = TcError>,
TmSender: TcpTmSender<TmError, TcError>,
TcParser: TcpTcParser<TmError, TcError>,
> TcpTmtcGenericServer<TmError, TcError, TmSource, TcReceiver, TmSender, TcParser>
{
pub fn new(
cfg: ServerConfig,
tc_parser: TcParser,
tm_sender: TmSender,
tm_source: TmSource,
tc_receiver: TcReceiver,
) -> Result<Self, std::io::Error> {
let socket = Socket::new(Domain::IPV4, Type::STREAM, None)?;
socket.set_reuse_address(cfg.reuse_addr)?;
#[cfg(unix)]
socket.set_reuse_port(cfg.reuse_port)?;
let addr = (cfg.addr).into();
socket.bind(&addr)?;
socket.listen(128)?;
Ok(Self {
tc_handler: tc_parser,
tm_handler: tm_sender,
listener: socket.into(),
inner_loop_delay: cfg.inner_loop_delay,
tm_source,
tm_buffer: vec![0; cfg.tm_buffer_size],
tc_receiver,
tc_buffer: vec![0; cfg.tc_buffer_size],
})
}
pub fn listener(&mut self) -> &mut TcpListener {
&mut self.listener
}
pub fn local_addr(&self) -> std::io::Result<SocketAddr> {
self.listener.local_addr()
}
pub fn handle_next_connection(
&mut self,
) -> Result<ConnectionResult, TcpTmtcError<TmError, TcError>> {
let mut connection_result = ConnectionResult::default();
let mut current_write_idx;
let mut next_write_idx = 0;
let (mut stream, addr) = self.listener.accept()?;
stream.set_nonblocking(true)?;
connection_result.addr = Some(addr);
current_write_idx = next_write_idx;
loop {
let read_result = stream.read(&mut self.tc_buffer[current_write_idx..]);
match read_result {
Ok(0) => {
if current_write_idx > 0 {
self.tc_handler.handle_tc_parsing(
&mut self.tc_buffer,
&mut self.tc_receiver,
&mut connection_result,
current_write_idx,
&mut next_write_idx,
)?;
}
break;
}
Ok(read_len) => {
current_write_idx += read_len;
if current_write_idx == self.tc_buffer.capacity() {
self.tc_handler.handle_tc_parsing(
&mut self.tc_buffer,
&mut self.tc_receiver,
&mut connection_result,
current_write_idx,
&mut next_write_idx,
)?;
current_write_idx = next_write_idx;
}
}
Err(e) => match e.kind() {
std::io::ErrorKind::WouldBlock | std::io::ErrorKind::TimedOut => {
self.tc_handler.handle_tc_parsing(
&mut self.tc_buffer,
&mut self.tc_receiver,
&mut connection_result,
current_write_idx,
&mut next_write_idx,
)?;
current_write_idx = next_write_idx;
if !self.tm_handler.handle_tm_sending(
&mut self.tm_buffer,
&mut self.tm_source,
&mut connection_result,
&mut stream,
)? {
thread::sleep(self.inner_loop_delay);
}
}
_ => {
return Err(TcpTmtcError::Io(e));
}
},
}
}
self.tm_handler.handle_tm_sending(
&mut self.tm_buffer,
&mut self.tm_source,
&mut connection_result,
&mut stream,
)?;
Ok(connection_result)
}
}
#[cfg(test)]
pub(crate) mod tests {
use std::sync::Mutex;
use alloc::{collections::VecDeque, sync::Arc, vec::Vec};
use crate::tmtc::{ReceivesTcCore, TmPacketSourceCore};
#[derive(Default, Clone)]
pub(crate) struct SyncTcCacher {
pub(crate) tc_queue: Arc<Mutex<VecDeque<Vec<u8>>>>,
}
impl ReceivesTcCore for SyncTcCacher {
type Error = ();
fn pass_tc(&mut self, tc_raw: &[u8]) -> Result<(), Self::Error> {
let mut tc_queue = self.tc_queue.lock().expect("tc forwarder failed");
tc_queue.push_back(tc_raw.to_vec());
Ok(())
}
}
#[derive(Default, Clone)]
pub(crate) struct SyncTmSource {
tm_queue: Arc<Mutex<VecDeque<Vec<u8>>>>,
}
impl SyncTmSource {
pub(crate) fn add_tm(&mut self, tm: &[u8]) {
let mut tm_queue = self.tm_queue.lock().expect("locking tm queue failec");
tm_queue.push_back(tm.to_vec());
}
}
impl TmPacketSourceCore for SyncTmSource {
type Error = ();
fn retrieve_packet(&mut self, buffer: &mut [u8]) -> Result<usize, Self::Error> {
let mut tm_queue = self.tm_queue.lock().expect("locking tm queue failed");
if !tm_queue.is_empty() {
let next_vec = tm_queue.front().unwrap();
if buffer.len() < next_vec.len() {
panic!(
"provided buffer too small, must be at least {} bytes",
next_vec.len()
);
}
let next_vec = tm_queue.pop_front().unwrap();
buffer[0..next_vec.len()].copy_from_slice(&next_vec);
return Ok(next_vec.len());
}
Ok(0)
}
}
}