zbus_lib/rpc/
io.rs

1use std::collections::HashMap;
2use std::sync::Arc;
3use std::sync::mpsc::{channel, Sender};
4use std::thread;
5use std::time::Duration;
6
7use log::debug;
8use serde_json::Value;
9use threadpool::ThreadPool;
10
11use crate::err::{ZbusErr, ZbusResult};
12use crate::message::{Message, Request, Response, ResponseBuilder};
13use crate::rpc::Protocol;
14use crate::wsocket::Instruct;
15
16pub struct IOHandler {
17    method: Box<dyn Handler + Send + Sync + 'static>,
18}
19
20impl IOHandler {
21    pub fn execute(&self, req: Request) -> ZbusResult<Value> {
22        self.method.execute(req.body())
23    }
24}
25
26pub struct IOHandlers {
27    methods: HashMap<String, Arc<IOHandler>>,
28    pool: ThreadPool,
29}
30
31impl IOHandlers {
32    pub fn new(n_workers: usize) -> Self {
33        Self {
34            methods: HashMap::new(),
35            pool: ThreadPool::new(n_workers),
36        }
37    }
38    pub fn add_method<H>(&mut self, name: String, handle: H) where H: Handler, H: Send + Sync + 'static {
39        self.methods.insert(name, Arc::new(IOHandler {
40            method: Box::new(handle)
41        }));
42    }
43    pub fn handler_request(&self, req: Request, tx: Sender<Instruct>) {
44        let mut builder = Response::builder();
45        let mut headers = HashMap::new();
46        headers.insert(Protocol::CMD.into(), Value::from(Protocol::ROUTE));
47        let id = req.id().unwrap();/// 这个存在判断是在调用rpc前验证过的
48        let req_headers = req.headers();
49        let source = req_headers.get(Protocol::SOURCE);
50        source.map(|s| headers.insert(Protocol::TARGET.into(), s.clone()));
51        headers.insert(Protocol::ID.into(), Value::from(id));
52        let url = req.url();
53        if url == "" {
54            tx.send(Instruct::Delivery(reply(builder, headers, 400, "url required"), None));
55            return;
56        }
57        let method = self.methods.get(req.url());
58        match method {
59            None => {
60                debug!("404 {} ", req.url());
61                tx.send(Instruct::Delivery(reply(builder, headers, 404, format!("URL={} Not Found", url)), None));
62            }
63            Some(handler) => {
64                let handler = handler.clone();
65                self.pool.execute(move || {
66                    let result: ZbusResult<Value> = handler.method.execute(req.body());
67                    let response = match result {
68                        Ok(body) => {
69                            headers.insert("Content-Type".into(), Value::from("application/json; charset=utf8"));
70                            builder.status(200).headers(headers).body(body);
71                            Message::Response(builder.build())
72                        }
73                        Err(e) => {
74                            reply(builder, headers, 500, format!("request params is error {}", e))
75                        }
76                    };
77                    tx.send(Instruct::Delivery(response, None));
78                });
79            }
80        }
81    }
82}
83
84fn reply<S: Into<String>>(mut builder: ResponseBuilder, mut headers: HashMap<String, Value>, status: u32, message: S) -> Message {
85    builder.status(status);
86    headers.insert("Content-Type".into(), Value::from("text/plain; charset=utf8"));
87    builder.body(Value::String(message.into()));
88    Message::Response(builder.build())
89}
90
91pub trait Handler: Send + Sync {
92    fn execute(&self, params: Value) -> ZbusResult<Value>;
93}
94
95impl<F> Handler for F where F: Fn(Value) -> ZbusResult<Value>, F: Sync + Send {
96    fn execute(&self, params: Value) -> ZbusResult<Value> { self(params) }
97}
98
99#[test]
100fn handler_test() {
101    let (tx, rx) = channel::<(Request, Sender<Instruct>)>();
102    let mut io_handler = IOHandlers::new(10);
103    io_handler.add_method("test".into(), |req: Value| {
104        Ok(Value::Null)
105    });
106    io_handler.add_method("test1".into(), |req: Value| {
107        Ok(Value::Null)
108    });
109    thread::spawn(move || {
110        loop {
111            if let Ok((req, sender)) = rx.recv() {
112                io_handler.handler_request(req, sender);
113            }
114        }
115    });
116    let (sender, receiv) = channel::<Instruct>();
117    for i in 0..10 {
118        let req = Request::builder().method("test").build();
119        tx.send((req, sender.clone()));
120    }
121    thread::sleep(Duration::from_secs(10));
122}
123
124