1use crate::connector::utils::{
2 extract_headers_and_data_from_binary_message, extract_headers_and_data_from_text_message,
3 make_binary_payload, make_text_payload,
4};
5
6#[derive(Debug, Clone, PartialEq, Eq)]
8pub enum Data {
9 Binary(Option<Vec<u8>>),
10 Text(Option<String>),
11}
12
13pub type Headers = Vec<(String, String)>;
15pub static REQUEST_ID_HEADER: &str = "X-RequestId";
16pub static STREAM_ID_HEADER: &str = "X-StreamId";
17pub static PATH_HEADER: &str = "Path";
18
19#[derive(Debug, Clone, PartialEq, Eq)]
21pub struct Message {
22 pub id: String,
23 pub path: String,
24 pub headers: Headers,
25 pub data: Data,
26}
27
28impl Message {
29 pub(crate) fn new(id: String, path: String, headers: Headers, data: Data) -> Self {
30 Self {
31 id,
32 path: path.to_lowercase(),
33 headers,
34 data,
35 }
36 }
37
38 #[allow(dead_code)]
39 pub(crate) fn get_header<I: Into<String> + Eq>(&self, key: I) -> Option<String> {
40 let key = key.into();
41 self.headers
42 .iter()
43 .find(|(k, _)| k == &key)
44 .map(|(_, v)| v.clone())
45 }
46
47 pub(crate) fn from_headers_and_data(mut headers: Headers, data: Data) -> Self {
48 Self::new(
49 extract_header(&mut headers, REQUEST_ID_HEADER),
50 extract_header(&mut headers, PATH_HEADER),
51 headers,
52 data,
53 )
54 }
55}
56
57impl From<Message> for tokio_websockets::Message {
58 fn from(message: Message) -> Self {
59 let headers = vec![
60 (REQUEST_ID_HEADER.to_string(), message.id),
61 (PATH_HEADER.to_string(), message.path),
62 ];
63
64 let headers = headers.into_iter().chain(message.headers).collect();
65
66 match message.data {
67 Data::Binary(data) => {
68 tokio_websockets::Message::binary(make_binary_payload(headers, data.as_deref()))
69 }
70 Data::Text(data) => {
71 tokio_websockets::Message::text(make_text_payload(headers, data.as_deref()))
72 }
73 }
74 }
75}
76
77impl TryFrom<&str> for Message {
78 type Error = crate::Error;
79 fn try_from(value: &str) -> crate::Result<Self> {
80 let (headers, text) = extract_headers_and_data_from_text_message(value)?;
81 Ok(Message::from_headers_and_data(headers, Data::Text(text)))
82 }
83}
84
85impl TryFrom<&[u8]> for Message {
86 type Error = crate::Error;
87 fn try_from(value: &[u8]) -> crate::Result<Self> {
88 let (headers, data) = extract_headers_and_data_from_binary_message(value)?;
89 Ok(Message::from_headers_and_data(headers, Data::Binary(data)))
90 }
91}
92
93impl TryFrom<tokio_websockets::Message> for Message {
94 type Error = crate::Error;
95 fn try_from(value: tokio_websockets::Message) -> crate::Result<Self> {
96 if value.is_text() {
97 return Message::try_from(value.as_text().unwrap());
98 }
99 if value.is_binary() {
100 return Message::try_from(value.as_payload().to_vec().as_slice());
101 }
102
103 Err(crate::Error::InternalError(
104 "Cannot convert message to binary".into(),
105 ))
106 }
107}
108
109pub(crate) fn extract_header(headers: &mut Headers, header_name: &str) -> String {
110 match headers.iter().position(|(k, _)| k == header_name) {
111 Some(index) => headers.remove(index).1,
112 None => Default::default(),
113 }
114}
115
116#[cfg(test)]
117mod tests {
118 use super::*;
119
120 #[test]
121 fn test_message_to_twsmessage() {
122 let message = Message::new(
123 "id".to_string(),
124 "path".to_string(),
125 vec![("header".to_string(), "value".to_string())],
126 Data::Text(Some("data".to_string())),
127 );
128
129 let ezmessage: tokio_websockets::Message = message.clone().into();
130 let headers = vec![
131 (REQUEST_ID_HEADER.to_string(), "id".to_string()),
132 (PATH_HEADER.to_string(), "path".to_string()),
133 ("header".to_string(), "value".to_string()),
134 ];
135
136 match ezmessage.as_text() {
137 Some(text) => {
138 let text_from_message = make_text_payload(headers, Some("data"));
139 assert_eq!(text, text_from_message);
140 }
141 None => unreachable!(),
142 }
143 }
144
145 #[test]
146 fn test_string_to_message() {
147 let text = make_text_payload(
148 vec![
149 (REQUEST_ID_HEADER.to_string(), "id".to_string()),
150 (PATH_HEADER.to_string(), "path".to_string()),
151 ("header".to_string(), "value".to_string()),
152 ],
153 Some("data"),
154 );
155
156 let message = Message::try_from(text.as_str()).unwrap();
157 assert_eq!(
158 message,
159 Message::new(
160 "id".to_string(),
161 "path".to_string(),
162 vec![("header".to_string(), "value".to_string())],
163 Data::Text(Some("data".to_string()))
164 )
165 );
166 }
167
168 #[test]
169 fn test_binary_to_message() {
170 let data = make_binary_payload(
171 vec![
172 (REQUEST_ID_HEADER.to_string(), "id".to_string()),
173 (PATH_HEADER.to_string(), "path".to_string()),
174 ("header".to_string(), "value".to_string()),
175 ],
176 Some("data".as_bytes()),
177 );
178
179 let message = Message::try_from(data.as_slice()).unwrap();
180 assert_eq!(
181 message,
182 Message::new(
183 "id".to_string(),
184 "path".to_string(),
185 vec![("header".to_string(), "value".to_string())],
186 Data::Binary(Some("data".as_bytes().to_vec()))
187 )
188 );
189 }
190
191 #[test]
192 fn test_binary_to_message_binary_no_data() {
193 let message = make_binary_payload(
194 vec![
195 (REQUEST_ID_HEADER.to_string(), "id".to_string()),
196 (PATH_HEADER.to_string(), "path".to_string()),
197 ("header".to_string(), "value".to_string()),
198 ],
199 None,
200 );
201
202 let message = Message::try_from(message.as_slice()).unwrap();
203 assert_eq!(
204 message,
205 Message::new(
206 "id".to_string(),
207 "path".to_string(),
208 vec![("header".to_string(), "value".to_string())],
209 Data::Binary(None)
210 )
211 );
212 }
213}