spacegate_kernel/
lib.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
//! # Spacegate kernel crate.
//!
//! This crate provides the core functionality of spacegate.

#![deny(clippy::unwrap_used, clippy::dbg_macro, clippy::unimplemented, clippy::todo, clippy::missing_safety_doc)]
#![warn(
    clippy::missing_errors_doc,
    clippy::indexing_slicing,
    clippy::inline_always,
    clippy::fn_params_excessive_bools,
    missing_debug_implementations
)]
/// https services, ws services, and static file services.
pub mod backend_service;
/// a boxed body
pub mod body;
/// extensions for request and response
pub mod extension;
/// extractors for request
pub mod extractor;
/// helper layers
pub mod helper_layers;
/// tcp listener
pub mod listener;
/// gateway service
pub mod service;
/// util functions and structs
pub mod utils;

pub use backend_service::ArcHyperService;
pub use body::SgBody;
use extension::Reflect;
pub use extractor::Extract;
use hyper::{body::Bytes, Request, Response, StatusCode};
use std::{convert::Infallible, fmt};
pub use tokio_util::sync::CancellationToken;
pub use tower_layer::Layer;

use tower_layer::layer_fn;

pub type BoxResult<T> = Result<T, BoxError>;
/// A boxed error.
pub type BoxError = Box<dyn std::error::Error + Send + Sync + 'static>;

/// Alias for a request with a boxed body.
pub type SgRequest = Request<SgBody>;
/// Alias for a response with a boxed body.
pub type SgResponse = Response<SgBody>;

/// Provides extension methods for [`Request`](hyper::Request).
pub trait SgRequestExt {
    fn with_reflect(&mut self);
    fn reflect_mut(&mut self) -> &mut Reflect;
    fn reflect(&self) -> &Reflect;
    #[cfg(feature = "ext-redis")]
    fn get_redis_client_by_gateway_name(&self) -> Option<spacegate_ext_redis::RedisClient>;
    fn extract<M: Extract>(&self) -> M;
    fn defer_call<F>(&mut self, f: F)
    where
        F: FnOnce(SgRequest) -> SgRequest + Send + 'static;
}

impl SgRequestExt for SgRequest {
    /// Get a mutable reference to the reflect extension.
    ///
    /// # Panics
    /// Panics if the reflect extension is not found.
    /// If you are using a request created by spacegate, this should never happen.
    fn reflect_mut(&mut self) -> &mut Reflect {
        self.extensions_mut().get_mut::<Reflect>().expect("reflect extension not found")
    }
    /// Get a reference to the reflect extension.
    ///
    /// # Panics
    /// Panics if the reflect extension is not found.
    /// If you are using a request created by spacegate, this should never happen.
    fn reflect(&self) -> &Reflect {
        self.extensions().get::<Reflect>().expect("reflect extension not found")
    }
    /// Add a reflect extension to the request if it does not exist.
    fn with_reflect(&mut self) {
        if self.extensions().get::<Reflect>().is_none() {
            self.extensions_mut().insert(Reflect::new());
        }
    }

    #[cfg(feature = "ext-redis")]
    /// Get a redis client by the [`extension::GatewayName`], which would exist once the request had entered some gateway.
    fn get_redis_client_by_gateway_name(&self) -> Option<spacegate_ext_redis::RedisClient> {
        self.extensions().get::<extension::GatewayName>().and_then(|gateway_name| spacegate_ext_redis::RedisClientRepo::global().get(gateway_name))
    }

    /// Extract a value from the request.
    fn extract<M: Extract>(&self) -> M {
        M::extract(self)
    }

    fn defer_call<F>(&mut self, f: F)
    where
        F: FnOnce(SgRequest) -> SgRequest + Send + 'static,
    {
        let defer = self.extensions_mut().get_or_insert_default::<extension::Defer>();
        defer.push_back(f);
    }
}

/// Provides extension methods for [`Response`](hyper::Response).
pub trait SgResponseExt {
    fn with_code_message(code: StatusCode, message: impl Into<Bytes>) -> Self;
    fn bad_gateway<E: std::error::Error>(e: E) -> Self
    where
        Self: Sized,
    {
        let message = e.to_string();
        let src = e.source();
        let message = if let Some(src) = src { format!("{}:\n {}", message, src) } else { message };
        Self::with_code_message(StatusCode::BAD_GATEWAY, message)
    }
}

impl SgResponseExt for Response<SgBody> {
    fn with_code_message(code: StatusCode, message: impl Into<Bytes>) -> Self {
        let body = SgBody::full(message);
        let mut resp = Response::builder().status(code).body(body).expect("response builder error");
        resp.extensions_mut().insert(Reflect::new());
        resp
    }
}

/// A boxed [`Layer`] that can be used as a plugin layer in gateway.
pub struct BoxLayer {
    boxed: Box<dyn Layer<ArcHyperService, Service = ArcHyperService> + Send + Sync + 'static>,
}

impl BoxLayer {
    /// Create a new [`BoxLayer`].
    pub fn new<L>(inner_layer: L) -> Self
    where
        L: Layer<ArcHyperService> + Send + Sync + 'static,
        L::Service: Clone + hyper::service::Service<Request<SgBody>, Response = Response<SgBody>, Error = Infallible> + Send + Sync + 'static,
        <L::Service as hyper::service::Service<Request<SgBody>>>::Future: Send + 'static,
    {
        let layer = layer_fn(move |inner: ArcHyperService| {
            let out = inner_layer.layer(inner);
            ArcHyperService::new(out)
        });

        Self { boxed: Box::new(layer) }
    }

    /// Create a new [`BoxLayer`] with an arc wrapped layer.
    #[must_use]
    pub fn layer_shared(&self, inner: ArcHyperService) -> ArcHyperService {
        self.boxed.layer(inner)
    }
}

impl<S> Layer<S> for BoxLayer
where
    S: hyper::service::Service<Request<SgBody>, Response = Response<SgBody>, Error = Infallible> + Send + Sync + 'static,
    <S as hyper::service::Service<hyper::Request<SgBody>>>::Future: std::marker::Send,
{
    type Service = ArcHyperService;

    fn layer(&self, inner: S) -> Self::Service {
        self.boxed.layer(ArcHyperService::new(inner))
    }
}

impl fmt::Debug for BoxLayer {
    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
        fmt.debug_struct("BoxLayer").finish()
    }
}