slacker 0.1.3

Slacker server built on top of tokio.
Documentation
use std::io;
use std::sync::Arc;
use std::net::SocketAddr;
use std::sync::atomic::{AtomicIsize, Ordering};

use tproto::TcpClient;
use tproto::multiplex::ClientService;
use tcore::net::TcpStream;
use tcore::reactor::Core;
use futures::{Future, IntoFuture};
use futures::future::err;
use tservice::Service;

use serde_json::value::Value as Json;

use serializer::*;
use parser::*;
use json::*;

pub struct ClientManager {
    serializer: Arc<JsonSerializer>,
}

impl ClientManager {
    pub fn new() -> ClientManager {
        let serializer = Arc::new(JsonSerializer);
        ClientManager { serializer }
    }

    pub fn connect(
        &self,
        core: &mut Core,
        addr: &SocketAddr,
    ) -> Box<Future<Item = Client, Error = io::Error>> {
        let handle = core.handle();
        let serializer = self.serializer.clone();
        let rt = TcpClient::new(JsonSlacker).connect(addr, &handle).map(
            |client_service| {
                Client {
                    inner: client_service,
                    serial_id_gen: AtomicIsize::new(0),
                    serializer,
                }
            },
        );
        Box::new(rt)
    }
}

pub struct Client {
    inner: ClientService<TcpStream, JsonSlacker>,
    serial_id_gen: AtomicIsize,
    serializer: Arc<JsonSerializer>,
}

impl Service for Client {
    type Request = SlackerPacket;
    type Response = SlackerPacket;
    type Error = io::Error;
    type Future = Box<Future<Item=Self::Response, Error=Self::Error>>;

    fn call(&self, req: Self::Request) -> Self::Future {
        Box::new(self.inner.call(req))
    }
}

impl Client {
    pub fn rpc_call(
        &self,
        ns_name: &str,
        fn_name: &str,
        args: Vec<Json>,
    ) -> Box<Future<Item=Json, Error=io::Error>> {
        let mut fname = String::new();
        fname.push_str(ns_name);
        fname.push_str("/");
        fname.push_str(fn_name);

        let sid = self.serial_id_gen.fetch_add(1, Ordering::SeqCst) as i32;
        let header = SlackerPacketHeader {
            version: PROTOCOL_VERSION_5,
            serial_id: sid,
            packet_type: PACKET_TYPE_REQUEST,
        };

        let serializer = self.serializer.clone();
        let body_result = serializer.serialize(&args.into()).map(|serialized_args| {
            SlackerPacketBody::Request(SlackerRequestPacket {
                content_type: JSON_CONTENT_TYPE,
                fname: fname,
                arguments: serialized_args,
            })
        });
        match body_result {
            Ok(body) => {
                Box::new(self.call(SlackerPacket(header, body))
                    .and_then(move |SlackerPacket(_, body)| {
                        debug!("getting results {:?}", body);
                        match body {
                            SlackerPacketBody::Response(r) => {
                                serializer.deserialize(&r.data).into_future()
                            }
                            _ => {
                                err(io::Error::new(
                                    io::ErrorKind::InvalidData,
                                    "Unexpect packet.",
                                ))
                            }
                        }
                    }))
                    
            }
            Err(e) => Box::new(err(e)),
        }
    }

    pub fn ping(&self) -> Box<Future<Item=(), Error=io::Error>> {
        let sid = self.serial_id_gen.fetch_add(1, Ordering::SeqCst) as i32;
        let header = SlackerPacketHeader {
            version: PROTOCOL_VERSION_5,
            serial_id: sid,
            packet_type: PACKET_TYPE_PING,
        };

        let body = SlackerPacketBody::Ping;
        Box::new(self.call(SlackerPacket(header, body)).map(|_| ()))
    }
}