pict-rs-aggregator 0.2.0-beta.2

A simple image aggregation service for pict-rs
Documentation
use actix_web::{App, HttpServer};
use awc::Client;
use clap::Parser;
use console_subscriber::ConsoleLayer;
use opentelemetry::{
    sdk::{propagation::TraceContextPropagator, Resource},
    KeyValue,
};
use opentelemetry_otlp::WithExportConfig;
use std::time::Duration;
use tracing::subscriber::set_global_default;
use tracing_actix_web::TracingLogger;
use tracing_awc::Tracing;
use tracing_error::ErrorLayer;
use tracing_log::LogTracer;
use tracing_subscriber::{
    filter::Targets, fmt::format::FmtSpan, layer::SubscriberExt, registry::LookupSpan, Layer,
    Registry,
};
use url::Url;

#[actix_rt::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let config = pict_rs_aggregator::Config::parse();

    init_logger(
        config.opentelemetry_url(),
        config.console_event_buffer_size(),
    )?;

    let mut db_path = config.db_path().to_owned();
    db_path.push("sled");
    db_path.push("db-0-34");
    let db = sled::Config::new()
        .path(db_path)
        .cache_capacity(config.sled_cache_capacity())
        .open()?;
    let bind_address = config.bind_address();
    let state = pict_rs_aggregator::state(config, "", db)?;

    HttpServer::new(move || {
        let client = Client::builder()
            .wrap(Tracing)
            .timeout(Duration::from_secs(30))
            .add_default_header(("User-Agent", "pict_rs_aggregator-v0.1.0"))
            .finish();

        App::new()
            .wrap(TracingLogger::default())
            .configure(|cfg| pict_rs_aggregator::configure(cfg, state.clone(), client))
    })
    .bind(bind_address)?
    .run()
    .await?;

    Ok(())
}

fn init_logger(
    opentelemetry_url: Option<&Url>,
    console_event_buffer_size: Option<usize>,
) -> Result<(), Box<dyn std::error::Error>> {
    LogTracer::init()?;

    opentelemetry::global::set_text_map_propagator(TraceContextPropagator::new());

    let targets: Targets = std::env::var("RUST_LOG")
        .unwrap_or_else(|_| "info".into())
        .parse()?;

    let format_layer = tracing_subscriber::fmt::layer()
        .with_span_events(FmtSpan::NEW | FmtSpan::CLOSE)
        .with_filter(targets.clone());

    let subscriber = Registry::default()
        .with(format_layer)
        .with(ErrorLayer::default());

    if let Some(buffer_size) = console_event_buffer_size {
        let console_layer = ConsoleLayer::builder()
            .with_default_env()
            .server_addr(([0, 0, 0, 0], 6669))
            .event_buffer_capacity(buffer_size)
            .spawn();

        let subscriber = subscriber.with(console_layer);

        init_subscriber(subscriber, targets, opentelemetry_url)
    } else {
        init_subscriber(subscriber, targets, opentelemetry_url)
    }
}

fn init_subscriber<S>(
    subscriber: S,
    targets: Targets,
    opentelemetry_url: Option<&Url>,
) -> Result<(), Box<dyn std::error::Error>>
where
    S: SubscriberExt + Send + Sync,
    for<'a> S: LookupSpan<'a>,
{
    if let Some(url) = opentelemetry_url {
        let tracer =
            opentelemetry_otlp::new_pipeline()
                .tracing()
                .with_trace_config(opentelemetry::sdk::trace::config().with_resource(
                    Resource::new(vec![KeyValue::new("service.name", "pict-rs-aggregator")]),
                ))
                .with_exporter(
                    opentelemetry_otlp::new_exporter()
                        .tonic()
                        .with_endpoint(url.as_str()),
                )
                .install_batch(opentelemetry::runtime::Tokio)?;

        let otel_layer = tracing_opentelemetry::layer()
            .with_tracer(tracer)
            .with_filter(targets);

        let subscriber = subscriber.with(otel_layer);

        set_global_default(subscriber)?;
    } else {
        set_global_default(subscriber)?;
    }

    Ok(())
}