Skip to main content

jokoway_core/
lib.rs

1//! Core traits and types for Jokoway API Gateway
2//!
3//! This crate provides the fundamental traits that extension developers
4//! need to implement to create middlewares and extensions for Jokoway.
5
6pub mod config;
7pub mod error;
8pub mod grpc;
9pub mod prelude;
10pub mod tls;
11pub mod websocket;
12
13use arc_swap::ArcSwap;
14use bytes::Bytes;
15use dashmap::DashMap;
16use pingora::Error;
17use pingora::http::ResponseHeader;
18use pingora::proxy::Session;
19use pingora::server::Server;
20use std::any::{Any, TypeId};
21use std::collections::HashMap;
22use std::sync::Arc;
23
24/// Shared interface for type-safe, heterogeneous key-value storage.
25///
26/// Both [`AppContext`] and [`RequestContext`] implement this trait,
27/// allowing generic helper functions to work with either context type.
28pub trait Context: Send + Sync {
29    /// Insert a value of any type into the context.
30    fn insert<T: Any + Send + Sync>(&self, value: T);
31
32    /// Retrieve a value by its type from the context.
33    fn get<T: Any + Send + Sync>(&self) -> Option<Arc<T>>;
34
35    /// Remove and return a value by its type from the context.
36    fn remove<T: Any + Send + Sync>(&self) -> Option<Arc<T>>;
37}
38
39/// Global application context, shared across all requests.
40///
41/// Uses [`ArcSwap`] internally for lock-free concurrent reads,
42/// making it ideal for configuration or state that is written rarely
43/// (e.g., at startup) but read on every request from many threads.
44#[derive(Clone)]
45pub struct AppContext {
46    data: Arc<ArcSwap<HashMap<TypeId, Arc<dyn Any + Send + Sync>>>>,
47}
48
49impl AppContext {
50    pub fn new() -> Self {
51        Self {
52            data: Arc::new(ArcSwap::from_pointee(HashMap::new())),
53        }
54    }
55}
56
57impl Default for AppContext {
58    fn default() -> Self {
59        Self::new()
60    }
61}
62
63impl Context for AppContext {
64    fn insert<T: Any + Send + Sync>(&self, value: T) {
65        let value = Arc::new(value) as Arc<dyn Any + Send + Sync>;
66        self.data.rcu(move |old| {
67            let mut next = (**old).clone();
68            next.insert(TypeId::of::<T>(), value.clone());
69            next
70        });
71    }
72
73    fn get<T: Any + Send + Sync>(&self) -> Option<Arc<T>> {
74        let data = self.data.load();
75        let value = data.get(&TypeId::of::<T>()).cloned()?;
76        Arc::downcast::<T>(value).ok()
77    }
78
79    fn remove<T: Any + Send + Sync>(&self) -> Option<Arc<T>> {
80        let mut removed: Option<Arc<dyn Any + Send + Sync>> = None;
81        self.data.rcu(|old| {
82            let mut next = (**old).clone();
83            removed = next.remove(&TypeId::of::<T>());
84            next
85        });
86        removed.and_then(|value| Arc::downcast::<T>(value).ok())
87    }
88}
89
90/// Per-request context for sharing state between middlewares within a single request lifecycle.
91///
92/// Uses [`DashMap`] internally for fast, zero-clone inserts and removes.
93/// Created fresh for every HTTP session and dropped when the request completes.
94#[derive(Clone)]
95pub struct RequestContext {
96    data: Arc<DashMap<TypeId, Arc<dyn Any + Send + Sync>>>,
97}
98
99impl RequestContext {
100    pub fn new() -> Self {
101        Self {
102            data: Arc::new(DashMap::new()),
103        }
104    }
105}
106
107impl Default for RequestContext {
108    fn default() -> Self {
109        Self::new()
110    }
111}
112
113impl Context for RequestContext {
114    fn insert<T: Any + Send + Sync>(&self, value: T) {
115        let value = Arc::new(value) as Arc<dyn Any + Send + Sync>;
116        self.data.insert(TypeId::of::<T>(), value);
117    }
118
119    fn get<T: Any + Send + Sync>(&self) -> Option<Arc<T>> {
120        let value = self.data.get(&TypeId::of::<T>())?.clone();
121        Arc::downcast::<T>(value).ok()
122    }
123
124    fn remove<T: Any + Send + Sync>(&self) -> Option<Arc<T>> {
125        let (_, value) = self.data.remove(&TypeId::of::<T>())?;
126        Arc::downcast::<T>(value).ok()
127    }
128}
129
130/// Middleware trait for processing requests and responses.
131///
132/// Middlewares can inspect and modify requests before they reach the upstream,
133/// and responses before they are sent to the client.
134#[async_trait::async_trait]
135pub trait JokowayMiddleware: Send + Sync {
136    /// Per-middleware context type, instantiated once per request lifecycle
137    /// to hold state across different filtering phases.
138    type CTX: Send + Sync + 'static;
139
140    /// The name of the middleware, used for identification and debugging.
141    fn name(&self) -> &'static str;
142
143    /// Create a new context instance for this middleware.
144    /// This is called early and the context is passed to all subsequent filter hooks for a given request.
145    fn new_ctx(&self) -> Self::CTX;
146
147    /// The execution order of the middleware.
148    ///
149    /// The higher the value, the earlier it runs relative to other middlewares in the chain.
150    /// Default is 0. Middlewares with the same order value are executed in the order they were registered.
151    fn order(&self) -> i16 {
152        0
153    }
154
155    /// Invoked after receiving the client's request headers, but before routing or connecting to the upstream.
156    ///
157    /// This hook is ideal for authentication, request block-listing, rate limiting, and
158    /// modifying request headers before the router processes them.
159    ///
160    /// Return `Ok(true)` if the middleware has fully handled the request (e.g., sent an early response)
161    /// and further proxy processing should be aborted.
162    /// Return `Ok(false)` to continue to the next middleware and eventual routing.
163    async fn request_filter(
164        &self,
165        _session: &mut Session,
166        _ctx: &mut Self::CTX,
167        _app_ctx: &AppContext,
168        _request_ctx: &RequestContext,
169    ) -> Result<bool, Box<Error>> {
170        Ok(false)
171    }
172
173    /// Invoked after receiving the HTTP response headers from the upstream server,
174    /// but before they are forwarded to the downstream client.
175    ///
176    /// Useful for modifying response headers (e.g., injecting security headers, CORS),
177    /// inspecting the status code, or logging the upstream response details.
178    ///
179    /// *Note: This is currently bypassed for upgraded WebSocket connections (101 Switching Protocols).*
180    async fn upstream_response_filter(
181        &self,
182        _session: &mut Session,
183        _upstream_response: &mut ResponseHeader,
184        _ctx: &mut Self::CTX,
185        _app_ctx: &AppContext,
186        _request_ctx: &RequestContext,
187    ) -> Result<(), Box<Error>> {
188        Ok(())
189    }
190
191    /// Invoked for each chunk of the response body streamed from the upstream to the client.
192    ///
193    /// Allows inspecting or mutating response body chunks before they reach the client.
194    /// If there is no more body to process, `_end_of_stream` will be true.
195    ///
196    /// Return `Ok(Some(duration))` if you want to record the time spent processing this chunk
197    /// in the metrics/logs, or `Ok(None)` otherwise.
198    ///
199    /// *Note: This hook is not invoked for WebSocket traffic. Use `on_websocket_message` instead.*
200    fn response_body_filter(
201        &self,
202        _session: &mut Session,
203        _body: &mut Option<Bytes>,
204        _end_of_stream: bool,
205        _ctx: &mut Self::CTX,
206        _app_ctx: &AppContext,
207        _request_ctx: &RequestContext,
208    ) -> Result<Option<std::time::Duration>, Box<Error>> {
209        Ok(None)
210    }
211
212    /// Invoked for each chunk of the request body streamed from the client to the upstream.
213    ///
214    /// Allows inspecting, buffering, or mutating request body chunks (e.g., parsing JSON validation)
215    /// before they reach the upstream server. If there is no more body, `_end_of_stream` will be true.
216    ///
217    /// *Note: This hook is not invoked for WebSocket traffic. Use `on_websocket_message` instead.*
218    async fn request_body_filter(
219        &self,
220        _session: &mut Session,
221        _body: &mut Option<Bytes>,
222        _end_of_stream: bool,
223        _ctx: &mut Self::CTX,
224        _app_ctx: &AppContext,
225        _request_ctx: &RequestContext,
226    ) -> Result<(), Box<Error>> {
227        Ok(())
228    }
229
230    /// Invoked whenever a discrete, fully-parsed WebSocket frame is intercepted.
231    ///
232    /// This hook operates on both directions of the WebSocket connection, distinguished by the `_direction` parameter.
233    /// Middlewares can inspect the payload, modify the message, drop the frame silently, or
234    /// close the connection altogether using the returned `WebsocketMessageAction`.
235    fn on_websocket_message(
236        &self,
237        _direction: crate::websocket::WebsocketDirection,
238        frame: crate::websocket::WsFrame,
239        _ctx: &mut Self::CTX,
240        _app_ctx: &AppContext,
241        _request_ctx: &RequestContext,
242    ) -> crate::websocket::WebsocketMessageAction {
243        crate::websocket::WebsocketMessageAction::Forward(frame)
244    }
245
246    /// Invoked when an error occurs while parsing raw stream bytes into WebSocket frames.
247    ///
248    /// This provides a facility for dealing with malformed or invalid WebSocket data.
249    /// The middleware can decide to pass the raw unparsed bytes through unmodified, drop the invalid
250    /// data, or force close the WebSocket connection.
251    fn on_websocket_error(
252        &self,
253        _direction: crate::websocket::WebsocketDirection,
254        _error: crate::websocket::WebsocketError,
255        _ctx: &mut Self::CTX,
256        _app_ctx: &AppContext,
257        _request_ctx: &RequestContext,
258    ) -> crate::websocket::WebsocketErrorAction {
259        crate::websocket::WebsocketErrorAction::PassThrough
260    }
261
262    /// Invoked whenever a discrete, fully-parsed gRPC message is intercepted.
263    ///
264    /// This hook operates on both directions of the gRPC connection, distinguished by the `_direction` parameter.
265    fn on_grpc_message(
266        &self,
267        _direction: crate::grpc::GrpcDirection,
268        message: crate::grpc::GrpcMessage,
269        _ctx: &mut Self::CTX,
270        _app_ctx: &AppContext,
271        _request_ctx: &RequestContext,
272    ) -> crate::grpc::GrpcMessageAction {
273        crate::grpc::GrpcMessageAction::Forward(message)
274    }
275}
276
277/// Dynamic dispatch version of JokowayMiddleware for trait objects
278#[async_trait::async_trait]
279pub trait JokowayMiddlewareDyn: Send + Sync {
280    /// The name of the middleware
281    fn name(&self) -> &'static str;
282
283    /// The order the middleware will run
284    fn order(&self) -> i16 {
285        0
286    }
287
288    fn new_ctx_dyn(&self) -> Box<dyn Any + Send + Sync>;
289
290    async fn request_filter_dyn(
291        &self,
292        session: &mut Session,
293        ctx: &mut (dyn Any + Send + Sync),
294        app_ctx: &AppContext,
295        request_ctx: &RequestContext,
296    ) -> Result<bool, Box<Error>>;
297
298    async fn upstream_response_filter_dyn(
299        &self,
300        session: &mut Session,
301        upstream_response: &mut ResponseHeader,
302        ctx: &mut (dyn Any + Send + Sync),
303        app_ctx: &AppContext,
304        request_ctx: &RequestContext,
305    ) -> Result<(), Box<Error>>;
306
307    fn response_body_filter_dyn(
308        &self,
309        session: &mut Session,
310        body: &mut Option<Bytes>,
311        end_of_stream: bool,
312        ctx: &mut (dyn Any + Send + Sync),
313        app_ctx: &AppContext,
314        request_ctx: &RequestContext,
315    ) -> Result<Option<std::time::Duration>, Box<Error>>;
316
317    async fn request_body_filter_dyn(
318        &self,
319        session: &mut Session,
320        body: &mut Option<Bytes>,
321        end_of_stream: bool,
322        ctx: &mut (dyn Any + Send + Sync),
323        app_ctx: &AppContext,
324        request_ctx: &RequestContext,
325    ) -> Result<(), Box<Error>>;
326
327    fn on_websocket_message_dyn(
328        &self,
329        direction: crate::websocket::WebsocketDirection,
330        frame: crate::websocket::WsFrame,
331        ctx: &mut (dyn Any + Send + Sync),
332        app_ctx: &AppContext,
333        request_ctx: &RequestContext,
334    ) -> crate::websocket::WebsocketMessageAction;
335
336    fn on_websocket_error_dyn(
337        &self,
338        direction: crate::websocket::WebsocketDirection,
339        error: crate::websocket::WebsocketError,
340        ctx: &mut (dyn Any + Send + Sync),
341        app_ctx: &AppContext,
342        request_ctx: &RequestContext,
343    ) -> crate::websocket::WebsocketErrorAction;
344
345    fn on_grpc_message_dyn(
346        &self,
347        direction: crate::grpc::GrpcDirection,
348        message: crate::grpc::GrpcMessage,
349        ctx: &mut (dyn Any + Send + Sync),
350        app_ctx: &AppContext,
351        request_ctx: &RequestContext,
352    ) -> crate::grpc::GrpcMessageAction;
353}
354
355/// Blanket implementation for all JokowayMiddleware types
356#[async_trait::async_trait]
357impl<T: JokowayMiddleware> JokowayMiddlewareDyn for T {
358    fn name(&self) -> &'static str {
359        JokowayMiddleware::name(self)
360    }
361
362    fn order(&self) -> i16 {
363        JokowayMiddleware::order(self)
364    }
365
366    fn new_ctx_dyn(&self) -> Box<dyn Any + Send + Sync> {
367        Box::new(self.new_ctx())
368    }
369
370    async fn request_filter_dyn(
371        &self,
372        session: &mut Session,
373        ctx: &mut (dyn Any + Send + Sync),
374        app_ctx: &AppContext,
375        request_ctx: &RequestContext,
376    ) -> Result<bool, Box<Error>> {
377        let ctx = ctx.downcast_mut::<T::CTX>().ok_or_else(|| {
378            Error::explain(pingora::ErrorType::InternalError, "Invalid context type")
379        })?;
380        self.request_filter(session, ctx, app_ctx, request_ctx)
381            .await
382    }
383
384    async fn upstream_response_filter_dyn(
385        &self,
386        session: &mut Session,
387        upstream_response: &mut ResponseHeader,
388        ctx: &mut (dyn Any + Send + Sync),
389        app_ctx: &AppContext,
390        request_ctx: &RequestContext,
391    ) -> Result<(), Box<Error>> {
392        let ctx = ctx.downcast_mut::<T::CTX>().ok_or_else(|| {
393            Error::explain(pingora::ErrorType::InternalError, "Invalid context type")
394        })?;
395        self.upstream_response_filter(session, upstream_response, ctx, app_ctx, request_ctx)
396            .await
397    }
398
399    fn response_body_filter_dyn(
400        &self,
401        session: &mut Session,
402        body: &mut Option<Bytes>,
403        end_of_stream: bool,
404        ctx: &mut (dyn Any + Send + Sync),
405        app_ctx: &AppContext,
406        request_ctx: &RequestContext,
407    ) -> Result<Option<std::time::Duration>, Box<Error>> {
408        let ctx = ctx.downcast_mut::<T::CTX>().ok_or_else(|| {
409            Error::explain(pingora::ErrorType::InternalError, "Invalid context type")
410        })?;
411        self.response_body_filter(session, body, end_of_stream, ctx, app_ctx, request_ctx)
412    }
413
414    async fn request_body_filter_dyn(
415        &self,
416        session: &mut Session,
417        body: &mut Option<Bytes>,
418        end_of_stream: bool,
419        ctx: &mut (dyn Any + Send + Sync),
420        app_ctx: &AppContext,
421        request_ctx: &RequestContext,
422    ) -> Result<(), Box<Error>> {
423        let ctx = ctx.downcast_mut::<T::CTX>().ok_or_else(|| {
424            Error::explain(pingora::ErrorType::InternalError, "Invalid context type")
425        })?;
426        self.request_body_filter(session, body, end_of_stream, ctx, app_ctx, request_ctx)
427            .await
428    }
429
430    fn on_websocket_message_dyn(
431        &self,
432        direction: crate::websocket::WebsocketDirection,
433        frame: crate::websocket::WsFrame,
434        ctx: &mut (dyn Any + Send + Sync),
435        app_ctx: &AppContext,
436        request_ctx: &RequestContext,
437    ) -> crate::websocket::WebsocketMessageAction {
438        let ctx = ctx
439            .downcast_mut::<T::CTX>()
440            .expect("Invalid context type for JokowayMiddleware");
441        self.on_websocket_message(direction, frame, ctx, app_ctx, request_ctx)
442    }
443
444    fn on_websocket_error_dyn(
445        &self,
446        direction: crate::websocket::WebsocketDirection,
447        error: crate::websocket::WebsocketError,
448        ctx: &mut (dyn Any + Send + Sync),
449        app_ctx: &AppContext,
450        request_ctx: &RequestContext,
451    ) -> crate::websocket::WebsocketErrorAction {
452        let ctx = ctx
453            .downcast_mut::<T::CTX>()
454            .expect("Invalid context type for JokowayMiddleware");
455        self.on_websocket_error(direction, error, ctx, app_ctx, request_ctx)
456    }
457
458    fn on_grpc_message_dyn(
459        &self,
460        direction: crate::grpc::GrpcDirection,
461        message: crate::grpc::GrpcMessage,
462        ctx: &mut (dyn Any + Send + Sync),
463        app_ctx: &AppContext,
464        request_ctx: &RequestContext,
465    ) -> crate::grpc::GrpcMessageAction {
466        let ctx = ctx
467            .downcast_mut::<T::CTX>()
468            .expect("Invalid context type for JokowayMiddleware");
469        self.on_grpc_message(direction, message, ctx, app_ctx, request_ctx)
470    }
471}
472
473/// Extension trait for adding custom functionality to Jokoway
474///
475/// Extensions can add background services, modify server configuration, etc.
476pub trait JokowayExtension: Send + Sync {
477    /// The order the extension will run
478    ///
479    /// The higher the value, the earlier it runs relative to other extensions.
480    /// If the order of the extension is not important, leave it to the default 0.
481    fn order(&self) -> i16 {
482        0
483    }
484
485    /// Called during server bootstrap to add background services etc.
486    ///
487    /// Note: This uses `dyn Any` for app_ctx to avoid circular dependencies.
488    /// Extensions should downcast to the concrete Context type.
489    fn init(
490        &self,
491        _server: &mut Server,
492        _app_ctx: &mut AppContext,
493        _middlewares: &mut Vec<Arc<dyn JokowayMiddlewareDyn>>,
494    ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
495        Ok(())
496    }
497}