use std::io;
use kevy_resp::parse_command_borrowed;
use crate::Commands;
use crate::message::{Inbound, Op};
use crate::shard::Shard;
impl<C: Commands> Shard<C> {
pub(crate) fn conn_readable(&mut self, conn_id: u64) -> io::Result<()> {
{
let Some(conn) = self.conns.get_mut(&conn_id) else {
return Ok(());
};
loop {
match conn.sock.read(&mut self.read_buf) {
Ok(0) => {
conn.closing = true;
break;
}
Ok(n) => conn.input.extend_from_slice(&self.read_buf[..n]),
Err(e) if e.kind() == io::ErrorKind::WouldBlock => break,
Err(e) if e.kind() == io::ErrorKind::Interrupted => continue,
Err(_) => {
conn.closing = true;
break;
}
}
}
}
let mut input_buf = match self.conns.get_mut(&conn_id) {
Some(c) => std::mem::take(&mut c.input),
None => return Ok(()),
};
let mut had_protocol_error = false;
loop {
let parse = parse_command_borrowed(&input_buf);
let (argv, consumed) = match parse {
Ok(Some(t)) => t,
Ok(None) => break,
Err(_) => {
had_protocol_error = true;
break;
}
};
if let Some(key) = argv.get(1) {
self.store.prefetch_for_key(key);
}
self.handle_command(conn_id, &argv);
drop(argv);
input_buf.drain(..consumed);
if !self.conns.contains_key(&conn_id) {
return Ok(());
}
}
if let Some(c) = self.conns.get_mut(&conn_id) {
c.input = input_buf;
}
if had_protocol_error {
self.protocol_error(conn_id);
}
self.flush_conn(conn_id)
}
pub(crate) fn drain_inbound(&mut self) -> io::Result<bool> {
let mut did = false;
for src in 0..self.nshards {
if src == self.id {
continue; }
while let Some(msg) = self.inboxes[src].as_mut().expect("peer inbox").pop() {
did = true;
match msg {
Inbound::Request {
origin,
conn,
seq,
op,
} => {
let part = self.exec_op(op);
self.send_to(origin, Inbound::Response { conn, seq, part });
}
Inbound::Response { conn, seq, part } => {
self.fold(conn, seq, part);
self.flush_conn(conn)?;
}
Inbound::RequestBatch { origin, reqs } => {
let mut resps = Vec::with_capacity(reqs.len());
for (conn, seq, argv) in reqs {
let part = self.exec_op(Op::Dispatch(argv));
resps.push((conn, seq, part));
}
self.send_to(origin, Inbound::ResponseBatch(resps));
}
Inbound::ResponseBatch(resps) => {
let mut to_flush: Vec<u64> = Vec::new();
for (conn, seq, part) in resps {
self.fold(conn, seq, part);
if !to_flush.contains(&conn) {
to_flush.push(conn);
}
}
for conn in to_flush {
self.flush_conn(conn)?;
}
}
Inbound::DeliverPublish(batch) => {
for m in &batch {
self.deliver_publish(&m.0, &m.1);
}
}
}
}
}
Ok(did)
}
pub(crate) fn close_conn(&mut self, conn_id: u64) {
if let Some(conn) = self.conns.remove(&conn_id) {
let fd = conn.sock.raw();
let _ = self.poller.delete(fd);
self.fd_to_conn.remove(&fd);
self.unregister_subs(&conn.sub);
}
}
}