sozu_command_futures/
lib.rs

1#[macro_use]
2extern crate log;
3extern crate sozu_command_lib as sozu_command;
4
5use bytes::BytesMut;
6use futures::{SinkExt, TryStreamExt};
7use sozu_command::command::{CommandRequest, CommandResponse, CommandStatus};
8use std::io::{self, Error, ErrorKind};
9use std::str::from_utf8;
10use tokio::net::UnixStream;
11use tokio_util::codec::{Decoder, Encoder, Framed};
12
13pub struct CommandCodec;
14
15impl Decoder for CommandCodec {
16    type Item = CommandResponse;
17    type Error = io::Error;
18
19    fn decode(&mut self, buf: &mut BytesMut) -> Result<Option<CommandResponse>, io::Error> {
20        if let Some(pos) = (&buf[..]).iter().position(|&x| x == 0) {
21            let res = if let Ok(s) = from_utf8(&buf[..pos]) {
22                match serde_json::from_str(s) {
23                    Ok(message) => Ok(Some(message)),
24                    Err(e) => Err(io::Error::new(
25                        io::ErrorKind::Other,
26                        format!("parse error: {:?}", e),
27                    )),
28                }
29            } else {
30                Err(io::Error::new(
31                    io::ErrorKind::InvalidData,
32                    String::from("could not parse UTF-8 data"),
33                ))
34            };
35
36            if pos < buf.len() {
37                let _ = buf.split_to(pos + 1);
38            }
39
40            res
41        } else {
42            Ok(None)
43        }
44    }
45}
46
47impl Encoder<CommandRequest> for CommandCodec {
48    type Error = io::Error;
49
50    fn encode(&mut self, message: CommandRequest, buf: &mut BytesMut) -> Result<(), Self::Error> {
51        match serde_json::to_string(&message) {
52            Ok(data) => {
53                trace!("encoded message: {}", data);
54                buf.extend(data.as_bytes());
55                buf.extend(&[0u8][..]);
56                trace!("buffer content: {:?}", from_utf8(&buf[..]));
57                Ok(())
58            }
59            Err(e) => Err(io::Error::new(
60                io::ErrorKind::Other,
61                format!("serialization error: {:?}", e),
62            )),
63        }
64    }
65}
66
67pub struct SozuCommandClient {
68    transport: Framed<UnixStream, CommandCodec>,
69}
70
71impl SozuCommandClient {
72    pub fn new(stream: UnixStream) -> SozuCommandClient {
73        SozuCommandClient {
74            transport: CommandCodec.framed(stream),
75        }
76    }
77
78    pub async fn send(&mut self, message: CommandRequest) -> Result<CommandResponse, io::Error> {
79        trace!("will send message: {:?}", message);
80
81        let id = message.id.clone();
82        self.transport.send(message).await?;
83
84        loop {
85            match self.transport.try_next().await? {
86                None => {}
87                Some(msg) => {
88                    if msg.id != id {
89                        return Err(Error::new(
90                            ErrorKind::ConnectionAborted,
91                            format!("could not send message"),
92                        ));
93                    }
94
95                    if msg.status == CommandStatus::Processing {
96                        info!("processing: {:?}", msg);
97                    } else {
98                        return Ok(msg);
99                    }
100                }
101            }
102        }
103    }
104}