use crate::channel::{Context, Handler};
use crate::transport::TaggedBytesMut;
use bytes::BytesMut;
use std::time::Instant;
mod line_based_frame_decoder;
pub use line_based_frame_decoder::{LineBasedFrameDecoder, TerminatorType};
pub trait MessageDecoder {
fn decode(&mut self, buf: &mut BytesMut) -> Result<Option<BytesMut>, std::io::Error>;
}
pub struct TaggedByteToMessageCodec {
transport_active: bool,
message_decoder: Box<dyn MessageDecoder + Send + Sync>,
}
impl TaggedByteToMessageCodec {
pub fn new(message_decoder: Box<dyn MessageDecoder + Send + Sync>) -> Self {
Self {
transport_active: false,
message_decoder,
}
}
}
impl Handler for TaggedByteToMessageCodec {
type Rin = TaggedBytesMut;
type Rout = Self::Rin;
type Win = TaggedBytesMut;
type Wout = Self::Win;
fn name(&self) -> &str {
"TaggedByteToMessageCodec"
}
fn transport_active(&mut self, ctx: &Context<Self::Rin, Self::Rout, Self::Win, Self::Wout>) {
self.transport_active = true;
ctx.fire_transport_active();
}
fn transport_inactive(&mut self, ctx: &Context<Self::Rin, Self::Rout, Self::Win, Self::Wout>) {
self.transport_active = false;
ctx.fire_transport_inactive();
}
fn handle_read(
&mut self,
ctx: &Context<Self::Rin, Self::Rout, Self::Win, Self::Wout>,
mut msg: Self::Rin,
) {
while self.transport_active {
match self.message_decoder.decode(&mut msg.message) {
Ok(message) => {
if let Some(message) = message {
ctx.fire_read(TaggedBytesMut {
now: Instant::now(),
transport: msg.transport,
message,
});
} else {
return;
}
}
Err(err) => {
ctx.fire_exception(Box::new(err));
return;
}
}
}
}
fn poll_write(
&mut self,
ctx: &Context<Self::Rin, Self::Rout, Self::Win, Self::Wout>,
) -> Option<Self::Wout> {
ctx.fire_poll_write()
}
}