beanstalkc/
request.rs

1use std::io::{BufRead, Read, Write};
2use std::net::TcpStream;
3use std::str::FromStr;
4
5use bufstream::BufStream;
6
7use crate::command::Status;
8use crate::error::{BeanstalkcError, BeanstalkcResult};
9use crate::response::Response;
10
11#[derive(Debug)]
12pub struct Request<'b> {
13    stream: &'b mut BufStream<TcpStream>,
14}
15
16impl<'b> Request<'b> {
17    pub fn new(stream: &'b mut BufStream<TcpStream>) -> Self {
18        Request { stream }
19    }
20
21    pub fn send(&mut self, message: &[u8]) -> BeanstalkcResult<Response> {
22        let _ = self.stream.write(message)?;
23        self.stream.flush()?;
24
25        let mut line = String::new();
26        self.stream.read_line(&mut line)?;
27
28        if line.trim().is_empty() {
29            return Err(BeanstalkcError::UnexpectedResponse(
30                "empty response".to_string(),
31            ));
32        }
33
34        let line_parts: Vec<_> = line.split_whitespace().collect();
35
36        let mut response = Response::default();
37        response.status = Status::from_str(line_parts.first().unwrap_or(&""))?;
38        response.params = line_parts[1..].iter().map(|&x| x.to_string()).collect();
39
40        let body_byte_count = match response.status {
41            Status::Ok => response.get_int_param(0)?,
42            Status::Reserved => response.get_int_param(1)?,
43            Status::Found => response.get_int_param(1)?,
44            _ => {
45                return Ok(response);
46            }
47        } as usize;
48
49        let mut tmp: Vec<u8> = vec![0; body_byte_count + 2]; // +2 trailing line break
50        let body = &mut tmp[..];
51        self.stream.read_exact(body)?;
52        tmp.truncate(body_byte_count);
53        response.body = Some(tmp);
54
55        Ok(response)
56    }
57}