spacegate_plugin/
lib.rs

1#![deny(clippy::unwrap_used, clippy::dbg_macro, clippy::unimplemented, clippy::todo, clippy::inline_always)]
2use std::{
3    any::Any,
4    borrow::Cow,
5    collections::{HashMap, HashSet},
6    fmt::Debug,
7    sync::{Arc, OnceLock, RwLock},
8};
9
10use futures_util::{future::BoxFuture, Future};
11use hyper::{Request, Response};
12use instance::{PluginInstance, PluginInstanceSnapshot};
13use layer::{InnerBoxPf, PluginFunction};
14use mount::{MountPoint, MountPointIndex};
15use serde::{Deserialize, Serialize};
16pub use serde_json;
17pub use serde_json::{Error as SerdeJsonError, Value as JsonValue};
18pub use spacegate_kernel::helper_layers::function::Inner;
19pub use spacegate_kernel::BoxError;
20pub use spacegate_kernel::BoxLayer;
21pub use spacegate_kernel::{SgBody, SgRequest, SgRequestExt, SgResponse, SgResponseExt};
22pub mod error;
23pub mod instance;
24pub mod model;
25pub mod mount;
26pub use error::PluginError;
27#[cfg(feature = "dylib")]
28pub mod dynamic;
29pub mod ext;
30pub mod layer;
31pub mod plugins;
32#[cfg(feature = "schema")]
33pub use schemars;
34pub use spacegate_model;
35pub use spacegate_model::{plugin_meta, PluginAttributes, PluginConfig, PluginInstanceId, PluginInstanceMap, PluginInstanceName, PluginMetaData};
36/// # Plugin Trait
37/// It's a easy way to define a plugin through this trait.
38/// You should give a unique [`code`](Plugin::CODE) for the plugin,
39/// and implement the [`call`](Plugin::call) function and the [`create`](Plugin::create) function.
40///
41/// # Example
42/// In the follow example, we add a server header for each response.
43/// ```rust
44/// # use spacegate_plugin::{Plugin, SgRequest, SgResponse, Inner, BoxError, PluginConfig};
45/// pub struct ServerHeaderPlugin {
46///     header_value: String,
47/// }
48///
49/// impl Plugin for ServerHeaderPlugin {
50///     const CODE: &'static str = "server-header";
51///     async fn call(&self, req: SgRequest, inner: Inner) -> Result<SgResponse, BoxError> {
52///         let mut resp = inner.call(req).await;    
53///         resp.headers_mut().insert("server", self.header_value.parse()?);
54///         Ok(resp)
55///     }
56///     fn create(plugin_config: PluginConfig) -> Result<Self, BoxError> {
57///         let Some(header_value) = plugin_config.spec.get("header_value") else {
58///             return Err("missing header_value".into())
59///         };
60///         Ok(Self {
61///            header_value: header_value.as_str().unwrap_or("spacegate").to_string(),
62///         })
63///     }
64/// }
65/// ```
66pub trait Plugin: Any + Sized + Send + Sync {
67    /// Plugin code, it should be unique repository-wise.
68    ///
69    /// It's **recommended** to use a **kebab-case** string, witch would make k8s happy.
70    const CODE: &'static str;
71    /// is this plugin mono instance
72    const MONO: bool = false;
73    fn meta() -> PluginMetaData {
74        PluginMetaData::default()
75    }
76    /// This function will be called when the plugin is invoked.
77    ///
78    /// The error will be wrapped with a response with status code 500, and the error message will be response's body.
79    ///
80    /// If you want to return a custom response, wrap it with `Ok` and return it.
81    ///
82    /// If you want to return a error response with other status code, use `PluginError::new` to create a error response, and wrap
83    /// it with `Ok`.
84    fn call(&self, req: SgRequest, inner: Inner) -> impl Future<Output = Result<SgResponse, BoxError>> + Send;
85    fn create(plugin_config: PluginConfig) -> Result<Self, BoxError>;
86    fn create_by_spec(spec: JsonValue, name: PluginInstanceName) -> Result<Self, BoxError> {
87        Self::create(PluginConfig {
88            id: PluginInstanceId { code: Self::CODE.into(), name },
89            spec,
90        })
91    }
92    /// Register the plugin to the repository.
93    ///
94    /// You can also register axum server route here.
95    fn register(repo: &PluginRepository) {
96        repo.plugins.write().expect("SgPluginRepository register error").insert(Self::CODE.into(), PluginDefinitionObject::from_trait::<Self>());
97    }
98
99    #[cfg(feature = "schema")]
100    /// Return the schema of the plugin config.
101    fn schema_opt() -> Option<schemars::schema::RootSchema> {
102        None
103    }
104}
105
106/// Plugin Trait Object
107///
108/// Firstly, a [`PluginInstance`] will be created when the plugin is loading.
109///
110/// Then, a [`BoxLayer`] will be created when the plugin is being mounted to a certain mount point.
111///
112/// [`PluginDefinitionObject`] -> [`PluginDefinitionObject::make_instance`] -> [`PluginInstance`]
113/// -> [`PluginInstance::make`] -> [`BoxLayer`]
114///
115pub struct PluginDefinitionObject {
116    /// should this plugin be a singleton?
117    pub mono: bool,
118    /// Plugin code
119    pub code: Cow<'static, str>,
120    /// Plugin meta data, which is just for display and debug information
121    pub meta: PluginMetaData,
122    /// Plugin config json schema
123    #[cfg(feature = "schema")]
124    pub schema: Option<schemars::schema::RootSchema>,
125    /// Plugin Function maker
126    pub make_pf: Box<MakePfMethod>,
127}
128
129impl Debug for PluginDefinitionObject {
130    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
131        let mut formatter = f.debug_struct("PluginAttributes");
132        formatter.field("mono", &self.mono).field("code", &self.code).field("meta", &self.meta);
133        #[cfg(feature = "schema")]
134        {
135            formatter.field("schema", &self.schema.is_some());
136        }
137
138        formatter.finish()
139    }
140}
141
142#[derive(Debug, Default, Serialize, Deserialize, Clone)]
143pub struct PluginRepoSnapshot {
144    pub mono: bool,
145    pub code: Cow<'static, str>,
146    pub meta: PluginMetaData,
147    pub instances: HashMap<String, PluginInstanceSnapshot>,
148}
149
150impl PluginDefinitionObject {
151    pub fn attr(&self) -> PluginAttributes {
152        PluginAttributes {
153            mono: self.mono,
154            code: self.code.clone(),
155            meta: self.meta.clone(),
156        }
157    }
158    /// Make a plugin trait object from [`Plugin`] Trait
159    pub fn from_trait<P: Plugin>() -> Self {
160        let constructor = move |config: PluginConfig| {
161            let plugin = Arc::new(P::create(config)?);
162            let function = move |req: Request<SgBody>, inner: Inner| {
163                let plugin = plugin.clone();
164                // let plugin_span = tracing::span!(tracing::Level::INFO, "plugin", code = P::CODE);
165                let task = async move {
166                    // let _entered = plugin_span.enter();
167                    match plugin.call(req, inner).await {
168                        Ok(resp) => resp,
169                        Err(e) => {
170                            tracing::error!("{code} plugin error: {e}", code = P::CODE);
171                            PluginError::internal_error::<P>(e).into()
172                        }
173                    }
174                };
175                Box::pin(task) as BoxFuture<'static, Response<SgBody>>
176            };
177            Ok(Box::new(function) as InnerBoxPf)
178        };
179        let make_pf = Box::new(constructor);
180        Self {
181            code: P::CODE.into(),
182            #[cfg(feature = "schema")]
183            schema: P::schema_opt(),
184            mono: P::MONO,
185            meta: P::meta(),
186            make_pf,
187        }
188    }
189    #[inline]
190    pub(crate) fn make_pf(&self, config: PluginConfig) -> Result<InnerBoxPf, BoxError> {
191        (self.make_pf)(config)
192    }
193    pub(crate) fn make_instance(&self, config: PluginConfig) -> Result<PluginInstance, BoxError> {
194        let pf = PluginFunction::new(self.make_pf(config.clone())?);
195        let instance = PluginInstance {
196            config,
197            plugin_function: pf,
198            mount_points: Default::default(),
199            hooks: Default::default(),
200        };
201        Ok(instance)
202    }
203}
204
205#[cfg(feature = "schema")]
206pub trait PluginSchemaExt {
207    fn schema() -> schemars::schema::RootSchema;
208}
209
210/// Plugin function maker, which received a [`PluginConfig`] and return a [`InnerBoxPf`]
211pub type MakePfMethod = dyn Fn(PluginConfig) -> Result<InnerBoxPf, BoxError> + Send + Sync + 'static;
212
213/// # Plugin Repository
214/// A repository to manage plugins, it stores plugin definitions and instances.
215///
216/// You can get a global instance through [`PluginRepository::global`].
217#[derive(Default, Clone)]
218pub struct PluginRepository {
219    plugins: Arc<RwLock<HashMap<String, PluginDefinitionObject>>>,
220    instances: Arc<RwLock<HashMap<PluginInstanceId, PluginInstance>>>,
221}
222
223impl PluginRepository {
224    /// Get a global instance of this repository.
225    ///
226    /// Once the repository is initialized, it will register all plugins in this crate.
227    pub fn global() -> &'static Self {
228        static INIT: OnceLock<PluginRepository> = OnceLock::new();
229        INIT.get_or_init(|| {
230            let repo = PluginRepository::new();
231            repo.register_prelude();
232            repo
233        })
234    }
235
236    /// register all plugins in this crates
237    pub fn register_prelude(&self) {
238        self.register::<plugins::static_resource::StaticResourcePlugin>();
239        #[cfg(feature = "limit")]
240        self.register::<plugins::limit::RateLimitPlugin>();
241        #[cfg(feature = "redirect")]
242        self.register::<plugins::redirect::RedirectPlugin>();
243        // #[cfg(feature = "retry")]
244        // self.register::<plugins::retry::RetryPlugin>();
245        #[cfg(feature = "header-modifier")]
246        self.register::<plugins::header_modifier::HeaderModifierPlugin>();
247        #[cfg(feature = "inject")]
248        self.register::<plugins::inject::InjectPlugin>();
249        #[cfg(feature = "rewrite")]
250        self.register::<plugins::rewrite::RewritePlugin>();
251        #[cfg(feature = "maintenance")]
252        self.register::<plugins::maintenance::MaintenancePlugin>();
253        // #[cfg(feature = "status")]
254        // self.register::<plugins::status::StatusPlugin>();
255        // #[cfg(feature = "decompression")]
256        // self.register::<plugins::decompression::DecompressionPlugin>();
257        #[cfg(feature = "set-version")]
258        self.register::<plugins::set_version::SetVersionPlugin>();
259        #[cfg(feature = "set-scheme")]
260        self.register::<plugins::set_scheme::SetSchemePlugin>();
261        #[cfg(feature = "redis")]
262        {
263            self.register::<ext::redis::plugins::redis_count::RedisCountPlugin>();
264            self.register::<ext::redis::plugins::redis_limit::RedisLimitPlugin>();
265            self.register::<ext::redis::plugins::redis_time_range::RedisTimeRangePlugin>();
266            self.register::<ext::redis::plugins::redis_dynamic_route::RedisDynamicRoutePlugin>();
267        }
268        #[cfg(feature = "east-west-traffic-white-list")]
269        self.register::<plugins::east_west_traffic_white_list::EastWestTrafficWhiteListPlugin>();
270    }
271
272    /// create a new empty repository
273    pub fn new() -> Self {
274        Self::default()
275    }
276
277    /// register by [`Plugin`] trait
278    pub fn register<P: Plugin>(&self) {
279        self.register_custom(PluginDefinitionObject::from_trait::<P>())
280    }
281
282    /// register a custom plugin
283    pub fn register_custom<A: Into<PluginDefinitionObject>>(&self, attr: A) {
284        let attr: PluginDefinitionObject = attr.into();
285        let mut map = self.plugins.write().expect("SgPluginRepository register error");
286        let _old_attr = map.insert(attr.code.to_string(), attr);
287    }
288
289    /// clear all instances
290    pub fn clear_instances(&self) {
291        let mut instances = self.instances.write().expect("SgPluginRepository register error");
292        for (_, inst) in instances.drain() {
293            if let Err(e) = inst.before_destroy() {
294                tracing::error!("plugin {id:?} before_destroy error: {e}", id = inst.config.id, e = e);
295            }
296        }
297    }
298
299    /// create or update a plugin instance by config
300    pub fn create_or_update_instance(&self, config: PluginConfig) -> Result<(), BoxError> {
301        let attr_rg = self.plugins.read().expect("SgPluginRepository register error");
302        let code = config.code();
303        let id = config.id.clone();
304        let Some(attr) = attr_rg.get(code) else {
305            return Err(format!("[Sg.Plugin] unregistered sg plugin type {code}").into());
306        };
307        let mut instances = self.instances.write().expect("SgPluginRepository register error");
308        if let Some(instance) = instances.get_mut(&id) {
309            let new_inner_pf = attr.make_pf(config)?;
310            instance.plugin_function.swap(new_inner_pf);
311        } else {
312            let instance = attr.make_instance(config)?;
313            instance.after_create()?;
314            instances.insert(id, instance);
315        }
316        Ok(())
317    }
318
319    /// remove a plugin instance by id
320    pub fn remove_instance(&self, id: &PluginInstanceId) -> Result<HashSet<MountPointIndex>, BoxError> {
321        let mut instances = self.instances.write().expect("SgPluginRepository register error");
322        if let Some(instance) = instances.remove(id) {
323            instance.before_destroy()?;
324            Ok(instance.mount_points.into_iter().filter_map(|(index, tracer)| (!tracer.all_dropped()).then_some(index)).collect())
325        } else {
326            Err(format!("[Sg.Plugin] missing instance {id:?}").into())
327        }
328    }
329
330    /// mount a plugin instance to a mount point
331    pub fn mount<M: MountPoint>(&self, mount_point: &mut M, mount_index: MountPointIndex, id: PluginInstanceId) -> Result<(), BoxError> {
332        let attr_rg = self.plugins.read().expect("SgPluginRepository register error");
333        let code = id.code.as_ref();
334        let Some(_attr) = attr_rg.get(code) else {
335            return Err(format!("[Sg.Plugin] unregistered sg plugin type {code}").into());
336        };
337        let mut instances = self.instances.write().expect("SgPluginRepository register error");
338        if let Some(instance) = instances.get_mut(&id) {
339            instance.mount_points_gc();
340            // before mount hook
341            instance.before_mount()?;
342            instance.mount_at(mount_point, mount_index)?;
343            // after mount hook
344            instance.after_mount()?;
345            Ok(())
346        } else {
347            Err(format!("[Sg.Plugin] missing instance {id:?}").into())
348        }
349    }
350
351    pub fn instance_snapshot(&self, id: PluginInstanceId) -> Option<PluginInstanceSnapshot> {
352        let map = self.instances.read().expect("SgPluginRepository register error");
353        map.get(&id).map(PluginInstance::snapshot)
354    }
355
356    pub fn plugin_list(&self) -> Vec<PluginAttributes> {
357        let map = self.plugins.read().expect("SgPluginRepository register error");
358        map.values().map(PluginDefinitionObject::attr).collect()
359    }
360
361    /// Get a snapshot for repository plugins
362    pub fn repo_snapshot(&self) -> HashMap<String, PluginRepoSnapshot> {
363        let plugins = self.plugins.read().expect("SgPluginRepository register error");
364        plugins
365            .iter()
366            .map(|(code, attr)| {
367                let instances = self.instances.read().expect("SgPluginRepository register error");
368                let instances = instances.iter().filter_map(|(id, instance)| if &id.code == code { Some((id.name.to_string(), instance.snapshot())) } else { None }).collect();
369                (
370                    code.clone(),
371                    PluginRepoSnapshot {
372                        code: code.clone().into(),
373                        mono: attr.mono,
374                        meta: attr.meta.clone(),
375                        instances,
376                    },
377                )
378            })
379            .collect()
380    }
381}
382
383#[cfg(feature = "schema")]
384#[macro_export]
385macro_rules! schema {
386    ($plugin:ident, $schema:ty) => {
387        impl $crate::PluginSchemaExt for $plugin {
388            fn schema() -> $crate::schemars::schema::RootSchema {
389                $crate::schemars::schema_for!($schema)
390            }
391        }
392    };
393    ($plugin:ident, $schema:expr) => {
394        impl $crate::PluginSchemaExt for $plugin {
395            fn schema() -> $crate::schemars::schema::RootSchema {
396                $crate::schemars::schema_for_value!($schema)
397            }
398        }
399    };
400}