use tailtalk::DataLinkProtocol;
use std::sync::Arc;
use std::time::Duration;
use tailtalk::{
DataLinkPacket, OutboundHandle,
addressing::Addressing,
adsp::{Adsp, AdspAddress},
ddp::DdpProcessor,
};
use tailtalk_packets::aarp::AddressSource;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::sync::{broadcast, mpsc};
#[derive(Clone, Debug)]
struct WirePacket {
packet: Arc<DataLinkPacket>,
src_mac: [u8; 6],
}
struct TestHub {
tx: broadcast::Sender<WirePacket>,
_rx: broadcast::Receiver<WirePacket>,
}
impl TestHub {
fn new() -> Self {
let (tx, _rx) = broadcast::channel(100);
Self { tx, _rx }
}
fn subscribe(&self) -> broadcast::Receiver<WirePacket> {
self.tx.subscribe()
}
async fn run(&self, mut frame_rx: mpsc::Receiver<WirePacket>) {
while let Some(pkt) = frame_rx.recv().await {
let _ = self.tx.send(pkt);
}
}
}
struct TestClient {
_mac: [u8; 6],
addressing: tailtalk::addressing::AddressingHandle,
ddp: tailtalk::ddp::DdpHandle,
}
impl TestClient {
async fn new(
mac: [u8; 6],
hub_tx: mpsc::Sender<WirePacket>,
hub_rx: broadcast::Receiver<WirePacket>,
) -> Self {
let (out_tx, mut out_rx) = mpsc::channel(100);
let outbound_handle = OutboundHandle::new(out_tx);
let hub_tx_clone = hub_tx.clone();
tokio::spawn(async move {
while let Some(pkt) = out_rx.recv().await {
let wire_pkt = WirePacket {
packet: Arc::new(pkt),
src_mac: mac,
};
let _ = hub_tx_clone.send(wire_pkt).await;
}
});
let addressing = Addressing::spawn(Some(mac), outbound_handle.clone(), None, AddressSource::EtherTalkPhase2);
let ddp = DdpProcessor::spawn(Some(addressing.clone()), None, outbound_handle.clone());
let mut rx = hub_rx;
let ddp_handle = ddp.clone();
let addressing_handle = addressing.clone();
tokio::spawn(async move {
loop {
if let Ok(wire_pkt) = rx.recv().await {
let src_mac = wire_pkt.src_mac;
let pkt = &wire_pkt.packet;
if src_mac == mac {
continue;
}
let dest_mac = if let tailtalk::addressing::Node::EtherTalkPhase2(m) = pkt.dest_node { m } else { [0; 6] };
if dest_mac == mac || tailtalk::addressing::Addressing::is_broadcast_mac(dest_mac, AddressSource::EtherTalkPhase2) {
match pkt.protocol {
DataLinkProtocol::Ddp => ddp_handle.received_pkt(
&pkt.payload,
AddressSource::EtherTalkPhase2,
src_mac,
),
DataLinkProtocol::Aarp => {
let _ = addressing_handle
.received_pkt(&pkt.payload, AddressSource::EtherTalkPhase2);
}
DataLinkProtocol::LlapEnq | DataLinkProtocol::LlapAck => {}
}
}
}
}
});
Self {
_mac: mac,
addressing,
ddp,
}
}
}
#[tokio::test]
async fn test_adsp_connection() {
let _ = tracing_subscriber::fmt().try_init();
let hub = TestHub::new();
let (hub_in_tx, hub_in_rx) = mpsc::channel(100);
let hub_ref = Arc::new(hub);
let hub_runner = hub_ref.clone();
let hub_task = tokio::spawn(async move {
hub_runner.run(hub_in_rx).await;
});
let mac_a = [0x00, 0x01, 0x02, 0x03, 0x04, 0x05];
let client_a = TestClient::new(mac_a, hub_in_tx.clone(), hub_ref.subscribe()).await;
let mac_b = [0x00, 0x01, 0x02, 0x03, 0x04, 0x06];
let client_b = TestClient::new(mac_b, hub_in_tx.clone(), hub_ref.subscribe()).await;
tokio::time::sleep(Duration::from_millis(1500)).await;
let addr_a = client_a
.addressing
.addr()
.await
.expect("failed to get addr");
let _addr_b = client_b
.addressing
.addr()
.await
.expect("failed to get addr");
let (socket_a, mut listener) = Adsp::bind(&client_a.ddp, Some(100))
.await
.expect("failed to bind listener");
tracing::info!("Client A listening on socket {}", socket_a);
let accept_task = tokio::spawn(async move { listener.accept().await });
tokio::time::sleep(Duration::from_millis(100)).await;
let remote_addr = AdspAddress {
network_number: addr_a.network_number,
node_number: addr_a.node_number,
socket_number: 100,
};
tracing::info!("Client B connecting to {:?}", remote_addr);
let mut stream_b = Adsp::connect(&client_b.ddp, remote_addr)
.await
.expect("failed to connect");
tracing::info!("Client B connected!");
let mut stream_a = accept_task
.await
.expect("accept task failed")
.expect("failed to accept");
tracing::info!("Client A accepted connection!");
let test_data = b"Hello from Client B!";
stream_b
.write_all(test_data)
.await
.expect("failed to write");
stream_b.flush().await.expect("failed to flush");
tracing::info!("Client B sent data");
tokio::time::sleep(Duration::from_millis(500)).await;
let mut buffer = vec![0u8; 1024];
let n = stream_a.read(&mut buffer).await.expect("failed to read");
assert!(n > 0, "No data received");
assert_eq!(&buffer[..n], test_data, "Data mismatch");
tracing::info!("Client A received data: {:?}", &buffer[..n]);
stream_a
.write_all(&buffer[..n])
.await
.expect("failed to write echo");
stream_a.flush().await.expect("failed to flush echo");
tokio::time::sleep(Duration::from_millis(500)).await;
let mut echo_buffer = vec![0u8; 1024];
let echo_n = stream_b
.read(&mut echo_buffer)
.await
.expect("failed to read echo");
assert_eq!(&echo_buffer[..echo_n], test_data, "Echo data mismatch");
tracing::info!("✓ ADSP test passed: bidirectional communication successful!");
stream_a.close().await.expect("failed to close stream A");
stream_b.close().await.expect("failed to close stream B");
hub_task.abort();
}