1pub mod config;
7pub mod error;
8pub mod grpc;
9pub mod prelude;
10pub mod tls;
11pub mod websocket;
12
13use arc_swap::ArcSwap;
14use bytes::Bytes;
15use dashmap::DashMap;
16use pingora::Error;
17use pingora::http::ResponseHeader;
18use pingora::proxy::Session;
19use pingora::server::Server;
20use std::any::{Any, TypeId};
21use std::collections::HashMap;
22use std::sync::Arc;
23
24pub trait Context: Send + Sync {
29 fn insert<T: Any + Send + Sync>(&self, value: T);
31
32 fn get<T: Any + Send + Sync>(&self) -> Option<Arc<T>>;
34
35 fn remove<T: Any + Send + Sync>(&self) -> Option<Arc<T>>;
37}
38
39#[derive(Clone)]
45pub struct AppContext {
46 data: Arc<ArcSwap<HashMap<TypeId, Arc<dyn Any + Send + Sync>>>>,
47}
48
49impl AppContext {
50 pub fn new() -> Self {
51 Self {
52 data: Arc::new(ArcSwap::from_pointee(HashMap::new())),
53 }
54 }
55}
56
57impl Default for AppContext {
58 fn default() -> Self {
59 Self::new()
60 }
61}
62
63impl Context for AppContext {
64 fn insert<T: Any + Send + Sync>(&self, value: T) {
65 let value = Arc::new(value) as Arc<dyn Any + Send + Sync>;
66 self.data.rcu(move |old| {
67 let mut next = (**old).clone();
68 next.insert(TypeId::of::<T>(), value.clone());
69 next
70 });
71 }
72
73 fn get<T: Any + Send + Sync>(&self) -> Option<Arc<T>> {
74 let data = self.data.load();
75 let value = data.get(&TypeId::of::<T>()).cloned()?;
76 Arc::downcast::<T>(value).ok()
77 }
78
79 fn remove<T: Any + Send + Sync>(&self) -> Option<Arc<T>> {
80 let mut removed: Option<Arc<dyn Any + Send + Sync>> = None;
81 self.data.rcu(|old| {
82 let mut next = (**old).clone();
83 removed = next.remove(&TypeId::of::<T>());
84 next
85 });
86 removed.and_then(|value| Arc::downcast::<T>(value).ok())
87 }
88}
89
90#[derive(Clone)]
95pub struct RequestContext {
96 data: Arc<DashMap<TypeId, Arc<dyn Any + Send + Sync>>>,
97}
98
99impl RequestContext {
100 pub fn new() -> Self {
101 Self {
102 data: Arc::new(DashMap::new()),
103 }
104 }
105}
106
107impl Default for RequestContext {
108 fn default() -> Self {
109 Self::new()
110 }
111}
112
113impl Context for RequestContext {
114 fn insert<T: Any + Send + Sync>(&self, value: T) {
115 let value = Arc::new(value) as Arc<dyn Any + Send + Sync>;
116 self.data.insert(TypeId::of::<T>(), value);
117 }
118
119 fn get<T: Any + Send + Sync>(&self) -> Option<Arc<T>> {
120 let value = self.data.get(&TypeId::of::<T>())?.clone();
121 Arc::downcast::<T>(value).ok()
122 }
123
124 fn remove<T: Any + Send + Sync>(&self) -> Option<Arc<T>> {
125 let (_, value) = self.data.remove(&TypeId::of::<T>())?;
126 Arc::downcast::<T>(value).ok()
127 }
128}
129
130#[async_trait::async_trait]
135pub trait JokowayMiddleware: Send + Sync {
136 type CTX: Send + Sync + 'static;
139
140 fn name(&self) -> &'static str;
142
143 fn new_ctx(&self) -> Self::CTX;
146
147 fn order(&self) -> i16 {
152 0
153 }
154
155 async fn request_filter(
164 &self,
165 _session: &mut Session,
166 _ctx: &mut Self::CTX,
167 _app_ctx: &AppContext,
168 _request_ctx: &RequestContext,
169 ) -> Result<bool, Box<Error>> {
170 Ok(false)
171 }
172
173 async fn upstream_response_filter(
181 &self,
182 _session: &mut Session,
183 _upstream_response: &mut ResponseHeader,
184 _ctx: &mut Self::CTX,
185 _app_ctx: &AppContext,
186 _request_ctx: &RequestContext,
187 ) -> Result<(), Box<Error>> {
188 Ok(())
189 }
190
191 fn response_body_filter(
201 &self,
202 _session: &mut Session,
203 _body: &mut Option<Bytes>,
204 _end_of_stream: bool,
205 _ctx: &mut Self::CTX,
206 _app_ctx: &AppContext,
207 _request_ctx: &RequestContext,
208 ) -> Result<Option<std::time::Duration>, Box<Error>> {
209 Ok(None)
210 }
211
212 async fn request_body_filter(
219 &self,
220 _session: &mut Session,
221 _body: &mut Option<Bytes>,
222 _end_of_stream: bool,
223 _ctx: &mut Self::CTX,
224 _app_ctx: &AppContext,
225 _request_ctx: &RequestContext,
226 ) -> Result<(), Box<Error>> {
227 Ok(())
228 }
229
230 fn on_websocket_message(
236 &self,
237 _direction: crate::websocket::WebsocketDirection,
238 frame: crate::websocket::WsFrame,
239 _ctx: &mut Self::CTX,
240 _app_ctx: &AppContext,
241 _request_ctx: &RequestContext,
242 ) -> crate::websocket::WebsocketMessageAction {
243 crate::websocket::WebsocketMessageAction::Forward(frame)
244 }
245
246 fn on_websocket_error(
252 &self,
253 _direction: crate::websocket::WebsocketDirection,
254 _error: crate::websocket::WebsocketError,
255 _ctx: &mut Self::CTX,
256 _app_ctx: &AppContext,
257 _request_ctx: &RequestContext,
258 ) -> crate::websocket::WebsocketErrorAction {
259 crate::websocket::WebsocketErrorAction::PassThrough
260 }
261
262 fn on_grpc_message(
266 &self,
267 _direction: crate::grpc::GrpcDirection,
268 message: crate::grpc::GrpcMessage,
269 _ctx: &mut Self::CTX,
270 _app_ctx: &AppContext,
271 _request_ctx: &RequestContext,
272 ) -> crate::grpc::GrpcMessageAction {
273 crate::grpc::GrpcMessageAction::Forward(message)
274 }
275}
276
277#[async_trait::async_trait]
279pub trait JokowayMiddlewareDyn: Send + Sync {
280 fn name(&self) -> &'static str;
282
283 fn order(&self) -> i16 {
285 0
286 }
287
288 fn new_ctx_dyn(&self) -> Box<dyn Any + Send + Sync>;
289
290 async fn request_filter_dyn(
291 &self,
292 session: &mut Session,
293 ctx: &mut (dyn Any + Send + Sync),
294 app_ctx: &AppContext,
295 request_ctx: &RequestContext,
296 ) -> Result<bool, Box<Error>>;
297
298 async fn upstream_response_filter_dyn(
299 &self,
300 session: &mut Session,
301 upstream_response: &mut ResponseHeader,
302 ctx: &mut (dyn Any + Send + Sync),
303 app_ctx: &AppContext,
304 request_ctx: &RequestContext,
305 ) -> Result<(), Box<Error>>;
306
307 fn response_body_filter_dyn(
308 &self,
309 session: &mut Session,
310 body: &mut Option<Bytes>,
311 end_of_stream: bool,
312 ctx: &mut (dyn Any + Send + Sync),
313 app_ctx: &AppContext,
314 request_ctx: &RequestContext,
315 ) -> Result<Option<std::time::Duration>, Box<Error>>;
316
317 async fn request_body_filter_dyn(
318 &self,
319 session: &mut Session,
320 body: &mut Option<Bytes>,
321 end_of_stream: bool,
322 ctx: &mut (dyn Any + Send + Sync),
323 app_ctx: &AppContext,
324 request_ctx: &RequestContext,
325 ) -> Result<(), Box<Error>>;
326
327 fn on_websocket_message_dyn(
328 &self,
329 direction: crate::websocket::WebsocketDirection,
330 frame: crate::websocket::WsFrame,
331 ctx: &mut (dyn Any + Send + Sync),
332 app_ctx: &AppContext,
333 request_ctx: &RequestContext,
334 ) -> crate::websocket::WebsocketMessageAction;
335
336 fn on_websocket_error_dyn(
337 &self,
338 direction: crate::websocket::WebsocketDirection,
339 error: crate::websocket::WebsocketError,
340 ctx: &mut (dyn Any + Send + Sync),
341 app_ctx: &AppContext,
342 request_ctx: &RequestContext,
343 ) -> crate::websocket::WebsocketErrorAction;
344
345 fn on_grpc_message_dyn(
346 &self,
347 direction: crate::grpc::GrpcDirection,
348 message: crate::grpc::GrpcMessage,
349 ctx: &mut (dyn Any + Send + Sync),
350 app_ctx: &AppContext,
351 request_ctx: &RequestContext,
352 ) -> crate::grpc::GrpcMessageAction;
353}
354
355#[async_trait::async_trait]
357impl<T: JokowayMiddleware> JokowayMiddlewareDyn for T {
358 fn name(&self) -> &'static str {
359 JokowayMiddleware::name(self)
360 }
361
362 fn order(&self) -> i16 {
363 JokowayMiddleware::order(self)
364 }
365
366 fn new_ctx_dyn(&self) -> Box<dyn Any + Send + Sync> {
367 Box::new(self.new_ctx())
368 }
369
370 async fn request_filter_dyn(
371 &self,
372 session: &mut Session,
373 ctx: &mut (dyn Any + Send + Sync),
374 app_ctx: &AppContext,
375 request_ctx: &RequestContext,
376 ) -> Result<bool, Box<Error>> {
377 let ctx = ctx.downcast_mut::<T::CTX>().ok_or_else(|| {
378 Error::explain(pingora::ErrorType::InternalError, "Invalid context type")
379 })?;
380 self.request_filter(session, ctx, app_ctx, request_ctx)
381 .await
382 }
383
384 async fn upstream_response_filter_dyn(
385 &self,
386 session: &mut Session,
387 upstream_response: &mut ResponseHeader,
388 ctx: &mut (dyn Any + Send + Sync),
389 app_ctx: &AppContext,
390 request_ctx: &RequestContext,
391 ) -> Result<(), Box<Error>> {
392 let ctx = ctx.downcast_mut::<T::CTX>().ok_or_else(|| {
393 Error::explain(pingora::ErrorType::InternalError, "Invalid context type")
394 })?;
395 self.upstream_response_filter(session, upstream_response, ctx, app_ctx, request_ctx)
396 .await
397 }
398
399 fn response_body_filter_dyn(
400 &self,
401 session: &mut Session,
402 body: &mut Option<Bytes>,
403 end_of_stream: bool,
404 ctx: &mut (dyn Any + Send + Sync),
405 app_ctx: &AppContext,
406 request_ctx: &RequestContext,
407 ) -> Result<Option<std::time::Duration>, Box<Error>> {
408 let ctx = ctx.downcast_mut::<T::CTX>().ok_or_else(|| {
409 Error::explain(pingora::ErrorType::InternalError, "Invalid context type")
410 })?;
411 self.response_body_filter(session, body, end_of_stream, ctx, app_ctx, request_ctx)
412 }
413
414 async fn request_body_filter_dyn(
415 &self,
416 session: &mut Session,
417 body: &mut Option<Bytes>,
418 end_of_stream: bool,
419 ctx: &mut (dyn Any + Send + Sync),
420 app_ctx: &AppContext,
421 request_ctx: &RequestContext,
422 ) -> Result<(), Box<Error>> {
423 let ctx = ctx.downcast_mut::<T::CTX>().ok_or_else(|| {
424 Error::explain(pingora::ErrorType::InternalError, "Invalid context type")
425 })?;
426 self.request_body_filter(session, body, end_of_stream, ctx, app_ctx, request_ctx)
427 .await
428 }
429
430 fn on_websocket_message_dyn(
431 &self,
432 direction: crate::websocket::WebsocketDirection,
433 frame: crate::websocket::WsFrame,
434 ctx: &mut (dyn Any + Send + Sync),
435 app_ctx: &AppContext,
436 request_ctx: &RequestContext,
437 ) -> crate::websocket::WebsocketMessageAction {
438 let ctx = ctx
439 .downcast_mut::<T::CTX>()
440 .expect("Invalid context type for JokowayMiddleware");
441 self.on_websocket_message(direction, frame, ctx, app_ctx, request_ctx)
442 }
443
444 fn on_websocket_error_dyn(
445 &self,
446 direction: crate::websocket::WebsocketDirection,
447 error: crate::websocket::WebsocketError,
448 ctx: &mut (dyn Any + Send + Sync),
449 app_ctx: &AppContext,
450 request_ctx: &RequestContext,
451 ) -> crate::websocket::WebsocketErrorAction {
452 let ctx = ctx
453 .downcast_mut::<T::CTX>()
454 .expect("Invalid context type for JokowayMiddleware");
455 self.on_websocket_error(direction, error, ctx, app_ctx, request_ctx)
456 }
457
458 fn on_grpc_message_dyn(
459 &self,
460 direction: crate::grpc::GrpcDirection,
461 message: crate::grpc::GrpcMessage,
462 ctx: &mut (dyn Any + Send + Sync),
463 app_ctx: &AppContext,
464 request_ctx: &RequestContext,
465 ) -> crate::grpc::GrpcMessageAction {
466 let ctx = ctx
467 .downcast_mut::<T::CTX>()
468 .expect("Invalid context type for JokowayMiddleware");
469 self.on_grpc_message(direction, message, ctx, app_ctx, request_ctx)
470 }
471}
472
473pub trait JokowayExtension: Send + Sync {
477 fn order(&self) -> i16 {
482 0
483 }
484
485 fn init(
490 &self,
491 _server: &mut Server,
492 _app_ctx: &mut AppContext,
493 _middlewares: &mut Vec<Arc<dyn JokowayMiddlewareDyn>>,
494 ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
495 Ok(())
496 }
497}