asteroid_mq/protocol/node/edge/
middleware.rs1use std::{future::Future, pin::Pin, sync::Arc};
2
3use asteroid_mq_model::{EdgeError, EdgeRequestEnum, EdgeResponseEnum, NodeId};
4
5use crate::prelude::Node;
6
7pub trait EdgeConnectionHandler: Clone + Send + 'static {
8 type Future: Future<Output = Result<EdgeResponseEnum, EdgeError>> + Send;
9 fn handle(&self, node: Node, from: NodeId, req: EdgeRequestEnum) -> Self::Future;
10}
11
12pub struct EdgeConnectionHandlerObject {
14 #[allow(clippy::type_complexity)]
15 handle: Arc<
16 dyn Fn(
17 Node,
18 NodeId,
19 EdgeRequestEnum,
20 )
21 -> Pin<Box<dyn Future<Output = Result<EdgeResponseEnum, EdgeError>> + Send>>
22 + Send
23 + Sync,
24 >,
25}
26
27impl std::fmt::Debug for EdgeConnectionHandlerObject {
28 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
29 f.debug_struct("EdgeConnectionHandlerObject").finish()
30 }
31}
32
33impl Clone for EdgeConnectionHandlerObject {
34 fn clone(&self) -> Self {
35 Self {
36 handle: self.handle.clone(),
37 }
38 }
39}
40
41impl EdgeConnectionHandlerObject {
42 pub fn basic() -> Self {
44 Self {
45 handle: Arc::new(|node, from, req| {
46 Box::pin(async move { node.handle_edge_request(from, req).await })
47 }),
48 }
49 }
50 pub fn with_middleware<M>(self, middleware: M) -> Self
52 where
53 M: EdgeConnectionMiddleware<Self>,
54 {
55 Self {
56 handle: Arc::new(move |node, from, req| {
57 let this = self.clone();
58 let middleware = middleware.clone();
59 Box::pin(async move { middleware.handle(node, from, req, &this).await })
60 }),
61 }
62 }
63}
64
65impl EdgeConnectionHandler for EdgeConnectionHandlerObject {
66 type Future = Pin<Box<dyn Future<Output = Result<EdgeResponseEnum, EdgeError>> + Send>>;
67
68 fn handle(&self, node: Node, from: NodeId, req: EdgeRequestEnum) -> Self::Future {
69 (self.handle)(node, from, req)
70 }
71}
72
73pub trait EdgeConnectionMiddleware<I>: Clone + Send + Sync + 'static
74where
75 I: EdgeConnectionHandler,
76{
77 type Future: Future<Output = Result<EdgeResponseEnum, EdgeError>> + Send;
78 fn handle(&self, node: Node, from: NodeId, req: EdgeRequestEnum, inner: &I) -> Self::Future;
79}
80
81#[derive(Clone)]
82pub struct WithEdgeConnectionMiddleware<I, M> {
83 inner: I,
84 middleware: M,
85}
86
87impl<I, M> EdgeConnectionHandler for WithEdgeConnectionMiddleware<I, M>
88where
89 I: EdgeConnectionHandler,
90 M: EdgeConnectionMiddleware<I>,
91{
92 type Future = M::Future;
93
94 fn handle(&self, node: Node, from: NodeId, req: EdgeRequestEnum) -> Self::Future {
95 self.middleware.handle(node, from, req, &self.inner)
96 }
97}
98#[derive(Clone)]
99pub struct FunctionalEdgeConnectionMiddleware<F>(pub F);
100
101impl<F, I> EdgeConnectionMiddleware<I> for FunctionalEdgeConnectionMiddleware<F>
102where
103 F: Fn(Node, NodeId, EdgeRequestEnum, &I) -> I::Future + Clone + Send + Sync + 'static,
104 I: EdgeConnectionHandler,
105{
106 type Future = I::Future;
107
108 fn handle(&self, node: Node, from: NodeId, req: EdgeRequestEnum, inner: &I) -> Self::Future {
109 (self.0)(node, from, req, inner)
110 }
111}
112
113#[derive(Clone, Debug)]
114pub struct BasicHandler;
115
116impl EdgeConnectionHandler for BasicHandler {
117 type Future = Pin<Box<dyn Future<Output = Result<EdgeResponseEnum, EdgeError>> + Send>>;
118
119 fn handle(&self, node: Node, from: NodeId, req: EdgeRequestEnum) -> Self::Future {
120 Box::pin(async move { node.handle_edge_request(from, req).await })
121 }
122}
123
124impl Node {
125 pub async fn insert_edge_connection_middleware<M>(&self, middleware: M)
131 where
132 M: EdgeConnectionMiddleware<EdgeConnectionHandlerObject>,
133 {
134 let mut wg = self.edge_handler.write().await;
135 *wg = wg.clone().with_middleware(middleware);
136 }
137 pub async fn get_edge_connection_handler(&self) -> EdgeConnectionHandlerObject {
141 self.edge_handler.read().await.clone()
142 }
143 pub async fn set_edge_connection_handler(&self, handler: EdgeConnectionHandlerObject) {
145 let mut wg = self.edge_handler.write().await;
146 *wg = handler;
147 }
148}