1use std::io::{BufRead, Read, Write};
2use std::net::TcpStream;
3use std::str::FromStr;
4
5use bufstream::BufStream;
6
7use crate::command::Status;
8use crate::error::{BeanstalkcError, BeanstalkcResult};
9use crate::response::Response;
10
11#[derive(Debug)]
12pub struct Request<'b> {
13 stream: &'b mut BufStream<TcpStream>,
14}
15
16impl<'b> Request<'b> {
17 pub fn new(stream: &'b mut BufStream<TcpStream>) -> Self {
18 Request { stream }
19 }
20
21 pub fn send(&mut self, message: &[u8]) -> BeanstalkcResult<Response> {
22 let _ = self.stream.write(message)?;
23 self.stream.flush()?;
24
25 let mut line = String::new();
26 self.stream.read_line(&mut line)?;
27
28 if line.trim().is_empty() {
29 return Err(BeanstalkcError::UnexpectedResponse(
30 "empty response".to_string(),
31 ));
32 }
33
34 let line_parts: Vec<_> = line.split_whitespace().collect();
35
36 let mut response = Response::default();
37 response.status = Status::from_str(line_parts.first().unwrap_or(&""))?;
38 response.params = line_parts[1..].iter().map(|&x| x.to_string()).collect();
39
40 let body_byte_count = match response.status {
41 Status::Ok => response.get_int_param(0)?,
42 Status::Reserved => response.get_int_param(1)?,
43 Status::Found => response.get_int_param(1)?,
44 _ => {
45 return Ok(response);
46 }
47 } as usize;
48
49 let mut tmp: Vec<u8> = vec![0; body_byte_count + 2]; let body = &mut tmp[..];
51 self.stream.read_exact(body)?;
52 tmp.truncate(body_byte_count);
53 response.body = Some(tmp);
54
55 Ok(response)
56 }
57}