1use crate::{
21 Params,
22 error::{ClientError, ClientResult},
23 io::{self, AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt},
24};
25use std::{
26 borrow::Cow,
27 cmp::min,
28 collections::HashMap,
29 fmt::{self, Debug, Display},
30 mem::size_of,
31 ops::{Deref, DerefMut},
32};
33
34pub(crate) const VERSION_1: u8 = 1;
36pub(crate) const MAX_LENGTH: usize = 0xffff;
38pub(crate) const HEADER_LEN: usize = size_of::<Header>();
40
41#[derive(Debug, Clone)]
43#[repr(u8)]
44pub enum RequestType {
45 BeginRequest = 1,
47 AbortRequest = 2,
49 EndRequest = 3,
51 Params = 4,
53 Stdin = 5,
55 Stdout = 6,
57 Stderr = 7,
59 Data = 8,
61 GetValues = 9,
63 GetValuesResult = 10,
65 UnknownType = 11,
67}
68
69impl RequestType {
70 fn from_u8(u: u8) -> Self {
76 match u {
77 1 => RequestType::BeginRequest,
78 2 => RequestType::AbortRequest,
79 3 => RequestType::EndRequest,
80 4 => RequestType::Params,
81 5 => RequestType::Stdin,
82 6 => RequestType::Stdout,
83 7 => RequestType::Stderr,
84 8 => RequestType::Data,
85 9 => RequestType::GetValues,
86 10 => RequestType::GetValuesResult,
87 _ => RequestType::UnknownType,
88 }
89 }
90}
91
92impl Display for RequestType {
93 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
94 Display::fmt(&(self.clone() as u8), f)
95 }
96}
97
98#[derive(Debug, Clone)]
99pub(crate) struct Header {
100 pub(crate) version: u8,
102 pub(crate) r#type: RequestType,
104 pub(crate) request_id: u16,
106 pub(crate) content_length: u16,
108 pub(crate) padding_length: u8,
110 pub(crate) reserved: u8,
112}
113
114impl Header {
115 pub(crate) async fn write_to_stream_batches<F, R, W>(
125 r#type: RequestType, request_id: u16, writer: &mut W, content: &mut R,
126 before_write: Option<F>,
127 ) -> io::Result<()>
128 where
129 F: Fn(Header) -> Header,
130 R: AsyncRead + Unpin,
131 W: AsyncWrite + Unpin,
132 {
133 let mut buf = vec![0; MAX_LENGTH];
134 let mut had_written = false;
135
136 loop {
137 let read = content.read(&mut buf).await?;
138 if had_written && read == 0 {
139 break;
140 }
141
142 let buf = &buf[..read];
143 let mut header = Self::new(r#type.clone(), request_id, buf);
144 if let Some(ref f) = before_write {
145 header = f(header);
146 }
147 header.write_to_stream(writer, buf).await?;
148
149 had_written = true;
150 }
151 Ok(())
152 }
153
154 fn new(r#type: RequestType, request_id: u16, content: &[u8]) -> Self {
162 let content_length = min(content.len(), MAX_LENGTH) as u16;
163 Self {
164 version: VERSION_1,
165 r#type,
166 request_id,
167 content_length,
168 padding_length: (-(content_length as i16) & 7) as u8,
169 reserved: 0,
170 }
171 }
172
173 async fn write_to_stream<W: AsyncWrite + Unpin>(
180 self, writer: &mut W, content: &[u8],
181 ) -> io::Result<()> {
182 let mut buf: Vec<u8> = Vec::with_capacity(HEADER_LEN);
183 buf.push(self.version);
184 buf.push(self.r#type as u8);
185 buf.extend_from_slice(&self.request_id.to_be_bytes());
186 buf.extend_from_slice(&self.content_length.to_be_bytes());
187 buf.push(self.padding_length);
188 buf.push(self.reserved);
189
190 writer.write_all(&buf).await?;
191 writer.write_all(content).await?;
192 if self.padding_length != 0 {
193 writer
194 .write_all(&vec![0; self.padding_length as usize])
195 .await?;
196 }
197
198 Ok(())
199 }
200
201 pub(crate) async fn new_from_stream<R: AsyncRead + Unpin>(reader: &mut R) -> io::Result<Self> {
207 let mut buf: [u8; HEADER_LEN] = [0; HEADER_LEN];
208 reader.read_exact(&mut buf).await?;
209
210 Ok(Self::new_from_buf(&buf))
211 }
212
213 #[inline]
219 pub(crate) fn new_from_buf(buf: &[u8; HEADER_LEN]) -> Self {
220 Self {
221 version: buf[0],
222 r#type: RequestType::from_u8(buf[1]),
223 request_id: be_buf_to_u16(&buf[2..4]),
224 content_length: be_buf_to_u16(&buf[4..6]),
225 padding_length: buf[6],
226 reserved: buf[7],
227 }
228 }
229
230 pub(crate) async fn read_content_from_stream<R: AsyncRead + Unpin>(
236 &self, reader: &mut R,
237 ) -> io::Result<Vec<u8>> {
238 let mut buf = vec![0; self.content_length as usize];
239 reader.read_exact(&mut buf).await?;
240 let mut padding_buf = vec![0; self.padding_length as usize];
241 reader.read_exact(&mut padding_buf).await?;
242 Ok(buf)
243 }
244}
245
246#[derive(Debug, Clone, Copy)]
248#[repr(u16)]
249#[allow(dead_code)]
250pub enum Role {
251 Responder = 1,
253 Authorizer = 2,
255 Filter = 3,
257}
258
259#[derive(Debug)]
261pub(crate) struct BeginRequest {
262 pub(crate) role: Role,
264 pub(crate) flags: u8,
266 pub(crate) reserved: [u8; 5],
268}
269
270impl BeginRequest {
271 pub(crate) fn new(role: Role, keep_alive: bool) -> Self {
278 Self {
279 role,
280 flags: keep_alive as u8,
281 reserved: [0; 5],
282 }
283 }
284
285 pub(crate) async fn to_content(&self) -> io::Result<Vec<u8>> {
287 let mut buf: Vec<u8> = Vec::with_capacity(8);
288 buf.extend_from_slice(&(self.role as u16).to_be_bytes());
289 buf.push(self.flags);
290 buf.extend_from_slice(&self.reserved);
291 Ok(buf)
292 }
293}
294
295pub(crate) struct BeginRequestRec {
297 pub(crate) header: Header,
299 pub(crate) begin_request: BeginRequest,
301 pub(crate) content: Vec<u8>,
303}
304
305impl BeginRequestRec {
306 pub(crate) async fn new(request_id: u16, role: Role, keep_alive: bool) -> io::Result<Self> {
314 let begin_request = BeginRequest::new(role, keep_alive);
315 let content = begin_request.to_content().await?;
316 let header = Header::new(RequestType::BeginRequest, request_id, &content);
317 Ok(Self {
318 header,
319 begin_request,
320 content,
321 })
322 }
323
324 pub(crate) async fn write_to_stream<W: AsyncWrite + Unpin>(
330 self, writer: &mut W,
331 ) -> io::Result<()> {
332 self.header.write_to_stream(writer, &self.content).await
333 }
334}
335
336impl Debug for BeginRequestRec {
337 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
338 Debug::fmt(
339 &format!(
340 "BeginRequestRec {{header: {:?}, begin_request: {:?}}}",
341 self.header, self.begin_request
342 ),
343 f,
344 )
345 }
346}
347
348#[derive(Debug, Clone, Copy)]
350pub enum ParamLength {
351 Short(u8),
353 Long(u32),
355}
356
357impl ParamLength {
358 pub fn new(length: usize) -> Self {
364 if length < 128 {
365 ParamLength::Short(length as u8)
366 } else {
367 let mut length = length;
368 length |= 1 << 31;
369 ParamLength::Long(length as u32)
370 }
371 }
372
373 pub async fn content(self) -> io::Result<Vec<u8>> {
375 let buf = match self {
376 ParamLength::Short(l) => vec![l],
377 ParamLength::Long(l) => l.to_be_bytes().to_vec(),
378 };
379 Ok(buf)
380 }
381}
382
383#[derive(Debug)]
385pub struct ParamPair<'a> {
386 name_length: ParamLength,
388 value_length: ParamLength,
390 name_data: Cow<'a, str>,
392 value_data: Cow<'a, str>,
394}
395
396impl<'a> ParamPair<'a> {
397 fn new(name: Cow<'a, str>, value: Cow<'a, str>) -> Self {
404 let name_length = ParamLength::new(name.len());
405 let value_length = ParamLength::new(value.len());
406 Self {
407 name_length,
408 value_length,
409 name_data: name,
410 value_data: value,
411 }
412 }
413
414 async fn write_to_stream<W: AsyncWrite + Unpin>(&self, writer: &mut W) -> io::Result<()> {
420 writer.write_all(&self.name_length.content().await?).await?;
421 writer
422 .write_all(&self.value_length.content().await?)
423 .await?;
424 writer.write_all(self.name_data.as_bytes()).await?;
425 writer.write_all(self.value_data.as_bytes()).await?;
426 Ok(())
427 }
428}
429
430#[derive(Debug)]
432pub(crate) struct ParamPairs<'a>(Vec<ParamPair<'a>>);
433
434impl<'a> ParamPairs<'a> {
435 pub(crate) fn new(params: Params<'a>) -> Self {
441 let mut param_pairs = Vec::new();
442 let params: HashMap<Cow<'a, str>, Cow<'a, str>> = params.into();
443 for (name, value) in params.into_iter() {
444 let param_pair = ParamPair::new(name, value);
445 param_pairs.push(param_pair);
446 }
447
448 Self(param_pairs)
449 }
450
451 pub(crate) async fn to_content(&self) -> io::Result<Vec<u8>> {
453 let mut buf: Vec<u8> = Vec::new();
454
455 for param_pair in self.iter() {
456 param_pair.write_to_stream(&mut buf).await?;
457 }
458
459 Ok(buf)
460 }
461}
462
463impl<'a> Deref for ParamPairs<'a> {
464 type Target = Vec<ParamPair<'a>>;
465
466 fn deref(&self) -> &Self::Target {
467 &self.0
468 }
469}
470
471impl DerefMut for ParamPairs<'_> {
472 fn deref_mut(&mut self) -> &mut Self::Target {
473 &mut self.0
474 }
475}
476
477#[derive(Debug)]
479#[repr(u8)]
480pub enum ProtocolStatus {
481 RequestComplete = 0,
483 CantMpxConn = 1,
485 Overloaded = 2,
487 UnknownRole = 3,
489}
490
491impl ProtocolStatus {
492 pub fn from_u8(u: u8) -> Self {
498 match u {
499 0 => ProtocolStatus::RequestComplete,
500 1 => ProtocolStatus::CantMpxConn,
501 2 => ProtocolStatus::Overloaded,
502 _ => ProtocolStatus::UnknownRole,
503 }
504 }
505
506 pub(crate) fn convert_to_client_result(self, app_status: u32) -> ClientResult<()> {
512 match self {
513 ProtocolStatus::RequestComplete => Ok(()),
514 _ => Err(ClientError::new_end_request_with_protocol_status(
515 self, app_status,
516 )),
517 }
518 }
519}
520
521#[derive(Debug)]
523pub struct EndRequest {
524 pub(crate) app_status: u32,
526 pub(crate) protocol_status: ProtocolStatus,
528 #[allow(dead_code)]
530 reserved: [u8; 3],
531}
532
533#[derive(Debug)]
535pub(crate) struct EndRequestRec {
536 #[allow(dead_code)]
538 header: Header,
539 pub(crate) end_request: EndRequest,
541}
542
543impl EndRequestRec {
544 pub(crate) async fn from_header<R: AsyncRead + Unpin>(
551 header: &Header, reader: &mut R,
552 ) -> io::Result<Self> {
553 let header = header.clone();
554 let content = &*header.read_content_from_stream(reader).await?;
555 Ok(Self::new_from_buf(header, content))
556 }
557
558 pub(crate) fn new_from_buf(header: Header, buf: &[u8]) -> Self {
565 let app_status = u32::from_be_bytes(<[u8; 4]>::try_from(&buf[0..4]).unwrap());
566 let protocol_status =
567 ProtocolStatus::from_u8(u8::from_be_bytes(<[u8; 1]>::try_from(&buf[4..5]).unwrap()));
568 let reserved = <[u8; 3]>::try_from(&buf[5..8]).unwrap();
569 Self {
570 header,
571 end_request: EndRequest {
572 app_status,
573 protocol_status,
574 reserved,
575 },
576 }
577 }
578}
579
580fn be_buf_to_u16(buf: &[u8]) -> u16 {
586 u16::from_be_bytes(<[u8; 2]>::try_from(buf).unwrap())
587}