1use add_data::{AddData, AddDataEndpoint};
2use ahash::AHashMap;
3use bytes::Bytes;
4use crossbeam::channel::{bounded, Receiver, Sender};
5use extensions::Extensions;
6use serde::Serialize;
7use std::{
8 collections::HashMap,
9 fmt::{Debug, Formatter},
10 ops::Deref,
11 sync::{Arc, RwLock},
12};
13
14pub mod add_data;
15pub mod common;
16pub mod extensions;
17pub mod request;
18pub mod response;
19
20pub mod prelude;
21
22pub use channel_server_derive::handler;
23
24#[derive(Default, Clone)]
25pub struct Body(Option<Bytes>);
26
27#[derive(Clone)]
28pub struct Param(Option<String>);
29
30impl Body {
31 pub fn empty() -> Body {
32 Self(None)
33 }
34
35 pub fn take(&mut self) -> Result<Bytes, ChannelError> {
36 self.0.take().ok_or(ChannelError::BodyNoData)
37 }
38
39 pub fn from_string(body: String) -> Body {
40 Self(Some(body.into()))
41 }
42
43 pub fn from_bytes(body: Bytes) -> Body {
44 Self(Some(body))
45 }
46}
47
48impl Param {
49 pub fn empty() -> Self {
50 Self(None)
51 }
52
53 pub fn from_obj(obj: impl Serialize) -> Self {
54 Self(Some(serde_json::to_string(&obj).unwrap()))
55 }
56
57 pub fn as_ref(&self) -> Result<&String, ChannelError> {
58 if let Some(param) = self.0.as_ref() {
59 Ok(param)
60 } else {
61 Err(ChannelError::ParamNoData)
62 }
63 }
64}
65
66impl Deref for Body {
67 type Target = Option<Bytes>;
68
69 fn deref(&self) -> &Self::Target {
70 &self.0
71 }
72}
73
74pub struct Request {
75 uri: String,
76 param: Param,
78 body: Body,
80 extensions: Extensions,
82}
83
84impl Request {
85 pub fn new(uri: impl Into<String>, param: Param, body: Body) -> Request {
86 Self {
87 uri: uri.into(),
88 param,
89 body,
90 extensions: Extensions::new(),
91 }
92 }
93
94 pub fn with_param(uri: String, param: Param) -> Request {
95 Self::new(uri, param, Body::empty())
96 }
97
98 pub fn with_body(uri: String, body: Body) -> Request {
99 Self::new(uri, Param::empty(), body)
100 }
101
102 pub fn split(mut self) -> (Request, Body) {
104 let body = std::mem::take(&mut self.body);
105 (self, body)
106 }
107
108 #[inline]
109 pub fn uri_ref(&self) -> &str {
110 &self.uri
111 }
112
113 #[inline]
115 pub fn extensions(&self) -> &Extensions {
116 &self.extensions
117 }
118
119 #[inline]
120 pub fn param(&self) -> &Param {
121 &self.param
122 }
123
124 #[inline]
126 pub fn extensions_mut(&mut self) -> &mut Extensions {
127 &mut self.extensions
128 }
129}
130
131impl Debug for Request {
132 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
133 f.debug_struct("Request").field("uri", &self.uri).finish()
134 }
135}
136
137#[derive(Debug, Clone)]
138pub enum StatusCode {
139 Ok(String),
141 Fail(String),
143 Ready(String),
145 Pending(String),
147 NotStart(String),
149}
150
151impl Default for StatusCode {
152 fn default() -> Self {
153 Self::ready()
154 }
155}
156
157impl StatusCode {
158 pub fn ok() -> Self {
159 Self::Ok("执行成功".into())
160 }
161 pub fn fail() -> Self {
162 Self::Fail("执行失败".into())
163 }
164 pub fn pending() -> Self {
165 Self::Pending("正在执行".into())
166 }
167 pub fn ready() -> Self {
168 Self::Ready("准备就绪".into())
169 }
170
171 pub fn not_start() -> Self {
172 Self::NotStart("未执行".into())
173 }
174
175 pub fn is_ok(&self) -> bool {
176 matches!(self, StatusCode::Ok(_))
177 }
178}
179
180#[derive(Default, Clone)]
181pub struct Response {
182 uri: String,
183 status: StatusCode,
184 body: Body,
185}
186
187impl Response {
188 pub fn new() -> Self {
189 Self {
190 uri: String::new(),
191 status: StatusCode::ready(),
192 body: Body(None),
193 }
194 }
195
196 pub fn topic(uri: &str) -> Self {
197 Self {
198 uri: uri.into(),
199 status: StatusCode::ok(),
200 body: Body(None),
201 }
202 }
203
204 pub fn body(mut self, body: Bytes) -> Self {
205 self.body = Body(Some(body));
206 self
207 }
208
209 pub fn status(mut self, status: StatusCode) -> Self {
210 self.status = status;
211 self
212 }
213
214 pub fn status_ref(&self) -> &StatusCode {
215 &self.status
216 }
217
218 pub fn uri(mut self, uri: String) -> Response {
219 self.uri = uri;
220 self
221 }
222
223 pub fn uri_ref(&self) -> &str {
224 &self.uri
225 }
226
227 pub fn is_ok(&self) -> bool {
228 self.status.is_ok()
229 }
230
231 #[inline]
232 pub fn take_body(&mut self) -> Body {
233 std::mem::take(&mut self.body)
234 }
235}
236
237impl Debug for Response {
238 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
239 let len = if let Some(body) = self.body.as_ref() {
240 body.len()
241 } else {
242 0
243 };
244 f.debug_struct("Response")
245 .field("uri", &self.uri)
246 .field("status", &self.status)
247 .field("body length", &len)
248 .finish()
249 }
250}
251
252pub trait IntoResponse: Send {
253 fn into_response(self) -> Response;
254}
255
256pub trait Endpoint: Send + Sync {
257 type Output: IntoResponse;
259
260 fn call(&self, req: Request) -> Result<Self::Output, ChannelError>;
262
263 fn get_response(&self, req: Request) -> Response {
264 let uri = req.uri_ref().to_string();
265 let res = self
266 .call(req)
267 .map(IntoResponse::into_response)
268 .unwrap_or_else(|err| err.into_response());
269 res.uri(uri)
270 }
271}
272
273#[derive(Debug, thiserror::Error)]
274pub enum ChannelError {
275 #[error("请求已经在队列中")]
276 ReqExistInQueue,
277 #[error("请求发送失败")]
278 ReqSendError,
279
280 #[error("io: {0}")]
282 Io(#[from] std::io::Error),
283
284 #[error("解析json异常")]
285 ParseJsonError,
286
287 #[error("the request body has no data")]
289 BodyNoData,
290
291 #[error("the request param has no data")]
292 ParamNoData,
293
294 #[error("parse utf8: {0}")]
296 NotUtf8(#[from] std::string::FromUtf8Error),
297
298 #[error("路径未找到: {0}")]
299 PathNotFoundError(String),
300
301 #[error("Get data 异常: {0}")]
303 GetDataError(String),
304
305 #[error("异常: {0}")]
306 Custom(String),
307}
308
309impl IntoResponse for ChannelError {
310 fn into_response(self) -> Response {
311 Response::new().status(StatusCode::Fail(self.to_string()))
312 }
313}
314
315pub trait FromRequest<'a>: Sized {
316 fn from_request(req: &'a Request, body: &mut Body) -> Result<Self, ChannelError>;
317 fn from_request_without_body(req: &'a Request) -> Result<Self, ChannelError> {
318 Self::from_request(req, &mut Default::default())
319 }
320}
321
322pub type BoxEndpoint<'a, T = Response> = Box<dyn Endpoint<Output = T> + 'a>;
323
324pub trait EndpointExt: IntoEndpoint {
325 fn boxed<'a>(self) -> BoxEndpoint<'a, <Self::Endpoint as Endpoint>::Output>
326 where
327 Self: Sized + 'a,
328 {
329 Box::new(self.into_endpoint())
330 }
331
332 fn data<T>(self, data: T) -> AddDataEndpoint<Self::Endpoint, T>
333 where
334 T: Clone + Send + Sync + 'static,
335 Self: Sized,
336 {
337 self.with(AddData::new(data))
338 }
339
340 fn with<T>(self, middleware: T) -> T::Output
341 where
342 T: Middleware<Self::Endpoint>,
343 Self: Sized,
344 {
345 middleware.transform(self.into_endpoint())
346 }
347}
348
349impl<T: IntoEndpoint> EndpointExt for T {}
350
351pub trait IntoEndpoint {
352 type Endpoint: Endpoint;
353 fn into_endpoint(self) -> Self::Endpoint;
354}
355
356impl<T: Endpoint> IntoEndpoint for T {
357 type Endpoint = T;
358
359 fn into_endpoint(self) -> Self::Endpoint {
360 self
361 }
362}
363
364pub trait Middleware<E: Endpoint> {
365 type Output: Endpoint;
366
367 fn transform(&self, ep: E) -> Self::Output;
369}
370
371#[derive(Clone)]
377pub struct Route {
378 map: Arc<RwLock<AHashMap<&'static str, BoxEndpoint<'static>>>>,
379}
380
381impl Route {
382 pub fn new() -> Self {
383 Self {
384 map: Arc::default(),
385 }
386 }
387}
388
389impl Endpoint for Route {
390 type Output = Response;
391
392 fn call(&self, req: Request) -> Result<Self::Output, ChannelError> {
393 let map = self.map.read().unwrap();
394 if map.contains_key(req.uri_ref()) {
395 let ep = &map[req.uri_ref()];
396 ep.call(req)
397 } else {
398 Err(ChannelError::PathNotFoundError(req.uri_ref().into()))
399 }
400 }
401}
402
403impl Route {
404 #[must_use]
405 pub fn at(self, path: &'static str, ep: impl Endpoint<Output = Response> + 'static) -> Self {
406 {
407 let mut map = self.map.write().unwrap();
408 if map.contains_key(path) {
409 panic!("duplicate path: {}", path);
410 }
411 map.insert(path, ep.boxed());
412 }
413 self
414 }
415}
416
417struct ChannelServer {
418 res_rx: Receiver<Request>,
419 req_tx: Sender<Response>,
420}
421
422impl ChannelServer {
423 pub(crate) fn new(req_rx: Receiver<Request>, res_tx: Sender<Response>) -> ChannelServer {
424 Self {
425 res_rx: req_rx,
426 req_tx: res_tx,
427 }
428 }
429
430 pub fn run(self, ep: impl Endpoint + 'static + Clone) {
431 std::thread::spawn(move || {
432 while let Ok(req) = self.res_rx.recv() {
433 let ep = ep.clone();
434 let req_tx = self.req_tx.clone();
435 std::thread::spawn(move || {
436 let res = ep.get_response(req);
437 req_tx.try_send(res).ok();
438 });
439 }
440 });
441 }
442}
443
444pub struct ChannelClient {
445 req_tx: Sender<Request>,
446 res_rx: Receiver<Response>,
447 res_queue: Vec<Response>,
448 topic_rx: Receiver<Response>,
449 topic_queue: HashMap<&'static str, Vec<Response>>,
450}
451
452#[derive(Clone)]
453pub struct ChannelTopic {
454 topic_tx: Sender<Response>,
455}
456
457impl ChannelTopic {
458 pub fn new(topic_tx: Sender<Response>) -> Self {
459 Self { topic_tx }
460 }
461 pub fn publish(&self, res: Response) {
462 self.topic_tx.send(res).ok();
464 }
465}
466
467impl ChannelClient {
468 pub fn req_with_param(
469 &mut self,
470 uri: impl Into<String>,
471 param: Param,
472 ) -> Result<(), ChannelError> {
473 let req = Request::new(uri.into(), param, Body::empty());
474 self.req(req)
475 }
476
477 pub fn req_with_body(
478 &mut self,
479 uri: impl Into<String>,
480 body: Body,
481 ) -> Result<(), ChannelError> {
482 let req = Request::new(uri.into(), Param::empty(), body);
483 self.req(req)
484 }
485
486 pub fn req_with_param_body(
487 &mut self,
488 uri: impl Into<String>,
489 param: Param,
490 body: Body,
491 ) -> Result<(), ChannelError> {
492 let req = Request::new(uri.into(), param, body);
493 self.req(req)
494 }
495
496 pub fn req(&mut self, req: Request) -> Result<(), ChannelError> {
498 let item = self
500 .res_queue
501 .iter()
502 .find(|res| res.uri_ref() == req.uri_ref());
503 if item.is_some() {
504 return Err(ChannelError::ReqExistInQueue);
505 }
506
507 self.res_queue
509 .push(Response::new().uri(req.uri_ref().into()));
510
511 self.req_tx
513 .send(req)
514 .map_err(|_e| ChannelError::ReqSendError)
515 }
516
517 pub fn run_once(&mut self) -> bool {
520 let mut recved = false;
521 while let Ok(res) = self.res_rx.try_recv() {
522 let item = self
523 .res_queue
524 .iter_mut()
525 .find(|r| r.uri_ref() == res.uri_ref());
526 if let Some(r) = item {
527 *r = res;
528 recved = true;
529 }
530 }
531 while let Ok(res) = self.topic_rx.try_recv() {
532 if let Some(queue) = self.topic_queue.get_mut(res.uri_ref()) {
534 queue.push(res);
535 recved = true;
536 }
537 }
538 recved
539 }
540
541 pub fn fetch(&self, uri: &str) -> Option<&Response> {
543 self.res_queue.iter().find(|res| res.uri_ref() == uri)
544 }
545
546 pub fn clean(&mut self, uri: &str) {
548 self.res_queue.retain(|res| res.uri_ref() != uri);
549 }
550
551 pub fn subject(&mut self, uri: &'static str) {
552 self.topic_queue.insert(uri, Vec::new());
553 }
554
555 pub fn fetch_topic(&mut self, uri: &'static str) -> Option<Vec<Response>> {
556 if self.topic_queue.contains_key(uri) {
557 self.topic_queue.insert(uri, Vec::new())
558 } else {
559 None
560 }
561 }
562
563 pub(crate) fn new(
564 req_tx: Sender<Request>,
565 res_rx: Receiver<Response>,
566 topic_rx: Receiver<Response>,
567 ) -> ChannelClient {
568 Self {
569 req_tx,
570 res_rx,
571 topic_rx,
572 res_queue: Vec::new(),
573 topic_queue: HashMap::new(),
574 }
575 }
576}
577
578pub struct ChannelService {}
579
580impl ChannelService {
581 pub fn start(ep: impl Endpoint + 'static + Clone) -> (ChannelClient, ChannelTopic) {
582 let (req_tx, req_rx) = bounded::<Request>(100);
583 let (res_tx, res_rx) = bounded::<Response>(100);
584 let (topic_tx, topic_rx) = bounded::<Response>(100);
585 let client = ChannelClient::new(req_tx, res_rx, topic_rx);
586 let server = ChannelServer::new(req_rx, res_tx);
587 let topic = ChannelTopic::new(topic_tx);
588 server.run(ep);
589 (client, topic)
590 }
591}