use crate::bpv7::bundle::Bundle;
use crate::cla::ConvergenceLayer;
use anyhow::Result;
use serde_cbor;
use std::sync::Arc;
use tokio::net::TcpListener;
#[derive(Clone)]
pub struct TcpClaListener {
pub bind_addr: String,
pub receive_callback: Arc<dyn Fn(Bundle) + Send + Sync>,
}
#[async_trait::async_trait]
impl ConvergenceLayer for TcpClaListener {
fn address(&self) -> String {
self.bind_addr.clone()
}
async fn activate(&self) -> Result<()> {
let listener = TcpListener::bind(&self.bind_addr).await?;
println!("TCP CLA Listener listening on {}", self.bind_addr);
loop {
let (stream, addr) = listener.accept().await?;
println!("📨 New connection from: {}", addr);
let callback = Arc::clone(&self.receive_callback);
tokio::spawn(async move {
if let Err(e) = handle_connection(stream, callback).await {
eprintln!("❌ Error handling connection: {}", e);
}
});
}
}
}
pub async fn handle_connection<S>(
mut stream: S,
callback: Arc<dyn Fn(Bundle) + Send + Sync>,
) -> Result<()>
where
S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin,
{
use tokio::io::{AsyncReadExt, AsyncWriteExt};
loop {
let mut len_buf = [0u8; 4];
match stream.read_exact(&mut len_buf).await {
Ok(_) => {}
Err(_) => break, }
let len = u32::from_be_bytes(len_buf) as usize;
let mut data = vec![0u8; len];
if stream.read_exact(&mut data).await.is_err() {
break;
}
match serde_cbor::from_slice::<Bundle>(&data) {
Ok(bundle) => {
callback(bundle);
let _ = stream.write_all(b"OK").await;
}
Err(e) => {
eprintln!("❌ Failed to deserialize bundle: {}", e);
let _ = stream.write_all(b"ERROR").await;
}
}
}
Ok(())
}