spacegate_config/
service.rs

1/// Config file format
2pub mod config_format;
3/// File system backend
4#[cfg(feature = "fs")]
5pub mod fs;
6/// Kubernetes backend
7#[cfg(feature = "k8s")]
8pub mod k8s;
9/// In-memory backend
10pub mod memory;
11/// Redis backend
12#[cfg(feature = "redis")]
13pub mod redis;
14use std::{collections::BTreeMap, error::Error, fmt::Display, str::FromStr};
15
16use futures_util::Future;
17use serde::{Deserialize, Serialize};
18use serde_json::Value;
19use spacegate_model::*;
20
21pub trait Create: Sync + Send {
22    fn create_config_item_gateway(&self, gateway_name: &str, gateway: SgGateway) -> impl Future<Output = Result<(), BoxError>> + Send;
23    fn create_config_item_route(&self, gateway_name: &str, route_name: &str, route: SgHttpRoute) -> impl Future<Output = Result<(), BoxError>> + Send;
24    fn create_config_item(&self, name: &str, item: ConfigItem) -> impl Future<Output = Result<(), BoxError>> + Send {
25        async move {
26            self.create_config_item_gateway(name, item.gateway).await?;
27            for (route_name, route) in item.routes {
28                self.create_config_item_route(name, &route_name, route).await?;
29            }
30            Ok(())
31        }
32    }
33    fn create_config(&self, config: Config) -> impl Future<Output = Result<(), BoxError>> + Send {
34        async move {
35            for (name, item) in config.gateways {
36                self.create_config_item(&name, item).await?;
37            }
38            Ok(())
39        }
40    }
41    fn create_plugin(&self, id: &PluginInstanceId, value: Value) -> impl Future<Output = Result<(), BoxError>> + Send;
42}
43
44pub trait Update: Sync + Send {
45    fn update_config_item_gateway(&self, gateway_name: &str, gateway: SgGateway) -> impl Future<Output = Result<(), BoxError>> + Send;
46    fn update_config_item_route(&self, gateway_name: &str, route_name: &str, route: SgHttpRoute) -> impl Future<Output = Result<(), BoxError>> + Send;
47
48    fn update_config_item(&self, name: &str, item: ConfigItem) -> impl Future<Output = Result<(), BoxError>> + Send {
49        async move {
50            self.update_config_item_gateway(name, item.gateway).await?;
51            for (route_name, route) in item.routes {
52                self.update_config_item_route(name, &route_name, route).await?;
53            }
54            Ok(())
55        }
56    }
57    fn update_config(&self, config: Config) -> impl Future<Output = Result<(), BoxError>> + Send {
58        async move {
59            for (name, item) in config.gateways {
60                self.update_config_item(&name, item).await?;
61            }
62            Ok(())
63        }
64    }
65    fn update_plugin(&self, id: &PluginInstanceId, value: Value) -> impl Future<Output = Result<(), BoxError>> + Send;
66}
67
68pub trait Delete: Sync + Send {
69    fn delete_config_item_gateway(&self, gateway_name: &str) -> impl Future<Output = Result<(), BoxError>> + Send;
70    fn delete_config_item_route(&self, gateway_name: &str, route_name: &str) -> impl Future<Output = Result<(), BoxError>> + Send;
71    fn delete_config_item_all_routes(&self, gateway_name: &str) -> impl Future<Output = Result<(), BoxError>> + Send
72    where
73        Self: Retrieve,
74    {
75        async move {
76            for route_name in self.retrieve_config_item_route_names(gateway_name).await? {
77                self.delete_config_item_route(gateway_name, &route_name).await?;
78            }
79            Ok(())
80        }
81    }
82    fn delete_config_item(&self, name: &str) -> impl Future<Output = Result<(), BoxError>> + Send
83    where
84        Self: Retrieve,
85    {
86        async move {
87            self.delete_config_item_gateway(name).await?;
88            self.delete_config_item_all_routes(name).await?;
89            Ok(())
90        }
91    }
92    fn delete_plugin(&self, id: &PluginInstanceId) -> impl Future<Output = Result<(), BoxError>> + Send;
93}
94
95pub trait Retrieve: Sync + Send {
96    fn retrieve_config_item_gateway(&self, gateway_name: &str) -> impl Future<Output = Result<Option<SgGateway>, BoxError>> + Send;
97    fn retrieve_config_item_route(&self, gateway_name: &str, route_name: &str) -> impl Future<Output = Result<Option<SgHttpRoute>, BoxError>> + Send;
98    fn retrieve_config_item_route_names(&self, name: &str) -> impl Future<Output = Result<Vec<String>, BoxError>> + Send;
99    fn retrieve_config_item_all_routes(&self, name: &str) -> impl Future<Output = Result<BTreeMap<String, SgHttpRoute>, BoxError>> + Send {
100        async move {
101            let mut routes = BTreeMap::new();
102            for route_name in self.retrieve_config_item_route_names(name).await? {
103                if let Ok(Some(route)) = self.retrieve_config_item_route(name, &route_name).await {
104                    routes.insert(route_name, route);
105                }
106            }
107            Ok(routes)
108        }
109    }
110    fn retrieve_config_item(&self, name: &str) -> impl Future<Output = Result<Option<ConfigItem>, BoxError>> + Send {
111        async move {
112            let Some(gateway) = self.retrieve_config_item_gateway(name).await? else {
113                return Ok(None);
114            };
115            let routes = self.retrieve_config_item_all_routes(name).await?;
116            Ok(Some(ConfigItem { gateway, routes }))
117        }
118    }
119    fn retrieve_config_names(&self) -> impl Future<Output = Result<Vec<String>, BoxError>> + Send;
120    fn retrieve_config(&self) -> impl Future<Output = Result<Config, BoxError>> + Send
121    where
122        Self: Sync,
123        BoxError: Send,
124    {
125        async move {
126            let mut gateways = BTreeMap::new();
127            for name in self.retrieve_config_names().await? {
128                if let Some(item) = self.retrieve_config_item(&name).await? {
129                    gateways.insert(name, item);
130                }
131            }
132            let plugins = self.retrieve_all_plugins().await?;
133            Ok(Config {
134                gateways,
135                plugins: PluginInstanceMap::from_config_vec(plugins),
136                api_port: None,
137            })
138        }
139    }
140    fn retrieve_all_plugins(&self) -> impl Future<Output = Result<Vec<PluginConfig>, BoxError>> + Send;
141    fn retrieve_plugin(&self, id: &PluginInstanceId) -> impl Future<Output = Result<Option<PluginConfig>, BoxError>> + Send;
142    fn retrieve_plugins_by_code(&self, code: &str) -> impl Future<Output = Result<Vec<PluginConfig>, BoxError>> + Send;
143}
144
145#[derive(Debug, Clone, Serialize, Deserialize)]
146pub enum ConfigEventType {
147    Create,
148    Update,
149    Delete,
150}
151
152impl FromStr for ConfigEventType {
153    type Err = BoxError;
154    fn from_str(s: &str) -> Result<Self, Self::Err> {
155        match s {
156            "create" => Ok(Self::Create),
157            "update" => Ok(Self::Update),
158            "delete" => Ok(Self::Delete),
159            _ => Err(format!("unknown ConfigEventType: {}", s).into()),
160        }
161    }
162}
163
164impl Display for ConfigEventType {
165    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
166        match self {
167            Self::Create => write!(f, "create"),
168            Self::Update => write!(f, "update"),
169            Self::Delete => write!(f, "delete"),
170        }
171    }
172}
173
174#[derive(Debug, Clone, Serialize, Deserialize)]
175#[serde(tag = "kind", content = "value")]
176pub enum ConfigType {
177    Gateway {
178        name: String,
179    },
180    Route {
181        gateway_name: String,
182        name: String,
183    },
184    Plugin {
185        id: PluginInstanceId,
186    },
187    /// update global config, the shell would reload all
188    Global,
189}
190
191impl Display for ConfigType {
192    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
193        match self {
194            Self::Gateway { name } => write!(f, "gateway/{}", name),
195            Self::Route { gateway_name, name } => write!(f, "httproute/{}/{}", gateway_name, name),
196            Self::Plugin { id } => write!(f, "plugin/{}/{}", id.code, id.name),
197            Self::Global => write!(f, "global"),
198        }
199    }
200}
201
202impl FromStr for ConfigType {
203    type Err = BoxError;
204    fn from_str(s: &str) -> Result<Self, Self::Err> {
205        let f = s.split('/').collect::<Vec<_>>();
206        match &f[..] {
207            ["gateway", gateway_name] => Ok(Self::Gateway { name: gateway_name.to_string() }),
208            ["httproute", gateway, route_name] => Ok(Self::Route {
209                gateway_name: gateway.to_string(),
210                name: route_name.to_string(),
211            }),
212            ["plugin", code, name] => {
213                let name = PluginInstanceName::from_str(name)?;
214                Ok(Self::Plugin {
215                    id: PluginInstanceId::new(code.to_string(), name),
216                })
217            }
218            _ => Err(format!("unknown ConfigType: {}", s).into()),
219        }
220    }
221}
222
223pub trait CreateListener {
224    const CONFIG_LISTENER_NAME: &'static str;
225    type Listener: Listen;
226    fn create_listener(&self) -> impl Future<Output = Result<(Config, Self::Listener), Box<dyn Error + Sync + Send + 'static>>> + Send;
227}
228pub trait Instance: Send + Sync {
229    fn id(&self) -> &str;
230    fn api_url(&self) -> &str;
231}
232pub trait Discovery: 'static {
233    fn instances(&self) -> impl Future<Output = Result<Vec<impl Instance>, BoxError>> + Send;
234    fn backends(&self) -> impl Future<Output = Result<Vec<BackendHost>, BoxError>> + Send {
235        std::future::ready(Ok(vec![]))
236    }
237}
238#[derive(Debug, Serialize, Deserialize)]
239pub struct ListenEvent {
240    pub r#type: ConfigEventType,
241    pub config: ConfigType,
242}
243
244impl From<(ConfigType, ConfigEventType)> for ListenEvent {
245    fn from((config, r#type): (ConfigType, ConfigEventType)) -> Self {
246        Self { r#type, config }
247    }
248}
249
250pub trait Listen: Unpin {
251    fn poll_next(&mut self, cx: &mut std::task::Context<'_>) -> std::task::Poll<Result<ListenEvent, BoxError>>;
252}
253
254pub trait ListenExt: Listen {
255    fn join<L1>(self, l1: L1) -> Joint<Self, L1>
256    where
257        L1: Listen,
258        Self: Sized,
259    {
260        Joint { l0: self, l1 }
261    }
262}
263
264impl<T: Listen> ListenExt for T {}
265
266pub struct Joint<L0, L1> {
267    l0: L0,
268    l1: L1,
269}
270
271impl<L0, L1> Listen for Joint<L0, L1>
272where
273    L0: Listen,
274    L1: Listen,
275{
276    fn poll_next(&mut self, cx: &mut std::task::Context<'_>) -> std::task::Poll<Result<ListenEvent, BoxError>> {
277        // l0 has higher priority
278        let l0 = self.l0.poll_next(cx);
279        if l0.is_ready() {
280            return l0;
281        }
282        self.l1.poll_next(cx)
283    }
284}
285
286impl Listen for tokio::sync::mpsc::Receiver<ListenEvent> {
287    fn poll_next(&mut self, cx: &mut std::task::Context<'_>) -> std::task::Poll<Result<ListenEvent, BoxError>> {
288        self.poll_recv(cx).map(|r| r.ok_or("channel closed".into()))
289    }
290}