use std::io::{self, Read, Write};
use std::net::TcpStream;
use kevy_embedded::PubsubFrame;
use kevy_resp::{Reply, encode_command, parse_reply};
use crate::subscribe::PubsubEvent;
pub(crate) fn send_to(
stream: &mut TcpStream,
verb: &[u8],
args: &[&[u8]],
) -> io::Result<()> {
let mut argv = Vec::with_capacity(args.len() + 1);
argv.push(verb.to_vec());
argv.extend(args.iter().map(|a| a.to_vec()));
let mut frame = Vec::new();
encode_command(&mut frame, &argv);
stream.write_all(&frame)
}
pub(crate) fn recv_remote(
stream: &mut TcpStream,
buf: &mut Vec<u8>,
) -> io::Result<PubsubEvent> {
let mut chunk = [0u8; 8192];
loop {
match parse_reply(buf) {
Ok(Some((reply, used))) => {
buf.drain(..used);
return classify(reply);
}
Ok(None) => {}
Err(_) => {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
"malformed reply",
));
}
}
let n = stream.read(&mut chunk)?;
if n == 0 {
return Err(io::Error::new(
io::ErrorKind::UnexpectedEof,
"server closed connection",
));
}
buf.extend_from_slice(&chunk[..n]);
}
}
pub(crate) fn frame_to_event(frame: PubsubFrame) -> PubsubEvent {
match frame {
PubsubFrame::Subscribe { channel, count } => PubsubEvent::Subscribe {
channel,
count: count as i64,
},
PubsubFrame::Psubscribe { pattern, count } => PubsubEvent::Psubscribe {
pattern,
count: count as i64,
},
PubsubFrame::Unsubscribe { channel, count } => PubsubEvent::Unsubscribe {
channel,
count: count as i64,
},
PubsubFrame::Punsubscribe { pattern, count } => PubsubEvent::Punsubscribe {
pattern,
count: count as i64,
},
PubsubFrame::Message { channel, payload } => PubsubEvent::Message { channel, payload },
PubsubFrame::Pmessage {
pattern,
channel,
payload,
} => PubsubEvent::Pmessage {
pattern,
channel,
payload,
},
}
}
pub(crate) fn classify(reply: Reply) -> io::Result<PubsubEvent> {
let items = match reply {
Reply::Array(v) | Reply::Push(v) => v,
other => return Err(invalid(format!("expected array frame, got {}", shape(&other)))),
};
let kind = match items.first() {
Some(Reply::Bulk(b)) => b.clone(),
_ => return Err(invalid("pubsub frame missing kind field")),
};
match kind.as_slice() {
b"subscribe" => {
let [_, ch, n] = into_array3(items)?;
Ok(PubsubEvent::Subscribe {
channel: take_bulk(ch, "channel")?,
count: take_int(n, "count")?,
})
}
b"psubscribe" => {
let [_, p, n] = into_array3(items)?;
Ok(PubsubEvent::Psubscribe {
pattern: take_bulk(p, "pattern")?,
count: take_int(n, "count")?,
})
}
b"unsubscribe" => {
let [_, ch, n] = into_array3(items)?;
Ok(PubsubEvent::Unsubscribe {
channel: take_bulk_or_nil(ch, "channel")?,
count: take_int(n, "count")?,
})
}
b"punsubscribe" => {
let [_, p, n] = into_array3(items)?;
Ok(PubsubEvent::Punsubscribe {
pattern: take_bulk_or_nil(p, "pattern")?,
count: take_int(n, "count")?,
})
}
b"message" => {
let [_, ch, payload] = into_array3(items)?;
Ok(PubsubEvent::Message {
channel: take_bulk(ch, "channel")?,
payload: take_bulk(payload, "payload")?,
})
}
b"pmessage" => {
let [_, pat, ch, payload] = into_array4(items)?;
Ok(PubsubEvent::Pmessage {
pattern: take_bulk(pat, "pattern")?,
channel: take_bulk(ch, "channel")?,
payload: take_bulk(payload, "payload")?,
})
}
other => Err(invalid(format!(
"unknown pubsub kind '{}'",
String::from_utf8_lossy(other)
))),
}
}
fn into_array3(items: Vec<Reply>) -> io::Result<[Reply; 3]> {
items
.try_into()
.map_err(|v: Vec<Reply>| invalid(format!("expected 3-element pubsub frame, got {}", v.len())))
}
fn into_array4(items: Vec<Reply>) -> io::Result<[Reply; 4]> {
items
.try_into()
.map_err(|v: Vec<Reply>| invalid(format!("expected 4-element pubsub frame, got {}", v.len())))
}
fn take_bulk(r: Reply, field: &str) -> io::Result<Vec<u8>> {
match r {
Reply::Bulk(b) => Ok(b),
other => Err(invalid(format!(
"expected bulk for {field}, got {}",
shape(&other)
))),
}
}
fn take_bulk_or_nil(r: Reply, field: &str) -> io::Result<Option<Vec<u8>>> {
match r {
Reply::Bulk(b) => Ok(Some(b)),
Reply::Nil => Ok(None),
other => Err(invalid(format!(
"expected bulk/nil for {field}, got {}",
shape(&other)
))),
}
}
fn take_int(r: Reply, field: &str) -> io::Result<i64> {
match r {
Reply::Int(n) => Ok(n),
other => Err(invalid(format!(
"expected integer for {field}, got {}",
shape(&other)
))),
}
}
pub(crate) fn shape(r: &Reply) -> &'static str {
match r {
Reply::Simple(_) => "simple-string",
Reply::Error(_) => "error",
Reply::Int(_) => "integer",
Reply::Bulk(_) => "bulk-string",
Reply::Nil | Reply::Null => "nil",
Reply::Array(_) => "array",
Reply::Map(_) => "map",
Reply::Set(_) => "set",
Reply::Double(_) => "double",
Reply::Boolean(_) => "boolean",
Reply::Verbatim { .. } => "verbatim-string",
Reply::BigNumber(_) => "big-number",
Reply::Push(_) => "push",
Reply::BlobError(_) => "blob-error",
}
}
pub(crate) fn invalid(msg: impl Into<String>) -> io::Error {
io::Error::new(io::ErrorKind::InvalidData, msg.into())
}