use std::sync::Arc;
use metrics::gauge;
use talea_core::store::{Store, StoreError};
use crate::config::Config;
use crate::http::auth::AuthConfig;
use crate::service::LedgerService;
pub async fn run(config: Config) -> Result<(), Box<dyn std::error::Error>> {
let _metrics_handle = crate::metrics::install()?;
let (store, pool_sampler, backend) = connect_store(&config).await?;
let mut entries = AuthConfig::single(config.api_token.clone()).entries;
if let Some(path) = &config.tokens_file {
let text = std::fs::read_to_string(path)
.map_err(|e| format!("reading TALEA_TOKENS_FILE {path}: {e}"))?;
let file_entries = crate::http::auth::parse_tokens(&text)
.map_err(|e| format!("TALEA_TOKENS_FILE {path}: {e}"))?;
if file_entries.is_empty() {
return Err(
format!("TALEA_TOKENS_FILE {path}: no [tokens.<name>] entries found").into(),
);
}
for (secret, scope) in &file_entries {
if entries.iter().any(|(s, _)| s == secret) {
return Err(format!(
"TALEA_TOKENS_FILE {path}: entry '{}' duplicates TALEA_API_TOKEN",
scope.name
)
.into());
}
}
entries.extend(file_entries);
}
if entries.is_empty() {
tracing::warn!(
"TALEA_API_TOKEN and TALEA_TOKENS_FILE not set - the API is OPEN (dev mode)"
);
} else {
tracing::info!(tokens = entries.len(), "auth: scoped bearer tokens active");
}
if let Some(bind) = config.metrics_bind {
let metrics_app = crate::metrics::router(_metrics_handle.clone());
let listener = tokio::net::TcpListener::bind(bind).await?;
tracing::info!(bind = %bind, "metrics listener up");
tokio::spawn(async move {
axum::serve(listener, metrics_app).await.ok();
});
}
let service = Arc::new(LedgerService::with_write_config(
store,
crate::write_router::WriteConfig {
queue_depth: config.write_queue_depth,
batch_max: config.write_batch_max,
..Default::default()
},
));
let sampler_service = Arc::clone(&service);
tokio::spawn(async move {
let mut tick = tokio::time::interval(std::time::Duration::from_secs(5));
loop {
tick.tick().await;
let (size, idle) = pool_sampler();
gauge!("talea_db_pool_connections", "state" => "size").set(size as f64);
gauge!("talea_db_pool_connections", "state" => "idle").set(idle as f64);
let (books, queued) = sampler_service.write_queue_stats();
gauge!("talea_write_active_books").set(books as f64);
gauge!("talea_write_queue_depth").set(queued as f64);
}
});
let app = crate::http::routes::router_with_batch_max(
service,
AuthConfig { entries },
config.max_inflight,
backend,
config.http_batch_max,
);
let listener = tokio::net::TcpListener::bind(config.bind).await?;
tracing::info!(bind = %config.bind, "talea-server listening");
axum::serve(listener, app)
.with_graceful_shutdown(async {
tokio::signal::ctrl_c().await.ok();
tracing::info!("shutting down");
})
.await?;
Ok(())
}
async fn connect_store(
config: &Config,
) -> Result<
(
Arc<dyn Store>,
Box<dyn Fn() -> (u32, usize) + Send>,
&'static str,
),
Box<dyn std::error::Error>,
> {
if config.db_url.starts_with("postgres://") || config.db_url.starts_with("postgresql://") {
if config.max_inflight as u32 > config.db_pool {
tracing::warn!(
db_pool = config.db_pool,
max_inflight = config.max_inflight,
"TALEA_DB_POOL is far below TALEA_MAX_INFLIGHT; sustained SSE \
subscriber fan-out on Postgres can starve the pool (each \
subscription holds one connection)"
);
}
let pool = sqlx::postgres::PgPoolOptions::new()
.max_connections(config.db_pool)
.acquire_timeout(Config::DB_ACQUIRE_TIMEOUT)
.connect(&config.db_url)
.await?;
let sampler_pool = pool.clone();
let sampler: Box<dyn Fn() -> (u32, usize) + Send> =
Box::new(move || (sampler_pool.size(), sampler_pool.num_idle()));
let store = talea_store_postgres::PgTaleaStore::new(pool);
store.migrate().await.map_err(box_store_err)?;
Ok((Arc::new(store), sampler, "postgres"))
} else if config.db_url.starts_with("sqlite:") {
use sqlx::sqlite::{
SqliteConnectOptions, SqliteJournalMode, SqlitePoolOptions, SqliteSynchronous,
};
use std::str::FromStr;
if config.db_url.contains(":memory:") {
return Err(
"sqlite::memory: is not supported by the server (each pooled \
connection would get its own database); use a file path"
.into(),
);
}
let opts = SqliteConnectOptions::from_str(&config.db_url)?
.create_if_missing(true)
.journal_mode(SqliteJournalMode::Wal)
.synchronous(SqliteSynchronous::Normal)
.busy_timeout(std::time::Duration::from_secs(5))
.foreign_keys(true);
let pool = SqlitePoolOptions::new()
.max_connections(config.db_pool)
.acquire_timeout(Config::DB_ACQUIRE_TIMEOUT)
.connect_with(opts)
.await?;
let sampler_pool = pool.clone();
let sampler: Box<dyn Fn() -> (u32, usize) + Send> =
Box::new(move || (sampler_pool.size(), sampler_pool.num_idle()));
let store = talea_store_sqlite::SqliteTaleaStore::new(pool);
store.migrate().await.map_err(box_store_err)?;
Ok((Arc::new(store), sampler, "sqlite"))
} else if let Some(path) = config.db_url.strip_prefix("log://") {
let log_opts = if config.log_snapshot_every.is_some()
|| config.log_idem_hot_cap.is_some()
|| config.log_segment_max.is_some()
{
let defaults = talea_store_log::LogStoreOptions::default();
talea_store_log::LogStoreOptions {
snapshot_every: config.log_snapshot_every.unwrap_or(defaults.snapshot_every),
idem_hot_cap: config.log_idem_hot_cap.unwrap_or(defaults.idem_hot_cap),
segment_max: config.log_segment_max.unwrap_or(defaults.segment_max),
}
} else {
talea_store_log::LogStoreOptions::default()
};
let store = talea_store_log::LogTaleaStore::open_with(std::path::Path::new(path), log_opts)
.await
.map_err(box_store_err)?;
let sampler: Box<dyn Fn() -> (u32, usize) + Send> = Box::new(|| (0, 0));
Ok((Arc::new(store), sampler, "log"))
} else {
Err(format!(
"unsupported TALEA_DB_URL scheme: {} (expected postgres://..., sqlite://..., or log://<dir>)",
config.db_url
)
.into())
}
}
fn box_store_err(e: StoreError) -> Box<dyn std::error::Error> {
Box::new(e)
}