use std::time::Duration;
use fdlimit::Outcome;
use synd_o11y::{
metric,
opentelemetry::OpenTelemetryGuard,
tracing_subscriber::otel_metrics::{self, metrics_event_filter},
};
use tokio_metrics::RuntimeMonitor;
use tracing::{error, info};
use synd_api::{
args::{self, Args, ObservabilityOptions},
config,
dependency::Dependency,
repository::kvsd::ConnectKvsdFailed,
serve::listen_and_serve,
shutdown::Shutdown,
};
fn init_tracing(options: &ObservabilityOptions) -> Option<OpenTelemetryGuard> {
use synd_o11y::{
opentelemetry::init_propagation,
tracing_subscriber::{audit, otel_log, otel_trace},
};
use tracing_subscriber::{
filter::EnvFilter, fmt, layer::SubscriberExt, util::SubscriberInitExt as _, Layer as _,
Registry,
};
let color = {
use supports_color::Stream;
supports_color::on(Stream::Stdout).is_some()
};
let show_src = options.show_code_location;
let show_target = options.show_target;
let (opentelemetry_layers, guard) = {
match options.otlp_endpoint.as_deref() {
None | Some("") => (None, None),
Some(endpoint) => {
let resource = synd_o11y::opentelemetry::resource(config::NAME, config::VERSION);
let trace_layer =
otel_trace::layer(endpoint, resource.clone(), options.trace_sampler_ratio);
let log_layer = otel_log::layer(endpoint, resource.clone());
let metrics_layer = otel_metrics::layer(endpoint, resource);
(
Some(trace_layer.and_then(log_layer).and_then(metrics_layer)),
Some(synd_o11y::opentelemetry::OpenTelemetryGuard),
)
}
}
};
Registry::default()
.with(
fmt::Layer::new()
.with_ansi(color)
.with_timer(fmt::time::UtcTime::rfc_3339())
.with_file(show_src)
.with_line_number(show_src)
.with_target(show_target)
.with_filter(metrics_event_filter())
.and_then(opentelemetry_layers)
.with_filter(
EnvFilter::try_from_env("SYND_LOG")
.or_else(|_| EnvFilter::try_new("info"))
.unwrap()
.add_directive(audit::Audit::directive()),
),
)
.with(audit::layer())
.init();
init_propagation();
guard
}
async fn run(
Args {
kvsd,
bind,
serve,
tls,
o11y,
}: Args,
shutdown: Shutdown,
) -> anyhow::Result<()> {
let dep = Dependency::new(kvsd, tls, serve).await?;
info!(
version = config::VERSION,
otlp_endpoint=?o11y.otlp_endpoint,
request_timeout=?dep.serve_options.timeout,
request_body_limit_bytes=dep.serve_options.body_limit_bytes,
concurrency_limit=?dep.serve_options.concurrency_limit,
"Runinng...",
);
listen_and_serve(dep, bind.into(), shutdown).await
}
fn init_file_descriptor_limit() {
fdlimit::raise_fd_limit()
.inspect(|outcome| {
match outcome {
Outcome::LimitRaised { from, to } => {
tracing::info!("Raise fd limit {from} to {to}");
}
Outcome::Unsupported => tracing::info!("Raise fd limit unsupported"),
};
})
.ok();
}
fn init_runtime_monitor() {
let handle = tokio::runtime::Handle::current();
let runtime_monitor = RuntimeMonitor::new(&handle);
tokio::spawn(async move {
for interval in runtime_monitor.intervals() {
metric!(counter.runtime.poll = interval.total_polls_count);
metric!(counter.runtime.busy_duration = interval.total_busy_duration.as_secs_f64());
tokio::time::sleep(Duration::from_secs(60)).await;
}
});
}
#[tokio::main]
async fn main() {
let args = args::parse();
let _guard = init_tracing(&args.o11y);
let shutdown = Shutdown::watch_signal();
init_file_descriptor_limit();
init_runtime_monitor();
if let Err(err) = run(args, shutdown).await {
if let Some(err) = err.downcast_ref::<ConnectKvsdFailed>() {
error!("{err}: make sure kvsd is running");
} else {
error!("{err:?}");
}
std::process::exit(1);
}
}