use std::fmt;
use std::fs::File;
use std::io::Write;
use std::path::PathBuf;
use std::{cmp, collections::HashMap, mem};
use bytes::{BufMut, Bytes, BytesMut};
use tokio::io;
use tokio_util::codec::Decoder;
use tokio_util::codec::Encoder;
use crate::err::Error;
use crate::{KVLines, Params, Telegram};
#[derive(Clone, Debug, PartialEq)]
enum CodecState {
Telegram,
Params,
KVLines,
Chunks,
Buf,
File,
Writer,
Skip
}
pub enum Input {
Telegram(Telegram),
KVLines(KVLines),
Params(Params),
Chunk(BytesMut, usize),
Buf(BytesMut),
File(PathBuf),
WriteDone,
SkipDone
}
pub struct Codec {
next_line_index: usize,
max_line_length: usize,
tg: Telegram,
params: Params,
kvlines: KVLines,
state: CodecState,
bin_remain: usize,
pathname: Option<PathBuf>,
writer: Option<Box<dyn Write + Send + Sync>>,
buf: BytesMut
}
impl fmt::Debug for Codec {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Codec").field("state", &self.state).finish()
}
}
impl Default for Codec {
fn default() -> Self {
Codec::new()
}
}
impl Codec {
pub fn new() -> Codec {
Codec {
next_line_index: 0,
max_line_length: usize::MAX,
tg: Telegram::new(),
params: Params::new(),
kvlines: KVLines::new(),
state: CodecState::Telegram,
bin_remain: 0,
pathname: None,
writer: None,
buf: BytesMut::new()
}
}
pub fn new_with_max_length(max_line_length: usize) -> Self {
Codec {
max_line_length,
..Codec::new()
}
}
pub fn max_line_length(&self) -> usize {
self.max_line_length
}
fn find_newline(&self, buf: &BytesMut) -> (usize, Option<usize>) {
let read_to = cmp::min(self.max_line_length.saturating_add(1), buf.len());
let newline_offset = buf[self.next_line_index..read_to]
.iter()
.position(|b| *b == b'\n');
(read_to, newline_offset)
}
fn decode_telegram_line(&mut self, line: &str) -> Result<(), Error> {
if self.tg.get_topic().is_none() {
self.tg.set_topic(line)?;
} else {
let idx = line.find(' ');
if let Some(idx) = idx {
let (k, v) = line.split_at(idx);
let v = &v[1..v.len()];
self.tg.add_param(k, v)?;
}
}
Ok(())
}
fn get_eol_idx(&mut self, buf: &BytesMut) -> Result<Option<usize>, Error> {
let (read_to, newline_offset) = self.find_newline(&buf);
match newline_offset {
Some(offset) => {
let newline_index = offset + self.next_line_index;
self.next_line_index = 0;
Ok(Some(newline_index + 1))
}
None if buf.len() > self.max_line_length => Err(Error::BadFormat(
"Exceeded maximum line length.".to_string()
)),
None => {
self.next_line_index = read_to;
Ok(None)
}
}
}
fn decode_telegram_lines(
&mut self,
buf: &mut BytesMut
) -> Result<Option<Telegram>, Error> {
loop {
if let Some(idx) = self.get_eol_idx(buf)? {
let line = buf.split_to(idx);
let line = &line[..line.len() - 1];
let line = utf8(without_carriage_return(line))?;
if line.is_empty() {
return Ok(Some(mem::take(&mut self.tg)));
} else {
self.decode_telegram_line(&line)?;
}
} else {
return Ok(None);
}
}
}
fn decode_params_lines(
&mut self,
buf: &mut BytesMut
) -> Result<Option<Params>, Error> {
loop {
if let Some(idx) = self.get_eol_idx(buf)? {
let line = buf.split_to(idx);
let line = &line[..line.len() - 1];
let line = utf8(without_carriage_return(line))?;
if line.is_empty() {
self.state = CodecState::Telegram;
return Ok(Some(mem::take(&mut self.params)));
} else {
let idx = line.find(' ');
if let Some(idx) = idx {
let (k, v) = line.split_at(idx);
let v = &v[1..v.len()];
self.params.add_param(k, v)?;
}
}
} else {
return Ok(None);
}
}
}
fn decode_kvlines(
&mut self,
buf: &mut BytesMut
) -> Result<Option<KVLines>, Error> {
loop {
if let Some(idx) = self.get_eol_idx(buf)? {
let line = buf.split_to(idx);
let line = &line[..line.len() - 1];
let line = utf8(without_carriage_return(line))?;
if line.is_empty() {
self.state = CodecState::Telegram;
return Ok(Some(mem::take(&mut self.kvlines)));
} else {
let idx = line.find(' ');
if let Some(idx) = idx {
let (k, v) = line.split_at(idx);
let v = &v[1..v.len()];
self.kvlines.append(k, v);
}
}
} else {
return Ok(None);
}
}
}
pub fn expect_chunks(&mut self, size: usize) {
self.state = CodecState::Chunks;
self.bin_remain = size;
}
pub fn expect_buf(&mut self, size: usize) -> Result<(), Error> {
if size == 0 {
return Err(Error::InvalidSize("The size must not be zero".to_string()));
}
self.state = CodecState::Buf;
self.bin_remain = size;
self.buf = BytesMut::with_capacity(size);
Ok(())
}
pub fn expect_file<P: Into<PathBuf>>(
&mut self,
pathname: P,
size: usize
) -> Result<(), Error> {
if size == 0 {
return Err(Error::InvalidSize("The size must not be zero".to_string()));
}
self.state = CodecState::File;
let pathname = pathname.into();
self.writer = Some(Box::new(File::create(&pathname)?));
self.pathname = Some(pathname);
self.bin_remain = size;
Ok(())
}
pub fn expect_writer<W: 'static + Write + Send + Sync>(
&mut self,
writer: W,
size: usize
) -> Result<(), Error> {
if size == 0 {
return Err(Error::InvalidSize("The size must not be zero".to_string()));
}
self.state = CodecState::Writer;
self.writer = Some(Box::new(writer));
self.bin_remain = size;
Ok(())
}
pub fn expect_params(&mut self) {
self.state = CodecState::Params;
}
pub fn expect_kvlines(&mut self) {
self.state = CodecState::KVLines;
}
pub fn skip(&mut self, size: usize) -> Result<(), Error> {
if size == 0 {
return Err(Error::InvalidSize("The size must not be zero".to_string()));
}
self.state = CodecState::Skip;
self.bin_remain = size;
Ok(())
}
}
fn utf8(buf: &[u8]) -> Result<&str, io::Error> {
std::str::from_utf8(buf).map_err(|_| {
io::Error::new(
io::ErrorKind::InvalidData,
"Unable to decode input as UTF8"
)
})
}
fn without_carriage_return(s: &[u8]) -> &[u8] {
if let Some(&b'\r') = s.last() {
&s[..s.len() - 1]
} else {
s
}
}
impl Decoder for Codec {
type Item = Input;
type Error = crate::err::Error;
fn decode(&mut self, buf: &mut BytesMut) -> Result<Option<Input>, Error> {
match self.state {
CodecState::Telegram => {
let tg = self.decode_telegram_lines(buf)?;
if let Some(tg) = tg {
return Ok(Some(Input::Telegram(tg)));
}
Ok(None)
}
CodecState::Params => {
let params = self.decode_params_lines(buf)?;
if let Some(params) = params {
return Ok(Some(Input::Params(params)));
}
Ok(None)
}
CodecState::KVLines => {
let kvlines = self.decode_kvlines(buf)?;
if let Some(kvlines) = kvlines {
return Ok(Some(Input::KVLines(kvlines)));
}
Ok(None)
}
CodecState::Chunks => {
if buf.is_empty() {
return Ok(None);
}
let read_to = cmp::min(self.bin_remain, buf.len());
self.bin_remain -= read_to;
if self.bin_remain == 0 {
self.state = CodecState::Telegram;
}
Ok(Some(Input::Chunk(buf.split_to(read_to), self.bin_remain)))
}
CodecState::Buf => {
if buf.is_empty() {
return Ok(None);
}
let read_to = cmp::min(self.bin_remain, buf.len());
self.buf.put(buf.split_to(read_to));
self.bin_remain -= read_to;
if self.bin_remain != 0 {
return Ok(None);
}
self.state = CodecState::Telegram;
Ok(Some(Input::Buf(mem::take(&mut self.buf))))
}
CodecState::File | CodecState::Writer => {
if buf.is_empty() {
return Ok(None); }
let read_to = cmp::min(self.bin_remain, buf.len());
if let Some(ref mut f) = self.writer {
f.write_all(&buf.split_to(read_to))?;
}
self.bin_remain -= read_to;
if self.bin_remain != 0 {
return Ok(None); }
self.writer = None;
let ret = if self.state == CodecState::File {
let pathname = if let Some(ref fname) = self.pathname {
fname.clone()
} else {
return Err(Error::BadState("Missing pathname".to_string()));
};
self.pathname = None;
Input::File(pathname)
} else {
Input::WriteDone
};
self.state = CodecState::Telegram;
Ok(Some(ret))
} CodecState::Skip => {
if buf.is_empty() {
return Ok(None); }
let read_to = cmp::min(self.bin_remain, buf.len());
let _ = buf.split_to(read_to);
self.bin_remain -= read_to;
if self.bin_remain != 0 {
return Ok(None); }
self.state = CodecState::Telegram;
Ok(Some(Input::SkipDone))
} } }
}
impl Encoder<&Telegram> for Codec {
type Error = crate::err::Error;
fn encode(
&mut self,
tg: &Telegram,
buf: &mut BytesMut
) -> Result<(), Error> {
tg.encoder_write(buf)?;
Ok(())
}
}
impl Encoder<&Params> for Codec {
type Error = crate::err::Error;
fn encode(
&mut self,
params: &Params,
buf: &mut BytesMut
) -> Result<(), Error> {
params.encoder_write(buf)?;
Ok(())
}
}
impl Encoder<&HashMap<String, String>> for Codec {
type Error = crate::err::Error;
fn encode(
&mut self,
data: &HashMap<String, String>,
buf: &mut BytesMut
) -> Result<(), Error> {
let mut sz = 0;
for (k, v) in data.iter() {
sz += k.len() + 1 + v.len() + 1;
}
sz += 1;
buf.reserve(sz);
for (k, v) in data.iter() {
buf.put(k.as_bytes());
buf.put_u8(b' ');
buf.put(v.as_bytes());
buf.put_u8(b'\n');
}
buf.put_u8(b'\n');
Ok(())
}
}
impl Encoder<&KVLines> for Codec {
type Error = crate::err::Error;
fn encode(
&mut self,
kvlines: &KVLines,
buf: &mut BytesMut
) -> Result<(), Error> {
kvlines.encoder_write(buf)?;
Ok(())
}
}
impl Encoder<Bytes> for Codec {
type Error = crate::err::Error;
fn encode(
&mut self,
data: Bytes,
buf: &mut BytesMut
) -> Result<(), crate::err::Error> {
buf.reserve(data.len());
buf.put(data);
Ok(())
}
}
impl Encoder<&[u8]> for Codec {
type Error = crate::err::Error;
fn encode(
&mut self,
data: &[u8],
buf: &mut BytesMut
) -> Result<(), crate::err::Error> {
buf.reserve(data.len());
buf.put(data);
Ok(())
}
}