stomp_agnostic/handle/
client.rs1use crate::transport::client::{BufferedTransport, ClientTransport, ServerResponse};
2use crate::{FromServer, Message, ReadError, ToServer, WriteError};
3use anyhow::anyhow;
4use std::fmt::Debug;
5
6pub struct ClientStompHandle<T>
8where
9 T: ClientTransport,
10 T::ProtocolSideChannel: Debug,
11{
12 transport: BufferedTransport<T>,
13}
14
15impl<T> ClientStompHandle<T>
16where
17 T: ClientTransport,
18 T::ProtocolSideChannel: Debug,
19{
20 pub async fn connect(
23 transport: T,
24 virtualhost: String,
25 login: Option<String>,
26 passcode: Option<String>,
27 headers: Vec<(String, String)>,
28 ) -> anyhow::Result<ClientStompHandle<T>> {
29 let transport = client_handshake(
30 BufferedTransport::new(transport),
31 virtualhost.clone(),
32 login,
33 passcode,
34 headers,
35 )
36 .await?;
37
38 Ok(ClientStompHandle { transport })
39 }
40
41 pub async fn send_message(&mut self, message: Message<ToServer>) -> Result<(), WriteError> {
43 self.transport.send(message).await
44 }
45
46 pub async fn read_response(
48 &mut self,
49 ) -> Result<ServerResponse<T::ProtocolSideChannel>, ReadError> {
50 self.transport.next().await
51 }
52
53 pub fn into_transport(self) -> T {
55 self.transport.into_transport()
56 }
57
58 pub fn as_mut_transport(&mut self) -> &mut T {
60 self.transport.as_mut_inner()
61 }
62}
63
64async fn client_handshake<T>(
70 mut transport: BufferedTransport<T>,
71 virtualhost: String,
72 login: Option<String>,
73 passcode: Option<String>,
74 headers: Vec<(String, String)>,
75) -> anyhow::Result<BufferedTransport<T>>
76where
77 T: ClientTransport,
78 T::ProtocolSideChannel: Debug,
79{
80 let extra_headers = headers
82 .iter()
83 .map(|(k, v)| (k.as_bytes().to_vec(), v.as_bytes().to_vec()))
84 .collect();
85
86 let connect = Message {
88 content: ToServer::Connect {
89 accept_version: "1.2".into(),
90 host: virtualhost,
91 login,
92 passcode,
93 heartbeat: None,
94 },
95 extra_headers,
96 };
97
98 transport.send(connect).await?;
100
101 let response = transport.next().await?;
103
104 match response {
105 ServerResponse::Message(msg) => {
106 if let FromServer::Connected { .. } = msg.content {
108 Ok(transport)
109 } else {
110 Err(anyhow!("Unexpected response: {msg:?}"))
111 }
112 }
113 ServerResponse::Custom(custom) => Err(anyhow!("Unexpected response: {custom:?}")),
114 }
115}