mocra 0.3.0

A distributed, event-driven crawling and data collection framework
use std::time::{SystemTime, UNIX_EPOCH};

use serde_json::{Map, Value, json};
use uuid::Uuid;

use crate::common::interface::module::{NodeGenerateContext, NodeParseContext};
use crate::common::model::login_info::LoginInfo;
use crate::common::model::{
    ExecutionMeta, ModuleConfig, NodeInput, PayloadCodec, Priority, ResolvedCommonConfig,
    ResolvedNodeConfig, Response, RoutingMeta, TypedEnvelope,
};

pub(crate) struct OwnedNodeGenerateContext {
    pub(crate) routing: RoutingMeta,
    pub(crate) exec: ExecutionMeta,
    pub(crate) config: ResolvedNodeConfig,
    pub(crate) input: NodeInput,
    pub(crate) login_info: Option<LoginInfo>,
}

impl OwnedNodeGenerateContext {
    pub(crate) fn borrowed(&self) -> NodeGenerateContext<'_> {
        NodeGenerateContext {
            routing: &self.routing,
            exec: &self.exec,
            config: &self.config,
            input: &self.input,
            login_info: self.login_info.as_ref(),
        }
    }
}

pub(crate) struct OwnedNodeParseContext {
    pub(crate) routing: RoutingMeta,
    pub(crate) exec: ExecutionMeta,
    pub(crate) config: ResolvedNodeConfig,
    pub(crate) login_info: Option<LoginInfo>,
}

impl OwnedNodeParseContext {
    pub(crate) fn borrowed(&self) -> NodeParseContext<'_> {
        NodeParseContext {
            routing: &self.routing,
            exec: &self.exec,
            config: &self.config,
            login_info: self.login_info.as_ref(),
        }
    }
}

pub(crate) struct ModuleConfigGenerateContextInput<'a> {
    pub(crate) module_id: &'a str,
    pub(crate) run_id: Uuid,
    pub(crate) node_key: &'a str,
    pub(crate) base_common: ResolvedCommonConfig,
    pub(crate) config: &'a ModuleConfig,
    pub(crate) params: Map<String, Value>,
    pub(crate) login_info: Option<LoginInfo>,
    pub(crate) parent_request_id: Option<Uuid>,
}

pub(crate) fn build_module_config_generate_context(
    input: ModuleConfigGenerateContextInput<'_>,
) -> OwnedNodeGenerateContext {
    let (account, platform, module) = split_module_id(input.module_id);
    let now_ms = now_ms();

    OwnedNodeGenerateContext {
        routing: RoutingMeta {
            namespace: String::new(),
            account,
            platform,
            module,
            node_key: input.node_key.to_string(),
            run_id: input.run_id,
            request_id: Uuid::now_v7(),
            parent_request_id: input.parent_request_id.filter(|id| !id.is_nil()),
            priority: Priority::default(),
        },
        exec: ExecutionMeta {
            created_at_ms: now_ms,
            updated_at_ms: now_ms,
            ..ExecutionMeta::default()
        },
        config: build_module_config_node_config(
            input.module_id,
            input.node_key,
            input.base_common,
            Some(input.config),
        ),
        input: build_module_config_input(input.node_key, input.params),
        login_info: input.login_info,
    }
}

pub(crate) fn build_module_config_parse_context(
    module_id: &str,
    node_key: &str,
    base_common: ResolvedCommonConfig,
    config: Option<&ModuleConfig>,
    response: &Response,
) -> OwnedNodeParseContext {
    let now_ms = now_ms();

    OwnedNodeParseContext {
        routing: RoutingMeta {
            namespace: String::new(),
            account: response.account.clone(),
            platform: response.platform.clone(),
            module: response.module.clone(),
            node_key: node_key.to_string(),
            run_id: response.run_id,
            request_id: response.id,
            parent_request_id: (!response.prefix_request.is_nil())
                .then_some(response.prefix_request),
            priority: response.priority,
        },
        exec: ExecutionMeta {
            task_retry_count: response.task_retry_times as u32,
            created_at_ms: now_ms,
            updated_at_ms: now_ms,
            ..ExecutionMeta::default()
        },
        config: build_module_config_node_config(module_id, node_key, base_common, config),
        login_info: None,
    }
}

fn build_module_config_node_config(
    module_id: &str,
    node_key: &str,
    base_common: ResolvedCommonConfig,
    config: Option<&ModuleConfig>,
) -> ResolvedNodeConfig {
    let (account, platform, module) = split_module_id(module_id);
    let merged_value = config
        .map(ModuleConfig::get_merged_config)
        .unwrap_or_else(|| json!({}));

    ResolvedNodeConfig {
        profile_key: format!("mocra.profile.v1:{}:{}:{}", account, platform, module),
        profile_version: 0,
        common: build_module_config_common_config(base_common, config),
        node_config: TypedEnvelope::new(
            format!("mocra.node_config.v1.{}", node_key),
            1,
            PayloadCodec::Json,
            serde_json::to_vec(&merged_value).unwrap_or_default(),
        ),
    }
}

pub(crate) fn apply_module_config_common_overrides(
    mut common: ResolvedCommonConfig,
    config: Option<&ModuleConfig>,
) -> ResolvedCommonConfig {
    if let Some(config) = config {
        common.timeout_secs = config
            .get_config::<u64>("timeout_secs")
            .or_else(|| config.get_config::<u64>("timeout"))
            .unwrap_or(common.timeout_secs);
        common.rate_limit = config.get_config::<f32>("rate_limit").or(common.rate_limit);
        common.proxy_pool = config
            .get_config::<String>("proxy_pool")
            .or(common.proxy_pool.clone());
        common.downloader = config
            .get_config::<String>("downloader")
            .unwrap_or_else(|| common.downloader.clone());
        common.enable_session = config
            .get_config::<bool>("enable_session")
            .unwrap_or(common.enable_session);
        common.serial_execution = config
            .get_config::<bool>("serial_execution")
            .unwrap_or(common.serial_execution);
        common.rate_limit_group = config
            .get_config::<String>("rate_limit_group")
            .or(common.rate_limit_group.clone());
        common.response_cache_enabled = config
            .get_config::<bool>("response_cache_enabled")
            .or_else(|| config.get_config::<bool>("enable_response_cache"))
            .unwrap_or(common.response_cache_enabled);
        common.response_cache_ttl_secs = config
            .get_config::<u64>("response_cache_ttl_secs")
            .or(common.response_cache_ttl_secs);
        if let Some(priority) = config.get_config::<Priority>("priority") {
            common.priority = priority;
        }
        common.module_locker = config
            .get_config::<bool>("module_locker")
            .unwrap_or(common.module_locker);
    }

    common
}

fn build_module_config_common_config(
    base_common: ResolvedCommonConfig,
    config: Option<&ModuleConfig>,
) -> ResolvedCommonConfig {
    apply_module_config_common_overrides(base_common, config)
}

fn build_module_config_input(target_node: &str, params: Map<String, Value>) -> NodeInput {
    NodeInput::new(
        target_node,
        TypedEnvelope::new(
            "mocra.node_input.v1",
            1,
            PayloadCodec::Json,
            serde_json::to_vec(&Value::Object(params)).unwrap_or_default(),
        ),
    )
}

fn split_module_id(module_id: &str) -> (String, String, String) {
    let mut parts = module_id.splitn(3, '-');
    let account = parts.next().unwrap_or_default().to_string();
    let platform = parts.next().unwrap_or_default().to_string();
    let module = parts.next().unwrap_or(module_id).to_string();
    (account, platform, module)
}

fn now_ms() -> i64 {
    SystemTime::now()
        .duration_since(UNIX_EPOCH)
        .map(|duration| duration.as_millis() as i64)
        .unwrap_or_default()
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::common::interface::ModuleTrait;
    use crate::common::model::Priority;
    use std::sync::Arc;

    struct CommonDefaultsTestModule;

    #[async_trait::async_trait]
    impl ModuleTrait for CommonDefaultsTestModule {
        fn should_login(&self) -> bool {
            false
        }

        fn name(&self) -> &'static str {
            "common_defaults_test"
        }

        fn version(&self) -> i32 {
            1
        }

        fn default_arc() -> Arc<dyn ModuleTrait>
        where
            Self: Sized,
        {
            Arc::new(Self)
        }

        fn rate_limit(&self) -> Option<f32> {
            Some(2.5)
        }

        fn proxy_pool(&self) -> Option<&str> {
            Some("pool-a")
        }

        fn rate_limit_group(&self) -> Option<&str> {
            Some("group-a")
        }

        fn response_cache_enabled(&self) -> bool {
            true
        }

        fn response_cache_ttl_secs(&self) -> Option<u64> {
            Some(60)
        }

        fn priority(&self) -> Priority {
            Priority::High
        }
    }

    #[test]
    fn apply_module_config_common_overrides_preserves_inherited_option_defaults() {
        let default_common = CommonDefaultsTestModule.default_common_config();
        let resolved =
            apply_module_config_common_overrides(default_common, Some(&ModuleConfig::default()));

        assert_eq!(resolved.rate_limit, Some(2.5));
        assert_eq!(resolved.proxy_pool.as_deref(), Some("pool-a"));
        assert_eq!(resolved.rate_limit_group.as_deref(), Some("group-a"));
        assert!(resolved.response_cache_enabled);
        assert_eq!(resolved.response_cache_ttl_secs, Some(60));
        assert_eq!(resolved.priority, Priority::High);
    }
}