stomp_agnostic/transport/
client.rs1use crate::frame::parse_frame;
2use crate::transport::{ReadData, ReadError, WriteError};
3use crate::{FromServer, Message, ToServer};
4use async_trait::async_trait;
5use bytes::{Buf, Bytes, BytesMut};
6use std::fmt::Debug;
7use winnow::Partial;
8use winnow::error::ErrMode;
9use winnow::stream::Offset;
10
11#[async_trait]
12pub trait ClientTransport: Send + Sync {
13 type ProtocolSideChannel;
16
17 async fn write(&mut self, message: Message<ToServer>) -> Result<(), WriteError>;
18 async fn read(&mut self) -> Result<ReadData<Self::ProtocolSideChannel>, ReadError>;
19}
20
21#[derive(Debug)]
24pub enum ServerResponse<T>
25where
26 T: Debug,
27{
28 Message(Message<FromServer>),
29 Custom(T),
30}
31
32pub(crate) struct BufferedTransport<T>
33where
34 T: ClientTransport,
35 T::ProtocolSideChannel: Debug,
36{
37 transport: T,
38 buffer: BytesMut,
39}
40
41impl<T> BufferedTransport<T>
42where
43 T: ClientTransport,
44 T::ProtocolSideChannel: Debug,
45{
46 pub(crate) fn new(transport: T) -> Self {
47 Self {
48 transport,
49 buffer: BytesMut::with_capacity(4096),
50 }
51 }
52
53 fn append(&mut self, data: Bytes) {
54 self.buffer.extend_from_slice(&data);
55 }
56
57 fn decode(&mut self) -> Result<Option<Message<FromServer>>, ReadError> {
58 let buf = &mut Partial::new(self.buffer.chunk());
60
61 let item = match parse_frame(buf) {
63 Ok(frame) => Message::<FromServer>::from_frame(frame),
64 Err(ErrMode::Incomplete(_)) => return Ok(None),
66 Err(e) => return Err(ReadError::Parser(e)),
67 };
68
69 let len = buf.offset_from(&Partial::new(self.buffer.chunk()));
71
72 self.buffer.advance(len);
74
75 item.map_err(|e| e.into()).map(Some)
77 }
78
79 pub(crate) async fn send(&mut self, message: Message<ToServer>) -> Result<(), WriteError> {
80 self.transport.write(message).await
81 }
82
83 pub(crate) async fn next(
84 &mut self,
85 ) -> Result<ServerResponse<T::ProtocolSideChannel>, ReadError> {
86 loop {
87 let response = self.transport.read().await?;
88 match response {
89 ReadData::Binary(buffer) => {
90 self.append(buffer);
91 }
92 ReadData::Custom(custom) => {
93 return Ok(ServerResponse::Custom(custom));
94 }
95 }
96
97 if let Some(message) = self.decode()? {
98 return Ok(ServerResponse::Message(message));
99 }
100 }
101 }
102
103 pub(crate) fn into_transport(self) -> T {
104 self.transport
105 }
106
107 pub(crate) fn as_mut_inner(&mut self) -> &mut T {
108 &mut self.transport
109 }
110}