1use crate::Result;
2use crate::{plugins::PluginManager, Invocation, InvocationResponse};
3use std::sync::Arc;
4use std::sync::RwLock;
5use wapc::WapcHost;
6
7#[cfg(feature = "prometheus_middleware")]
8pub mod prometheus;
9
10pub trait Middleware: Send + Sync + 'static {
12 fn actor_pre_invoke(&self, inv: Invocation) -> Result<Invocation>;
13 fn actor_invoke(
14 &self,
15 inv: Invocation,
16 handler: InvocationHandler,
17 ) -> Result<MiddlewareResponse>;
18 fn actor_post_invoke(&self, response: InvocationResponse) -> Result<InvocationResponse>;
19
20 fn capability_pre_invoke(&self, inv: Invocation) -> Result<Invocation>;
21 fn capability_invoke(
22 &self,
23 inv: Invocation,
24 handler: InvocationHandler,
25 ) -> Result<MiddlewareResponse>;
26 fn capability_post_invoke(&self, response: InvocationResponse) -> Result<InvocationResponse>;
27}
28
29pub enum MiddlewareResponse {
30 Continue(InvocationResponse),
31 Halt(InvocationResponse),
32}
33
34pub struct InvocationHandler<'a> {
35 operation: &'a dyn Fn(Invocation) -> InvocationResponse,
36}
37
38impl<'a> InvocationHandler<'a> {
39 fn new(operation: &'a dyn Fn(Invocation) -> InvocationResponse) -> Self {
40 Self { operation }
41 }
42
43 pub fn invoke(&self, inv: Invocation) -> InvocationResponse {
44 (self.operation)(inv)
45 }
46}
47
48pub(crate) fn invoke_native_capability(
50 middlewares: Arc<RwLock<Vec<Box<dyn Middleware>>>>,
51 inv: Invocation,
52 plugins: Arc<RwLock<PluginManager>>,
53) -> Result<InvocationResponse> {
54 let inv = match run_capability_pre_invoke(inv.clone(), &middlewares.read().unwrap()) {
55 Ok(i) => i,
56 Err(e) => {
57 error!("Middleware failure: {}", e);
58 inv
59 }
60 };
61
62 match run_native_capability_invoke(&middlewares.read().unwrap(), &plugins.read().unwrap(), inv)
63 {
64 Ok(response) => {
65 match run_capability_post_invoke(response.clone(), &middlewares.read().unwrap()) {
66 Ok(r) => Ok(r),
67 Err(e) => {
68 error!("Middleware failure: {}", e);
69 Ok(response)
70 }
71 }
72 }
73 Err(e) => Err(e),
74 }
75}
76
77pub(crate) fn invoke_portable_capability(
79 middlewares: Arc<RwLock<Vec<Box<dyn Middleware>>>>,
80 inv: Invocation,
81 guest: &WapcHost,
82) -> Result<InvocationResponse> {
83 let inv = match run_capability_pre_invoke(inv.clone(), &middlewares.read().unwrap()) {
84 Ok(i) => i,
85 Err(e) => {
86 error!("Middleware failure: {}", e);
87 inv
88 }
89 };
90
91 match run_portable_capability_invoke(&middlewares.read().unwrap(), inv, guest) {
92 Ok(response) => {
93 match run_capability_post_invoke(response.clone(), &middlewares.read().unwrap()) {
94 Ok(r) => Ok(r),
95 Err(e) => {
96 error!("Middleware failure: {}", e);
97 Ok(response)
98 }
99 }
100 }
101 Err(e) => Err(e),
102 }
103}
104
105pub(crate) fn invoke_actor(
106 middlewares: Arc<RwLock<Vec<Box<dyn Middleware>>>>,
107 inv: Invocation,
108 guest: &WapcHost,
109) -> Result<InvocationResponse> {
110 let inv = match run_actor_pre_invoke(inv.clone(), &middlewares.read().unwrap()) {
111 Ok(i) => i,
112 Err(e) => {
113 error!("Middleware failure: {}", e);
114 inv
115 }
116 };
117
118 match run_actor_invoke(&middlewares.read().unwrap(), inv, guest) {
119 Ok(response) => {
120 match run_actor_post_invoke(response.clone(), &middlewares.read().unwrap()) {
121 Ok(r) => Ok(r),
122 Err(e) => {
123 error!("Middleware failure: {}", e);
124 Ok(response)
125 }
126 }
127 }
128 Err(e) => Err(e),
129 }
130}
131
132fn run_actor_pre_invoke(
133 inv: Invocation,
134 middlewares: &[Box<dyn Middleware>],
135) -> Result<Invocation> {
136 let mut cur_inv = inv;
137 for m in middlewares {
138 match m.actor_pre_invoke(cur_inv) {
139 Ok(i) => cur_inv = i.clone(),
140 Err(e) => return Err(e),
141 }
142 }
143 Ok(cur_inv)
144}
145
146fn run_actor_invoke(
147 middlewares: &[Box<dyn Middleware>],
148 inv: Invocation,
149 guest: &WapcHost,
150) -> Result<InvocationResponse> {
151 let invoke_operation = |inv: Invocation| match guest.call(&inv.operation, &inv.msg) {
152 Ok(v) => InvocationResponse::success(&inv, v),
153 Err(e) => InvocationResponse::error(&inv, &format!("failed to invoke actor: {}", e)),
154 };
155
156 run_invoke(middlewares, inv, &invoke_operation)
157}
158
159fn run_actor_post_invoke(
160 resp: InvocationResponse,
161 middlewares: &[Box<dyn Middleware>],
162) -> Result<InvocationResponse> {
163 let mut cur_resp = resp;
164 for m in middlewares {
165 match m.actor_post_invoke(cur_resp) {
166 Ok(i) => cur_resp = i.clone(),
167 Err(e) => return Err(e),
168 }
169 }
170 Ok(cur_resp)
171}
172
173pub(crate) fn run_capability_pre_invoke(
174 inv: Invocation,
175 middlewares: &[Box<dyn Middleware>],
176) -> Result<Invocation> {
177 let mut cur_inv = inv;
178 for m in middlewares {
179 match m.capability_pre_invoke(cur_inv) {
180 Ok(i) => cur_inv = i.clone(),
181 Err(e) => return Err(e),
182 }
183 }
184 Ok(cur_inv)
185}
186
187pub(crate) fn run_native_capability_invoke(
188 middlewares: &[Box<dyn Middleware>],
189 plugins: &PluginManager,
190 inv: Invocation,
191) -> Result<InvocationResponse> {
192 let invoke_operation = |inv: Invocation| match plugins.call(&inv) {
193 Ok(r) => r,
194 Err(e) => InvocationResponse::error(&inv, &format!("failed to invoke capability: {}", e)),
195 };
196
197 run_invoke(middlewares, inv, &invoke_operation)
198}
199
200pub(crate) fn run_portable_capability_invoke(
201 middlewares: &[Box<dyn Middleware>],
202 inv: Invocation,
203 guest: &WapcHost,
204) -> Result<InvocationResponse> {
205 let invoke_operation = |inv: Invocation| match guest.call(&inv.operation, &inv.msg) {
206 Ok(v) => InvocationResponse::success(&inv, v),
207 Err(e) => InvocationResponse::error(&inv, &format!("failed to invoke capability: {}", e)),
208 };
209
210 run_invoke(middlewares, inv, &invoke_operation)
211}
212
213fn run_invoke(
214 middlewares: &[Box<dyn Middleware>],
215 inv: Invocation,
216 invoke_operation: &dyn Fn(Invocation) -> InvocationResponse,
217) -> Result<InvocationResponse> {
218 let mut cur_resp = Ok(InvocationResponse::error(
219 &inv,
220 "No middleware invoked the operation",
221 ));
222
223 for m in middlewares.iter() {
224 match m.capability_invoke(inv.clone(), InvocationHandler::new(&invoke_operation)) {
225 Ok(mr) => match mr {
226 MiddlewareResponse::Continue(res) => cur_resp = Ok(res),
227 MiddlewareResponse::Halt(res) => return Ok(res),
228 },
229 Err(e) => return Err(e),
230 }
231 }
232
233 if middlewares.is_empty() {
234 Ok(invoke_operation(inv))
235 } else {
236 cur_resp
237 }
238}
239
240pub(crate) fn run_capability_post_invoke(
241 resp: InvocationResponse,
242 middlewares: &[Box<dyn Middleware>],
243) -> Result<InvocationResponse> {
244 let mut cur_resp = resp;
245 for m in middlewares {
246 match m.capability_post_invoke(cur_resp) {
247 Ok(i) => cur_resp = i.clone(),
248 Err(e) => return Err(e),
249 }
250 }
251 Ok(cur_resp)
252}
253
254#[cfg(test)]
255mod tests {
256 use std::sync::atomic::{AtomicUsize, Ordering};
257
258 use super::Middleware;
259 use crate::inthost::{Invocation, InvocationResponse, WasccEntity};
260 use crate::middleware::{InvocationHandler, MiddlewareResponse};
261 use crate::Result;
262 use wascap::prelude::KeyPair;
263
264 struct IncMiddleware {
265 pre: &'static AtomicUsize,
266 post: &'static AtomicUsize,
267 cap_pre: &'static AtomicUsize,
268 cap_post: &'static AtomicUsize,
269 }
270
271 impl Middleware for IncMiddleware {
272 fn actor_pre_invoke(&self, inv: Invocation) -> Result<Invocation> {
273 self.pre.fetch_add(1, Ordering::SeqCst);
274 Ok(inv)
275 }
276 fn actor_invoke(
277 &self,
278 inv: Invocation,
279 handler: InvocationHandler,
280 ) -> Result<MiddlewareResponse> {
281 Ok(MiddlewareResponse::Continue(handler.invoke(inv)))
282 }
283 fn actor_post_invoke(&self, response: InvocationResponse) -> Result<InvocationResponse> {
284 self.post.fetch_add(1, Ordering::SeqCst);
285 Ok(response)
286 }
287 fn capability_pre_invoke(&self, inv: Invocation) -> Result<Invocation> {
288 self.cap_pre.fetch_add(1, Ordering::SeqCst);
289 Ok(inv)
290 }
291 fn capability_invoke(
292 &self,
293 inv: Invocation,
294 handler: InvocationHandler,
295 ) -> Result<MiddlewareResponse> {
296 Ok(MiddlewareResponse::Continue(handler.invoke(inv)))
297 }
298 fn capability_post_invoke(
299 &self,
300 response: InvocationResponse,
301 ) -> Result<InvocationResponse> {
302 self.cap_post.fetch_add(1, Ordering::SeqCst);
303 Ok(response)
304 }
305 }
306
307 static PRE: AtomicUsize = AtomicUsize::new(0);
308 static POST: AtomicUsize = AtomicUsize::new(0);
309 static CAP_PRE: AtomicUsize = AtomicUsize::new(0);
310 static CAP_POST: AtomicUsize = AtomicUsize::new(0);
311
312 #[test]
313 fn simple_add() {
314 let inc_mid = IncMiddleware {
315 pre: &PRE,
316 post: &POST,
317 cap_pre: &CAP_PRE,
318 cap_post: &CAP_POST,
319 };
320 let hk = KeyPair::new_server();
321
322 let mids: Vec<Box<dyn Middleware>> = vec![Box::new(inc_mid)];
323 let inv = Invocation::new(
324 &hk,
325 WasccEntity::Actor("test".to_string()),
326 WasccEntity::Capability {
327 capid: "testing:sample".to_string(),
328 binding: "default".to_string(),
329 },
330 "testing",
331 b"abc1234".to_vec(),
332 );
333 let res = super::run_actor_pre_invoke(inv.clone(), &mids);
334 assert!(res.is_ok());
335 let res2 = super::run_actor_pre_invoke(inv, &mids);
336 assert!(res2.is_ok());
337 assert_eq!(PRE.fetch_add(0, Ordering::SeqCst), 2);
338 }
339}