righvalor 0.1.0

RighValor: AI Infrastructure and Applications Framework for the Far Edge
use std::{str::FromStr, sync::Arc};

use righ_dm_rs::RighIpv4Addr;
use tokio::{signal, sync::RwLock};

use crate::{
    config::{ValorCommand, ValorConfig},
    logging::init_logging_with_reload,
    master::{ValorTaskManager, ValorWorkerRegistry},
    northbound::{ApiServerManager, ValorApiServerConfig},
    service::ValorServiceRegistry,
    types::{ValorID, ValorIdExt},
    uds::{UdsClientTool, UdsServer},
};

mod common;
mod config;

mod logging;
mod master;
mod northbound;
mod righgravity_client;
mod runtime;
mod service;
mod types;
mod uds;
mod worker;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    // Initialize logging first
    init_logging_with_reload();

    // Load Valor configuration from CLI and compile-time TOML
    let config = ValorConfig::new()?;

    tracing::info!("RighValor starting with config: {:?}", config);

    // Execute command based on CLI input
    match config.cli.command {
        ValorCommand::Master => {
            let port = config.app.master_port();
            tracing::info!(
                "Starting Valor Master node '{}' on port {}",
                config.cli.id,
                port
            );

            // Shared registry instance for master and API
            let tuning = config.app.tuning().clone();
            let registry = Arc::new(RwLock::new(ValorWorkerRegistry::new(
                tuning.unreachable_after_secs,
                tuning.eviction_after_secs,
            )));
            // Apply tuning to registry
            {
                let mut guard = registry.write().await;
                guard.apply_tuning(&tuning);
            }
            // Shared task manager instance
            let task_manager = Arc::new(ValorTaskManager::new());
            // Result aggregator plugged into task manager
            let result_aggregator = Arc::new(crate::master::ValorResultAggregator::new());
            task_manager.set_result_aggregator(result_aggregator).await;
            // Deprecated: Master no longer loads service.toml. Keep an empty registry for API constructor signature.
            let service_registry = Arc::new(RwLock::new(ValorServiceRegistry::new()));
            let master_actor = master::startup_master_node_with_registry(
                port,
                &config,
                registry.clone(),
                task_manager.clone(),
            )
            .await;

            // Start UDS server for master
            let uds_server =
                UdsServer::new(config.cli.uds.clone(), &config.cli.id, master_actor.clone());

            // Start UDS server in background
            let uds_span = tracing::info_span!(
                "flow.master.uds_server",
                master_id = %config.cli.id,
                uds_path = %config.cli.uds.display()
            );
            tokio::spawn({
                let uds_server = uds_server.clone();
                let uds_span = uds_span.clone();
                async move {
                    let _e = uds_span.enter();
                    if let Err(e) = uds_server.start().await {
                        tracing::error!("Failed to start UDS server: {}", e);
                    }
                }
            });

            // Create RighGravity client (for fallback/manual queries if needed)
            // Note: Node status is now pushed from RighGravity, not polled
            tracing::info!("RighGravity client initialized");
            let righgravity_client = Arc::new(righgravity_client::RighGravityClient::new(
                "http://localhost:55100",
            ));

            // Initialize online nodes counter with current state from RighGravity
            let online_nodes_counter = Arc::new(tokio::sync::RwLock::new(
                northbound::ValorOnlineNodesInfo::default(),
            ));

            // Query initial node states from RighGravity (only once at startup)
            tracing::info!("Querying initial node states from RighGravity...");
            match righgravity_client.fetch_node_states().await {
                Ok(nodes) => {
                    let mut counter = online_nodes_counter.write().await;
                    for node in &nodes {
                        if righgravity_client::RighGravityClient::is_node_online(node) {
                            counter.online_nodes.insert(node.id.clone());
                        }
                    }
                    counter.total_seen = nodes.len();
                    let online_count = counter.online_nodes.len();
                    tracing::info!(
                        "Initial node status: {}/{} online (including controller)",
                        online_count,
                        counter.total_seen
                    );
                }
                Err(e) => {
                    tracing::warn!(
                        "Failed to query initial node states from RighGravity: {}. Starting with empty state.",
                        e
                    );
                }
            }

            // Start Northbound API server
            let api_config = ValorApiServerConfig {
                host: "0.0.0.0".to_string(),
                port: 8080,
            };
            let api_manager = ApiServerManager::new(
                api_config,
                registry.clone(),
                service_registry.clone(),
                task_manager.clone(),
                master_actor.clone(),
                righgravity_client.clone(),
                online_nodes_counter.clone(),
            );

            // Start API server in background
            let api_span =
                tracing::info_span!("flow.master.api_server", host = "0.0.0.0", port = 8080);
            tokio::spawn({
                let api_span = api_span.clone();
                async move {
                    let _e = api_span.enter();
                    if let Err(e) = api_manager.start().await {
                        tracing::error!("Failed to start northbound API server: {}", e);
                    }
                }
            });

            // Periodic cleanup & fallback tick
            tokio::spawn({
                let registry = registry.clone();
                let task_manager = task_manager.clone();
                async move {
                    let mut interval = tokio::time::interval(std::time::Duration::from_secs(2));
                    loop {
                        interval.tick().await;
                        // Detect workers that are unreachable (but not yet evicted) and requeue
                        let to_requeue: Vec<_> = {
                            let guard = registry.read().await;
                            guard
                                .iter()
                                .filter_map(|(id, rec)| {
                                    if rec.state == crate::master::ValorWorkerState::Unreachable {
                                        Some(id.clone())
                                    } else {
                                        None
                                    }
                                })
                                .collect()
                        };
                        for wid in to_requeue {
                            let n = task_manager
                                .requeue_tasks_for_unreachable_worker(&wid)
                                .await;
                            if n > 0 {
                                tracing::warn!(worker = %wid, n, "Worker unreachable; tasks requeued");
                            }
                        }
                    }
                }
            });
        }
        ValorCommand::Worker => {
            let port = config.app.worker_port();
            tracing::info!(
                "Starting Valor Worker node '{}' on port {}",
                config.cli.id,
                port
            );

            // Propagate service.toml path to worker via env if provided
            if let Some(path) = config.cli.service_toml.as_ref() {
                std::env::set_var("VALOR_SERVICE_TOML", path);
                tracing::info!("Worker service.toml set from CLI: {}", path.display());
            }

            worker::startup_worker_node(port, &config).await;
        }
        ValorCommand::RegisterWorker {
            worker_id,
            worker_ip,
        } => {
            tracing::info!(
                "Registering worker '{}@{}' with master '{}'",
                worker_id,
                worker_ip,
                config.cli.id
            );

            // Register worker with master (client role)
            UdsClientTool::register_worker(
                &ValorID::new_worker(&worker_id),
                RighIpv4Addr::from_str(&worker_ip)?,
                &config.cli.uds,
            )
            .await?;
        }
    }

    // Keep the main function running until Ctrl+C is received
    tracing::info!("RighValor is running. Press Ctrl+C to exit.");
    signal::ctrl_c().await?;

    tracing::info!("RighValor shutdown signal received, exiting...");
    Ok(())
}