1use std::collections::HashMap;
2use std::sync::Arc;
3use std::sync::mpsc::{channel, Sender};
4use std::thread;
5use std::time::Duration;
6
7use log::debug;
8use serde_json::Value;
9use threadpool::ThreadPool;
10
11use crate::err::{ZbusErr, ZbusResult};
12use crate::message::{Message, Request, Response, ResponseBuilder};
13use crate::rpc::Protocol;
14use crate::wsocket::Instruct;
15
16pub struct IOHandler {
17 method: Box<dyn Handler + Send + Sync + 'static>,
18}
19
20impl IOHandler {
21 pub fn execute(&self, req: Request) -> ZbusResult<Value> {
22 self.method.execute(req.body())
23 }
24}
25
26pub struct IOHandlers {
27 methods: HashMap<String, Arc<IOHandler>>,
28 pool: ThreadPool,
29}
30
31impl IOHandlers {
32 pub fn new(n_workers: usize) -> Self {
33 Self {
34 methods: HashMap::new(),
35 pool: ThreadPool::new(n_workers),
36 }
37 }
38 pub fn add_method<H>(&mut self, name: String, handle: H) where H: Handler, H: Send + Sync + 'static {
39 self.methods.insert(name, Arc::new(IOHandler {
40 method: Box::new(handle)
41 }));
42 }
43 pub fn handler_request(&self, req: Request, tx: Sender<Instruct>) {
44 let mut builder = Response::builder();
45 let mut headers = HashMap::new();
46 headers.insert(Protocol::CMD.into(), Value::from(Protocol::ROUTE));
47 let id = req.id().unwrap();let req_headers = req.headers();
49 let source = req_headers.get(Protocol::SOURCE);
50 source.map(|s| headers.insert(Protocol::TARGET.into(), s.clone()));
51 headers.insert(Protocol::ID.into(), Value::from(id));
52 let url = req.url();
53 if url == "" {
54 tx.send(Instruct::Delivery(reply(builder, headers, 400, "url required"), None));
55 return;
56 }
57 let method = self.methods.get(req.url());
58 match method {
59 None => {
60 debug!("404 {} ", req.url());
61 tx.send(Instruct::Delivery(reply(builder, headers, 404, format!("URL={} Not Found", url)), None));
62 }
63 Some(handler) => {
64 let handler = handler.clone();
65 self.pool.execute(move || {
66 let result: ZbusResult<Value> = handler.method.execute(req.body());
67 let response = match result {
68 Ok(body) => {
69 headers.insert("Content-Type".into(), Value::from("application/json; charset=utf8"));
70 builder.status(200).headers(headers).body(body);
71 Message::Response(builder.build())
72 }
73 Err(e) => {
74 reply(builder, headers, 500, format!("request params is error {}", e))
75 }
76 };
77 tx.send(Instruct::Delivery(response, None));
78 });
79 }
80 }
81 }
82}
83
84fn reply<S: Into<String>>(mut builder: ResponseBuilder, mut headers: HashMap<String, Value>, status: u32, message: S) -> Message {
85 builder.status(status);
86 headers.insert("Content-Type".into(), Value::from("text/plain; charset=utf8"));
87 builder.body(Value::String(message.into()));
88 Message::Response(builder.build())
89}
90
91pub trait Handler: Send + Sync {
92 fn execute(&self, params: Value) -> ZbusResult<Value>;
93}
94
95impl<F> Handler for F where F: Fn(Value) -> ZbusResult<Value>, F: Sync + Send {
96 fn execute(&self, params: Value) -> ZbusResult<Value> { self(params) }
97}
98
99#[test]
100fn handler_test() {
101 let (tx, rx) = channel::<(Request, Sender<Instruct>)>();
102 let mut io_handler = IOHandlers::new(10);
103 io_handler.add_method("test".into(), |req: Value| {
104 Ok(Value::Null)
105 });
106 io_handler.add_method("test1".into(), |req: Value| {
107 Ok(Value::Null)
108 });
109 thread::spawn(move || {
110 loop {
111 if let Ok((req, sender)) = rx.recv() {
112 io_handler.handler_request(req, sender);
113 }
114 }
115 });
116 let (sender, receiv) = channel::<Instruct>();
117 for i in 0..10 {
118 let req = Request::builder().method("test").build();
119 tx.send((req, sender.clone()));
120 }
121 thread::sleep(Duration::from_secs(10));
122}
123
124