spacegate_shell/
config.rs1use std::collections::VecDeque;
2
3use crate::server::RunningSgGateway;
4use futures_util::{Stream, StreamExt};
5
6use spacegate_plugin::PluginRepository;
7use tokio_util::sync::CancellationToken;
8
9pub use spacegate_config::model::*;
10pub use spacegate_config::service::*;
11use tracing::info;
12
13pub(crate) mod matches_convert;
14pub mod plugin_filter_dto;
15
16pub struct ListenerWrapper<L: Listen>(L);
17
18impl<L: Listen> Stream for ListenerWrapper<L> {
19 type Item = ListenEvent;
20
21 fn poll_next(mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll<Option<Self::Item>> {
22 self.0.poll_next(cx).map_err(|e| tracing::error!("[SG.Config] listening gateway error: {e}")).map(Result::ok)
23 }
24}
25
26pub async fn startup_with_shutdown_signal<C>(config: C, shutdown_signal: CancellationToken) -> Result<(), BoxError>
28where
29 C: Retrieve + CreateListener + 'static,
30{
31 let (init_config, listener) = config.create_listener().await?;
32 #[cfg(feature = "ext-axum")]
33 let listener = {
34 use crate::ext_features::axum::{shell_routers, App};
35 use spacegate_ext_axum::axum::Extension;
36 info!("Starting web server...");
37 let server = spacegate_ext_axum::GlobalAxumServer::default();
38 let (listen_event_tx, listen_event_rx) = tokio::sync::mpsc::channel(64);
39 server.modify_router(move |router| shell_routers(router).layer(Extension(App { listen_event_tx }))).await;
40 spacegate_plugin::ext::axum::register_plugin_routes().await;
41 server.set_cancellation(shutdown_signal.child_token()).await;
42 if let Some(port) = init_config.api_port {
43 server.set_bind(std::net::SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED), port)).await;
44 }
45 let server_addr = server.get_bind().await;
46 server.start().await?;
47
48 info!(%server_addr, "Web server started.");
49 listener.join(listen_event_rx)
50 };
51 let mut listener = ListenerWrapper(listener);
52 RunningSgGateway::global_init(init_config, shutdown_signal.clone()).await;
53 info!("[SG.Config] Entering listening");
54 let mut local_queue = VecDeque::new();
55 let gateway_shutdown_signal = shutdown_signal.child_token();
56
57 loop {
58 let event = if let Some(next) = local_queue.pop_front() {
59 next
60 } else {
61 tokio::select! {
62 _ = shutdown_signal.cancelled() => {
63 tracing::info!("[SG.Config] config listener {CONFIG_LISTENER_NAME} shutdown", CONFIG_LISTENER_NAME = C::CONFIG_LISTENER_NAME);
64 return Ok(());
66 }
67 event = listener.next() => {
68 match event {
69 Some(event) => {
70 tracing::debug!(?event, "received event from listener");
71 event
72 },
73 None => {
74 tracing::info!("[SG.Config] config event stream end");
75 tracing::info!("[SG.Config] config listener {CONFIG_LISTENER_NAME} shutdown", CONFIG_LISTENER_NAME = C::CONFIG_LISTENER_NAME);
76 return Ok(());
78 }
79 }
80 }
81 }
82 };
83
84 if let Err(error) = handler(event, &config, &gateway_shutdown_signal).await {
85 tracing::error!(%error, "[SG.Config] handle event failed");
86 }
87 }
88}
89
90async fn handler<C: Retrieve>(event: ListenEvent, config: &C, gateway_shutdown_signal: &CancellationToken) -> Result<(), BoxError> {
91 match (event.config, event.r#type) {
92 (ConfigType::Gateway { name }, ConfigEventType::Create) => {
93 if let Some(config) = config.retrieve_config_item(&name).await? {
94 tracing::info!("[SG.Config] gateway {name} created", name = name);
95 if let Ok(gateway) = RunningSgGateway::create(config, gateway_shutdown_signal.child_token()) {
96 RunningSgGateway::global_save(name, gateway);
97 }
98 }
99 }
100 (ConfigType::Gateway { name }, ConfigEventType::Update) => {
101 if let Some(config) = config.retrieve_config_item(&name).await? {
102 tracing::info!("[SG.Config] gateway {name} updated", name = name);
103 if let Some(inst) = RunningSgGateway::global_remove(&name) {
104 inst.shutdown().await;
105 }
106 if let Ok(gateway) = RunningSgGateway::create(config, gateway_shutdown_signal.child_token()) {
107 RunningSgGateway::global_save(name, gateway);
108 }
109 }
110 }
111 (ConfigType::Gateway { name }, ConfigEventType::Delete) => {
112 tracing::info!("[SG.Config] gateway {name} deleted", name = name);
113 if let Some(inst) = RunningSgGateway::global_remove(name) {
114 inst.shutdown().await;
115 }
116 }
117 (ConfigType::Route { gateway_name, name }, _) => {
118 let routes = config.retrieve_config_item_all_routes(&gateway_name).await?;
119 tracing::info!("[SG.Config] route {name} modified", name = name);
120 if let Err(e) = RunningSgGateway::global_update(&gateway_name, routes) {
121 tracing::error!("[SG.Config] route {name} modified failed: {e}", name = name, e = e);
122 }
123 }
124 (ConfigType::Plugin { id }, ConfigEventType::Create | ConfigEventType::Update) => {
125 let config = config.retrieve_plugin(&id).await?;
126 if let Some(config) = config {
127 if let Err(e) = PluginRepository::global().create_or_update_instance(config) {
128 tracing::error!("[SG.Config] plugin {id:?} create failed: {e}", id = id, e = e);
129 }
130 } else {
131 tracing::error!("[SG.Config] plugin {id:?} not found");
132 }
133 }
134 (ConfigType::Plugin { id }, ConfigEventType::Delete) => match PluginRepository::global().remove_instance(&id) {
135 Ok(_mount_points) => {}
136 Err(e) => {
137 tracing::error!("[SG.Config] file to remove plugin {id:?} : {e}", id = id, e = e);
138 return Err(e);
139 }
140 },
141 (ConfigType::Global, _) => {
142 let config = config.retrieve_config().await?;
143 RunningSgGateway::global_reset().await;
144 RunningSgGateway::global_init(config, gateway_shutdown_signal.child_token()).await;
145 }
146 }
147
148 Result::<(), BoxError>::Ok(())
149}