wascc_host/middleware/
mod.rs

1use crate::Result;
2use crate::{plugins::PluginManager, Invocation, InvocationResponse};
3use std::sync::Arc;
4use std::sync::RwLock;
5use wapc::WapcHost;
6
7#[cfg(feature = "prometheus_middleware")]
8pub mod prometheus;
9
10/// The trait that must be implemented by all waSCC middleware
11pub trait Middleware: Send + Sync + 'static {
12    fn actor_pre_invoke(&self, inv: Invocation) -> Result<Invocation>;
13    fn actor_invoke(
14        &self,
15        inv: Invocation,
16        handler: InvocationHandler,
17    ) -> Result<MiddlewareResponse>;
18    fn actor_post_invoke(&self, response: InvocationResponse) -> Result<InvocationResponse>;
19
20    fn capability_pre_invoke(&self, inv: Invocation) -> Result<Invocation>;
21    fn capability_invoke(
22        &self,
23        inv: Invocation,
24        handler: InvocationHandler,
25    ) -> Result<MiddlewareResponse>;
26    fn capability_post_invoke(&self, response: InvocationResponse) -> Result<InvocationResponse>;
27}
28
29pub enum MiddlewareResponse {
30    Continue(InvocationResponse),
31    Halt(InvocationResponse),
32}
33
34pub struct InvocationHandler<'a> {
35    operation: &'a dyn Fn(Invocation) -> InvocationResponse,
36}
37
38impl<'a> InvocationHandler<'a> {
39    fn new(operation: &'a dyn Fn(Invocation) -> InvocationResponse) -> Self {
40        Self { operation }
41    }
42
43    pub fn invoke(&self, inv: Invocation) -> InvocationResponse {
44        (self.operation)(inv)
45    }
46}
47
48/// Follows a chain of middleware, ultimately executing the native plugin
49pub(crate) fn invoke_native_capability(
50    middlewares: Arc<RwLock<Vec<Box<dyn Middleware>>>>,
51    inv: Invocation,
52    plugins: Arc<RwLock<PluginManager>>,
53) -> Result<InvocationResponse> {
54    let inv = match run_capability_pre_invoke(inv.clone(), &middlewares.read().unwrap()) {
55        Ok(i) => i,
56        Err(e) => {
57            error!("Middleware failure: {}", e);
58            inv
59        }
60    };
61
62    match run_native_capability_invoke(&middlewares.read().unwrap(), &plugins.read().unwrap(), inv)
63    {
64        Ok(response) => {
65            match run_capability_post_invoke(response.clone(), &middlewares.read().unwrap()) {
66                Ok(r) => Ok(r),
67                Err(e) => {
68                    error!("Middleware failure: {}", e);
69                    Ok(response)
70                }
71            }
72        }
73        Err(e) => Err(e),
74    }
75}
76
77/// Follows a chain of middleware, ultimately executing a portable capability provider function
78pub(crate) fn invoke_portable_capability(
79    middlewares: Arc<RwLock<Vec<Box<dyn Middleware>>>>,
80    inv: Invocation,
81    guest: &WapcHost,
82) -> Result<InvocationResponse> {
83    let inv = match run_capability_pre_invoke(inv.clone(), &middlewares.read().unwrap()) {
84        Ok(i) => i,
85        Err(e) => {
86            error!("Middleware failure: {}", e);
87            inv
88        }
89    };
90
91    match run_portable_capability_invoke(&middlewares.read().unwrap(), inv, guest) {
92        Ok(response) => {
93            match run_capability_post_invoke(response.clone(), &middlewares.read().unwrap()) {
94                Ok(r) => Ok(r),
95                Err(e) => {
96                    error!("Middleware failure: {}", e);
97                    Ok(response)
98                }
99            }
100        }
101        Err(e) => Err(e),
102    }
103}
104
105pub(crate) fn invoke_actor(
106    middlewares: Arc<RwLock<Vec<Box<dyn Middleware>>>>,
107    inv: Invocation,
108    guest: &WapcHost,
109) -> Result<InvocationResponse> {
110    let inv = match run_actor_pre_invoke(inv.clone(), &middlewares.read().unwrap()) {
111        Ok(i) => i,
112        Err(e) => {
113            error!("Middleware failure: {}", e);
114            inv
115        }
116    };
117
118    match run_actor_invoke(&middlewares.read().unwrap(), inv, guest) {
119        Ok(response) => {
120            match run_actor_post_invoke(response.clone(), &middlewares.read().unwrap()) {
121                Ok(r) => Ok(r),
122                Err(e) => {
123                    error!("Middleware failure: {}", e);
124                    Ok(response)
125                }
126            }
127        }
128        Err(e) => Err(e),
129    }
130}
131
132fn run_actor_pre_invoke(
133    inv: Invocation,
134    middlewares: &[Box<dyn Middleware>],
135) -> Result<Invocation> {
136    let mut cur_inv = inv;
137    for m in middlewares {
138        match m.actor_pre_invoke(cur_inv) {
139            Ok(i) => cur_inv = i.clone(),
140            Err(e) => return Err(e),
141        }
142    }
143    Ok(cur_inv)
144}
145
146fn run_actor_invoke(
147    middlewares: &[Box<dyn Middleware>],
148    inv: Invocation,
149    guest: &WapcHost,
150) -> Result<InvocationResponse> {
151    let invoke_operation = |inv: Invocation| match guest.call(&inv.operation, &inv.msg) {
152        Ok(v) => InvocationResponse::success(&inv, v),
153        Err(e) => InvocationResponse::error(&inv, &format!("failed to invoke actor: {}", e)),
154    };
155
156    run_invoke(middlewares, inv, &invoke_operation)
157}
158
159fn run_actor_post_invoke(
160    resp: InvocationResponse,
161    middlewares: &[Box<dyn Middleware>],
162) -> Result<InvocationResponse> {
163    let mut cur_resp = resp;
164    for m in middlewares {
165        match m.actor_post_invoke(cur_resp) {
166            Ok(i) => cur_resp = i.clone(),
167            Err(e) => return Err(e),
168        }
169    }
170    Ok(cur_resp)
171}
172
173pub(crate) fn run_capability_pre_invoke(
174    inv: Invocation,
175    middlewares: &[Box<dyn Middleware>],
176) -> Result<Invocation> {
177    let mut cur_inv = inv;
178    for m in middlewares {
179        match m.capability_pre_invoke(cur_inv) {
180            Ok(i) => cur_inv = i.clone(),
181            Err(e) => return Err(e),
182        }
183    }
184    Ok(cur_inv)
185}
186
187pub(crate) fn run_native_capability_invoke(
188    middlewares: &[Box<dyn Middleware>],
189    plugins: &PluginManager,
190    inv: Invocation,
191) -> Result<InvocationResponse> {
192    let invoke_operation = |inv: Invocation| match plugins.call(&inv) {
193        Ok(r) => r,
194        Err(e) => InvocationResponse::error(&inv, &format!("failed to invoke capability: {}", e)),
195    };
196
197    run_invoke(middlewares, inv, &invoke_operation)
198}
199
200pub(crate) fn run_portable_capability_invoke(
201    middlewares: &[Box<dyn Middleware>],
202    inv: Invocation,
203    guest: &WapcHost,
204) -> Result<InvocationResponse> {
205    let invoke_operation = |inv: Invocation| match guest.call(&inv.operation, &inv.msg) {
206        Ok(v) => InvocationResponse::success(&inv, v),
207        Err(e) => InvocationResponse::error(&inv, &format!("failed to invoke capability: {}", e)),
208    };
209
210    run_invoke(middlewares, inv, &invoke_operation)
211}
212
213fn run_invoke(
214    middlewares: &[Box<dyn Middleware>],
215    inv: Invocation,
216    invoke_operation: &dyn Fn(Invocation) -> InvocationResponse,
217) -> Result<InvocationResponse> {
218    let mut cur_resp = Ok(InvocationResponse::error(
219        &inv,
220        "No middleware invoked the operation",
221    ));
222
223    for m in middlewares.iter() {
224        match m.capability_invoke(inv.clone(), InvocationHandler::new(&invoke_operation)) {
225            Ok(mr) => match mr {
226                MiddlewareResponse::Continue(res) => cur_resp = Ok(res),
227                MiddlewareResponse::Halt(res) => return Ok(res),
228            },
229            Err(e) => return Err(e),
230        }
231    }
232
233    if middlewares.is_empty() {
234        Ok(invoke_operation(inv))
235    } else {
236        cur_resp
237    }
238}
239
240pub(crate) fn run_capability_post_invoke(
241    resp: InvocationResponse,
242    middlewares: &[Box<dyn Middleware>],
243) -> Result<InvocationResponse> {
244    let mut cur_resp = resp;
245    for m in middlewares {
246        match m.capability_post_invoke(cur_resp) {
247            Ok(i) => cur_resp = i.clone(),
248            Err(e) => return Err(e),
249        }
250    }
251    Ok(cur_resp)
252}
253
254#[cfg(test)]
255mod tests {
256    use std::sync::atomic::{AtomicUsize, Ordering};
257
258    use super::Middleware;
259    use crate::inthost::{Invocation, InvocationResponse, WasccEntity};
260    use crate::middleware::{InvocationHandler, MiddlewareResponse};
261    use crate::Result;
262    use wascap::prelude::KeyPair;
263
264    struct IncMiddleware {
265        pre: &'static AtomicUsize,
266        post: &'static AtomicUsize,
267        cap_pre: &'static AtomicUsize,
268        cap_post: &'static AtomicUsize,
269    }
270
271    impl Middleware for IncMiddleware {
272        fn actor_pre_invoke(&self, inv: Invocation) -> Result<Invocation> {
273            self.pre.fetch_add(1, Ordering::SeqCst);
274            Ok(inv)
275        }
276        fn actor_invoke(
277            &self,
278            inv: Invocation,
279            handler: InvocationHandler,
280        ) -> Result<MiddlewareResponse> {
281            Ok(MiddlewareResponse::Continue(handler.invoke(inv)))
282        }
283        fn actor_post_invoke(&self, response: InvocationResponse) -> Result<InvocationResponse> {
284            self.post.fetch_add(1, Ordering::SeqCst);
285            Ok(response)
286        }
287        fn capability_pre_invoke(&self, inv: Invocation) -> Result<Invocation> {
288            self.cap_pre.fetch_add(1, Ordering::SeqCst);
289            Ok(inv)
290        }
291        fn capability_invoke(
292            &self,
293            inv: Invocation,
294            handler: InvocationHandler,
295        ) -> Result<MiddlewareResponse> {
296            Ok(MiddlewareResponse::Continue(handler.invoke(inv)))
297        }
298        fn capability_post_invoke(
299            &self,
300            response: InvocationResponse,
301        ) -> Result<InvocationResponse> {
302            self.cap_post.fetch_add(1, Ordering::SeqCst);
303            Ok(response)
304        }
305    }
306
307    static PRE: AtomicUsize = AtomicUsize::new(0);
308    static POST: AtomicUsize = AtomicUsize::new(0);
309    static CAP_PRE: AtomicUsize = AtomicUsize::new(0);
310    static CAP_POST: AtomicUsize = AtomicUsize::new(0);
311
312    #[test]
313    fn simple_add() {
314        let inc_mid = IncMiddleware {
315            pre: &PRE,
316            post: &POST,
317            cap_pre: &CAP_PRE,
318            cap_post: &CAP_POST,
319        };
320        let hk = KeyPair::new_server();
321
322        let mids: Vec<Box<dyn Middleware>> = vec![Box::new(inc_mid)];
323        let inv = Invocation::new(
324            &hk,
325            WasccEntity::Actor("test".to_string()),
326            WasccEntity::Capability {
327                capid: "testing:sample".to_string(),
328                binding: "default".to_string(),
329            },
330            "testing",
331            b"abc1234".to_vec(),
332        );
333        let res = super::run_actor_pre_invoke(inv.clone(), &mids);
334        assert!(res.is_ok());
335        let res2 = super::run_actor_pre_invoke(inv, &mids);
336        assert!(res2.is_ok());
337        assert_eq!(PRE.fetch_add(0, Ordering::SeqCst), 2);
338    }
339}