mocra 0.3.0

A distributed, event-driven crawling and data collection framework
use crate::engine::api::state::ApiState;
use axum::http::StatusCode;
use axum::{
    Json,
    extract::{Path, State},
};
use serde::Serialize;
use std::time::Duration;
use std::time::Instant;

use crate::common::registry::NodeInfo;
use crate::sync::RaftLeaderView;

#[derive(Debug, Clone, Serialize, PartialEq, Eq)]
pub struct ClusterLeaderResponse {
    pub mode: String,
    pub local_node_id: Option<u64>,
    pub local_node_addr: Option<String>,
    pub leader_id: Option<u64>,
    pub leader_addr: Option<String>,
    pub is_local_leader: bool,
}

#[derive(Debug, Clone, Serialize, PartialEq)]
pub struct FallbackGateConfigResponse {
    pub failure_threshold: usize,
    pub recovery_window_secs: u64,
    pub gray_ratio: f64,
}

#[derive(Debug, Clone, Serialize, PartialEq)]
pub struct ModuleFallbackGateResponse {
    pub module: String,
    pub gate: FallbackGateConfigResponse,
    pub blocked: bool,
    pub failure_streak: usize,
    pub last_failure_ms: Option<u64>,
}

impl ClusterLeaderResponse {
    fn from_raft_view(view: RaftLeaderView) -> Self {
        Self {
            mode: "raft".to_string(),
            local_node_id: Some(view.local_node_id),
            local_node_addr: Some(view.local_node_addr),
            leader_id: view.leader_id,
            leader_addr: view.leader_addr,
            is_local_leader: view.is_local_leader,
        }
    }

    fn local() -> Self {
        Self {
            mode: "local".to_string(),
            local_node_id: None,
            local_node_addr: None,
            leader_id: None,
            leader_addr: None,
            is_local_leader: true,
        }
    }
}

pub async fn get_nodes(State(state): State<ApiState>) -> Json<Vec<NodeInfo>> {
    let started = Instant::now();
    let nodes = state
        .state
        .profile_store
        .list_active_nodes(Duration::from_secs(30));
    crate::common::metrics::inc_throughput("control_plane", "cluster", "get_nodes", "success", 1);
    crate::common::metrics::observe_latency(
        "control_plane",
        "cluster",
        "get_nodes",
        "success",
        started.elapsed().as_secs_f64(),
    );
    Json(nodes)
}

pub async fn get_cluster_leader(
    State(state): State<ApiState>,
) -> Result<Json<ClusterLeaderResponse>, StatusCode> {
    let started = Instant::now();

    if let Some(runtime) = state.state.raft_runtime.as_ref() {
        let leader = runtime.leader_view();
        if leader.leader_id.is_none() {
            record_control_status(
                "get_cluster_leader",
                StatusCode::SERVICE_UNAVAILABLE,
                started,
            );
            return Err(StatusCode::SERVICE_UNAVAILABLE);
        }

        record_control_status("get_cluster_leader", StatusCode::OK, started);
        return Ok(Json(ClusterLeaderResponse::from_raft_view(leader)));
    }

    record_control_status("get_cluster_leader", StatusCode::OK, started);
    Ok(Json(ClusterLeaderResponse::local()))
}

pub async fn pause_engine(State(state): State<ApiState>) -> StatusCode {
    let started = Instant::now();
    if let Err(e) = state.state.profile_store.set_pause_state(true).await {
        log::error!("Failed to set pause flag: {}", e);
        record_control_status("pause_engine", StatusCode::INTERNAL_SERVER_ERROR, started);
        return StatusCode::INTERNAL_SERVER_ERROR;
    }
    record_control_status("pause_engine", StatusCode::OK, started);
    StatusCode::OK
}

pub async fn resume_engine(State(state): State<ApiState>) -> StatusCode {
    let started = Instant::now();
    if let Err(e) = state.state.profile_store.set_pause_state(false).await {
        log::error!("Failed to remove pause flag: {}", e);
        record_control_status("resume_engine", StatusCode::INTERNAL_SERVER_ERROR, started);
        return StatusCode::INTERNAL_SERVER_ERROR;
    }
    record_control_status("resume_engine", StatusCode::OK, started);
    StatusCode::OK
}

pub async fn get_module_fallback_gate(
    State(state): State<ApiState>,
    Path(module): Path<String>,
) -> Json<ModuleFallbackGateResponse> {
    let started = Instant::now();
    let gate = {
        let config = state.state.config.read().await;
        config.crawler.scheduler_ingress_cutover_gate_config()
    };

    let snapshot = state.task_manager.module_dag_cutover_gate_state(
        module.as_str(),
        gate.failure_threshold,
        gate.recovery_window_secs,
    );

    record_control_status("get_module_fallback_gate", StatusCode::OK, started);
    Json(ModuleFallbackGateResponse {
        module,
        gate: FallbackGateConfigResponse {
            failure_threshold: gate.failure_threshold,
            recovery_window_secs: gate.recovery_window_secs,
            gray_ratio: gate.gray_ratio,
        },
        blocked: snapshot.blocked,
        failure_streak: snapshot.failure_streak,
        last_failure_ms: snapshot.last_failure_ms,
    })
}

fn record_control_status(action: &str, status: StatusCode, started: Instant) {
    let result = if status == StatusCode::OK {
        "success"
    } else {
        "error"
    };
    crate::common::metrics::inc_throughput("control_plane", "cluster", action, result, 1);
    crate::common::metrics::observe_latency(
        "control_plane",
        "cluster",
        action,
        result,
        started.elapsed().as_secs_f64(),
    );
    if result == "error" {
        crate::common::metrics::inc_error("control_plane", "cluster", "http", status.as_str(), 1);
    }
}

#[cfg(test)]
mod tests {
    use super::{ClusterLeaderResponse, FallbackGateConfigResponse};
    use crate::sync::RaftLeaderView;

    #[test]
    fn cluster_leader_response_maps_raft_view() {
        let response = ClusterLeaderResponse::from_raft_view(RaftLeaderView {
            local_node_id: 11,
            local_node_addr: "127.0.0.1:3101".to_string(),
            leader_id: Some(12),
            leader_addr: Some("127.0.0.1:3102".to_string()),
            is_local_leader: false,
        });

        assert_eq!(response.mode, "raft");
        assert_eq!(response.local_node_id, Some(11));
        assert_eq!(response.leader_id, Some(12));
        assert_eq!(response.leader_addr.as_deref(), Some("127.0.0.1:3102"));
        assert!(!response.is_local_leader);
    }

    #[test]
    fn cluster_leader_response_local_mode_is_self_led() {
        let response = ClusterLeaderResponse::local();

        assert_eq!(response.mode, "local");
        assert_eq!(response.local_node_id, None);
        assert_eq!(response.leader_id, None);
        assert!(response.is_local_leader);
    }

    #[test]
    fn fallback_gate_config_response_keeps_values() {
        let response = FallbackGateConfigResponse {
            failure_threshold: 3,
            recovery_window_secs: 60,
            gray_ratio: 0.25,
        };

        assert_eq!(response.failure_threshold, 3);
        assert_eq!(response.recovery_window_secs, 60);
        assert_eq!(response.gray_ratio, 0.25);
    }
}