sansio_codec/byte_to_message_decoder/
mod.rs

1//! Handlers for converting byte to message
2use bytes::BytesMut;
3use sansio::{Context, Handler};
4use sansio_transport::TaggedBytesMut;
5use std::time::Instant;
6
7mod line_based_frame_decoder;
8
9pub use line_based_frame_decoder::{LineBasedFrameDecoder, TerminatorType};
10
11/// This trait allows for decoding messages.
12pub trait MessageDecoder {
13    /// Decodes byte buffer to message buffer
14    fn decode(&mut self, buf: &mut BytesMut) -> Result<Option<BytesMut>, std::io::Error>;
15}
16
17/// A tagged Byte to Message Codec handler that reads with input of TaggedBytesMut and output of TaggedBytesMut,
18/// or writes with input of TaggedBytesMut and output of TaggedBytesMut
19pub struct TaggedByteToMessageCodec {
20    transport_active: bool,
21    message_decoder: Box<dyn MessageDecoder + Send + Sync>,
22}
23
24impl TaggedByteToMessageCodec {
25    /// Creates a new TaggedByteToMessageCodec handler
26    pub fn new(message_decoder: Box<dyn MessageDecoder + Send + Sync>) -> Self {
27        Self {
28            transport_active: false,
29            message_decoder,
30        }
31    }
32}
33
34impl Handler for TaggedByteToMessageCodec {
35    type Rin = TaggedBytesMut;
36    type Rout = Self::Rin;
37    type Win = TaggedBytesMut;
38    type Wout = Self::Win;
39
40    fn name(&self) -> &str {
41        "TaggedByteToMessageCodec"
42    }
43
44    fn transport_active(&mut self, ctx: &Context<Self::Rin, Self::Rout, Self::Win, Self::Wout>) {
45        self.transport_active = true;
46        ctx.fire_transport_active();
47    }
48    fn transport_inactive(&mut self, ctx: &Context<Self::Rin, Self::Rout, Self::Win, Self::Wout>) {
49        self.transport_active = false;
50        ctx.fire_transport_inactive();
51    }
52    fn handle_read(
53        &mut self,
54        ctx: &Context<Self::Rin, Self::Rout, Self::Win, Self::Wout>,
55        mut msg: Self::Rin,
56    ) {
57        while self.transport_active {
58            match self.message_decoder.decode(&mut msg.message) {
59                Ok(message) => {
60                    if let Some(message) = message {
61                        ctx.fire_handle_read(TaggedBytesMut {
62                            now: Instant::now(),
63                            transport: msg.transport,
64                            message,
65                        });
66                    } else {
67                        return;
68                    }
69                }
70                Err(err) => {
71                    ctx.fire_handle_error(Box::new(err));
72                    return;
73                }
74            }
75        }
76    }
77
78    fn poll_write(
79        &mut self,
80        ctx: &Context<Self::Rin, Self::Rout, Self::Win, Self::Wout>,
81    ) -> Option<Self::Wout> {
82        ctx.fire_poll_write()
83    }
84}