1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
#[macro_use]
extern crate log;
extern crate sozu_command_lib as sozu_command;

use bytes::BytesMut;
use futures::{SinkExt, TryStreamExt};
use sozu_command::command::{CommandRequest, CommandResponse, CommandStatus};
use std::io::{self, Error, ErrorKind};
use std::str::from_utf8;
use tokio::net::UnixStream;
use tokio_util::codec::{Decoder, Encoder, Framed};

pub struct CommandCodec;

impl Decoder for CommandCodec {
    type Item = CommandResponse;
    type Error = io::Error;

    fn decode(&mut self, buf: &mut BytesMut) -> Result<Option<CommandResponse>, io::Error> {
        if let Some(pos) = (&buf[..]).iter().position(|&x| x == 0) {
            let res = if let Ok(s) = from_utf8(&buf[..pos]) {
                match serde_json::from_str(s) {
                    Ok(message) => Ok(Some(message)),
                    Err(e) => Err(io::Error::new(
                        io::ErrorKind::Other,
                        format!("parse error: {:?}", e),
                    )),
                }
            } else {
                Err(io::Error::new(
                    io::ErrorKind::InvalidData,
                    String::from("could not parse UTF-8 data"),
                ))
            };

            if pos < buf.len() {
                let _ = buf.split_to(pos + 1);
            }

            res
        } else {
            Ok(None)
        }
    }
}

impl Encoder<CommandRequest> for CommandCodec {
    type Error = io::Error;

    fn encode(&mut self, message: CommandRequest, buf: &mut BytesMut) -> Result<(), Self::Error> {
        match serde_json::to_string(&message) {
            Ok(data) => {
                trace!("encoded message: {}", data);
                buf.extend(data.as_bytes());
                buf.extend(&[0u8][..]);
                trace!("buffer content: {:?}", from_utf8(&buf[..]));
                Ok(())
            }
            Err(e) => Err(io::Error::new(
                io::ErrorKind::Other,
                format!("serialization error: {:?}", e),
            )),
        }
    }
}

pub struct SozuCommandClient {
    transport: Framed<UnixStream, CommandCodec>,
}

impl SozuCommandClient {
    pub fn new(stream: UnixStream) -> SozuCommandClient {
        SozuCommandClient {
            transport: CommandCodec.framed(stream),
        }
    }

    pub async fn send(&mut self, message: CommandRequest) -> Result<CommandResponse, io::Error> {
        trace!("will send message: {:?}", message);

        let id = message.id.clone();
        self.transport.send(message).await?;

        loop {
            match self.transport.try_next().await? {
                None => {}
                Some(msg) => {
                    if msg.id != id {
                        return Err(Error::new(
                            ErrorKind::ConnectionAborted,
                            format!("could not send message"),
                        ));
                    }

                    if msg.status == CommandStatus::Processing {
                        info!("processing: {:?}", msg);
                    } else {
                        return Ok(msg);
                    }
                }
            }
        }
    }
}