covert_system/
lib.rs

1#![forbid(unsafe_code)]
2#![forbid(clippy::unwrap_used)]
3#![deny(clippy::pedantic)]
4#![deny(clippy::get_unwrap)]
5#![allow(clippy::module_name_repetitions)]
6#![allow(clippy::missing_errors_doc)]
7
8mod config;
9mod context;
10mod error;
11mod expiration_manager;
12mod helpers;
13mod layer;
14mod migrations;
15mod recovery;
16mod repos;
17mod response;
18mod router;
19mod system;
20
21use std::{future::Future, net::SocketAddr, sync::Arc, time::Duration};
22
23pub use config::*;
24use context::ChildProcesses;
25use covert_storage::EncryptedPool;
26pub use expiration_manager::{ExpirationManager, LeaseEntry};
27pub use router::{Router, RouterService};
28use sqlx::sqlite::SqliteConnectOptions;
29use tower::{make::Shared, ServiceBuilder};
30use tower_http::{cors::CorsLayer, limit::RequestBodyLimitLayer};
31use tracing::info;
32
33use crate::{
34    context::Context,
35    expiration_manager::clock::SystemClock,
36    layer::{
37        auth_service::AuthServiceLayer, lease_registration::LeaseRegistrationLayer,
38        namespace_extension::NamespaceExtensionLayer, request_mapper::LogicalRequestResponseLayer,
39        storage_state_extension::StorageStateExtensionLayer,
40    },
41    recovery::{recover, recover_encrypted_storage_snapshot, replicate},
42    repos::Repos,
43    system::new_system_backend,
44};
45
46pub async fn shutdown_signal() {
47    // Wait for the CTRL+C signal
48    tokio::signal::ctrl_c()
49        .await
50        .expect("failed to install CTRL+C signal handler");
51}
52
53async fn shutdown_handler(child_processes: ChildProcesses) {
54    child_processes.kill_all().await;
55}
56
57pub async fn start(
58    mut config: Config,
59    shutdown_signal: impl Future<Output = ()>,
60) -> anyhow::Result<()> {
61    config.sanitize()?;
62
63    let child_processes = ChildProcesses::default();
64    let shutdown_handler = async {
65        shutdown_signal.await;
66        info!("Shutdown signal received");
67        shutdown_handler(child_processes.clone()).await;
68    };
69
70    let port_tx = config.port_tx.take();
71    let config = Arc::new(config);
72
73    // Try to recover as far as possible if replication has configured and we
74    // have a backup available
75    if let Some(replication) = config.replication.as_ref() {
76        // Recover seal storage
77        recover(
78            replication,
79            &config.seal_storage_path(),
80            &replication.seal_bucket_url(),
81        )?;
82
83        // Recover latest snapshot of encrypted storage. Changes applied to DB
84        // after latest snapshot will be applied after we have the encryption key
85        // available during unseal.
86        recover_encrypted_storage_snapshot(&config, replication);
87    }
88
89    // Create seal storage DB
90    let seal_db = sqlx::sqlite::SqlitePoolOptions::new()
91        .min_connections(1)
92        .max_connections(1)
93        .connect_with(
94            SqliteConnectOptions::new()
95                .create_if_missing(true)
96                .foreign_keys(true)
97                .filename(config.seal_storage_path()),
98        )
99        .await?;
100
101    // Start replication of seal storage if configured
102    if let Some(replication) = config.replication.as_ref() {
103        let p = replicate(
104            replication,
105            None,
106            &config.seal_storage_path(),
107            &replication.seal_bucket_url(),
108        )?;
109        child_processes.set_seal_storage_replication(p).await;
110    }
111
112    let encrypted_pool = Arc::new(EncryptedPool::new(&config.encrypted_storage_path()));
113    let repos = Repos::new(encrypted_pool, seal_db);
114
115    // Run migration
116    crate::migrations::migrate_unecrypted_db(&repos.unecrypted_pool).await?;
117
118    let router = Arc::new(Router::new(repos.mount.clone()));
119    let expiration = Arc::new(ExpirationManager::new(
120        Arc::clone(&router),
121        repos.clone(),
122        SystemClock::new(),
123    ));
124    let ctx = Context {
125        config: Arc::clone(&config),
126        repos: repos.clone(),
127        child_processes: child_processes.clone(),
128        expiration_manager: Arc::clone(&expiration),
129        router: Arc::clone(&router),
130    };
131
132    // Mount system backend
133    let system = new_system_backend(ctx);
134    router.mount_system(Arc::new(system));
135
136    let server_router_svc = ServiceBuilder::new()
137        .concurrency_limit(1000)
138        .timeout(Duration::from_secs(30))
139        .layer(RequestBodyLimitLayer::new(1024 * 16))
140        .layer(CorsLayer::permissive())
141        .layer(LogicalRequestResponseLayer::new())
142        .layer(StorageStateExtensionLayer::new(Arc::clone(&repos.pool)))
143        .layer(NamespaceExtensionLayer::new(repos.namespace.clone()))
144        .layer(AuthServiceLayer::new(
145            repos.token.clone(),
146            repos.namespace.clone(),
147        ))
148        .layer(LeaseRegistrationLayer::new(
149            expiration.clone(),
150            repos.token.clone(),
151            repos.entity.clone(),
152        ))
153        .service(RouterService::new(router.clone()));
154
155    let addr = SocketAddr::from(([0, 0, 0, 0], config.port));
156    let covert_server = hyper::Server::bind(&addr).serve(Shared::new(server_router_svc));
157    let addr = covert_server.local_addr();
158    let covert_server = covert_server.with_graceful_shutdown(shutdown_handler);
159
160    info!("listening on {addr}");
161    if let Some(tx) = port_tx {
162        let _ = tx.send(addr.port());
163    }
164
165    // And run forever...
166    if let Err(error) = covert_server.await {
167        tracing::error!(?error, "Encountered server error. Shutting down.");
168        return Err(error.into());
169    }
170    Ok(())
171}