use std::collections::VecDeque;
use vox_types::{Conduit, ConduitRx, ConduitTx, LinkRx, LinkTx, MsgFamily};
use crate::{MemoryLink, memory_link_pair};
use super::*;
struct StringFamily;
impl MsgFamily for StringFamily {
type Msg<'a> = String;
fn shape() -> &'static facet_core::Shape {
String::SHAPE
}
}
struct QueuedLinkSource {
links: VecDeque<(MemoryLink, Option<ClientHello>)>,
}
impl LinkSource for QueuedLinkSource {
type Link = MemoryLink;
async fn next_link(&mut self) -> std::io::Result<Attachment<MemoryLink>> {
let (link, client_hello) = self.links.pop_front().ok_or_else(|| {
std::io::Error::new(std::io::ErrorKind::ConnectionRefused, "no more links")
})?;
Ok(Attachment { link, client_hello })
}
}
fn server_hello(resume_key: ResumeKey, last_received: Option<u32>, rejected: bool) -> ServerHello {
let mut flags = 0u8;
if rejected {
flags |= SH_REJECTED;
}
if last_received.is_some() {
flags |= SH_HAS_LAST_RECEIVED;
}
ServerHello {
magic: LeU32::new(SERVER_HELLO_MAGIC),
flags,
resume_key,
last_received: LeU32::new(last_received.unwrap_or(0)),
}
}
fn resume_key(b: &[u8]) -> ResumeKey {
let mut key = [0u8; 16];
let len = b.len().min(16);
key[..len].copy_from_slice(&b[..len]);
ResumeKey(key)
}
async fn send_frame<LTx: LinkTx>(tx: <x, seq: u32, ack: Option<u32>, item: &str) {
let item_bytes = vox_postcard::to_vec(&item.to_string()).unwrap();
let frame = Frame {
seq: PacketSeq(seq),
ack: ack.map(|n| PacketAck {
max_delivered: PacketSeq(n),
}),
item: Payload::PostcardBytes(&item_bytes),
};
let peek = unsafe {
Peek::unchecked_new(
PtrConst::new((&raw const frame).cast::<u8>()),
<Frame<'static> as facet::Facet<'static>>::SHAPE,
)
};
let plan = vox_postcard::peek_to_scatter_plan(peek).unwrap();
let mut frame_bytes = vec![0u8; plan.total_size()];
plan.write_into(&mut frame_bytes);
tx.send(frame_bytes).await.unwrap();
}
async fn send_item<Tx: ConduitTx<Msg = StringFamily>>(tx: &Tx, item: &str) {
let prepared = tx.prepare_send(item.to_string()).unwrap();
tx.send_prepared(prepared).await.unwrap();
}
fn decode_frame(bytes: &[u8]) -> (u32, Option<u32>, String) {
let frame: Frame<'_> = vox_postcard::from_slice_borrowed(bytes).unwrap();
let item_bytes = match &frame.item {
Payload::PostcardBytes(b) => b,
_ => unreachable!("deserialized Payload should be Incoming"),
};
let item: String = vox_postcard::from_slice(item_bytes).unwrap();
(frame.seq.0, frame.ack.map(|a| a.max_delivered.0), item)
}
async fn recv_raw<LRx: LinkRx>(rx: &mut LRx) -> Vec<u8> {
let backing = rx.recv().await.unwrap().unwrap();
match backing {
vox_types::Backing::Boxed(b) => b.to_vec(),
vox_types::Backing::Shared(s) => s.as_bytes().to_vec(),
}
}
#[tokio::test]
async fn stable_send_recv_single() {
let (c, s) = memory_link_pair(16);
let source = QueuedLinkSource {
links: VecDeque::from([(c, None)]),
};
let server = tokio::spawn(async move {
let (s_tx, mut s_rx) = s.split();
let _hello = recv_handshake::<_, ClientHello>(&mut s_rx).await.unwrap();
send_handshake(&s_tx, &server_hello(resume_key(b"key"), None, false))
.await
.unwrap();
let raw = recv_raw(&mut s_rx).await;
let (seq, _, item) = decode_frame(&raw);
(seq, item)
});
let client = StableConduit::<StringFamily, _>::new(source).await.unwrap();
let (client_tx, _client_rx) = client.split();
send_item(&client_tx, "hello").await;
let (seq, item) = server.await.unwrap();
assert_eq!(seq, 0);
assert_eq!(item, "hello");
}
#[tokio::test]
async fn reconnect_replays_unacked_frames() {
let (c1, s1) = memory_link_pair(32);
let (c2, s2) = memory_link_pair(32);
let server1 = tokio::spawn(async move {
let (s1_tx, mut s1_rx) = s1.split();
let _hello = recv_handshake::<_, ClientHello>(&mut s1_rx).await.unwrap();
send_handshake(
&s1_tx,
&server_hello(resume_key(b"resume-key-for-test"), None, false),
)
.await
.unwrap();
let raw = recv_raw(&mut s1_rx).await;
let (seq_a, _, item_a) = decode_frame(&raw);
assert_eq!(seq_a, 0);
assert_eq!(item_a, "alpha");
let raw = recv_raw(&mut s1_rx).await;
let (seq_b, _, item_b) = decode_frame(&raw);
assert_eq!(seq_b, 1);
assert_eq!(item_b, "beta");
send_frame(&s1_tx, 0, Some(0), "ack-for-alpha").await;
});
let server2 = tokio::spawn(async move {
let (s2_tx, mut s2_rx) = s2.split();
let hello = recv_handshake::<_, ClientHello>(&mut s2_rx).await.unwrap();
assert!(hello.flags & CH_HAS_RESUME_KEY != 0);
assert!(hello.flags & CH_HAS_LAST_RECEIVED != 0);
assert_eq!(hello.last_received.get(), 0);
send_handshake(
&s2_tx,
&server_hello(resume_key(b"resume-key-2"), Some(0), false),
)
.await
.unwrap();
let raw = recv_raw(&mut s2_rx).await;
let (seq, _, item) = decode_frame(&raw);
assert_eq!(seq, 1);
assert_eq!(item, "beta");
let raw = recv_raw(&mut s2_rx).await;
let (seq, _, item) = decode_frame(&raw);
assert_eq!(seq, 2);
assert_eq!(item, "gamma");
});
let source = QueuedLinkSource {
links: VecDeque::from([(c1, None), (c2, None)]),
};
let client = StableConduit::<StringFamily, _>::new(source).await.unwrap();
let (client_tx, mut client_rx) = client.split();
send_item(&client_tx, "alpha").await;
send_item(&client_tx, "beta").await;
let msg = client_rx.recv().await.unwrap().unwrap();
let msg = msg.get();
assert_eq!(&**msg, "ack-for-alpha");
send_item(&client_tx, "gamma").await;
server1.await.unwrap();
server2.await.unwrap();
}
#[tokio::test]
async fn reconnect_no_replay_when_all_acked() {
let (c1, s1) = memory_link_pair(32);
let (c2, s2) = memory_link_pair(32);
let server1 = tokio::spawn(async move {
let (s1_tx, mut s1_rx) = s1.split();
let _ = recv_handshake::<_, ClientHello>(&mut s1_rx).await.unwrap();
send_handshake(&s1_tx, &server_hello(resume_key(b"key1"), None, false))
.await
.unwrap();
recv_raw(&mut s1_rx).await;
recv_raw(&mut s1_rx).await;
send_frame(&s1_tx, 0, Some(1), "ack-both").await;
});
let server2 = tokio::spawn(async move {
let (s2_tx, mut s2_rx) = s2.split();
let hello = recv_handshake::<_, ClientHello>(&mut s2_rx).await.unwrap();
assert!(hello.flags & CH_HAS_RESUME_KEY != 0);
send_handshake(&s2_tx, &server_hello(resume_key(b"key2"), Some(1), false))
.await
.unwrap();
let raw = recv_raw(&mut s2_rx).await;
let (seq, _, item) = decode_frame(&raw);
assert_eq!(seq, 2);
assert_eq!(item, "gamma");
});
let source = QueuedLinkSource {
links: VecDeque::from([(c1, None), (c2, None)]),
};
let client = StableConduit::<StringFamily, _>::new(source).await.unwrap();
let (client_tx, mut client_rx) = client.split();
send_item(&client_tx, "alpha").await;
send_item(&client_tx, "beta").await;
let msg = client_rx.recv().await.unwrap().unwrap();
assert_eq!(&**msg.get(), "ack-both");
send_item(&client_tx, "gamma").await;
server1.await.unwrap();
server2.await.unwrap();
}
#[tokio::test]
async fn duplicate_frames_are_skipped() {
let (c, s) = memory_link_pair(32);
let source = QueuedLinkSource {
links: VecDeque::from([(c, None)]),
};
let server = tokio::spawn(async move {
let (s_tx, mut s_rx) = s.split();
let _ = recv_handshake::<_, ClientHello>(&mut s_rx).await.unwrap();
send_handshake(&s_tx, &server_hello(resume_key(b"k"), None, false))
.await
.unwrap();
send_frame(&s_tx, 0, None, "first").await;
send_frame(&s_tx, 0, None, "duplicate-first").await;
send_frame(&s_tx, 1, None, "second").await;
});
let client = StableConduit::<StringFamily, _>::new(source).await.unwrap();
let (_client_tx, mut client_rx) = client.split();
let a = client_rx.recv().await.unwrap().unwrap();
let a = a.get();
assert_eq!(&**a, "first");
let b = client_rx.recv().await.unwrap().unwrap();
let b = b.get();
assert_eq!(&**b, "second");
server.await.unwrap();
}
#[tokio::test]
async fn reconnect_failure_surfaces_session_lost() {
let (c1, s1) = memory_link_pair(32);
let (c2, s2) = memory_link_pair(32);
let server1 = tokio::spawn(async move {
let (s1_tx, mut s1_rx) = s1.split();
let _ = recv_handshake::<_, ClientHello>(&mut s1_rx).await.unwrap();
send_handshake(&s1_tx, &server_hello(resume_key(b"known-key"), None, false))
.await
.unwrap();
recv_raw(&mut s1_rx).await;
});
let server2 = tokio::spawn(async move {
let (s2_tx, mut s2_rx) = s2.split();
let hello = recv_handshake::<_, ClientHello>(&mut s2_rx).await.unwrap();
assert!(hello.flags & CH_HAS_RESUME_KEY != 0);
send_handshake(&s2_tx, &server_hello(ResumeKey([0u8; 16]), None, true))
.await
.unwrap();
});
let source = QueuedLinkSource {
links: VecDeque::from([(c1, None), (c2, None)]),
};
let client = StableConduit::<StringFamily, _>::new(source).await.unwrap();
let (client_tx, mut client_rx) = client.split();
send_item(&client_tx, "hello").await;
match client_rx.recv().await {
Err(StableConduitError::SessionLost) => {}
other => panic!("expected SessionLost, got: {:?}", other.map(|_| ())),
}
server1.await.unwrap();
server2.await.unwrap();
}