1#![forbid(unsafe_code)]
2#![forbid(clippy::unwrap_used)]
3#![deny(clippy::pedantic)]
4#![deny(clippy::get_unwrap)]
5#![allow(clippy::module_name_repetitions)]
6#![allow(clippy::missing_errors_doc)]
7
8mod config;
9mod context;
10mod error;
11mod expiration_manager;
12mod helpers;
13mod layer;
14mod migrations;
15mod recovery;
16mod repos;
17mod response;
18mod router;
19mod system;
20
21use std::{future::Future, net::SocketAddr, sync::Arc, time::Duration};
22
23pub use config::*;
24use context::ChildProcesses;
25use covert_storage::EncryptedPool;
26pub use expiration_manager::{ExpirationManager, LeaseEntry};
27pub use router::{Router, RouterService};
28use sqlx::sqlite::SqliteConnectOptions;
29use tower::{make::Shared, ServiceBuilder};
30use tower_http::{cors::CorsLayer, limit::RequestBodyLimitLayer};
31use tracing::info;
32
33use crate::{
34 context::Context,
35 expiration_manager::clock::SystemClock,
36 layer::{
37 auth_service::AuthServiceLayer, lease_registration::LeaseRegistrationLayer,
38 namespace_extension::NamespaceExtensionLayer, request_mapper::LogicalRequestResponseLayer,
39 storage_state_extension::StorageStateExtensionLayer,
40 },
41 recovery::{recover, recover_encrypted_storage_snapshot, replicate},
42 repos::Repos,
43 system::new_system_backend,
44};
45
46pub async fn shutdown_signal() {
47 tokio::signal::ctrl_c()
49 .await
50 .expect("failed to install CTRL+C signal handler");
51}
52
53async fn shutdown_handler(child_processes: ChildProcesses) {
54 child_processes.kill_all().await;
55}
56
57pub async fn start(
58 mut config: Config,
59 shutdown_signal: impl Future<Output = ()>,
60) -> anyhow::Result<()> {
61 config.sanitize()?;
62
63 let child_processes = ChildProcesses::default();
64 let shutdown_handler = async {
65 shutdown_signal.await;
66 info!("Shutdown signal received");
67 shutdown_handler(child_processes.clone()).await;
68 };
69
70 let port_tx = config.port_tx.take();
71 let config = Arc::new(config);
72
73 if let Some(replication) = config.replication.as_ref() {
76 recover(
78 replication,
79 &config.seal_storage_path(),
80 &replication.seal_bucket_url(),
81 )?;
82
83 recover_encrypted_storage_snapshot(&config, replication);
87 }
88
89 let seal_db = sqlx::sqlite::SqlitePoolOptions::new()
91 .min_connections(1)
92 .max_connections(1)
93 .connect_with(
94 SqliteConnectOptions::new()
95 .create_if_missing(true)
96 .foreign_keys(true)
97 .filename(config.seal_storage_path()),
98 )
99 .await?;
100
101 if let Some(replication) = config.replication.as_ref() {
103 let p = replicate(
104 replication,
105 None,
106 &config.seal_storage_path(),
107 &replication.seal_bucket_url(),
108 )?;
109 child_processes.set_seal_storage_replication(p).await;
110 }
111
112 let encrypted_pool = Arc::new(EncryptedPool::new(&config.encrypted_storage_path()));
113 let repos = Repos::new(encrypted_pool, seal_db);
114
115 crate::migrations::migrate_unecrypted_db(&repos.unecrypted_pool).await?;
117
118 let router = Arc::new(Router::new(repos.mount.clone()));
119 let expiration = Arc::new(ExpirationManager::new(
120 Arc::clone(&router),
121 repos.clone(),
122 SystemClock::new(),
123 ));
124 let ctx = Context {
125 config: Arc::clone(&config),
126 repos: repos.clone(),
127 child_processes: child_processes.clone(),
128 expiration_manager: Arc::clone(&expiration),
129 router: Arc::clone(&router),
130 };
131
132 let system = new_system_backend(ctx);
134 router.mount_system(Arc::new(system));
135
136 let server_router_svc = ServiceBuilder::new()
137 .concurrency_limit(1000)
138 .timeout(Duration::from_secs(30))
139 .layer(RequestBodyLimitLayer::new(1024 * 16))
140 .layer(CorsLayer::permissive())
141 .layer(LogicalRequestResponseLayer::new())
142 .layer(StorageStateExtensionLayer::new(Arc::clone(&repos.pool)))
143 .layer(NamespaceExtensionLayer::new(repos.namespace.clone()))
144 .layer(AuthServiceLayer::new(
145 repos.token.clone(),
146 repos.namespace.clone(),
147 ))
148 .layer(LeaseRegistrationLayer::new(
149 expiration.clone(),
150 repos.token.clone(),
151 repos.entity.clone(),
152 ))
153 .service(RouterService::new(router.clone()));
154
155 let addr = SocketAddr::from(([0, 0, 0, 0], config.port));
156 let covert_server = hyper::Server::bind(&addr).serve(Shared::new(server_router_svc));
157 let addr = covert_server.local_addr();
158 let covert_server = covert_server.with_graceful_shutdown(shutdown_handler);
159
160 info!("listening on {addr}");
161 if let Some(tx) = port_tx {
162 let _ = tx.send(addr.port());
163 }
164
165 if let Err(error) = covert_server.await {
167 tracing::error!(?error, "Encountered server error. Shutting down.");
168 return Err(error.into());
169 }
170 Ok(())
171}