use axum::Json;
use ordinary_app::server::{OrdinaryAppServer, SecurityMode};
use ordinary_auth::{Auth, OsRng};
use ordinary_config::{OrdinaryConfig, RuntimeMode};
use ordinary_storage::Storage;
use ordinary_storage::saferlmdb::{self, EnvBuilder};
use ordinary_types::ContentObject;
use rand_chacha::rand_core::{Rng, SeedableRng};
use rustls_acme::AcmeConfig;
use rustls_acme::caches::DirCache;
use serde::Deserialize;
use tokio::io::AsyncWriteExt;
use tokio::net::TcpListener;
use axum::body::Bytes;
use axum::extract::{Path, Query, State};
use axum::http::{HeaderMap, StatusCode};
use axum::response::IntoResponse;
use tokio::net::TcpStream;
use tokio_rustls::StartHandshake;
use tokio_rustls::rustls::ServerConfig;
use tokio_rustls::rustls::crypto::ring;
use tracing::{Instrument, Span};
use crate::server::ops::{
LogFilesParams, LogMetadataParams, log_files_helper, log_files_metadata_helper,
};
use crate::server::{ADMIN, APPLICATION, ROOT, WrappedOrdinaryAppServer};
use crate::server::{OrdinaryApiServerState, ProvisionMode};
use anyhow::{anyhow, bail};
use getrandom::SysRng;
use ordinary_action::PrivilegedComponents;
use ordinary_monitor::{LOG_FILE_FORMAT, LogFileMetadata, LogLine};
use ordinary_utils::shutdown_signal;
use std::fs;
use std::fs::File;
use std::io::Write;
use std::net::{Ipv6Addr, SocketAddr};
use std::path::PathBuf;
use std::sync::Arc;
use time::UtcDateTime;
use utoipa::IntoParams;
use x25519_dalek::{PublicKey, StaticSecret};
#[utoipa::path(
get,
path = "/app/logs/metadata",
tags = [ROOT, ADMIN],
params(LogMetadataParams),
responses(
(status = 401, description = "unauthorized for operation"),
(status = 200, description = "log files metadata", body = [LogFileMetadata]),
),
security(
("access" = []),
),
)]
pub async fn log_files_metadata(
State(state): State<Arc<OrdinaryApiServerState>>,
Query(query): Query<LogMetadataParams>,
headers: HeaderMap,
) -> impl IntoResponse {
let span = tracing::info_span!("app", domain = %query.d);
let span = span.in_scope(|| tracing::info_span!("logs"));
async {
match crate::server::check_ordinary_auth(&state, &headers, 1, query.d.as_str()) {
Ok(account) => account,
Err(code) => return code.into_response(),
};
log_files_metadata_helper(&state, &query)
.await
.into_response()
}
.instrument(span)
.await
}
#[utoipa::path(
get,
path = "/app/logs/files/{file}",
tags = [ROOT, ADMIN],
params(LogFilesParams),
responses(
(status = 401, description = "unauthorized for operation"),
(status = 200, description = "jsonl log lines compressed with Zstd", body = [LogLine]),
),
security(
("access" = []),
),
)]
pub async fn log_files(
State(state): State<Arc<OrdinaryApiServerState>>,
Query(query): Query<LogFilesParams>,
Path(file_name): Path<String>,
headers: HeaderMap,
) -> impl IntoResponse {
let span = tracing::info_span!("app", domain = %query.d);
let span = span.in_scope(|| tracing::info_span!("logs"));
async move {
match crate::server::check_ordinary_auth(&state, &headers, 1, query.d.as_str()) {
Ok(account) => account,
Err(code) => return code.into_response(),
};
log_files_helper(&state, &query, file_name.as_str())
.await
.into_response()
}
.instrument(span)
.await
}
#[utoipa::path(
put,
path = "/app/deploy",
tag = APPLICATION,
request_body(content = OrdinaryConfig, content_type = "application/json"),
responses(
(status = 500, description = "internal error"),
(status = 401, description = "unauthorized for operation"),
(status = 200, description = "deploy application success, return port as BE bytes", body = [u8]),
),
security(
("access" = []),
),
)]
#[allow(clippy::too_many_lines)]
pub async fn deploy(
State(state): State<Arc<OrdinaryApiServerState>>,
headers: HeaderMap,
Json(mut config): Json<OrdinaryConfig>,
) -> impl IntoResponse {
let span = tracing::info_span!("app", domain = %config.domain);
let span = span.in_scope(|| tracing::info_span!("deploy", version = %config.version));
let domain = config.domain.clone();
let Ok(shared_rt) = tokio::runtime::Handle::try_current() else {
return StatusCode::INTERNAL_SERVER_ERROR.into_response();
};
async {
match crate::server::check_ordinary_auth(&state, &headers, 6, &domain) {
Ok(a) => a,
Err(err) => return err.into_response(),
};
if let Err(err) = config.validate() {
tracing::error!(%err);
return (StatusCode::BAD_REQUEST, err.to_string()).into_response();
}
if let Err(err) = OrdinaryAppServer::validate_cnames(&config).await {
tracing::error!(%err);
return (StatusCode::BAD_REQUEST, err.to_string()).into_response();
}
if let Err(err) =
config.check_config_against_limits(&state.config.limits, &state.privileged_domains)
{
tracing::error!(%err);
return (StatusCode::BAD_REQUEST, err.to_string()).into_response();
}
config.load_internal_compression();
let app_path = state.apps_dir.join(&config.domain);
let mut apps = state.apps.write().await;
let config_path = app_path.join("ordinary.json");
let is_first_deploy = !config_path.exists();
let (auth, storage) = if is_first_deploy {
let erased_path = app_path.join("erased");
if erased_path.exists()
&& let Err(err) = tokio::fs::remove_file(&erased_path).await
{
tracing::error!(%err);
}
if state.stored_logs
&& let Some(monitor) = &*state.monitor
&& let Err(err) = monitor.add(
config.domain.as_str(),
&config.cnames.clone().unwrap_or_default(),
)
{
tracing::error!(%err, "failed to add domain to log manager");
}
let (auth, storage) = match dbs(&state, &config, &app_path, None).await {
Ok(value) => value,
Err(err) => {
tracing::error!(%err);
return StatusCode::INTERNAL_SERVER_ERROR.into_response();
}
};
(auth, storage)
} else {
let killed_path = app_path.join("killed");
if killed_path.exists()
&& let Err(err) = tokio::fs::remove_file(&killed_path).await
{
tracing::error!(%err);
}
let dbs = state.dbs.lock().await;
let Some((_env, auth, storage)) = dbs.get(&domain) else {
tracing::error!("failed to get dbs for existing app");
return StatusCode::INTERNAL_SERVER_ERROR.into_response();
};
(auth.clone(), storage.clone())
};
match start(
&state,
&config,
auth,
storage,
match state
.provision_mode
.as_ref()
.unwrap_or(&ProvisionMode::Localhost)
{
ProvisionMode::Localhost => ordinary_app::server::ProvisionMode::Localhost,
ProvisionMode::Staging => ordinary_app::server::ProvisionMode::Staging,
ProvisionMode::Production => ordinary_app::server::ProvisionMode::Production,
},
app_path.join("certs"),
shared_rt,
Some(
state
.server_span
.clone()
.in_scope(|| tracing::info_span!("app", domain = %domain)),
),
)
.await
{
Ok((port, app, terminate_tx, stream_tx)) => {
if let Some(wrapped_app) = apps.get(&domain) {
if let Err(err) = wrapped_app.terminate_tx.send(false) {
tracing::error!(%err, "terminate failed");
} else {
tracing::info!(
wrapped_app.port,
version = &wrapped_app.app.config.version,
"killed"
);
}
}
let Ok(mut config_file) = tokio::fs::File::create(config_path).await else {
tracing::error!("failed to create ordinary.json");
return StatusCode::INTERNAL_SERVER_ERROR.into_response();
};
let ordinary_json = match simd_json::serde::to_vec_pretty(&*app.config) {
Ok(oj) => oj,
Err(err) => {
tracing::error!(%err, "failed to serialize ordinary.json");
return StatusCode::INTERNAL_SERVER_ERROR.into_response();
}
};
if let Err(err) = config_file.write_all(&ordinary_json).await {
tracing::error!(%err, "failed to write_all ordinary.json");
return StatusCode::INTERNAL_SERVER_ERROR.into_response();
}
if let Err(err) = config_file.flush().await {
tracing::error!(%err, "failed to flush ordinary.json");
return StatusCode::INTERNAL_SERVER_ERROR.into_response();
}
let cnames = app.config.cnames.clone();
let static_secret = StaticSecret::random_from_rng(OsRng);
let public_key = PublicKey::from(&static_secret);
let app = Arc::new(WrappedOrdinaryAppServer {
port,
app,
terminate_tx,
stream_tx,
dh_keypair: (static_secret, public_key),
});
if let Some(cnames) = cnames {
for cname in cnames {
apps.insert(cname, app.clone());
}
}
apps.insert(domain, app);
if port > 0 {
tracing::info!(port, "deployed");
} else {
tracing::info!("deployed");
}
(StatusCode::OK, Bytes::copy_from_slice(&port.to_be_bytes())).into_response()
}
Err(_) => StatusCode::INTERNAL_SERVER_ERROR.into_response(),
}
}
.instrument(span)
.await
}
#[derive(Deserialize)]
pub struct MigrateBody {
config: OrdinaryConfig,
_content: Vec<ContentObject>,
}
pub async fn migrate(
State(state): State<Arc<OrdinaryApiServerState>>,
headers: HeaderMap,
Json(MigrateBody {
mut config,
_content,
}): Json<MigrateBody>,
) -> impl IntoResponse {
let span = tracing::info_span!("app", domain = %config.domain);
let span = span.in_scope(|| tracing::info_span!("migrate", version = %config.version));
async {
if let Err(err) = config.validate() {
tracing::error!(%err);
return (StatusCode::BAD_REQUEST, err.to_string()).into_response();
}
if let Err(err) =
config.check_config_against_limits(&state.config.limits, &state.privileged_domains)
{
tracing::error!(%err);
return (StatusCode::BAD_REQUEST, err.to_string()).into_response();
}
let domain = config.domain.clone();
match crate::server::check_ordinary_auth(&state, &headers, 6, &domain) {
Ok(account) => account,
Err(code) => return code.into_response(),
};
config.load_internal_compression();
let apps = state.apps.read().await;
if apps.get(&domain).is_some() {
}
StatusCode::OK.into_response()
}
.instrument(span)
.await
}
pub async fn list(
State(state): State<Arc<OrdinaryApiServerState>>,
headers: HeaderMap,
_body: Bytes,
) -> impl IntoResponse {
let span = tracing::info_span!("root");
let span = span.in_scope(|| tracing::info_span!("apps"));
let span = span.in_scope(|| tracing::info_span!("list"));
span.in_scope(|| {
let _domain = match crate::server::check_ordinary_auth(&state, &headers, 1, "root") {
Ok(account) => account,
Err(code) => return (code, Bytes::new()),
};
(StatusCode::OK, Bytes::new())
})
}
pub async fn bridge(
State(state): State<Arc<OrdinaryApiServerState>>,
headers: HeaderMap,
Path(domain): Path<String>,
_body: Bytes,
) -> impl IntoResponse {
let span = tracing::info_span!("app", %domain);
let span = span.in_scope(|| tracing::info_span!("bridge"));
span.in_scope(|| {
let _domain = match crate::server::check_ordinary_auth(&state, &headers, 7, &domain) {
Ok(account) => account,
Err(code) => return (code, Bytes::new()),
};
(StatusCode::OK, Bytes::new())
})
}
#[derive(Deserialize, IntoParams)]
pub struct Params {
d: String,
}
#[utoipa::path(
put,
path = "/app/kill",
tag = APPLICATION,
responses(
(status = 500, description = "internal error"),
(status = 401, description = "unauthorized for operation"),
(status = 200, description = "application successfully shut down"),
),
security(
("access" = []),
),
)]
pub async fn kill(
State(state): State<Arc<OrdinaryApiServerState>>,
Query(Params { d }): Query<Params>,
headers: HeaderMap,
) -> impl IntoResponse {
let domain = d;
let span = tracing::info_span!("app", %domain);
let span = span.in_scope(|| tracing::info_span!("kill"));
async {
match crate::server::check_ordinary_auth(&state, &headers, 8, &domain) {
Ok(account) => account,
Err(code) => return code.into_response(),
};
let mut config = None;
let apps = state.apps.read().await;
if let Some(wrapped_app) = apps.get(&domain) {
let killed_path = state.apps_dir.join(&domain).join("killed");
match tokio::fs::File::create(&killed_path).await {
Ok(mut killed_file) => {
let Ok(content) = UtcDateTime::now().format(LOG_FILE_FORMAT) else {
tracing::error!("failed to format date");
return StatusCode::INTERNAL_SERVER_ERROR.into_response();
};
if let Err(err) = killed_file.write_all(content.as_bytes()).await {
tracing::error!(%err);
return StatusCode::INTERNAL_SERVER_ERROR.into_response();
}
if let Err(err) = killed_file.flush().await {
tracing::error!(%err);
return StatusCode::INTERNAL_SERVER_ERROR.into_response();
}
}
Err(err) => {
tracing::error!(%err);
return StatusCode::INTERNAL_SERVER_ERROR.into_response();
}
}
wrapped_app.app.set_killed(true);
config = Some(wrapped_app.app.config.clone());
}
if let Some(config) = config {
tracing::info!(
version = %config.version,
"killed"
);
}
StatusCode::OK.into_response()
}
.instrument(span)
.await
}
#[utoipa::path(
post,
path = "/app/restart",
tag = APPLICATION,
responses(
(status = 500, description = "internal error"),
(status = 401, description = "unauthorized for operation"),
(status = 200, description = "restart application success, return port as BE bytes", body = [u8]),
),
security(
("access" = []),
),
)]
pub async fn restart(
State(state): State<Arc<OrdinaryApiServerState>>,
Query(Params { d }): Query<Params>,
headers: HeaderMap,
) -> impl IntoResponse {
let domain = d;
let span = tracing::info_span!("app", %domain);
let span = span.in_scope(|| tracing::info_span!("restart"));
let Ok(shared_rt) = tokio::runtime::Handle::try_current() else {
return StatusCode::INTERNAL_SERVER_ERROR.into_response();
};
async {
match crate::server::check_ordinary_auth(&state, &headers, 8, &domain) {
Ok(account) => account,
Err(code) => return code.into_response(),
};
let app_path = state.apps_dir.join(&domain);
let killed_path = app_path.join("killed");
if killed_path.exists()
&& let Err(err) = tokio::fs::remove_file(&killed_path).await
{
tracing::error!(%err);
}
let config_path = app_path.join("ordinary.json");
let mut apps = state.apps.write().await;
let dbs = state.dbs.lock().await;
if let Some((_env, auth, storage)) = dbs.get(&domain)
&& let Ok(mut config_bytes) = tokio::fs::read(&config_path).await
&& let Ok(config) = simd_json::from_slice::<OrdinaryConfig>(&mut config_bytes[..])
{
match start(
&state,
&config,
auth.clone(),
storage.clone(),
match state
.provision_mode
.as_ref()
.unwrap_or(&ProvisionMode::Localhost)
{
ProvisionMode::Localhost => ordinary_app::server::ProvisionMode::Localhost,
ProvisionMode::Staging => ordinary_app::server::ProvisionMode::Staging,
ProvisionMode::Production => ordinary_app::server::ProvisionMode::Production,
},
state.apps_dir.join(&domain).join("certs"),
shared_rt,
Some(
state
.server_span
.clone()
.in_scope(|| tracing::info_span!("app", domain = %domain)),
),
)
.await
{
Ok((port, app, terminate_tx, stream_tx)) => {
if let Some(wrapped_app) = apps.get(&domain) {
if let Err(err) = wrapped_app.terminate_tx.send(false) {
tracing::error!(%err, "terminate failed");
} else {
tracing::info!(
wrapped_app.port,
version = &wrapped_app.app.config.version,
"killed"
);
}
}
let cnames = app.config.cnames.clone();
let static_secret = StaticSecret::random_from_rng(OsRng);
let public_key = PublicKey::from(&static_secret);
let app = Arc::new(WrappedOrdinaryAppServer {
port,
app,
terminate_tx,
stream_tx,
dh_keypair: (static_secret, public_key),
});
if let Some(cnames) = cnames {
for cname in cnames {
apps.insert(cname, app.clone());
}
}
apps.insert(domain, app);
if port > 0 {
tracing::info!(port, "restarted");
} else {
tracing::info!("restarted");
}
return (StatusCode::OK, Bytes::copy_from_slice(&port.to_be_bytes()))
.into_response();
}
Err(err) => tracing::error!(%err, "restart failed"),
}
}
StatusCode::NOT_FOUND.into_response()
}
.instrument(span)
.await
}
#[utoipa::path(
delete,
path = "/app/erase",
tag = APPLICATION,
params(Params),
responses(
(status = 500, description = "internal error"),
(status = 401, description = "unauthorized for operation"),
(status = 200, description = "erase application success"),
),
security(
("access" = []),
),
)]
pub async fn erase(
State(state): State<Arc<OrdinaryApiServerState>>,
Query(Params { d }): Query<Params>,
headers: HeaderMap,
) -> impl IntoResponse {
let domain = d;
let span = tracing::info_span!("app", %domain);
let span = span.in_scope(|| tracing::info_span!("erase"));
async {
match crate::server::check_ordinary_auth(&state, &headers, 9, &domain) {
Ok(account) => account,
Err(code) => return code.into_response(),
};
let apps = state.apps.read().await;
if let Some(wrapped_app) = apps.get(&domain) {
if let Err(err) = wrapped_app.terminate_tx.send(false) {
tracing::error!(%err, "terminate failed");
} else {
let erased_path = state.apps_dir.join(&domain).join("erased");
match tokio::fs::File::create(&erased_path).await {
Ok(mut erased_file) => {
let Ok(content) = UtcDateTime::now().format(LOG_FILE_FORMAT) else {
tracing::error!("failed to format date");
return StatusCode::INTERNAL_SERVER_ERROR.into_response();
};
if let Err(err) = erased_file.write_all(content.as_bytes()).await {
tracing::error!(%err);
return StatusCode::INTERNAL_SERVER_ERROR.into_response();
}
if let Err(err) = erased_file.flush().await {
tracing::error!(%err);
return StatusCode::INTERNAL_SERVER_ERROR.into_response();
}
}
Err(err) => {
tracing::error!(%err);
return StatusCode::INTERNAL_SERVER_ERROR.into_response();
}
}
if let Some(monitor) = &*state.monitor
&& let Err(err) = monitor.erase(
domain.as_str(),
&wrapped_app.app.config.cnames.clone().unwrap_or_default(),
)
{
tracing::error!(%err);
}
let killed_path = state.apps_dir.join(&domain).join("killed");
let config_path = state.apps_dir.join(&domain).join("ordinary.json");
for file in [killed_path, config_path] {
if file.exists()
&& let Err(err) = tokio::fs::remove_file(&file).await
{
tracing::error!(%err);
}
}
let certs_path = state.apps_dir.join(&domain).join("certs");
let data_path = state.apps_dir.join(&domain).join("data");
let keys_path = state.apps_dir.join(&domain).join("keys");
for dir in [certs_path, data_path, keys_path] {
if dir.exists()
&& let Err(err) = tokio::fs::remove_dir_all(&dir).await
{
tracing::error!(%err);
}
}
tracing::info!(
port = wrapped_app.port,
version = &wrapped_app.app.config.version,
"erased"
);
}
return StatusCode::OK.into_response();
}
StatusCode::NOT_FOUND.into_response()
}
.instrument(span)
.await
}
pub async fn accounts_list(
State(state): State<Arc<OrdinaryApiServerState>>,
Query(Params { d }): Query<Params>,
headers: HeaderMap,
) -> impl IntoResponse {
let domain = d;
let span = tracing::info_span!("app", %domain);
let span = span.in_scope(|| tracing::info_span!("accounts"));
let span = span.in_scope(|| tracing::info_span!("list"));
async {
match crate::server::check_ordinary_auth(&state, &headers, 1, &domain) {
Ok(account) => account,
Err(code) => return (code, Bytes::new()),
};
let apps = state.apps.read().await;
if let Some(wrapped_app) = apps.get(&domain) {
match wrapped_app.app.list_accounts() {
Ok(accounts) => return (StatusCode::OK, accounts),
Err(err) => tracing::error!(%err, "failed to list accounts"),
}
}
(StatusCode::INTERNAL_SERVER_ERROR, Bytes::new())
}
.instrument(span)
.await
}
pub async fn invite(
State(state): State<Arc<OrdinaryApiServerState>>,
Query(Params { d }): Query<Params>,
headers: HeaderMap,
) -> impl IntoResponse {
let domain = d;
let span = tracing::info_span!("app", %domain);
let span = span.in_scope(|| tracing::info_span!("accounts"));
let span = span.in_scope(|| tracing::info_span!("invite"));
async {
let account = match crate::server::check_ordinary_auth(&state, &headers, 0, &domain) {
Ok(account) => account,
Err(code) => return (code, Bytes::new()),
};
let apps = state.apps.read().await;
if let Some(wrapped_app) = apps.get(&domain) {
match wrapped_app.app.get_invite(&state.domain, &account, None) {
Ok(v) => (StatusCode::OK, v),
Err(err) => {
tracing::error!(%err);
(StatusCode::INTERNAL_SERVER_ERROR, Bytes::new())
}
}
} else {
(StatusCode::NOT_FOUND, Bytes::new())
}
}
.instrument(span)
.await
}
pub async fn dbs(
state: &Arc<OrdinaryApiServerState>,
config: &OrdinaryConfig,
app_path: &std::path::Path,
app_span: Option<Span>,
) -> anyhow::Result<(Arc<Auth>, Arc<Storage>)> {
if let Some(app_span) = app_span {
inner_dbs(state, config, app_path)
.instrument(app_span)
.await
} else {
inner_dbs(state, config, app_path).await
}
}
async fn inner_dbs(
state: &Arc<OrdinaryApiServerState>,
config: &OrdinaryConfig,
app_path: &std::path::Path,
) -> anyhow::Result<(Arc<Auth>, Arc<Storage>)> {
let data_dir = app_path.join("data");
fs::create_dir_all(&data_dir)?;
let ps = page_size::get() as u64;
let storage_size = match &config.storage_size {
Some(s) => *s,
None => OrdinaryConfig::default_storage_size().unwrap_or_default(),
};
let remainder = storage_size % ps;
let mapsize = (storage_size - remainder) + ps;
tracing::info!(mapsize = %bytesize::ByteSize(mapsize).display().si_short());
let env = Arc::new(unsafe {
let mut env_builder = EnvBuilder::new()?;
env_builder.set_maxreaders(126)?;
env_builder.set_mapsize(usize::try_from(mapsize)?)?;
env_builder.set_maxdbs(13)?;
env_builder.open(
match data_dir.to_str() {
Some(v) => v,
None => bail!("data_dir not a str"),
},
&saferlmdb::open::Flags::empty(),
0o600,
)?
});
let keys_dir = app_path.join("keys");
fs::create_dir_all(&keys_dir)?;
let auth_key_path = keys_dir.join("auth");
let auth_key: [u8; 32] = if auth_key_path.exists() && auth_key_path.is_file() {
let auth_key = fs::read(&auth_key_path)?;
let auth_key: [u8; 32] = auth_key[..].try_into()?;
auth_key
} else {
let mut auth_key = [0u8; 32];
let mut rng = rand_chacha::ChaCha20Rng::try_from_rng(&mut SysRng)?;
rng.fill_bytes(&mut auth_key[..]);
let mut auth_key_file = File::create(auth_key_path)?;
auth_key_file.write_all(&auth_key)?;
auth_key_file.flush()?;
auth_key
};
let auth = Arc::new(Auth::new(
config.domain.clone(),
config.auth.clone(),
auth_key,
env.clone(),
)?);
let auth_clone = auth.clone();
let storage_key_path = keys_dir.join("storage");
let storage_key: [u8; 32] = if storage_key_path.exists() && storage_key_path.is_file() {
let storage_key = fs::read(&storage_key_path)?;
let storage_key: [u8; 32] = storage_key[..].try_into()?;
storage_key
} else {
let mut storage_key = [0u8; 32];
let mut rng = rand_chacha::ChaCha20Rng::try_from_rng(&mut SysRng)?;
rng.fill_bytes(&mut storage_key[..]);
let mut storage_key_file = File::create(storage_key_path)?;
storage_key_file.write_all(&storage_key)?;
storage_key_file.flush()?;
storage_key
};
let storage = Arc::new(Storage::new(
state.config.limits.storage.clone(),
if let Some(models) = &config.models {
models.clone()
} else {
vec![]
},
if let Some(content) = &config.content {
content.definitions.clone()
} else {
vec![]
},
storage_key,
&env,
data_dir.join("search"),
state.log_size,
)?);
let storage_clone = storage.clone();
let mut dbs = state.dbs.lock().await;
dbs.insert(config.domain.clone(), (env, auth, storage));
Ok((auth_clone, storage_clone))
}
#[allow(
clippy::too_many_arguments,
clippy::too_many_lines,
clippy::fn_params_excessive_bools,
clippy::type_complexity
)]
pub async fn start(
state: &Arc<OrdinaryApiServerState>,
config: &OrdinaryConfig,
auth: Arc<Auth>,
storage: Arc<Storage>,
provision: ordinary_app::server::ProvisionMode,
cert_dir_path: PathBuf,
shared_rt: tokio::runtime::Handle,
app_span: Option<Span>,
) -> Result<
(
u16,
Arc<OrdinaryAppServer>,
tokio::sync::watch::Sender<bool>,
tokio::sync::mpsc::UnboundedSender<(StartHandshake<TcpStream>, SocketAddr, Span)>,
),
&'static str,
> {
let (acme_state, configs) = match provision {
ordinary_app::server::ProvisionMode::Localhost => (None, None),
ordinary_app::server::ProvisionMode::Staging
| ordinary_app::server::ProvisionMode::Production => {
let Some(contacts) = &config.contacts else {
return Err("contacts cannot be unset");
};
let contacts = contacts
.iter()
.map(|c| format!("mailto:{c}"))
.collect::<Vec<String>>();
let cache_path = cert_dir_path.join("cache");
let mut domains = vec![&config.domain];
if let Some(cnames) = &config.cnames {
for cname in cnames {
domains.push(cname);
}
}
let state = AcmeConfig::new(domains)
.contact(contacts)
.cache_option(Some(DirCache::new(cache_path)))
.directory_lets_encrypt(
provision == ordinary_app::server::ProvisionMode::Production,
)
.state();
let challenge_rustls_config = state.challenge_rustls_config();
let mut default_rustls_config =
(match ServerConfig::builder_with_provider(Arc::new(ring::default_provider()))
.with_safe_default_protocol_versions()
{
Ok(v) => v,
Err(_) => return Err("failed to get default protocol versions"),
})
.with_no_client_auth()
.with_cert_resolver(state.resolver());
default_rustls_config.alpn_protocols = vec![b"h2".to_vec(), b"http/1.1".to_vec()];
(
Some(state),
Some((challenge_rustls_config, Arc::new(default_rustls_config))),
)
}
};
let privileged_components =
state
.privileged_domains
.contains(&config.domain)
.then_some(PrivilegedComponents {
auth: state.auth.clone(),
app_domains: state.app_domains.clone(),
apps_dir: state.apps_dir.clone(),
});
match OrdinaryAppServer::new(
config.clone(),
state.config.limits.clone(),
auth,
storage,
state.secure,
state.secure_cookies,
state.log_headers,
state.log_ips,
Some(state.server_span.clone()),
app_span.clone(),
privileged_components,
)
.await
{
Ok(app) => {
let app = Arc::new(app);
let (stream_tx, mut stream_rx) = tokio::sync::mpsc::unbounded_channel::<(
StartHandshake<TcpStream>,
SocketAddr,
Span,
)>();
let app_clone = app.clone();
let (tx, rx) = oneshot::channel();
let (terminate_tx, mut terminate_rx) = tokio::sync::watch::channel(true);
let server_span = state.server_span.clone();
let domain = app_clone.config.domain.clone();
let cname_list = app_clone.config.cnames.clone().unwrap_or_default();
let domain_clone = domain.clone();
let provision_str = match provision {
ordinary_app::server::ProvisionMode::Production => "production",
ordinary_app::server::ProvisionMode::Staging => "staging",
ordinary_app::server::ProvisionMode::Localhost => "localhost",
};
let dedicated_port = provision == ordinary_app::server::ProvisionMode::Localhost
|| state.dedicated_ports;
let mode = if state.secure {
SecurityMode::Secure(cert_dir_path, provision)
} else {
SecurityMode::Insecure
};
let (signal_tx, close_rx) = {
let signal_tx_read = state.signal_tx.read();
let close_rx_read = state.close_rx.read();
(signal_tx_read.clone(), close_rx_read.clone())
};
let app_span_clone = app_span.clone();
let app_fut = async move {
let domain = domain_clone;
let domain_clone = domain.clone();
let acme_span = server_span.in_scope(|| {
#[cfg(tracing_unstable)]
{
tracing::info_span!("acme", domain = %domain_clone, cnames = tracing::field::valuable(&cname_list), provision = %provision_str)
}
#[cfg(not(tracing_unstable))]
{
tracing::info_span!("acme", domain = %domain_clone, cnames = ?cname_list, provision = %provision_str)
}
});
if let Some(state) = acme_state
&& let Some(signal_tx) = &signal_tx
{
ordinary_utils::acme_task(acme_span.clone(), state, signal_tx.clone());
}
let channel_app = app_clone.clone();
let configs_clone = configs.clone();
let app_span_clone2 = app_span_clone.clone();
let terminate_rx_clone = terminate_rx.clone();
let channel_fut = async move {
if let Some((challenge_rustls_config, default_rustls_config)) = configs_clone
&& let Some(signal_tx) = &signal_tx
&& let Some(close_rx) = &close_rx
{
while let Some((start_handshake, addr, tls_span)) = tokio::select! {
message = stream_rx.recv() => {
message
}
() = signal_tx.closed() => {
if let Some(span) = &app_span_clone2 {
span.in_scope(|| {
tracing::warn!("handshake channel closed");
});
} else {
tracing::warn!(domain = %channel_app.config.domain, "handshake channel closed");
}
None
}
} {
let signal_tx = signal_tx.clone();
let close_rx = close_rx.clone();
let acme_span = acme_span.clone();
let app_service = channel_app.service.clone();
let challenge_rustls_config = challenge_rustls_config.clone();
let default_rustls_config = default_rustls_config.clone();
let tls_span = tls_span.clone();
let mut terminate_rx_clone = terminate_rx_clone.clone();
tokio::spawn(async move {
if let Err(err) = OrdinaryAppServer::handle_handshake(
&tls_span,
&acme_span,
start_handshake,
app_service,
addr,
challenge_rustls_config,
default_rustls_config,
&signal_tx,
&close_rx,
Some(&mut terminate_rx_clone),
)
.await
{
tls_span.in_scope(|| {
tracing::error!(%err, "handshake");
});
}
});
}
} else {
while let Some((_, _, _)) = stream_rx.recv().await {
}
}
drop(signal_tx);
drop(close_rx);
Ok(())
};
let res = if dedicated_port {
let Ok(listener) = TcpListener::bind((Ipv6Addr::UNSPECIFIED, 0)).await else {
tracing::error!(
"failed to assign port for domain {}",
app_clone.config.domain
);
return;
};
let addr = listener.local_addr().expect("no local addr for listener");
if let Some(span) = &app_span_clone {
span.in_scope(|| {
tracing::info!(port = addr.port(), "running");
});
} else {
tracing::info!(
domain = app_clone.config.domain,
port = addr.port(),
"running"
);
}
if let Err(err) = tx.send(addr.port()) {
if let Some(span) = &app_span_clone {
span.in_scope(|| {
tracing::error!(%err);
});
} else {
tracing::error!(domain = app_clone.config.domain, %err);
}
}
let socket_fut =
app_clone.start(listener, None, mode, configs, shutdown_signal);
tokio::select! {
channel = channel_fut => channel,
socket = socket_fut => socket,
_ = terminate_rx.changed() => Err(anyhow!("terminated")),
}
} else {
if let Err(err) = tx.send(0) {
if let Some(span) = &app_span_clone {
span.in_scope(|| {
tracing::error!(%err);
});
} else {
tracing::error!(domain = app_clone.config.domain, %err);
}
}
tokio::select! {
channel = channel_fut => channel,
_ = terminate_rx.changed() => Err(anyhow!("terminated")),
}
};
match res {
Ok(()) => {
if let Some(span) = app_span_clone {
span.in_scope(|| tracing::info!("stopped"));
} else {
tracing::info!(domain, "stopped");
}
}
Err(err) => {
if let Some(span) = app_span_clone {
span.in_scope(|| tracing::error!(%err));
} else {
tracing::error!(domain, %err);
}
}
}
};
if let Some(runtime) = &config.runtime {
match runtime {
RuntimeMode::Shared => {
std::thread::spawn(move || {
if let Some(app_span) = app_span {
app_span.in_scope(|| {
tracing::info!(runtime = %"shared", "spawning");
});
} else {
tracing::info!(domain, runtime = %"shared", "spawning");
}
shared_rt.block_on(app_fut);
});
}
RuntimeMode::SingleThreaded => {
std::thread::spawn(move || {
if let Some(app_span) = app_span.clone() {
app_span.in_scope(|| {
tracing::info!(runtime = %"single-threaded", "spawning");
});
} else {
tracing::info!(domain, runtime = %"single-threaded", "spawning");
}
let Ok(rt) = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
else {
if let Some(app_span) = app_span.clone() {
app_span.in_scope(|| {
tracing::error!("failed to start runtime");
});
} else {
tracing::error!(domain, "failed to start runtime");
}
return;
};
rt.block_on(app_fut);
});
}
RuntimeMode::MultiThreaded => {
std::thread::spawn(move || {
if let Some(app_span) = app_span.clone() {
app_span.in_scope(|| {
tracing::info!(runtime = %"multi-threaded", "spawning");
});
} else {
tracing::info!(domain, runtime = %"multi-threaded", "spawning");
}
let Ok(rt) = tokio::runtime::Runtime::new() else {
if let Some(app_span) = app_span {
app_span.in_scope(|| {
tracing::error!("failed to start runtime");
});
} else {
tracing::error!(domain, "failed to start runtime");
}
return;
};
rt.block_on(app_fut);
});
}
}
} else {
std::thread::spawn(move || {
if let Some(app_span) = app_span {
app_span.in_scope(|| {
tracing::info!(runtime = %"shared", "spawning");
});
} else {
tracing::info!(domain, runtime = %"shared", "spawning");
}
shared_rt.block_on(app_fut);
});
}
match rx.recv() {
Ok(port) => return Ok((port, app, terminate_tx, stream_tx)),
Err(err) => tracing::error!(?err, "failed to receive port"),
}
}
Err(err) => tracing::error!(%err, "failed to create app"),
}
Err("failed at some point")
}