embystream 0.0.25

Another Emby streaming application (frontend/backend separation) written in Rust.
Documentation
use std::{error::Error, fs, path::Path, process, str::FromStr, sync::Arc};

use clap::{CommandFactory, FromArgMatches};
use figlet_rs::FIGfont;
use hyper::{StatusCode, body::Incoming};
use tokio::signal as TokioSignal;

use embystream::{
    AppState, GATEWAY_LOGGER_DOMAIN, INIT_LOGGER_DOMAIN, debug_log, error_log,
    i18n::lookup, info_log,
};
use embystream::{
    backend::{
        service::AppStreamService, stream::StreamMiddleware,
        stream_relay::StreamRelayMiddleware,
    },
    cli::{Cli, Commands, RunArgs},
    cli_lang::{detect_lang_from_env_early, localize_cli_command},
    cli_wizard,
    config::{core::Config, general::StreamMode},
    frontend::{forward::ForwardMiddleware, service::AppForwardService},
    gateway::{
        CorsMiddleware, LoggerMiddleware, OptionsMiddleware,
        PlaylistMockMiddleware, ReverseProxyMiddleware, chain::Handler,
        client_filter::ClientAgentFilterMiddleware, context::Context,
        core::Gateway, filtered_routes::COMPILED_UA_FILTERS,
        response::ResponseBuilder,
        reverse_proxy_filter::ReverseProxyFilterMiddleware,
    },
    logger::{LogLevel, Logger, start_cleanup_task},
    system::SystemInfo,
};

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
    let help_lang = detect_lang_from_env_early();
    let mut cmd = Cli::command();
    localize_cli_command(&mut cmd, help_lang);
    let matches = cmd.get_matches();
    let cli = Cli::from_arg_matches(&matches).unwrap_or_else(|e| e.exit());

    match cli.command {
        Some(Commands::Run(run_args)) => {
            run_app(&run_args).await?;
        }
        Some(Commands::Config(ref cfg_args)) => {
            if let Err(e) = cli_wizard::run(cfg_args, cli.lang) {
                let prefix = lookup(cli.lang, "error.wizard_prefix");
                eprintln!("{prefix}: {e}");
                process::exit(1);
            }
        }
        None => {}
    }
    Ok(())
}

async fn run_app(
    run_args: &RunArgs,
) -> Result<(), Box<dyn Error + Send + Sync>> {
    setup_figlet();

    let config = setup_load_config(run_args);

    setup_logger(&config)?;
    setup_print_info(&config);

    if let Err(e) = validate_dual_mode_ports(&config) {
        error_log!(INIT_LOGGER_DOMAIN, "{}", e);
        process::exit(1);
    }

    setup_crypto_provider()?;

    let app_state = setup_cache(&config).await;

    setup_rate_limiters(&app_state).await;

    let mode = {
        let config_guard = app_state.get_config().await;
        config_guard.general.stream_mode
    };

    if matches!(mode, StreamMode::Frontend | StreamMode::Dual) {
        let frontend_state = app_state.clone();
        tokio::spawn(async move {
            if let Err(e) = setup_frontend_gateway(&frontend_state).await {
                error_log!(
                    INIT_LOGGER_DOMAIN,
                    "Frontend gateway failed: {}",
                    e
                );
            }
        });
    }

    if matches!(mode, StreamMode::Backend | StreamMode::Dual) {
        let backend_state = app_state.clone();
        tokio::spawn(async move {
            if let Err(e) = setup_backend_gateway(&backend_state).await {
                error_log!(INIT_LOGGER_DOMAIN, "Backend gateway failed: {}", e);
            }
        });
    }

    TokioSignal::ctrl_c().await?;
    info_log!(INIT_LOGGER_DOMAIN, "Shutting down EmbyStream...");

    Ok(())
}

fn setup_figlet() {
    if let Ok(standard_font) = FIGfont::standard() {
        if let Some(figure) = standard_font.convert("EMBYSTREAM") {
            println!("{figure}");
        }
    }
}

fn setup_print_info(config: &Config) {
    info_log!(INIT_LOGGER_DOMAIN, "Initializing EmbyStream...");

    let system_info = SystemInfo::new();
    let configurarion = if cfg!(debug_assertions) {
        "Development"
    } else {
        "Production"
    };
    info_log!(
        INIT_LOGGER_DOMAIN,
        "Environment: {:?} [{:?}], Version: {:?}",
        system_info.environment,
        &configurarion,
        system_info.version
    );
    info_log!(
        INIT_LOGGER_DOMAIN,
        "Log level: {}",
        config.log.level.as_str()
    );
    info_log!(
        INIT_LOGGER_DOMAIN,
        "Memory mode: {}",
        config.general.memory_mode.as_str()
    );
    info_log!(
        INIT_LOGGER_DOMAIN,
        "Stream mode: {}",
        config.general.stream_mode
    );
    info_log!(INIT_LOGGER_DOMAIN, "User agent: {}", config.user_agent)
}

fn setup_load_config(run_args: &RunArgs) -> Config {
    match Config::load_or_init(run_args) {
        Ok(config) => {
            info_log!(INIT_LOGGER_DOMAIN, "Configuration loaded successfully.");
            config
        }
        Err(e) => {
            error_log!(
                INIT_LOGGER_DOMAIN,
                "Configuration initialization failed: {}",
                e
            );
            process::exit(1);
        }
    }
}

fn setup_logger(config: &Config) -> Result<(), Box<dyn Error + Send + Sync>> {
    let log_path = Path::new(&config.log.root_path);
    fs::create_dir_all(log_path)?;

    let level = LogLevel::from_str(&config.log.level).unwrap_or(LogLevel::Info);
    Logger::builder()
        .with_level(level)
        .with_directory(&config.log.root_path)
        .with_file_prefix(&config.log.prefix)
        .build();

    start_cleanup_task(config.log.root_path.clone());
    info_log!(
        INIT_LOGGER_DOMAIN,
        "Log cleanup task started (retention: 7 days)"
    );

    Ok(())
}

async fn setup_cache(config: &Config) -> Arc<AppState> {
    let app_state = AppState::new(config.clone()).await;

    let problematic_clients = app_state.get_problematic_clients().await;
    info_log!(
        INIT_LOGGER_DOMAIN,
        "Problematic clients: {:?}",
        problematic_clients
    );

    Arc::new(app_state)
}

fn validate_dual_mode_ports(config: &Config) -> Result<(), String> {
    if config.general.stream_mode == StreamMode::Dual {
        if let (Some(frontend), Some(backend)) =
            (&config.frontend, &config.backend)
        {
            if frontend.listen_port == backend.listen_port {
                return Err(format!(
                    "Dual mode port conflict: frontend & backend cannot both use {}.",
                    frontend.listen_port
                ));
            }
        }
    }
    Ok(())
}

fn setup_crypto_provider() -> Result<(), Box<dyn Error + Send + Sync>> {
    Gateway::setup_crypto_provider().map_err(|e| {
        error_log!(INIT_LOGGER_DOMAIN, "Setup crypto-provider failed: {:?}", e);
        e
    })
}

async fn setup_rate_limiters(app_state: &Arc<AppState>) {
    app_state.init_rate_limiters().await;
    info_log!(INIT_LOGGER_DOMAIN, "Rate limiter refill task started.");
}

async fn setup_frontend_gateway(
    app_state: &Arc<AppState>,
) -> Result<(), Box<dyn Error + Send + Sync>> {
    let config = app_state.get_config().await.clone();
    let mode = config.general.stream_mode;

    if !matches!(mode, StreamMode::Frontend | StreamMode::Dual) {
        debug_log!(
            INIT_LOGGER_DOMAIN,
            "Skipping frontend gateway setup - stream mode not enabled"
        );
        return Ok(());
    }

    debug_log!(INIT_LOGGER_DOMAIN, "Successfully start frontend listener");

    let frontend = config.frontend.as_ref().ok_or_else(|| {
        error_log!(
            INIT_LOGGER_DOMAIN,
            "Error: Frontend configuration not exist"
        );
        "Frontend config missing"
    })?;

    let addr = format!("0.0.0.0:{}", frontend.listen_port);
    let service = Arc::new(AppForwardService::new(app_state.clone()));

    let emby_base_url = config.emby.get_uri().to_string();
    let api_cache = app_state.get_api_response_cache().await.clone();

    info_log!(
        INIT_LOGGER_DOMAIN,
        "Frontend reverse proxy target: {}",
        emby_base_url
    );

    let mut gateway = Gateway::new(&addr)
        .add_middleware(Box::new(LoggerMiddleware))
        .add_middleware(Box::new(
            ClientAgentFilterMiddleware::new(app_state.clone())
                .with_filter_paths(COMPILED_UA_FILTERS.clone()),
        ))
        .add_middleware(Box::new(ReverseProxyFilterMiddleware::new(
            frontend.clone().anti_reverse_proxy,
        )))
        .add_middleware(Box::new(CorsMiddleware))
        .add_middleware(Box::new(OptionsMiddleware))
        .add_middleware(Box::new(PlaylistMockMiddleware))
        .add_middleware(Box::new(ForwardMiddleware::new(service)))
        .add_middleware(Box::new(ReverseProxyMiddleware::new(
            emby_base_url,
            api_cache,
            app_state.clone(),
        )));

    gateway.set_handler(default_handler());
    gateway.listen().await?;

    Ok(())
}

async fn setup_backend_gateway(
    app_state: &Arc<AppState>,
) -> Result<(), Box<dyn Error + Send + Sync>> {
    let config = app_state.get_config().await.clone();
    let mode = config.general.clone().stream_mode;

    if !matches!(mode, StreamMode::Backend | StreamMode::Dual) {
        debug_log!(
            INIT_LOGGER_DOMAIN,
            "Skipping backend gateway setup - stream mode not enabled"
        );
        return Ok(());
    }

    debug_log!(INIT_LOGGER_DOMAIN, "Successfully start backend listener");

    let backend = config.backend.as_ref().ok_or_else(|| {
        error_log!(
            INIT_LOGGER_DOMAIN,
            "Error: Backend configuration not exist"
        );
        "Backend config missing"
    })?;

    let addr = format!("0.0.0.0:{}", backend.listen_port);
    let service = Arc::new(AppStreamService::new(app_state.clone()));

    let mut gateway = Gateway::new(&addr)
        .with_tls(config.get_ssl_cert_path(), config.get_ssl_key_path())
        .add_middleware(Box::new(LoggerMiddleware))
        .add_middleware(Box::new(ClientAgentFilterMiddleware::new(
            app_state.clone(),
        )))
        .add_middleware(Box::new(CorsMiddleware))
        .add_middleware(Box::new(OptionsMiddleware))
        .add_middleware(Box::new(StreamRelayMiddleware::new(
            config.backend_nodes.clone(),
        )))
        .add_middleware(Box::new(StreamMiddleware::new(
            config.backend_nodes.clone(),
            service,
            app_state.clone(),
        )));

    gateway.set_handler(default_handler());
    gateway.listen().await?;

    Ok(())
}

fn default_handler() -> Handler {
    Arc::new(|_ctx: Context, _body: Option<Incoming>| {
        debug_log!(GATEWAY_LOGGER_DOMAIN, "Fallback to default middleware...");
        ResponseBuilder::with_status_code(StatusCode::SERVICE_UNAVAILABLE)
    })
}