sansio_codec/byte_to_message_decoder/
mod.rs1use bytes::BytesMut;
3use sansio::{Context, Handler};
4use sansio_transport::TaggedBytesMut;
5use std::time::Instant;
6
7mod line_based_frame_decoder;
8
9pub use line_based_frame_decoder::{LineBasedFrameDecoder, TerminatorType};
10
11pub trait MessageDecoder {
13 fn decode(&mut self, buf: &mut BytesMut) -> Result<Option<BytesMut>, std::io::Error>;
15}
16
17pub struct TaggedByteToMessageCodec {
20 transport_active: bool,
21 message_decoder: Box<dyn MessageDecoder + Send + Sync>,
22}
23
24impl TaggedByteToMessageCodec {
25 pub fn new(message_decoder: Box<dyn MessageDecoder + Send + Sync>) -> Self {
27 Self {
28 transport_active: false,
29 message_decoder,
30 }
31 }
32}
33
34impl Handler for TaggedByteToMessageCodec {
35 type Rin = TaggedBytesMut;
36 type Rout = Self::Rin;
37 type Win = TaggedBytesMut;
38 type Wout = Self::Win;
39
40 fn name(&self) -> &str {
41 "TaggedByteToMessageCodec"
42 }
43
44 fn transport_active(&mut self, ctx: &Context<Self::Rin, Self::Rout, Self::Win, Self::Wout>) {
45 self.transport_active = true;
46 ctx.fire_transport_active();
47 }
48 fn transport_inactive(&mut self, ctx: &Context<Self::Rin, Self::Rout, Self::Win, Self::Wout>) {
49 self.transport_active = false;
50 ctx.fire_transport_inactive();
51 }
52 fn handle_read(
53 &mut self,
54 ctx: &Context<Self::Rin, Self::Rout, Self::Win, Self::Wout>,
55 mut msg: Self::Rin,
56 ) {
57 while self.transport_active {
58 match self.message_decoder.decode(&mut msg.message) {
59 Ok(message) => {
60 if let Some(message) = message {
61 ctx.fire_handle_read(TaggedBytesMut {
62 now: Instant::now(),
63 transport: msg.transport,
64 message,
65 });
66 } else {
67 return;
68 }
69 }
70 Err(err) => {
71 ctx.fire_handle_error(Box::new(err));
72 return;
73 }
74 }
75 }
76 }
77
78 fn poll_write(
79 &mut self,
80 ctx: &Context<Self::Rin, Self::Rout, Self::Win, Self::Wout>,
81 ) -> Option<Self::Wout> {
82 ctx.fire_poll_write()
83 }
84}