mco_rpc/
server.rs

1use mco::{co};
2use mco::net::{TcpListener, TcpStream};
3use codec::{BinCodec, Codec, Codecs};
4use stub::ServerStub;
5use std::net::ToSocketAddrs;
6use std::sync::Arc;
7use log::error;
8use mco::std::sync::SyncHashMap;
9use serde::de::DeserializeOwned;
10use serde::Serialize;
11use mco::std::errors::Result;
12
13pub struct Server {
14    pub handles: SyncHashMap<String, Box<dyn Stub>>,
15    pub codec: Codecs,
16    pub stub: ServerStub,
17}
18
19impl Default for Server {
20    fn default() -> Self {
21        Self {
22            handles: SyncHashMap::new(),
23            codec: Codecs::BinCodec(BinCodec {}),
24            stub: ServerStub::new(),
25        }
26    }
27}
28
29impl Server {
30    #[inline]
31    pub fn call(&self, stream: TcpStream) {
32        self.stub.call(&self.handles, &self.codec, stream);
33    }
34}
35
36pub trait Stub {
37    fn accept(&self, arg: &[u8], codec: &Codecs) -> Result<Vec<u8>>;
38}
39
40pub trait Handler: Stub {
41    type Req: DeserializeOwned;
42    type Resp: Serialize;
43    fn accept(&self, arg: &[u8], codec: &Codecs) -> Result<Vec<u8>> {
44        //.or_else(|e| Result::Err(err!("{}",e)))?
45        let req: Self::Req = codec.decode(arg)?;
46        let data = self.handle(req)?;
47        Ok(codec.encode(data)?)
48    }
49    fn handle(&self, req: Self::Req) -> Result<Self::Resp>;
50}
51
52impl<H: Handler> Stub for H {
53    fn accept(&self, arg: &[u8], codec: &Codecs) -> Result<Vec<u8>> {
54        <H as Handler>::accept(self, arg, codec)
55    }
56}
57
58pub struct HandleFn<Req: DeserializeOwned, Resp: Serialize> {
59    pub f: Box<dyn Fn(Req) -> Result<Resp>>,
60}
61
62impl<Req: DeserializeOwned, Resp: Serialize> Handler for HandleFn<Req, Resp> {
63    type Req = Req;
64    type Resp = Resp;
65
66    fn handle(&self, req: Self::Req) -> mco::std::errors::Result<Self::Resp> {
67        (self.f)(req)
68    }
69}
70
71impl<Req: DeserializeOwned, Resp: Serialize> HandleFn<Req, Resp> {
72    pub fn new<F: 'static>(f: F) -> Self where F: Fn(Req) -> Result<Resp> {
73        Self {
74            f: Box::new(f),
75        }
76    }
77}
78
79
80impl Server {
81    ///register a handle to server
82    /// ```
83    /// use mco_rpc::server::{Handler};
84    ///
85    /// pub struct H{}
86    ///
87    /// impl Handler for H{
88    ///     type Req = i32;
89    ///     type Resp = i32;
90    ///
91    ///     fn handle(&self, req: Self::Req) -> mco::std::errors::Result<Self::Resp> {
92    ///         return Ok(req);
93    ///     }
94    /// }
95    ///
96    /// ```
97    pub fn register<H: 'static>(&mut self, name: &str, handle: H) where H: Stub {
98        self.handles.insert(name.to_owned(), Box::new(handle));
99    }
100
101    /// register a func into server
102    /// for example:
103    /// ```
104    /// use mco_rpc::server::{Server};
105    /// let mut s = Server::default();
106    /// fn handle(req: i32) -> mco::std::errors::Result<i32> {
107    ///     return Ok(req + 1);
108    /// }
109    ///     //s.codec = Codecs::JsonCodec(JsonCodec{});
110    ///     s.register_fn("handle", handle);
111    ///     s.register_fn("handle_fn2", |arg:i32| -> mco::std::errors::Result<i32>{
112    ///         Ok(1)
113    ///     });
114    /// ```
115    pub fn register_fn<Req: DeserializeOwned + 'static, Resp: Serialize + 'static, F: 'static>(&mut self, name: &str, f: F) where F: Fn(Req) -> Result<Resp> {
116        self.handles.insert(name.to_owned(), Box::new(HandleFn::new(f)));
117    }
118
119    pub fn serve<A>(self, addr: A) where A: ToSocketAddrs {
120        let listener = TcpListener::bind(addr).unwrap();
121        println!(
122            "Starting tcp echo server on {:?}",
123            listener.local_addr().unwrap(),
124        );
125        let server = Arc::new(self);
126        for stream in listener.incoming() {
127            match stream {
128                Ok(s) => {
129                    let server = server.clone();
130                    co!(move || server.call(s));
131                }
132                Err(e) => error!("err = {:?}", e),
133            }
134        }
135    }
136}