use std::borrow::Cow;
use std::cmp::min;
use std::io::{self, Cursor};
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::io::{AsyncRead, AsyncReadExt, ReadBuf, Take};
#[derive(Debug)]
pub struct RawLinedEvent {
name: Cursor<Cow<'static, [u8]>>,
value: Take<Cursor<Cow<'static, [u8]>>>,
state: State,
}
fn farm(cow: Cow<'_, str>) -> Cow<'_, [u8]> {
match cow {
Cow::Borrowed(slice) => Cow::Borrowed(slice.as_bytes()),
Cow::Owned(vec) => Cow::Owned(vec.into_bytes()),
}
}
fn farm_name(cow: Cow<'_, str>) -> Cow<'_, [u8]> {
let mut i = 0;
let mut cow = farm(cow);
while i < cow.len() {
if let Some(k) = memchr::memchr3(b'\r', b'\n', b':', &cow[i..]) {
cow.to_mut()[i + k] = b' ';
i += k + 1;
} else {
break;
}
}
cow
}
fn farm_value(cow: Cow<'_, str>) -> Cow<'_, [u8]> {
let mut i = 0;
let mut cow = farm(cow);
while i < cow.len() {
if let Some(k) = memchr::memchr2(b'\r', b'\n', &cow[i..]) {
cow.to_mut()[i + k] = b' ';
i += k + 1;
} else {
break;
}
}
cow
}
impl RawLinedEvent {
fn prefarmed(name: Cow<'static, [u8]>, value: Cow<'static, [u8]>) -> RawLinedEvent {
let name = Cursor::new(name);
let mut value = Cursor::new(value).take(0);
advance(&mut value);
RawLinedEvent {
name,
value,
state: State::Name,
}
}
pub fn one<N, V>(name: N, value: V) -> RawLinedEvent
where
N: Into<Cow<'static, str>>,
V: Into<Cow<'static, str>>,
{
RawLinedEvent::prefarmed(farm_name(name.into()), farm_value(value.into()))
}
pub fn many<N, V>(name: N, value: V) -> RawLinedEvent
where
N: Into<Cow<'static, str>>,
V: Into<Cow<'static, str>>,
{
RawLinedEvent::prefarmed(farm_name(name.into()), farm(value.into()))
}
pub fn raw<V: Into<Cow<'static, str>>>(value: V) -> RawLinedEvent {
let value = value.into();
let len = value.len();
RawLinedEvent {
name: Cursor::new(Cow::Borrowed(&[])),
value: Cursor::new(farm(value)).take(len as u64),
state: State::Value,
}
}
}
#[derive(Debug, PartialEq, Copy, Clone)]
enum State {
Name,
Colon,
Value,
NewLine,
Done,
}
fn advance<T: AsRef<[u8]> + Unpin>(buf: &mut Take<Cursor<T>>) {
let pos = min(
buf.get_ref().get_ref().as_ref().len() as u64,
buf.get_ref().position(),
);
let inner = buf.get_ref().get_ref().as_ref();
let next = memchr::memchr2(b'\n', b'\r', &inner[(pos as usize)..])
.map(|i| pos + i as u64)
.unwrap_or_else(|| inner.len() as u64);
let limit = next - pos;
buf.set_limit(limit);
}
fn skip<T: AsRef<[u8]> + Unpin>(buf: &mut Take<Cursor<T>>) {
let pos = min(
buf.get_ref().get_ref().as_ref().len() as u64,
buf.get_ref().position(),
);
match buf.get_ref().get_ref().as_ref().get(pos as usize) {
Some(b'\n') => buf.get_mut().set_position(pos + 1),
Some(b'\r') => {
let next = (pos as usize).saturating_add(1);
if buf.get_ref().get_ref().as_ref().get(next) == Some(&b'\n') {
buf.get_mut().set_position(pos + 2);
} else {
buf.get_mut().set_position(pos + 1);
}
}
_ => (),
}
}
macro_rules! dbg_assert_ready {
($e:expr) => {{
let poll = $e;
debug_assert!(poll.is_ready());
::futures::ready!(poll)
}};
}
impl AsyncRead for RawLinedEvent {
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<()>> {
use bytes::Buf;
loop {
if buf.remaining() == 0 {
return Poll::Ready(Ok(()));
}
match self.state {
State::Name => {
dbg_assert_ready!(Pin::new(&mut self.name).poll_read(cx, buf))?;
if !self.name.has_remaining() {
self.name.set_position(0);
self.state = State::Colon;
}
}
State::Colon => {
buf.put_slice(b":");
self.state = State::Value;
}
State::Value => {
dbg_assert_ready!(Pin::new(&mut self.value).poll_read(cx, buf))?;
if self.value.limit() == 0 {
self.state = State::NewLine;
}
}
State::NewLine => {
buf.put_slice(b"\n");
if self.value.get_ref().has_remaining() {
skip(&mut self.value);
advance(&mut self.value);
self.state = State::Name;
} else {
self.state = State::Done;
}
}
State::Done => return Poll::Ready(Ok(())),
}
}
}
}