use super::*;
#[derive(Debug)]
pub struct User {
pub ic: Arc<InstanceConfig>,
pub web: mpsc::Sender<WebRequest>,
pub route: mpsc::Sender<RoutedPacket>,
}
pub async fn run(global: Arc<Global>,
ic: Arc<InstanceConfig>,
mut web: mpsc::Receiver<WebRequest>,
mut routed: mpsc::Receiver<RoutedPacket>)
-> Result<Void, AE>
{
struct Outstanding {
reply_to: oneshot::Sender<WebResponse>,
oi: OutstandingInner,
}
#[derive(Debug)]
struct OutstandingInner {
deadline: Instant,
target_requests_outstanding: u32,
max_batch_down: u32,
}
let mut outstanding: VecDeque<Outstanding> = default();
let mut downbound: PacketQueue<RoutedPacketData> = default();
let try_send_response = |
reply_to: oneshot::Sender<WebResponse>,
response: WebResponse
| {
reply_to.send(response)
.unwrap_or_else(|_: WebResponse| {
trace!("unable to send response back to webserver! user={}",
&ic.link.client);
});
};
loop {
let eff_max_batch_down = outstanding
.iter()
.map(|o| o.oi.max_batch_down)
.min()
.unwrap_or(ic.max_batch_down)
.sat();
let earliest_deadline = outstanding
.iter()
.map(|o| o.oi.deadline)
.min();
if let Some(req) = {
let now = Instant::now();
if ! downbound.is_empty() {
outstanding.pop_front()
} else if let Some((i,_)) = outstanding.iter().enumerate().find({
|(_,o)| {
outstanding.len() > o.oi.target_requests_outstanding.sat()
||
o.oi.deadline < now
}
}) {
Some(outstanding.remove(i).unwrap())
} else {
None
}
} {
let mut build: FrameQueueBuf = default();
loop {
let next = if let Some(n) = downbound.peek_front() { n }
else { break };
if build.len() + next.len() >= eff_max_batch_down { break }
build.esc_push(downbound.pop_front().unwrap());
}
if ! build.is_empty() {
build.advance(1);
}
let response = WebResponse {
data: Ok(build),
warnings: default(),
};
try_send_response(req.reply_to, response);
}
let max = usize::saturating_mul(
ic.max_requests_outstanding.sat(),
eff_max_batch_down,
).saturating_add(1 );
while downbound.total_len() > max {
let _ = downbound.pop_front();
trace!("{} discarding downbound-queue-full", &ic.link);
}
select!{
biased;
data = routed.recv() =>
{
let data = data.ok_or_else(|| anyhow!("routers shut down!"))?;
downbound.push_back(data.data);
},
req = web.recv() =>
{
let WebRequest {
initial, initial_remaining, length_hint, mut body,
boundary_finder,
reply_to, conn, mut warnings, may_route,
} = req.ok_or_else(|| anyhow!("webservers all shut down!"))?;
match async {
let initial_used = initial.len() - initial_remaining;
let whole_request = read_limited_bytes(
ic.max_batch_up.sat(),
initial,
length_hint,
&mut body
).await.context("read request body")?;
let (meta, mut comps) =
multipart::ComponentIterator::resume_mid_component(
&whole_request[initial_used..],
boundary_finder
).context("resume parsing body, after auth checks")?;
let mut meta = MetadataFieldIterator::new(&meta);
macro_rules! meta {
{ $v:ident, ( $( $badcmp:tt )? ), $ret:expr,
let $server:ident, $client:ident $($code:tt)*
} => {
let $v = (||{
let $server = ic.$v;
let $client $($code)*
$(
if $client $badcmp $server {
throw!(anyhow!("mismatch: client={:?} {} server={:?}",
$client, stringify!($badcmp), $server));
}
)?
Ok::<_,AE>($ret)
})().context(stringify!($v))?;
}
}
meta!{
target_requests_outstanding, ( != ), client,
let server, client: u32 = meta.need_parse()?;
}
meta!{
http_timeout, ( > ), client,
let server, client = Duration::from_secs(meta.need_parse()?);
}
meta!{
mtu, ( != ), client,
let server, client: u32 = meta.parse()?.unwrap_or(server);
}
meta!{
max_batch_down, (), min(client, server),
let server, client: u32 = meta.parse()?.unwrap_or(server);
}
meta!{
max_batch_up, ( > ), client,
let server, client = meta.parse()?.unwrap_or(server);
}
let _ = max_batch_up;
while let Some(comp) = comps.next(&mut warnings, PartName::d)? {
if comp.name != PartName::d {
warnings.add(&format_args!("unexpected part {:?}", comp.name))?;
}
slip::processn(Mime2Slip, mtu, comp.payload, |header| {
let saddr = ip_packet_addr::<false>(header)?;
if saddr != ic.link.client.0 { throw!(PE::Src(saddr)) }
let daddr = ip_packet_addr::<true>(header)?;
Ok(daddr)
}, |(daddr,packet)| route_packet(
&global, &conn, Some(&ic.link.client), daddr,
packet, may_route.clone(),
).map(Ok),
|e| Ok::<_,SlipFramesError<_>>({ warnings.add(&e)?; })
).await?;
}
let deadline = Instant::now() + http_timeout;
let oi = OutstandingInner {
target_requests_outstanding,
max_batch_down,
deadline,
};
Ok::<_,AE>(oi)
}.await {
Ok(oi) => outstanding.push_back(Outstanding { reply_to, oi }),
Err(e) => {
try_send_response(reply_to, WebResponse {
data: Err(e),
warnings,
});
},
}
}
() = async {if let Some(deadline) = earliest_deadline {
tokio::time::sleep_until(deadline).await;
} else {
future::pending().await
} } =>
{
}
}
}
}