macro_rules! bg_stream {
($name:ident, $handler:ty, $bytes:ty, $cfg:ty) => {
async fn $name<S, P>(
mut stream: PacketStream<S, P>,
handler: &mut $handler,
cfg_rx: &mut watch::Receiver<$cfg>,
mut close: &mut oneshot::Receiver<()>
) -> Result<(), TaskError>
where
S: ByteStream,
P: Packet<$bytes>
{
let mut should_close = false;
let mut close_packet = None;
let timeout = stream.timeout();
let diff = match timeout.as_secs() {
0..=1 => 0,
0..=10 => 1,
_ => 5
};
let mut interval = interval(timeout - Duration::from_secs(diff));
interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
loop {
tokio::select!{
packet = stream.receive(), if !should_close => {
let r_packet = match packet {
Ok(p) => {
handler.send(p).await?
},
Err(PacketReceiverError::Io(e)) => {
return Err(TaskError::Io(e))
},
Err(PacketReceiverError::Hard(e)) => {
return Err(TaskError::Packet(e))
},
Err(PacketReceiverError::Soft(h, e)) => {
handler.packet_error(h, e)?
}
};
match r_packet {
SendBack::None => {},
SendBack::Packet(p) => {
stream.send(p).await
.map_err(TaskError::Io)?;
},
SendBack::Close => {
should_close = true;
let _ = handler.close();
},
SendBack::CloseWithPacket => {
should_close = true;
let packet = handler.close();
close_packet = Some(packet);
}
}
},
Some(packet) = handler.to_send() => {
stream.send(packet).await
.map_err(TaskError::Io)?;
},
_ping = interval.tick(), if !should_close => {
stream.send(handler.ping_packet()).await
.map_err(TaskError::Io)?;
},
_ = &mut close, if !should_close => {
should_close = true;
let packet = handler.close();
close_packet = Some(packet);
},
Some(cfg) = cfg_rx.recv(), if !should_close => {
stream.stream.set_timeout(cfg.timeout);
stream.builder.set_body_limit(cfg.body_limit);
},
else => {
if let Some(packet) = close_packet.take() {
if let Err(e) = stream.send(packet).await {
tracing::error!(
"error sending close packet {:?}", e
);
}
}
if let Err(e) = stream.shutdown().await {
tracing::error!("error shutting down {:?}", e);
}
return Ok(())
}
}
}
}
}
}
macro_rules! client_bg_reconnect {
(
$fn:ident(
$stream:ident,
$bg_handler:ident,
$cfg_rx:ident,
$rx_close:ident,
$recon_strat:ident,
// return Result<PacketStream, TaskError>
|$n_stream:ident, $cfg:ident| $block:block
)
) => (
let mut stream = Some($stream);
loop {
let stream = match (stream.take(), &mut $recon_strat) {
(Some(s), _) => s,
(None, None) => unreachable!(),
(None, Some(recon)) => {
let mut err_counter = 0;
loop {
let stream = (recon.inner)(err_counter).await;
match stream {
Ok(s) => break s,
Err(e) => {
tracing::error!(
"reconnect failed attempt {} {:?}",
err_counter,
e
);
err_counter += 1;
}
}
}
}
};
let cfg = $cfg_rx.newest();
let stream = {
let $n_stream = stream;
let $cfg = cfg;
$block
};
let stream = match stream {
Ok(s) => s,
Err(e) => {
tracing::error!("creating packetstream failed {:?}", e);
if $recon_strat.is_none() {
return Err(e)
}
continue
}
};
let r = $fn(
stream,
&mut $bg_handler,
&mut $cfg_rx,
&mut $rx_close
).await;
match r {
Ok(o) => return Ok(o),
Err(e) => {
tracing::error!(
"fire stream client connection failed {:?}", e
);
if $recon_strat.is_none() {
return Err(e)
}
}
}
$bg_handler.close_all_started();
}
)
}