use std::net::{Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4};
use bytes::Bytes;
use test_strategy::Arbitrary;
use tracing::{debug, error, info, trace};
use crate::{
ClientConfig, Connection, ConnectionHandle, Dir, FourTuple, PathId, PathStatus, Side, StreamId,
tests::{Pair, TestEndpoint},
};
#[derive(Debug, Clone, Copy, Arbitrary)]
pub(super) enum TestOp {
Drive { side: Side },
AdvanceTime,
DropInbound { side: Side },
ReorderInbound { side: Side },
ForceKeyUpdate { side: Side },
PassiveMigration {
side: Side,
#[strategy(0..3usize)]
addr_idx: usize,
},
OpenPath {
side: Side,
status: PathStatus,
#[strategy(0..3usize)]
addr_idx: usize,
},
ClosePath {
side: Side,
#[strategy(0..3usize)]
path_idx: usize,
error_code: u32,
},
PathSetStatus {
side: Side,
#[strategy(0..3usize)]
path_idx: usize,
status: PathStatus,
},
StreamOp { side: Side, stream_op: StreamOp },
CloseConn {
side: Side,
error_code: u32,
},
AddHpAddr {
side: Side,
#[strategy(0..3usize)]
addr_idx: usize,
},
InitiateHpRound { side: Side },
}
#[derive(Debug, Clone, Copy, Arbitrary)]
pub(super) enum StreamOp {
Open(Dir),
Send {
#[strategy(0..3usize)]
stream: usize,
#[strategy(0..10_000usize)]
num_bytes: usize,
},
Finish(#[strategy(0..3usize)] usize),
Reset(#[strategy(0..3usize)] usize, u32),
Accept(Dir),
Receive(#[strategy(0..3usize)] usize, bool),
Stop(#[strategy(0..3usize)] usize, u32),
}
pub(super) struct State {
send_streams: Vec<StreamId>,
recv_streams: Vec<StreamId>,
handle: ConnectionHandle,
side: Side,
}
impl TestOp {
fn run(self, pair: &mut Pair, client: &mut State, server: &mut State) -> Option<()> {
let now = pair.time;
match self {
Self::Drive { side: Side::Client } => pair.drive_client(),
Self::Drive { side: Side::Server } => pair.drive_server(),
Self::AdvanceTime => {
if !pair.client.is_idle() || !pair.server.is_idle() {
pair.advance_time();
}
}
Self::DropInbound { side: Side::Client } => {
debug!(len = pair.client.inbound.len(), "dropping inbound");
pair.client.inbound.clear();
}
Self::DropInbound { side: Side::Server } => {
debug!(len = pair.server.inbound.len(), "dropping inbound");
pair.server.inbound.clear();
}
Self::ReorderInbound { side: Side::Client } => {
let item = pair.client.inbound.pop_front()?;
pair.client.inbound.push_back(item);
}
Self::ReorderInbound { side: Side::Server } => {
let item = pair.server.inbound.pop_front()?;
pair.server.inbound.push_back(item);
}
Self::ForceKeyUpdate { side: Side::Client } => client.conn(pair)?.force_key_update(),
Self::ForceKeyUpdate { side: Side::Server } => server.conn(pair)?.force_key_update(),
Self::PassiveMigration {
side: Side::Client,
addr_idx,
} => {
let routes = pair.routes.as_mut()?;
routes.sim_client_migration(addr_idx, inc_last_addr_octet);
}
Self::PassiveMigration {
side: Side::Server,
addr_idx,
} => {
let routes = pair.routes.as_mut()?;
routes.sim_server_migration(addr_idx, inc_last_addr_octet);
}
Self::OpenPath {
side,
status,
addr_idx,
} => {
let routes = pair.routes.as_ref()?;
let remote = match side {
Side::Client => routes.server_addr(addr_idx)?,
Side::Server => routes.client_addr(addr_idx)?,
};
let state = match side {
Side::Client => client,
Side::Server => server,
};
let conn = state.conn(pair)?;
let network_path = FourTuple {
remote,
local_ip: None,
};
conn.open_path(network_path, status, now)
.inspect_err(|err| error!(?err, "OpenPath failed"))
.ok();
}
Self::ClosePath {
side,
path_idx,
error_code,
} => {
let state = match side {
Side::Client => client,
Side::Server => server,
};
let conn = state.conn(pair)?;
let path_id = get_path_id(conn, path_idx)?;
conn.close_path(now, path_id, error_code.into())
.inspect_err(|err| error!(?err, "ClosePath failed"))
.ok();
}
Self::PathSetStatus {
side,
path_idx,
status,
} => {
let state = match side {
Side::Client => client,
Side::Server => server,
};
let conn = state.conn(pair)?;
let path_id = get_path_id(conn, path_idx)?;
conn.set_path_status(path_id, status)
.inspect_err(|err| error!(?err, "PathSetStatus failed"))
.ok();
}
Self::StreamOp { side, stream_op } => {
let state = match side {
Side::Client => client,
Side::Server => server,
};
stream_op.run(pair, state);
}
Self::CloseConn { side, error_code } => {
let state = match side {
Side::Client => client,
Side::Server => server,
};
let conn = state.conn(pair)?;
conn.close(now, error_code.into(), Bytes::new());
}
Self::AddHpAddr { side, addr_idx } => {
let routes = pair.routes.as_ref()?;
let address = match side {
Side::Client => routes.client_addr(addr_idx)?,
Side::Server => routes.server_addr(addr_idx)?,
};
let state = match side {
Side::Client => client,
Side::Server => server,
};
let conn = state.conn(pair)?;
conn.add_nat_traversal_address(address)
.inspect_err(|err| error!(?err, "AddHpAddr failed"))
.ok();
}
Self::InitiateHpRound { side } => {
let state = match side {
Side::Client => client,
Side::Server => server,
};
let conn = state.conn(pair)?;
let addrs = conn
.initiate_nat_traversal_round(now)
.inspect_err(|err| error!(?err, "InitiateHpRound failed"))
.ok()?;
trace!(?addrs, "initiating NAT Traversal");
}
}
Some(())
}
}
impl StreamOp {
fn run(self, pair: &mut Pair, state: &mut State) -> Option<()> {
let conn = state.conn(pair)?;
match self {
Self::Open(kind) => state.send_streams.extend(conn.streams().open(kind)),
Self::Send { stream, num_bytes } => {
let stream_id = state.send_streams.get(stream)?;
let data = vec![0; num_bytes];
let bytes = conn.send_stream(*stream_id).write(&data).ok()?;
trace!(attempted_write = %num_bytes, actually_written = %bytes, "random interaction: Wrote stream bytes");
}
Self::Finish(stream) => {
let stream_id = state.send_streams.get(stream)?;
conn.send_stream(*stream_id).finish().ok();
}
Self::Reset(stream, code) => {
let stream_id = state.send_streams.get(stream)?;
conn.send_stream(*stream_id).reset(code.into()).ok();
}
Self::Accept(kind) => state.recv_streams.extend(conn.streams().accept(kind)),
Self::Receive(stream, ordered) => {
let stream_id = state.recv_streams.get(stream)?;
let mut recv_stream = conn.recv_stream(*stream_id);
let mut chunks = recv_stream.read(ordered).ok()?;
let chunk = chunks.next(usize::MAX).ok()??;
trace!(chunk_len = %chunk.bytes.len(), offset = %chunk.offset, "read from stream");
}
Self::Stop(stream, code) => {
let stream_id = state.recv_streams.get(stream)?;
conn.recv_stream(*stream_id).stop(code.into()).ok();
}
};
Some(())
}
}
impl State {
fn new(side: Side, handle: ConnectionHandle) -> Self {
Self {
send_streams: Vec::new(),
recv_streams: Vec::new(),
handle,
side,
}
}
fn endpoint<'a>(&self, pair: &'a mut Pair) -> &'a mut TestEndpoint {
match self.side {
Side::Server => &mut pair.server,
Side::Client => &mut pair.client,
}
}
fn conn<'a>(&self, pair: &'a mut Pair) -> Option<&'a mut Connection> {
self.endpoint(pair).connections.get_mut(&self.handle)
}
}
fn get_path_id(conn: &mut Connection, idx: usize) -> Option<PathId> {
let paths = conn.paths();
paths
.get(idx.clamp(0, paths.len().saturating_sub(1)))
.copied()
}
fn inc_last_addr_octet(addr: SocketAddr) -> SocketAddr {
match addr {
SocketAddr::V4(socket_addr_v4) => {
let [a, b, c, d] = socket_addr_v4.ip().octets();
SocketAddr::V4(SocketAddrV4::new(
Ipv4Addr::new(a, b, c, d.wrapping_add(1)),
socket_addr_v4.port(),
))
}
SocketAddr::V6(mut socket_addr_v6) => {
let [a, b, c, d, e, f, g, h] = socket_addr_v6.ip().segments();
socket_addr_v6.set_ip(Ipv6Addr::new(a, b, c, d, e, f, g, h.wrapping_add(1)));
SocketAddr::V6(socket_addr_v6)
}
}
}
pub(super) fn run_random_interaction(
pair: &mut Pair,
interactions: Vec<TestOp>,
client_config: ClientConfig,
) -> (ConnectionHandle, ConnectionHandle) {
let (client_ch, server_ch) = pair.connect_with(client_config);
pair.drive(); info!("INTERACTION SETUP FINISHED");
let mut client = State::new(Side::Client, client_ch);
let mut server = State::new(Side::Server, server_ch);
for interaction in interactions {
info!(?interaction, "INTERACTION STEP");
interaction.run(pair, &mut client, &mut server);
}
(client.handle, server.handle)
}