1#![deny(clippy::unwrap_used, clippy::dbg_macro, clippy::unimplemented, clippy::todo, clippy::inline_always)]
2use std::{
3 any::Any,
4 borrow::Cow,
5 collections::{HashMap, HashSet},
6 fmt::Debug,
7 sync::{Arc, OnceLock, RwLock},
8};
9
10use futures_util::{future::BoxFuture, Future};
11use hyper::{Request, Response};
12use instance::{PluginInstance, PluginInstanceSnapshot};
13use layer::{InnerBoxPf, PluginFunction};
14use mount::{MountPoint, MountPointIndex};
15use serde::{Deserialize, Serialize};
16pub use serde_json;
17pub use serde_json::{Error as SerdeJsonError, Value as JsonValue};
18pub use spacegate_kernel::helper_layers::function::Inner;
19pub use spacegate_kernel::BoxError;
20pub use spacegate_kernel::BoxLayer;
21pub use spacegate_kernel::{SgBody, SgRequest, SgRequestExt, SgResponse, SgResponseExt};
22pub mod error;
23pub mod instance;
24pub mod model;
25pub mod mount;
26pub use error::PluginError;
27#[cfg(feature = "dylib")]
28pub mod dynamic;
29pub mod ext;
30pub mod layer;
31pub mod plugins;
32#[cfg(feature = "schema")]
33pub use schemars;
34pub use spacegate_model;
35pub use spacegate_model::{plugin_meta, PluginAttributes, PluginConfig, PluginInstanceId, PluginInstanceMap, PluginInstanceName, PluginMetaData};
36pub trait Plugin: Any + Sized + Send + Sync {
67 const CODE: &'static str;
71 const MONO: bool = false;
73 fn meta() -> PluginMetaData {
74 PluginMetaData::default()
75 }
76 fn call(&self, req: SgRequest, inner: Inner) -> impl Future<Output = Result<SgResponse, BoxError>> + Send;
85 fn create(plugin_config: PluginConfig) -> Result<Self, BoxError>;
86 fn create_by_spec(spec: JsonValue, name: PluginInstanceName) -> Result<Self, BoxError> {
87 Self::create(PluginConfig {
88 id: PluginInstanceId { code: Self::CODE.into(), name },
89 spec,
90 })
91 }
92 fn register(repo: &PluginRepository) {
96 repo.plugins.write().expect("SgPluginRepository register error").insert(Self::CODE.into(), PluginDefinitionObject::from_trait::<Self>());
97 }
98
99 #[cfg(feature = "schema")]
100 fn schema_opt() -> Option<schemars::schema::RootSchema> {
102 None
103 }
104}
105
106pub struct PluginDefinitionObject {
116 pub mono: bool,
118 pub code: Cow<'static, str>,
120 pub meta: PluginMetaData,
122 #[cfg(feature = "schema")]
124 pub schema: Option<schemars::schema::RootSchema>,
125 pub make_pf: Box<MakePfMethod>,
127}
128
129impl Debug for PluginDefinitionObject {
130 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
131 let mut formatter = f.debug_struct("PluginAttributes");
132 formatter.field("mono", &self.mono).field("code", &self.code).field("meta", &self.meta);
133 #[cfg(feature = "schema")]
134 {
135 formatter.field("schema", &self.schema.is_some());
136 }
137
138 formatter.finish()
139 }
140}
141
142#[derive(Debug, Default, Serialize, Deserialize, Clone)]
143pub struct PluginRepoSnapshot {
144 pub mono: bool,
145 pub code: Cow<'static, str>,
146 pub meta: PluginMetaData,
147 pub instances: HashMap<String, PluginInstanceSnapshot>,
148}
149
150impl PluginDefinitionObject {
151 pub fn attr(&self) -> PluginAttributes {
152 PluginAttributes {
153 mono: self.mono,
154 code: self.code.clone(),
155 meta: self.meta.clone(),
156 }
157 }
158 pub fn from_trait<P: Plugin>() -> Self {
160 let constructor = move |config: PluginConfig| {
161 let plugin = Arc::new(P::create(config)?);
162 let function = move |req: Request<SgBody>, inner: Inner| {
163 let plugin = plugin.clone();
164 let task = async move {
166 match plugin.call(req, inner).await {
168 Ok(resp) => resp,
169 Err(e) => {
170 tracing::error!("{code} plugin error: {e}", code = P::CODE);
171 PluginError::internal_error::<P>(e).into()
172 }
173 }
174 };
175 Box::pin(task) as BoxFuture<'static, Response<SgBody>>
176 };
177 Ok(Box::new(function) as InnerBoxPf)
178 };
179 let make_pf = Box::new(constructor);
180 Self {
181 code: P::CODE.into(),
182 #[cfg(feature = "schema")]
183 schema: P::schema_opt(),
184 mono: P::MONO,
185 meta: P::meta(),
186 make_pf,
187 }
188 }
189 #[inline]
190 pub(crate) fn make_pf(&self, config: PluginConfig) -> Result<InnerBoxPf, BoxError> {
191 (self.make_pf)(config)
192 }
193 pub(crate) fn make_instance(&self, config: PluginConfig) -> Result<PluginInstance, BoxError> {
194 let pf = PluginFunction::new(self.make_pf(config.clone())?);
195 let instance = PluginInstance {
196 config,
197 plugin_function: pf,
198 mount_points: Default::default(),
199 hooks: Default::default(),
200 };
201 Ok(instance)
202 }
203}
204
205#[cfg(feature = "schema")]
206pub trait PluginSchemaExt {
207 fn schema() -> schemars::schema::RootSchema;
208}
209
210pub type MakePfMethod = dyn Fn(PluginConfig) -> Result<InnerBoxPf, BoxError> + Send + Sync + 'static;
212
213#[derive(Default, Clone)]
218pub struct PluginRepository {
219 plugins: Arc<RwLock<HashMap<String, PluginDefinitionObject>>>,
220 instances: Arc<RwLock<HashMap<PluginInstanceId, PluginInstance>>>,
221}
222
223impl PluginRepository {
224 pub fn global() -> &'static Self {
228 static INIT: OnceLock<PluginRepository> = OnceLock::new();
229 INIT.get_or_init(|| {
230 let repo = PluginRepository::new();
231 repo.register_prelude();
232 repo
233 })
234 }
235
236 pub fn register_prelude(&self) {
238 self.register::<plugins::static_resource::StaticResourcePlugin>();
239 #[cfg(feature = "limit")]
240 self.register::<plugins::limit::RateLimitPlugin>();
241 #[cfg(feature = "redirect")]
242 self.register::<plugins::redirect::RedirectPlugin>();
243 #[cfg(feature = "header-modifier")]
246 self.register::<plugins::header_modifier::HeaderModifierPlugin>();
247 #[cfg(feature = "inject")]
248 self.register::<plugins::inject::InjectPlugin>();
249 #[cfg(feature = "rewrite")]
250 self.register::<plugins::rewrite::RewritePlugin>();
251 #[cfg(feature = "maintenance")]
252 self.register::<plugins::maintenance::MaintenancePlugin>();
253 #[cfg(feature = "set-version")]
258 self.register::<plugins::set_version::SetVersionPlugin>();
259 #[cfg(feature = "set-scheme")]
260 self.register::<plugins::set_scheme::SetSchemePlugin>();
261 #[cfg(feature = "redis")]
262 {
263 self.register::<ext::redis::plugins::redis_count::RedisCountPlugin>();
264 self.register::<ext::redis::plugins::redis_limit::RedisLimitPlugin>();
265 self.register::<ext::redis::plugins::redis_time_range::RedisTimeRangePlugin>();
266 self.register::<ext::redis::plugins::redis_dynamic_route::RedisDynamicRoutePlugin>();
267 }
268 #[cfg(feature = "east-west-traffic-white-list")]
269 self.register::<plugins::east_west_traffic_white_list::EastWestTrafficWhiteListPlugin>();
270 }
271
272 pub fn new() -> Self {
274 Self::default()
275 }
276
277 pub fn register<P: Plugin>(&self) {
279 self.register_custom(PluginDefinitionObject::from_trait::<P>())
280 }
281
282 pub fn register_custom<A: Into<PluginDefinitionObject>>(&self, attr: A) {
284 let attr: PluginDefinitionObject = attr.into();
285 let mut map = self.plugins.write().expect("SgPluginRepository register error");
286 let _old_attr = map.insert(attr.code.to_string(), attr);
287 }
288
289 pub fn clear_instances(&self) {
291 let mut instances = self.instances.write().expect("SgPluginRepository register error");
292 for (_, inst) in instances.drain() {
293 if let Err(e) = inst.before_destroy() {
294 tracing::error!("plugin {id:?} before_destroy error: {e}", id = inst.config.id, e = e);
295 }
296 }
297 }
298
299 pub fn create_or_update_instance(&self, config: PluginConfig) -> Result<(), BoxError> {
301 let attr_rg = self.plugins.read().expect("SgPluginRepository register error");
302 let code = config.code();
303 let id = config.id.clone();
304 let Some(attr) = attr_rg.get(code) else {
305 return Err(format!("[Sg.Plugin] unregistered sg plugin type {code}").into());
306 };
307 let mut instances = self.instances.write().expect("SgPluginRepository register error");
308 if let Some(instance) = instances.get_mut(&id) {
309 let new_inner_pf = attr.make_pf(config)?;
310 instance.plugin_function.swap(new_inner_pf);
311 } else {
312 let instance = attr.make_instance(config)?;
313 instance.after_create()?;
314 instances.insert(id, instance);
315 }
316 Ok(())
317 }
318
319 pub fn remove_instance(&self, id: &PluginInstanceId) -> Result<HashSet<MountPointIndex>, BoxError> {
321 let mut instances = self.instances.write().expect("SgPluginRepository register error");
322 if let Some(instance) = instances.remove(id) {
323 instance.before_destroy()?;
324 Ok(instance.mount_points.into_iter().filter_map(|(index, tracer)| (!tracer.all_dropped()).then_some(index)).collect())
325 } else {
326 Err(format!("[Sg.Plugin] missing instance {id:?}").into())
327 }
328 }
329
330 pub fn mount<M: MountPoint>(&self, mount_point: &mut M, mount_index: MountPointIndex, id: PluginInstanceId) -> Result<(), BoxError> {
332 let attr_rg = self.plugins.read().expect("SgPluginRepository register error");
333 let code = id.code.as_ref();
334 let Some(_attr) = attr_rg.get(code) else {
335 return Err(format!("[Sg.Plugin] unregistered sg plugin type {code}").into());
336 };
337 let mut instances = self.instances.write().expect("SgPluginRepository register error");
338 if let Some(instance) = instances.get_mut(&id) {
339 instance.mount_points_gc();
340 instance.before_mount()?;
342 instance.mount_at(mount_point, mount_index)?;
343 instance.after_mount()?;
345 Ok(())
346 } else {
347 Err(format!("[Sg.Plugin] missing instance {id:?}").into())
348 }
349 }
350
351 pub fn instance_snapshot(&self, id: PluginInstanceId) -> Option<PluginInstanceSnapshot> {
352 let map = self.instances.read().expect("SgPluginRepository register error");
353 map.get(&id).map(PluginInstance::snapshot)
354 }
355
356 pub fn plugin_list(&self) -> Vec<PluginAttributes> {
357 let map = self.plugins.read().expect("SgPluginRepository register error");
358 map.values().map(PluginDefinitionObject::attr).collect()
359 }
360
361 pub fn repo_snapshot(&self) -> HashMap<String, PluginRepoSnapshot> {
363 let plugins = self.plugins.read().expect("SgPluginRepository register error");
364 plugins
365 .iter()
366 .map(|(code, attr)| {
367 let instances = self.instances.read().expect("SgPluginRepository register error");
368 let instances = instances.iter().filter_map(|(id, instance)| if &id.code == code { Some((id.name.to_string(), instance.snapshot())) } else { None }).collect();
369 (
370 code.clone(),
371 PluginRepoSnapshot {
372 code: code.clone().into(),
373 mono: attr.mono,
374 meta: attr.meta.clone(),
375 instances,
376 },
377 )
378 })
379 .collect()
380 }
381}
382
383#[cfg(feature = "schema")]
384#[macro_export]
385macro_rules! schema {
386 ($plugin:ident, $schema:ty) => {
387 impl $crate::PluginSchemaExt for $plugin {
388 fn schema() -> $crate::schemars::schema::RootSchema {
389 $crate::schemars::schema_for!($schema)
390 }
391 }
392 };
393 ($plugin:ident, $schema:expr) => {
394 impl $crate::PluginSchemaExt for $plugin {
395 fn schema() -> $crate::schemars::schema::RootSchema {
396 $crate::schemars::schema_for_value!($schema)
397 }
398 }
399 };
400}