use std::slice::Iter;
use std::time::Duration;
use std::time::Instant;
use ring::rand::*;
use crate::client::QUIC_VERSION;
use crate::frame::H3iFrame;
use crate::quiche;
use crate::actions::h3::Action;
use crate::actions::h3::StreamEventType;
use crate::actions::h3::WaitType;
use crate::actions::h3::WaitingFor;
use crate::client::execute_action;
use crate::client::parse_streams;
use crate::client::ClientError;
use crate::client::ConnectionCloseDetails;
use crate::client::MAX_DATAGRAM_SIZE;
use crate::config::Config;
use super::parse_args;
use super::Client;
use super::CloseTriggerFrames;
use super::ConnectionSummary;
use super::ParsedArgs;
use super::StreamMap;
use super::StreamParserMap;
#[derive(Default)]
struct SyncClient {
streams: StreamMap,
stream_parsers: StreamParserMap,
}
impl SyncClient {
fn new(close_trigger_frames: Option<CloseTriggerFrames>) -> Self {
Self {
streams: StreamMap::new(close_trigger_frames),
..Default::default()
}
}
}
impl Client for SyncClient {
fn stream_parsers_mut(&mut self) -> &mut StreamParserMap {
&mut self.stream_parsers
}
fn handle_response_frame(&mut self, stream_id: u64, frame: H3iFrame) {
self.streams.insert(stream_id, frame);
}
}
fn create_config(args: &Config, should_log_keys: bool) -> quiche::Config {
let mut config = quiche::Config::new(QUIC_VERSION).unwrap();
config.verify_peer(args.verify_peer);
config.set_application_protos(&[b"h3"]).unwrap();
config.set_max_idle_timeout(args.idle_timeout);
config.set_max_recv_udp_payload_size(MAX_DATAGRAM_SIZE);
config.set_max_send_udp_payload_size(MAX_DATAGRAM_SIZE);
config.set_initial_max_data(10_000_000);
config
.set_initial_max_stream_data_bidi_local(args.max_stream_data_bidi_local);
config.set_initial_max_stream_data_bidi_remote(
args.max_stream_data_bidi_remote,
);
config.set_initial_max_stream_data_uni(args.max_stream_data_uni);
config.set_initial_max_streams_bidi(args.max_streams_bidi);
config.set_initial_max_streams_uni(args.max_streams_uni);
config.set_disable_active_migration(true);
config.set_active_connection_id_limit(0);
config.set_max_connection_window(args.max_window);
config.set_max_stream_window(args.max_stream_window);
config.grease(false);
if args.enable_early_data {
config.enable_early_data();
}
if args.enable_dgram {
config.enable_dgram(
true,
args.dgram_recv_queue_len,
args.dgram_send_queue_len,
);
}
if should_log_keys {
config.log_keys()
}
config
}
pub fn connect(
args: Config, actions: Vec<Action>,
close_trigger_frames: Option<CloseTriggerFrames>,
) -> std::result::Result<ConnectionSummary, ClientError> {
connect_with_early_data(args, None, actions, close_trigger_frames)
}
pub fn connect_with_early_data(
args: Config, early_actions: Option<Vec<Action>>, actions: Vec<Action>,
close_trigger_frames: Option<CloseTriggerFrames>,
) -> std::result::Result<ConnectionSummary, ClientError> {
let mut buf = [0; 65535];
let mut out = [0; MAX_DATAGRAM_SIZE];
let ParsedArgs {
connect_url,
bind_addr,
peer_addr,
} = parse_args(&args);
let mut poll = mio::Poll::new().unwrap();
let mut events = mio::Events::with_capacity(1024);
let mut socket = mio::net::UdpSocket::bind(bind_addr).unwrap();
poll.registry()
.register(&mut socket, mio::Token(0), mio::Interest::READABLE)
.unwrap();
let mut keylog = None;
if let Some(keylog_path) = std::env::var_os("SSLKEYLOGFILE") {
let file = std::fs::OpenOptions::new()
.create(true)
.append(true)
.open(keylog_path)
.unwrap();
keylog = Some(file);
}
let mut config = create_config(&args, keylog.is_some());
let mut scid = [0; quiche::MAX_CONN_ID_LEN];
let rng = SystemRandom::new();
rng.fill(&mut scid[..]).unwrap();
let scid = quiche::ConnectionId::from_ref(&scid);
let Ok(local_addr) = socket.local_addr() else {
return Err(ClientError::Other("invalid socket".to_string()));
};
let mut conn =
quiche::connect(connect_url, &scid, local_addr, peer_addr, &mut config)
.map_err(|e| ClientError::Other(e.to_string()))?;
if let Some(session) = &args.session {
conn.set_session(session)
.map_err(|error| ClientError::Other(error.to_string()))?;
}
if let Some(keylog) = &mut keylog {
if let Ok(keylog) = keylog.try_clone() {
conn.set_keylog(Box::new(keylog));
}
}
log::info!(
"connecting to {peer_addr:} from {local_addr:} with scid {scid:?}",
);
let mut app_proto_selected = false;
let (write, send_info) = conn.send(&mut out).expect("initial send failed");
let mut client = SyncClient::new(close_trigger_frames);
if conn.is_in_early_data() {
if let Some(early_actions) = early_actions {
let mut early_action_iter = early_actions.iter();
let mut wait_duration = None;
let mut wait_instant = None;
let mut waiting_for = WaitingFor::default();
check_duration_and_do_actions(
&mut wait_duration,
&mut wait_instant,
&mut early_action_iter,
&mut conn,
&mut waiting_for,
client.stream_parsers_mut(),
);
}
}
while let Err(e) = socket.send_to(&out[..write], send_info.to) {
if e.kind() == std::io::ErrorKind::WouldBlock {
log::debug!(
"{} -> {}: send() would block",
socket.local_addr().unwrap(),
send_info.to
);
continue;
}
return Err(ClientError::Other(format!("send() failed: {e:?}")));
}
let app_data_start = std::time::Instant::now();
let mut action_iter = actions.iter();
let mut wait_duration = None;
let mut wait_instant = None;
let mut waiting_for = WaitingFor::default();
loop {
let actual_sleep = match (wait_duration, conn.timeout()) {
(Some(wait), Some(timeout)) => {
#[allow(clippy::comparison_chain)]
if timeout < wait {
let new = wait - timeout;
wait_duration = Some(new);
Some(timeout)
} else if wait < timeout {
Some(wait)
} else {
Some(timeout)
}
},
(None, Some(timeout)) => Some(timeout),
(Some(wait), None) => Some(wait),
_ => None,
};
log::debug!("actual sleep is {actual_sleep:?}");
poll.poll(&mut events, actual_sleep).unwrap();
if events.is_empty() {
log::debug!("timed out");
conn.on_timeout();
}
for event in &events {
let socket = match event.token() {
mio::Token(0) => &socket,
_ => unreachable!(),
};
let local_addr = socket.local_addr().unwrap();
'read: loop {
let (len, from) = match socket.recv_from(&mut buf) {
Ok(v) => v,
Err(e) => {
if e.kind() == std::io::ErrorKind::WouldBlock {
break 'read;
}
return Err(ClientError::Other(format!(
"{local_addr}: recv() failed: {e:?}"
)));
},
};
let recv_info = quiche::RecvInfo {
to: local_addr,
from,
};
let _read = match conn.recv(&mut buf[..len], recv_info) {
Ok(v) => v,
Err(e) => {
log::debug!("{local_addr}: recv failed: {e:?}");
continue 'read;
},
};
}
}
log::debug!("done reading");
if conn.is_closed() {
log::info!(
"connection closed with error={:?} did_idle_timeout={}, stats={:?} path_stats={:?}",
conn.peer_error(),
conn.is_timed_out(),
conn.stats(),
conn.path_stats().collect::<Vec<quiche::PathStats>>(),
);
if !conn.is_established() {
log::info!(
"connection timed out after {:?}",
app_data_start.elapsed(),
);
return Err(ClientError::HandshakeFail);
}
break;
}
if (conn.is_established() || conn.is_in_early_data()) &&
!app_proto_selected
{
app_proto_selected = true;
}
if app_proto_selected {
check_duration_and_do_actions(
&mut wait_duration,
&mut wait_instant,
&mut action_iter,
&mut conn,
&mut waiting_for,
client.stream_parsers_mut(),
);
let mut wait_cleared = false;
for response in parse_streams(&mut conn, &mut client) {
let stream_id = response.stream_id;
if let StreamEventType::Finished = response.event_type {
waiting_for.clear_waits_on_stream(stream_id);
} else {
waiting_for.remove_wait(response);
}
wait_cleared = true;
}
if client.streams.all_close_trigger_frames_seen() {
client.streams.close_due_to_trigger_frames(&mut conn);
}
if wait_cleared {
check_duration_and_do_actions(
&mut wait_duration,
&mut wait_instant,
&mut action_iter,
&mut conn,
&mut waiting_for,
client.stream_parsers_mut(),
);
}
}
while conn.scids_left() > 0 {
let (scid, reset_token) = generate_cid_and_reset_token();
if conn.new_scid(&scid, reset_token, false).is_err() {
break;
}
}
let sockets = vec![&socket];
for socket in sockets {
let local_addr = socket.local_addr().unwrap();
for peer_addr in conn.paths_iter(local_addr) {
loop {
let (write, send_info) = match conn.send_on_path(
&mut out,
Some(local_addr),
Some(peer_addr),
) {
Ok(v) => v,
Err(quiche::Error::Done) => {
break;
},
Err(e) => {
log::error!(
"{local_addr} -> {peer_addr}: send failed: {e:?}"
);
conn.close(false, 0x1, b"fail").ok();
break;
},
};
if let Err(e) = socket.send_to(&out[..write], send_info.to) {
if e.kind() == std::io::ErrorKind::WouldBlock {
log::debug!(
"{} -> {}: send() would block",
local_addr,
send_info.to
);
break;
}
return Err(ClientError::Other(format!(
"{} -> {}: send() failed: {:?}",
local_addr, send_info.to, e
)));
}
}
}
}
if conn.is_closed() {
log::info!(
"connection closed, {:?} {:?}",
conn.stats(),
conn.path_stats().collect::<Vec<quiche::PathStats>>()
);
if !conn.is_established() {
log::info!(
"connection timed out after {:?}",
app_data_start.elapsed(),
);
return Err(ClientError::HandshakeFail);
}
break;
}
}
Ok(ConnectionSummary {
stream_map: client.streams,
stats: Some(conn.stats()),
path_stats: conn.path_stats().collect(),
conn_close_details: ConnectionCloseDetails::new(&conn),
})
}
fn check_duration_and_do_actions(
wait_duration: &mut Option<Duration>, wait_instant: &mut Option<Instant>,
action_iter: &mut Iter<Action>, conn: &mut quiche::Connection,
waiting_for: &mut WaitingFor, stream_parsers: &mut StreamParserMap,
) {
match wait_duration.as_ref() {
None => {
if let Some(idle_wait) =
handle_actions(action_iter, conn, waiting_for, stream_parsers)
{
*wait_duration = Some(idle_wait);
*wait_instant = Some(Instant::now());
log::info!(
"waiting for {idle_wait:?} before executing more actions"
);
}
},
Some(period) => {
let now = Instant::now();
let then = wait_instant.unwrap();
log::debug!(
"checking if actions wait period elapsed {:?} > {:?}",
now.duration_since(then),
wait_duration
);
if now.duration_since(then) >= *period {
log::debug!("yup!");
*wait_duration = None;
if let Some(idle_wait) =
handle_actions(action_iter, conn, waiting_for, stream_parsers)
{
*wait_duration = Some(idle_wait);
}
}
},
}
}
pub fn generate_cid_and_reset_token() -> (quiche::ConnectionId<'static>, u128) {
let rng = SystemRandom::new();
let mut scid = [0; quiche::MAX_CONN_ID_LEN];
rng.fill(&mut scid[..]).unwrap();
let scid = scid.to_vec().into();
let mut reset_token = [0; 16];
rng.fill(&mut reset_token[..]).unwrap();
let reset_token = u128::from_be_bytes(reset_token);
(scid, reset_token)
}
fn handle_actions<'a, I>(
iter: &mut I, conn: &mut quiche::Connection, waiting_for: &mut WaitingFor,
stream_parsers: &mut StreamParserMap,
) -> Option<Duration>
where
I: Iterator<Item = &'a Action>,
{
if !waiting_for.is_empty() {
log::debug!(
"won't fire an action due to waiting for responses: {waiting_for:?}"
);
return None;
}
for action in iter {
match action {
Action::FlushPackets => return None,
Action::Wait { wait_type } => match wait_type {
WaitType::WaitDuration(period) => return Some(*period),
WaitType::StreamEvent(response) => {
log::info!(
"waiting for {response:?} before executing more actions"
);
waiting_for.add_wait(response);
return None;
},
},
action => execute_action(action, conn, stream_parsers),
}
}
None
}