Skip to main content

asteroid_mq/protocol/node/edge/
middleware.rs

1use std::{future::Future, pin::Pin, sync::Arc};
2
3use asteroid_mq_model::{EdgeError, EdgeRequestEnum, EdgeResponseEnum, NodeId};
4
5use crate::prelude::Node;
6
7pub trait EdgeConnectionHandler: Clone + Send + 'static {
8    type Future: Future<Output = Result<EdgeResponseEnum, EdgeError>> + Send;
9    fn handle(&self, node: Node, from: NodeId, req: EdgeRequestEnum) -> Self::Future;
10}
11
12/// The dynamic object wrapper for [`EdgeConnectionHandler`].
13pub struct EdgeConnectionHandlerObject {
14    #[allow(clippy::type_complexity)]
15    handle: Arc<
16        dyn Fn(
17                Node,
18                NodeId,
19                EdgeRequestEnum,
20            )
21                -> Pin<Box<dyn Future<Output = Result<EdgeResponseEnum, EdgeError>> + Send>>
22            + Send
23            + Sync,
24    >,
25}
26
27impl std::fmt::Debug for EdgeConnectionHandlerObject {
28    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
29        f.debug_struct("EdgeConnectionHandlerObject").finish()
30    }
31}
32
33impl Clone for EdgeConnectionHandlerObject {
34    fn clone(&self) -> Self {
35        Self {
36            handle: self.handle.clone(),
37        }
38    }
39}
40
41impl EdgeConnectionHandlerObject {
42    /// Basic handler: call [`Node::handle_edge_request`] directly.
43    pub fn basic() -> Self {
44        Self {
45            handle: Arc::new(|node, from, req| {
46                Box::pin(async move { node.handle_edge_request(from, req).await })
47            }),
48        }
49    }
50    /// Wrap the handler with a middleware.
51    pub fn with_middleware<M>(self, middleware: M) -> Self
52    where
53        M: EdgeConnectionMiddleware<Self>,
54    {
55        Self {
56            handle: Arc::new(move |node, from, req| {
57                let this = self.clone();
58                let middleware = middleware.clone();
59                Box::pin(async move { middleware.handle(node, from, req, &this).await })
60            }),
61        }
62    }
63}
64
65impl EdgeConnectionHandler for EdgeConnectionHandlerObject {
66    type Future = Pin<Box<dyn Future<Output = Result<EdgeResponseEnum, EdgeError>> + Send>>;
67
68    fn handle(&self, node: Node, from: NodeId, req: EdgeRequestEnum) -> Self::Future {
69        (self.handle)(node, from, req)
70    }
71}
72
73pub trait EdgeConnectionMiddleware<I>: Clone + Send + Sync + 'static
74where
75    I: EdgeConnectionHandler,
76{
77    type Future: Future<Output = Result<EdgeResponseEnum, EdgeError>> + Send;
78    fn handle(&self, node: Node, from: NodeId, req: EdgeRequestEnum, inner: &I) -> Self::Future;
79}
80
81#[derive(Clone)]
82pub struct WithEdgeConnectionMiddleware<I, M> {
83    inner: I,
84    middleware: M,
85}
86
87impl<I, M> EdgeConnectionHandler for WithEdgeConnectionMiddleware<I, M>
88where
89    I: EdgeConnectionHandler,
90    M: EdgeConnectionMiddleware<I>,
91{
92    type Future = M::Future;
93
94    fn handle(&self, node: Node, from: NodeId, req: EdgeRequestEnum) -> Self::Future {
95        self.middleware.handle(node, from, req, &self.inner)
96    }
97}
98#[derive(Clone)]
99pub struct FunctionalEdgeConnectionMiddleware<F>(pub F);
100
101impl<F, I> EdgeConnectionMiddleware<I> for FunctionalEdgeConnectionMiddleware<F>
102where
103    F: Fn(Node, NodeId, EdgeRequestEnum, &I) -> I::Future + Clone + Send + Sync + 'static,
104    I: EdgeConnectionHandler,
105{
106    type Future = I::Future;
107
108    fn handle(&self, node: Node, from: NodeId, req: EdgeRequestEnum, inner: &I) -> Self::Future {
109        (self.0)(node, from, req, inner)
110    }
111}
112
113#[derive(Clone, Debug)]
114pub struct BasicHandler;
115
116impl EdgeConnectionHandler for BasicHandler {
117    type Future = Pin<Box<dyn Future<Output = Result<EdgeResponseEnum, EdgeError>> + Send>>;
118
119    fn handle(&self, node: Node, from: NodeId, req: EdgeRequestEnum) -> Self::Future {
120        Box::pin(async move { node.handle_edge_request(from, req).await })
121    }
122}
123
124impl Node {
125    /// Insert a middleware to the edge connection handler.
126    ///
127    /// Middlewares may be used to intercept and modify the behavior of the edge connection handler, such as logging, authentication, etc.
128    ///
129    /// Refer to the [`EdgeConnectionMiddleware`] trait for more information.
130    pub async fn insert_edge_connection_middleware<M>(&self, middleware: M)
131    where
132        M: EdgeConnectionMiddleware<EdgeConnectionHandlerObject>,
133    {
134        let mut wg = self.edge_handler.write().await;
135        *wg = wg.clone().with_middleware(middleware);
136    }
137    /// Get the current edge connection handler.
138    ///
139    /// You may access the [`Node::edge_handler`](`crate::protocol::node::NodeInner::edge_handler`) field directly to get the lock.
140    pub async fn get_edge_connection_handler(&self) -> EdgeConnectionHandlerObject {
141        self.edge_handler.read().await.clone()
142    }
143    /// Set the edge connection handler.
144    pub async fn set_edge_connection_handler(&self, handler: EdgeConnectionHandlerObject) {
145        let mut wg = self.edge_handler.write().await;
146        *wg = handler;
147    }
148}