channel_server/
lib.rs

1use add_data::{AddData, AddDataEndpoint};
2use ahash::AHashMap;
3use bytes::Bytes;
4use crossbeam::channel::{bounded, Receiver, Sender};
5use extensions::Extensions;
6use serde::Serialize;
7use std::{
8    collections::HashMap,
9    fmt::{Debug, Formatter},
10    ops::Deref,
11    sync::{Arc, RwLock},
12};
13
14pub mod add_data;
15pub mod common;
16pub mod extensions;
17pub mod request;
18pub mod response;
19
20pub mod prelude;
21
22pub use channel_server_derive::handler;
23
24#[derive(Default, Clone)]
25pub struct Body(Option<Bytes>);
26
27#[derive(Clone)]
28pub struct Param(Option<String>);
29
30impl Body {
31    pub fn empty() -> Body {
32        Self(None)
33    }
34
35    pub fn take(&mut self) -> Result<Bytes, ChannelError> {
36        self.0.take().ok_or(ChannelError::BodyNoData)
37    }
38
39    pub fn from_string(body: String) -> Body {
40        Self(Some(body.into()))
41    }
42
43    pub fn from_bytes(body: Bytes) -> Body {
44        Self(Some(body))
45    }
46}
47
48impl Param {
49    pub fn empty() -> Self {
50        Self(None)
51    }
52
53    pub fn from_obj(obj: impl Serialize) -> Self {
54        Self(Some(serde_json::to_string(&obj).unwrap()))
55    }
56
57    pub fn as_ref(&self) -> Result<&String, ChannelError> {
58        if let Some(param) = self.0.as_ref() {
59            Ok(param)
60        } else {
61            Err(ChannelError::ParamNoData)
62        }
63    }
64}
65
66impl Deref for Body {
67    type Target = Option<Bytes>;
68
69    fn deref(&self) -> &Self::Target {
70        &self.0
71    }
72}
73
74pub struct Request {
75    uri: String,
76    /// 用于传递参数, json序列化字符串
77    param: Param,
78    /// 用于传递大数据
79    body: Body,
80    /// 主要是 Middleware 使用的
81    extensions: Extensions,
82}
83
84impl Request {
85    pub fn new(uri: impl Into<String>, param: Param, body: Body) -> Request {
86        Self {
87            uri: uri.into(),
88            param,
89            body,
90            extensions: Extensions::new(),
91        }
92    }
93
94    pub fn with_param(uri: String, param: Param) -> Request {
95        Self::new(uri, param, Body::empty())
96    }
97
98    pub fn with_body(uri: String, body: Body) -> Request {
99        Self::new(uri, Param::empty(), body)
100    }
101
102    /// Returns the parameters used by the extractor.
103    pub fn split(mut self) -> (Request, Body) {
104        let body = std::mem::take(&mut self.body);
105        (self, body)
106    }
107
108    #[inline]
109    pub fn uri_ref(&self) -> &str {
110        &self.uri
111    }
112
113    /// Returns a reference to the associated extensions.
114    #[inline]
115    pub fn extensions(&self) -> &Extensions {
116        &self.extensions
117    }
118
119    #[inline]
120    pub fn param(&self) -> &Param {
121        &self.param
122    }
123
124    /// Returns a mutable reference to the associated extensions.
125    #[inline]
126    pub fn extensions_mut(&mut self) -> &mut Extensions {
127        &mut self.extensions
128    }
129}
130
131impl Debug for Request {
132    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
133        f.debug_struct("Request").field("uri", &self.uri).finish()
134    }
135}
136
137#[derive(Debug, Clone)]
138pub enum StatusCode {
139    /// 执行成功
140    Ok(String),
141    /// 执行失败
142    Fail(String),
143    /// 准备就绪
144    Ready(String),
145    /// 执行中
146    Pending(String),
147    /// 还未开始
148    NotStart(String),
149}
150
151impl Default for StatusCode {
152    fn default() -> Self {
153        Self::ready()
154    }
155}
156
157impl StatusCode {
158    pub fn ok() -> Self {
159        Self::Ok("执行成功".into())
160    }
161    pub fn fail() -> Self {
162        Self::Fail("执行失败".into())
163    }
164    pub fn pending() -> Self {
165        Self::Pending("正在执行".into())
166    }
167    pub fn ready() -> Self {
168        Self::Ready("准备就绪".into())
169    }
170
171    pub fn not_start() -> Self {
172        Self::NotStart("未执行".into())
173    }
174
175    pub fn is_ok(&self) -> bool {
176        matches!(self, StatusCode::Ok(_))
177    }
178}
179
180#[derive(Default, Clone)]
181pub struct Response {
182    uri: String,
183    status: StatusCode,
184    body: Body,
185}
186
187impl Response {
188    pub fn new() -> Self {
189        Self {
190            uri: String::new(),
191            status: StatusCode::ready(),
192            body: Body(None),
193        }
194    }
195
196    pub fn topic(uri: &str) -> Self {
197        Self {
198            uri: uri.into(),
199            status: StatusCode::ok(),
200            body: Body(None),
201        }
202    }
203
204    pub fn body(mut self, body: Bytes) -> Self {
205        self.body = Body(Some(body));
206        self
207    }
208
209    pub fn status(mut self, status: StatusCode) -> Self {
210        self.status = status;
211        self
212    }
213
214    pub fn status_ref(&self) -> &StatusCode {
215        &self.status
216    }
217
218    pub fn uri(mut self, uri: String) -> Response {
219        self.uri = uri;
220        self
221    }
222
223    pub fn uri_ref(&self) -> &str {
224        &self.uri
225    }
226
227    pub fn is_ok(&self) -> bool {
228        self.status.is_ok()
229    }
230
231    #[inline]
232    pub fn take_body(&mut self) -> Body {
233        std::mem::take(&mut self.body)
234    }
235}
236
237impl Debug for Response {
238    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
239        let len = if let Some(body) = self.body.as_ref() {
240            body.len()
241        } else {
242            0
243        };
244        f.debug_struct("Response")
245            .field("uri", &self.uri)
246            .field("status", &self.status)
247            .field("body length", &len)
248            .finish()
249    }
250}
251
252pub trait IntoResponse: Send {
253    fn into_response(self) -> Response;
254}
255
256pub trait Endpoint: Send + Sync {
257    /// Represents the response of the endpoint.
258    type Output: IntoResponse;
259
260    /// Get the response to the request.
261    fn call(&self, req: Request) -> Result<Self::Output, ChannelError>;
262
263    fn get_response(&self, req: Request) -> Response {
264        let uri = req.uri_ref().to_string();
265        let res = self
266            .call(req)
267            .map(IntoResponse::into_response)
268            .unwrap_or_else(|err| err.into_response());
269        res.uri(uri)
270    }
271}
272
273#[derive(Debug, thiserror::Error)]
274pub enum ChannelError {
275    #[error("请求已经在队列中")]
276    ReqExistInQueue,
277    #[error("请求发送失败")]
278    ReqSendError,
279
280    /// Io error.
281    #[error("io: {0}")]
282    Io(#[from] std::io::Error),
283
284    #[error("解析json异常")]
285    ParseJsonError,
286
287    /// Body has been taken by other extractors.
288    #[error("the request body has no data")]
289    BodyNoData,
290
291    #[error("the request param has no data")]
292    ParamNoData,
293
294    /// Body is not a valid utf8 string.
295    #[error("parse utf8: {0}")]
296    NotUtf8(#[from] std::string::FromUtf8Error),
297
298    #[error("路径未找到: {0}")]
299    PathNotFoundError(String),
300
301    /// 获取Data异常.
302    #[error("Get data 异常: {0}")]
303    GetDataError(String),
304
305    #[error("异常: {0}")]
306    Custom(String),
307}
308
309impl IntoResponse for ChannelError {
310    fn into_response(self) -> Response {
311        Response::new().status(StatusCode::Fail(self.to_string()))
312    }
313}
314
315pub trait FromRequest<'a>: Sized {
316    fn from_request(req: &'a Request, body: &mut Body) -> Result<Self, ChannelError>;
317    fn from_request_without_body(req: &'a Request) -> Result<Self, ChannelError> {
318        Self::from_request(req, &mut Default::default())
319    }
320}
321
322pub type BoxEndpoint<'a, T = Response> = Box<dyn Endpoint<Output = T> + 'a>;
323
324pub trait EndpointExt: IntoEndpoint {
325    fn boxed<'a>(self) -> BoxEndpoint<'a, <Self::Endpoint as Endpoint>::Output>
326    where
327        Self: Sized + 'a,
328    {
329        Box::new(self.into_endpoint())
330    }
331
332    fn data<T>(self, data: T) -> AddDataEndpoint<Self::Endpoint, T>
333    where
334        T: Clone + Send + Sync + 'static,
335        Self: Sized,
336    {
337        self.with(AddData::new(data))
338    }
339
340    fn with<T>(self, middleware: T) -> T::Output
341    where
342        T: Middleware<Self::Endpoint>,
343        Self: Sized,
344    {
345        middleware.transform(self.into_endpoint())
346    }
347}
348
349impl<T: IntoEndpoint> EndpointExt for T {}
350
351pub trait IntoEndpoint {
352    type Endpoint: Endpoint;
353    fn into_endpoint(self) -> Self::Endpoint;
354}
355
356impl<T: Endpoint> IntoEndpoint for T {
357    type Endpoint = T;
358
359    fn into_endpoint(self) -> Self::Endpoint {
360        self
361    }
362}
363
364pub trait Middleware<E: Endpoint> {
365    type Output: Endpoint;
366
367    /// Transform the input [`Endpoint`] to another one.
368    fn transform(&self, ep: E) -> Self::Output;
369}
370
371// #[handler]
372// fn hello(name: String) -> String {
373//     format!("hello: {}", name)
374// }
375
376#[derive(Clone)]
377pub struct Route {
378    map: Arc<RwLock<AHashMap<&'static str, BoxEndpoint<'static>>>>,
379}
380
381impl Route {
382    pub fn new() -> Self {
383        Self {
384            map: Arc::default(),
385        }
386    }
387}
388
389impl Endpoint for Route {
390    type Output = Response;
391
392    fn call(&self, req: Request) -> Result<Self::Output, ChannelError> {
393        let map = self.map.read().unwrap();
394        if map.contains_key(req.uri_ref()) {
395            let ep = &map[req.uri_ref()];
396            ep.call(req)
397        } else {
398            Err(ChannelError::PathNotFoundError(req.uri_ref().into()))
399        }
400    }
401}
402
403impl Route {
404    #[must_use]
405    pub fn at(self, path: &'static str, ep: impl Endpoint<Output = Response> + 'static) -> Self {
406        {
407            let mut map = self.map.write().unwrap();
408            if map.contains_key(path) {
409                panic!("duplicate path: {}", path);
410            }
411            map.insert(path, ep.boxed());
412        }
413        self
414    }
415}
416
417struct ChannelServer {
418    res_rx: Receiver<Request>,
419    req_tx: Sender<Response>,
420}
421
422impl ChannelServer {
423    pub(crate) fn new(req_rx: Receiver<Request>, res_tx: Sender<Response>) -> ChannelServer {
424        Self {
425            res_rx: req_rx,
426            req_tx: res_tx,
427        }
428    }
429
430    pub fn run(self, ep: impl Endpoint + 'static + Clone) {
431        std::thread::spawn(move || {
432            while let Ok(req) = self.res_rx.recv() {
433                let ep = ep.clone();
434                let req_tx = self.req_tx.clone();
435                std::thread::spawn(move || {
436                    let res = ep.get_response(req);
437                    req_tx.try_send(res).ok();
438                });
439            }
440        });
441    }
442}
443
444pub struct ChannelClient {
445    req_tx: Sender<Request>,
446    res_rx: Receiver<Response>,
447    res_queue: Vec<Response>,
448    topic_rx: Receiver<Response>,
449    topic_queue: HashMap<&'static str, Vec<Response>>,
450}
451
452#[derive(Clone)]
453pub struct ChannelTopic {
454    topic_tx: Sender<Response>,
455}
456
457impl ChannelTopic {
458    pub fn new(topic_tx: Sender<Response>) -> Self {
459        Self { topic_tx }
460    }
461    pub fn publish(&self, res: Response) {
462        // 发送成功还是失败并不重要
463        self.topic_tx.send(res).ok();
464    }
465}
466
467impl ChannelClient {
468    pub fn req_with_param(
469        &mut self,
470        uri: impl Into<String>,
471        param: Param,
472    ) -> Result<(), ChannelError> {
473        let req = Request::new(uri.into(), param, Body::empty());
474        self.req(req)
475    }
476
477    pub fn req_with_body(
478        &mut self,
479        uri: impl Into<String>,
480        body: Body,
481    ) -> Result<(), ChannelError> {
482        let req = Request::new(uri.into(), Param::empty(), body);
483        self.req(req)
484    }
485
486    pub fn req_with_param_body(
487        &mut self,
488        uri: impl Into<String>,
489        param: Param,
490        body: Body,
491    ) -> Result<(), ChannelError> {
492        let req = Request::new(uri.into(), param, body);
493        self.req(req)
494    }
495
496    /// 发起请求
497    pub fn req(&mut self, req: Request) -> Result<(), ChannelError> {
498        // 先检查队列中是否有这个请求
499        let item = self
500            .res_queue
501            .iter()
502            .find(|res| res.uri_ref() == req.uri_ref());
503        if item.is_some() {
504            return Err(ChannelError::ReqExistInQueue);
505        }
506
507        // 添加 请求状态
508        self.res_queue
509            .push(Response::new().uri(req.uri_ref().into()));
510
511        // 发送请求
512        self.req_tx
513            .send(req)
514            .map_err(|_e| ChannelError::ReqSendError)
515    }
516
517    /// 处理消息队列
518    /// 返回值为 true 表示 接收到 响应
519    pub fn run_once(&mut self) -> bool {
520        let mut recved = false;
521        while let Ok(res) = self.res_rx.try_recv() {
522            let item = self
523                .res_queue
524                .iter_mut()
525                .find(|r| r.uri_ref() == res.uri_ref());
526            if let Some(r) = item {
527                *r = res;
528                recved = true;
529            }
530        }
531        while let Ok(res) = self.topic_rx.try_recv() {
532            // 只有明确订阅的数据才会被添加到队列中
533            if let Some(queue) = self.topic_queue.get_mut(res.uri_ref()) {
534                queue.push(res);
535                recved = true;
536            }
537        }
538        recved
539    }
540
541    /// 根据 uri 获得请求结果
542    pub fn fetch(&self, uri: &str) -> Option<&Response> {
543        self.res_queue.iter().find(|res| res.uri_ref() == uri)
544    }
545
546    /// 清除 response
547    pub fn clean(&mut self, uri: &str) {
548        self.res_queue.retain(|res| res.uri_ref() != uri);
549    }
550
551    pub fn subject(&mut self, uri: &'static str) {
552        self.topic_queue.insert(uri, Vec::new());
553    }
554
555    pub fn fetch_topic(&mut self, uri: &'static str) -> Option<Vec<Response>> {
556        if self.topic_queue.contains_key(uri) {
557            self.topic_queue.insert(uri, Vec::new())
558        } else {
559            None
560        }
561    }
562
563    pub(crate) fn new(
564        req_tx: Sender<Request>,
565        res_rx: Receiver<Response>,
566        topic_rx: Receiver<Response>,
567    ) -> ChannelClient {
568        Self {
569            req_tx,
570            res_rx,
571            topic_rx,
572            res_queue: Vec::new(),
573            topic_queue: HashMap::new(),
574        }
575    }
576}
577
578pub struct ChannelService {}
579
580impl ChannelService {
581    pub fn start(ep: impl Endpoint + 'static + Clone) -> (ChannelClient, ChannelTopic) {
582        let (req_tx, req_rx) = bounded::<Request>(100);
583        let (res_tx, res_rx) = bounded::<Response>(100);
584        let (topic_tx, topic_rx) = bounded::<Response>(100);
585        let client = ChannelClient::new(req_tx, res_rx, topic_rx);
586        let server = ChannelServer::new(req_rx, res_tx);
587        let topic = ChannelTopic::new(topic_tx);
588        server.run(ep);
589        (client, topic)
590    }
591}