use std::sync::Arc;
use std::time::Duration;
use atomr_core::actor::Address;
use atomr_remote::pdu::{AkkaPdu, AssociateInfo, DisassociateReason, PROTOCOL_VERSION};
use atomr_remote::transport::Transport;
use atomr_remote_serial::SerialTransport;
const MAX_FRAME: usize = 64 * 1024;
fn associate_pdu(origin: Address, uid: u64) -> AkkaPdu {
AkkaPdu::Associate(AssociateInfo { origin, uid, cookie: None, protocol_version: PROTOCOL_VERSION })
}
#[tokio::test]
async fn pdu_roundtrip_over_duplex() {
let (a_io, b_io) = tokio::io::duplex(8192);
let (a_reader, a_writer) = tokio::io::split(a_io);
let (b_reader, b_writer) = tokio::io::split(b_io);
let a = Arc::new(SerialTransport::with_streams("A", a_reader, a_writer, MAX_FRAME));
let b = Arc::new(SerialTransport::with_streams("B", b_reader, b_writer, MAX_FRAME));
let addr_a = a.local_address().expect("local address");
let addr_b = b.local_address().expect("local address");
let mut inbound_a = a.inbound();
let mut inbound_b = b.inbound();
a.send(&addr_b, associate_pdu(addr_a.clone(), 7)).await.unwrap();
b.send(&addr_a, associate_pdu(addr_b.clone(), 11)).await.unwrap();
let frame_b = tokio::time::timeout(Duration::from_millis(500), inbound_b.recv())
.await
.expect("timeout")
.expect("inbound closed");
match frame_b.pdu {
AkkaPdu::Associate(info) => {
assert_eq!(info.origin, addr_a);
assert_eq!(info.uid, 7);
assert_eq!(frame_b.from, addr_a, "B attributes incoming frames to A's advertised address");
}
other => panic!("unexpected pdu on B: {other:?}"),
}
let frame_a = tokio::time::timeout(Duration::from_millis(500), inbound_a.recv())
.await
.expect("timeout")
.expect("inbound closed");
match frame_a.pdu {
AkkaPdu::Associate(info) => {
assert_eq!(info.origin, addr_b);
assert_eq!(info.uid, 11);
assert_eq!(frame_a.from, addr_b);
}
other => panic!("unexpected pdu on A: {other:?}"),
}
a.send(&addr_b, AkkaPdu::Heartbeat).await.unwrap();
let hb = tokio::time::timeout(Duration::from_millis(500), inbound_b.recv())
.await
.expect("timeout")
.expect("inbound closed");
assert!(matches!(hb.pdu, AkkaPdu::Heartbeat));
assert_eq!(hb.from, addr_a);
a.disassociate(&addr_b).await.unwrap();
let dis = tokio::time::timeout(Duration::from_millis(500), inbound_b.recv())
.await
.expect("timeout")
.expect("inbound closed");
assert!(matches!(dis.pdu, AkkaPdu::Disassociate(DisassociateReason::Normal)));
a.shutdown().await.unwrap();
b.shutdown().await.unwrap();
}
#[tokio::test]
async fn unassociated_send_returns_closed() {
use atomr_remote::transport::TransportError;
let (a_io, _b_io) = tokio::io::duplex(8192);
let (a_reader, a_writer) = tokio::io::split(a_io);
let a = SerialTransport::with_streams("A", a_reader, a_writer, MAX_FRAME);
a.shutdown().await.unwrap();
let result = a.send(&Address::remote("akka.serial", "B", "/dev/null", 0), AkkaPdu::Heartbeat).await;
assert!(matches!(result, Err(TransportError::Closed)));
}