1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96
use std::io; use tokio_service::Service; use futures::{Future, BoxFuture}; use message::{Message, Response}; use rmpv::Value; use futures_cpupool::CpuPool; use std::marker::Sync; // FIXME: The 'static bound is quite limiting because it means that we can implement Dispatch for // types like Foo<'a>. It is required because in Service.call(), we move a dispatcher into a // closure, and it has to live long enough inside this closure, i.e. at least as long as the // closure lives. // // Is this even fixable? /// A dispatcher that performs the calls on the server. /// /// # Examples /// /// Here is how to implement a simple server with two methods `hello` (that returns "hello") and /// `world` (that return "world"). Calling any other method would result in an error. /// /// ``` /// use rmp_rpc::Dispatch; /// #[derive(Clone)] /// pub struct HelloWorld; /// /// impl Dispatch for HelloWorld { /// fn dispatch(&self, method: &str, _params: &[Value]) -> Result<Value, Value> { /// match method { /// "hello" => { Ok(Value::String(Utf8String::from("hello"))) } /// "world" => { Ok(Value::String(Utf8String::from("world"))) } /// _ => { Err(Value::String(Utf8String::from(format!("Invalid method {}", method)))) } /// } /// } /// } /// /// ``` /// pub trait Dispatch: Send + Sync + Clone + 'static { /// Respond a request. `method` is the name of the `MessagePack-RPC` method that was called, and /// `params` its arguments. fn dispatch(&self, method: &str, params: &[Value]) -> Result<Value, Value>; } /// A `MessagePack-RPC` server. It calls a dispatcher to answer requests. pub struct Server<T: Dispatch> { dispatcher: T, thread_pool: CpuPool, } impl<T: Dispatch> Server<T> { /// Instantiate a new server based on a given dispatcher. pub fn new(dispatcher: T) -> Self { Server { dispatcher: dispatcher, thread_pool: CpuPool::new(10), } } } impl<T: Dispatch> Service for Server<T> { type Request = Message; type Response = Message; type Error = io::Error; type Future = BoxFuture<Self::Response, Self::Error>; fn call(&self, message: Self::Request) -> Self::Future { match message { Message::Request(request) => { // FIXME: The whole dispatcher is cloned for every request won't that kill // performances ? How could we avoid that? let dispatcher = self.dispatcher.clone(); let future = self.thread_pool.spawn_fn(move || { match dispatcher.dispatch(request.method.as_str(), &request.params) { Ok(value) => { Ok(Message::Response(Response { id: request.id, result: Ok(value), })) } Err(value) => { Ok(Message::Response(Response { id: request.id, result: Err(value), })) } } }); future.boxed() } // TODO: Notifications _ => unimplemented!(), } } }