sozu_command_futures/
lib.rs1#[macro_use]
2extern crate log;
3extern crate sozu_command_lib as sozu_command;
4
5use bytes::BytesMut;
6use futures::{SinkExt, TryStreamExt};
7use sozu_command::command::{CommandRequest, CommandResponse, CommandStatus};
8use std::io::{self, Error, ErrorKind};
9use std::str::from_utf8;
10use tokio::net::UnixStream;
11use tokio_util::codec::{Decoder, Encoder, Framed};
12
13pub struct CommandCodec;
14
15impl Decoder for CommandCodec {
16 type Item = CommandResponse;
17 type Error = io::Error;
18
19 fn decode(&mut self, buf: &mut BytesMut) -> Result<Option<CommandResponse>, io::Error> {
20 if let Some(pos) = (&buf[..]).iter().position(|&x| x == 0) {
21 let res = if let Ok(s) = from_utf8(&buf[..pos]) {
22 match serde_json::from_str(s) {
23 Ok(message) => Ok(Some(message)),
24 Err(e) => Err(io::Error::new(
25 io::ErrorKind::Other,
26 format!("parse error: {:?}", e),
27 )),
28 }
29 } else {
30 Err(io::Error::new(
31 io::ErrorKind::InvalidData,
32 String::from("could not parse UTF-8 data"),
33 ))
34 };
35
36 if pos < buf.len() {
37 let _ = buf.split_to(pos + 1);
38 }
39
40 res
41 } else {
42 Ok(None)
43 }
44 }
45}
46
47impl Encoder<CommandRequest> for CommandCodec {
48 type Error = io::Error;
49
50 fn encode(&mut self, message: CommandRequest, buf: &mut BytesMut) -> Result<(), Self::Error> {
51 match serde_json::to_string(&message) {
52 Ok(data) => {
53 trace!("encoded message: {}", data);
54 buf.extend(data.as_bytes());
55 buf.extend(&[0u8][..]);
56 trace!("buffer content: {:?}", from_utf8(&buf[..]));
57 Ok(())
58 }
59 Err(e) => Err(io::Error::new(
60 io::ErrorKind::Other,
61 format!("serialization error: {:?}", e),
62 )),
63 }
64 }
65}
66
67pub struct SozuCommandClient {
68 transport: Framed<UnixStream, CommandCodec>,
69}
70
71impl SozuCommandClient {
72 pub fn new(stream: UnixStream) -> SozuCommandClient {
73 SozuCommandClient {
74 transport: CommandCodec.framed(stream),
75 }
76 }
77
78 pub async fn send(&mut self, message: CommandRequest) -> Result<CommandResponse, io::Error> {
79 trace!("will send message: {:?}", message);
80
81 let id = message.id.clone();
82 self.transport.send(message).await?;
83
84 loop {
85 match self.transport.try_next().await? {
86 None => {}
87 Some(msg) => {
88 if msg.id != id {
89 return Err(Error::new(
90 ErrorKind::ConnectionAborted,
91 format!("could not send message"),
92 ));
93 }
94
95 if msg.status == CommandStatus::Processing {
96 info!("processing: {:?}", msg);
97 } else {
98 return Ok(msg);
99 }
100 }
101 }
102 }
103 }
104}