use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::Duration;
use anyhow::Context;
use clap::Args;
use daemon::{Daemon, DaemonOptions};
use schema::{Config, IndexName};
use tokio::net::TcpListener;
use tokio::sync::{mpsc, oneshot};
use crate::DEFAULT_LOCK;
use crate::backends::FlussoBackends;
use crate::http::{self, BasicAuth, DEFAULT_ADMIN_PASSWORD, DEFAULT_ADMIN_USER};
use crate::telemetry::observer::OtelObserver;
use crate::telemetry::{self, metrics};
const DEFAULT_PUBLIC_ADDRESS: SocketAddr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 9464);
const DEFAULT_PRIVATE_ADDRESS: SocketAddr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 9465);
#[derive(Debug, Args)]
pub(crate) struct RunArgs {
#[arg(short, long, env = "FLUSSO_CONFIG")]
config: Option<PathBuf>,
#[arg(long, env = "FLUSSO_LOCK", default_value = DEFAULT_LOCK)]
lock: PathBuf,
#[arg(long, env = "FLUSSO_LOCKED")]
locked: bool,
#[arg(long, env = "FLUSSO_SLOT", default_value = "flusso")]
slot: String,
#[arg(long, env = "FLUSSO_PUBLICATION", default_value = "flusso")]
publication: String,
#[arg(long, env = "FLUSSO_MANAGE_PUBLICATION")]
manage_publication: Option<bool>,
#[arg(long, env = "FLUSSO_SKIP_BACKFILL")]
skip_backfill: bool,
#[arg(long, env = "FLUSSO_PRETTY")]
pretty: bool,
#[arg(long, env = "FLUSSO_QUEUE_CAPACITY", default_value_t = 1024)]
queue_capacity: usize,
#[arg(long, env = "FLUSSO_PUBLIC_ADDRESS")]
public_address: Option<SocketAddr>,
#[arg(long, env = "FLUSSO_PRIVATE_ADDRESS")]
private_address: Option<SocketAddr>,
#[arg(long, env = "FLUSSO_ADMIN_USER", default_value = DEFAULT_ADMIN_USER)]
admin_user: String,
#[arg(long, env = "FLUSSO_ADMIN_PASSWORD", default_value = DEFAULT_ADMIN_PASSWORD)]
admin_password: String,
#[arg(long, env = "FLUSSO_LAG_POLL_SECS", default_value_t = 15)]
lag_poll_secs: u64,
#[arg(long, env = "FLUSSO_INDEX_PREFIX")]
index_prefix: Option<String>,
}
pub(crate) async fn execute(args: RunArgs) -> anyhow::Result<()> {
let tracer_provider = telemetry::init_tracing();
let mut config = resolve_config(&args)?;
if let Some(prefix) = args.index_prefix.clone() {
config.prefix = prefix;
}
schema::validate_index_prefix(&config.prefix)
.map_err(|reason| anyhow::anyhow!("invalid index prefix: {reason}"))?;
let public_addr = args
.public_address
.or(config.server.public_address)
.unwrap_or(DEFAULT_PUBLIC_ADDRESS);
let private_addr = args
.private_address
.or(config.server.private_address)
.unwrap_or(DEFAULT_PRIVATE_ADDRESS);
let public_listener = TcpListener::bind(public_addr)
.await
.with_context(|| format!("binding public HTTP surface to {public_addr}"))?;
let private_listener = TcpListener::bind(private_addr)
.await
.with_context(|| format!("binding private HTTP surface to {private_addr}"))?;
let basic_auth = Arc::new(BasicAuth::new(args.admin_user, args.admin_password));
if basic_auth.uses_default_password() {
tracing::warn!(
"the private control surface is using the DEFAULT admin password ({DEFAULT_ADMIN_PASSWORD:?}); \
set --admin-password / FLUSSO_ADMIN_PASSWORD before exposing it"
);
}
let metrics = metrics::init(true)?;
let registry = metrics.registry.clone();
let options = DaemonOptions {
slot: args.slot,
publication: args.publication,
manage_publication: args
.manage_publication
.unwrap_or(config.source.manage_publication),
skip_backfill: args.skip_backfill,
queue_capacity: args.queue_capacity,
pretty: args.pretty,
lag_poll_interval: Duration::from_secs(args.lag_poll_secs),
};
let status = Arc::new(daemon::Status::new(
config.indexes.keys().cloned(),
std::time::Instant::now(),
));
let _in_flight_gauge = metrics::register_in_flight_gauge(Arc::clone(&status));
let (reindex_tx, mut reindex_rx) = mpsc::channel::<IndexName>(8);
let (public_shutdown, public_rx) = oneshot::channel::<()>();
let (private_shutdown, private_rx) = oneshot::channel::<()>();
let public = tokio::spawn(http::serve(
"public",
public_listener,
http::public_router(http::PublicState {
status: Arc::clone(&status),
registry,
}),
public_rx,
));
let private = tokio::spawn(http::serve(
"private",
private_listener,
http::private_router(
http::PrivateState {
status: Arc::clone(&status),
reindex: reindex_tx,
},
Arc::clone(&basic_auth),
),
private_rx,
));
let backends: Arc<dyn daemon::Backends> = Arc::new(FlussoBackends);
let otel_observer: Arc<dyn daemon::Observer> = Arc::new(OtelObserver::new());
let result = loop {
let running = Daemon::new(config.clone(), Arc::clone(&backends))
.with_options(options.clone())
.with_observer(Arc::clone(&otel_observer))
.with_status(Arc::clone(&status))
.start()
.await?;
let index = tokio::select! {
outcome = running.run(shutdown_signal()) => break outcome,
Some(index) = reindex_rx.recv() => index,
};
tracing::info!(
index = index.as_ref(),
"reindex requested; staging a fresh generation and restarting"
);
if let Err(error) = stage_reindex(&config, &options, backends.as_ref(), &index).await {
tracing::error!(%error, index = index.as_ref(), "failed to stage reindex; restarting without it");
}
};
let _ = public_shutdown.send(());
let _ = private_shutdown.send(());
for (task, surface) in [(public, "public"), (private, "private")] {
if let Err(error) = task.await {
tracing::warn!(%error, surface, "HTTP server task did not shut down cleanly");
}
}
metrics.shutdown();
if let Some(provider) = tracer_provider
&& let Err(error) = provider.shutdown()
{
tracing::warn!(%error, "failed to flush OTLP tracer on shutdown");
}
result
}
async fn stage_reindex(
config: &Config,
options: &DaemonOptions,
backends: &dyn daemon::Backends,
index: &IndexName,
) -> anyhow::Result<()> {
let mapping = config
.resolve_mappings()
.into_iter()
.find(|mapping| &mapping.index == index)
.with_context(|| format!("no such index {}", index.as_ref()))?;
let sink = backends.sink(config, options).await?;
sink.reindex(&mapping).await?;
Ok(())
}
#[derive(Debug, PartialEq, Eq)]
enum ConfigPlan {
UseLock,
Compile(PathBuf),
Fallback,
Missing(PathBuf),
}
fn plan_config(
locked: bool,
config: Option<&Path>,
default_config: &Path,
exists: impl Fn(&Path) -> bool,
) -> ConfigPlan {
if locked {
return ConfigPlan::UseLock;
}
match config {
Some(path) if exists(path) => ConfigPlan::Compile(path.to_path_buf()),
Some(path) => ConfigPlan::Missing(path.to_path_buf()),
None if exists(default_config) => ConfigPlan::Compile(default_config.to_path_buf()),
None => ConfigPlan::Fallback,
}
}
fn resolve_config(args: &RunArgs) -> anyhow::Result<Config> {
let default_config = PathBuf::from(crate::DEFAULT_CONFIG);
match plan_config(
args.locked,
args.config.as_deref(),
&default_config,
|path| path.exists(),
) {
ConfigPlan::UseLock => load_lock(&args.lock),
ConfigPlan::Compile(config_path) => {
let compiled = schema::compile(&config_path)
.with_context(|| format!("compiling config from {}", config_path.display()))?;
schema::write(&compiled, &args.lock)
.with_context(|| format!("writing compiled lock to {}", args.lock.display()))?;
tracing::info!(
indexes = compiled.config.indexes.len(),
lock = %args.lock.display(),
"compiled config and wrote lock"
);
Ok(compiled.config)
}
ConfigPlan::Fallback => load_lock(&args.lock).with_context(|| {
format!(
"no {} to compile and no compiled lock at {}; create a config or build a lock first",
crate::DEFAULT_CONFIG,
args.lock.display()
)
}),
ConfigPlan::Missing(config_path) => {
anyhow::bail!("config file {} not found", config_path.display())
}
}
}
fn load_lock(path: &Path) -> anyhow::Result<Config> {
schema::load_compiled(path)
.with_context(|| format!("loading compiled lock from {}", path.display()))
}
#[cfg(test)]
mod tests;
async fn shutdown_signal() {
let ctrl_c = async {
if let Err(error) = tokio::signal::ctrl_c().await {
tracing::warn!(%error, "failed to listen for Ctrl-C");
std::future::pending::<()>().await;
}
};
#[cfg(unix)]
let terminate = async {
match tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate()) {
Ok(mut signal) => {
signal.recv().await;
}
Err(error) => {
tracing::warn!(%error, "failed to install SIGTERM handler");
std::future::pending::<()>().await;
}
}
};
#[cfg(not(unix))]
let terminate = std::future::pending::<()>();
tokio::select! {
() = ctrl_c => {},
() = terminate => {},
}
}