spacegate_shell/
config.rs

1use std::collections::VecDeque;
2
3use crate::server::RunningSgGateway;
4use futures_util::{Stream, StreamExt};
5
6use spacegate_plugin::PluginRepository;
7use tokio_util::sync::CancellationToken;
8
9pub use spacegate_config::model::*;
10pub use spacegate_config::service::*;
11use tracing::info;
12
13pub(crate) mod matches_convert;
14pub mod plugin_filter_dto;
15
16pub struct ListenerWrapper<L: Listen>(L);
17
18impl<L: Listen> Stream for ListenerWrapper<L> {
19    type Item = ListenEvent;
20
21    fn poll_next(mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll<Option<Self::Item>> {
22        self.0.poll_next(cx).map_err(|e| tracing::error!("[SG.Config] listening gateway error: {e}")).map(Result::ok)
23    }
24}
25
26/// Startup the gateway with custom shutdown signal
27pub async fn startup_with_shutdown_signal<C>(config: C, shutdown_signal: CancellationToken) -> Result<(), BoxError>
28where
29    C: Retrieve + CreateListener + 'static,
30{
31    let (init_config, listener) = config.create_listener().await?;
32    #[cfg(feature = "ext-axum")]
33    let listener = {
34        use crate::ext_features::axum::{shell_routers, App};
35        use spacegate_ext_axum::axum::Extension;
36        info!("Starting web server...");
37        let server = spacegate_ext_axum::GlobalAxumServer::default();
38        let (listen_event_tx, listen_event_rx) = tokio::sync::mpsc::channel(64);
39        server.modify_router(move |router| shell_routers(router).layer(Extension(App { listen_event_tx }))).await;
40        spacegate_plugin::ext::axum::register_plugin_routes().await;
41        server.set_cancellation(shutdown_signal.child_token()).await;
42        if let Some(port) = init_config.api_port {
43            server.set_bind(std::net::SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED), port)).await;
44        }
45        let server_addr = server.get_bind().await;
46        server.start().await?;
47
48        info!(%server_addr, "Web server started.");
49        listener.join(listen_event_rx)
50    };
51    let mut listener = ListenerWrapper(listener);
52    RunningSgGateway::global_init(init_config, shutdown_signal.clone()).await;
53    info!("[SG.Config] Entering listening");
54    let mut local_queue = VecDeque::new();
55    let gateway_shutdown_signal = shutdown_signal.child_token();
56
57    loop {
58        let event = if let Some(next) = local_queue.pop_front() {
59            next
60        } else {
61            tokio::select! {
62                _ = shutdown_signal.cancelled() => {
63                    tracing::info!("[SG.Config] config listener {CONFIG_LISTENER_NAME} shutdown", CONFIG_LISTENER_NAME = C::CONFIG_LISTENER_NAME);
64                    // listener.shutdown();
65                    return Ok(());
66                }
67                event = listener.next() => {
68                    match event {
69                        Some(event) => {
70                            tracing::debug!(?event, "received event from listener");
71                            event
72                        },
73                        None => {
74                            tracing::info!("[SG.Config] config event stream end");
75                            tracing::info!("[SG.Config] config listener {CONFIG_LISTENER_NAME} shutdown", CONFIG_LISTENER_NAME = C::CONFIG_LISTENER_NAME);
76                            // config.shutdown();
77                            return Ok(());
78                        }
79                    }
80                }
81            }
82        };
83
84        if let Err(error) = handler(event, &config, &gateway_shutdown_signal).await {
85            tracing::error!(%error, "[SG.Config] handle event failed");
86        }
87    }
88}
89
90async fn handler<C: Retrieve>(event: ListenEvent, config: &C, gateway_shutdown_signal: &CancellationToken) -> Result<(), BoxError> {
91    match (event.config, event.r#type) {
92        (ConfigType::Gateway { name }, ConfigEventType::Create) => {
93            if let Some(config) = config.retrieve_config_item(&name).await? {
94                tracing::info!("[SG.Config] gateway {name} created", name = name);
95                if let Ok(gateway) = RunningSgGateway::create(config, gateway_shutdown_signal.child_token()) {
96                    RunningSgGateway::global_save(name, gateway);
97                }
98            }
99        }
100        (ConfigType::Gateway { name }, ConfigEventType::Update) => {
101            if let Some(config) = config.retrieve_config_item(&name).await? {
102                tracing::info!("[SG.Config] gateway {name} updated", name = name);
103                if let Some(inst) = RunningSgGateway::global_remove(&name) {
104                    inst.shutdown().await;
105                }
106                if let Ok(gateway) = RunningSgGateway::create(config, gateway_shutdown_signal.child_token()) {
107                    RunningSgGateway::global_save(name, gateway);
108                }
109            }
110        }
111        (ConfigType::Gateway { name }, ConfigEventType::Delete) => {
112            tracing::info!("[SG.Config] gateway {name} deleted", name = name);
113            if let Some(inst) = RunningSgGateway::global_remove(name) {
114                inst.shutdown().await;
115            }
116        }
117        (ConfigType::Route { gateway_name, name }, _) => {
118            let routes = config.retrieve_config_item_all_routes(&gateway_name).await?;
119            tracing::info!("[SG.Config] route {name} modified", name = name);
120            if let Err(e) = RunningSgGateway::global_update(&gateway_name, routes) {
121                tracing::error!("[SG.Config] route {name} modified failed: {e}", name = name, e = e);
122            }
123        }
124        (ConfigType::Plugin { id }, ConfigEventType::Create | ConfigEventType::Update) => {
125            let config = config.retrieve_plugin(&id).await?;
126            if let Some(config) = config {
127                if let Err(e) = PluginRepository::global().create_or_update_instance(config) {
128                    tracing::error!("[SG.Config] plugin {id:?} create failed: {e}", id = id, e = e);
129                }
130            } else {
131                tracing::error!("[SG.Config] plugin {id:?} not found");
132            }
133        }
134        (ConfigType::Plugin { id }, ConfigEventType::Delete) => match PluginRepository::global().remove_instance(&id) {
135            Ok(_mount_points) => {}
136            Err(e) => {
137                tracing::error!("[SG.Config] file to remove plugin {id:?} : {e}", id = id, e = e);
138                return Err(e);
139            }
140        },
141        (ConfigType::Global, _) => {
142            let config = config.retrieve_config().await?;
143            RunningSgGateway::global_reset().await;
144            RunningSgGateway::global_init(config, gateway_shutdown_signal.child_token()).await;
145        }
146    }
147
148    Result::<(), BoxError>::Ok(())
149}