mocra 0.3.0

A distributed, event-driven crawling and data collection framework
use serde::{Deserialize, Serialize};
use uuid::Uuid;

use crate::common::model::data::DataEvent;
use crate::common::model::{Priority, ResolvedNodeConfig, TypedEnvelope};

pub type ParsedData = DataEvent;

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct RoutingMeta {
    pub namespace: String,
    pub account: String,
    pub platform: String,
    pub module: String,
    pub node_key: String,
    pub run_id: Uuid,
    pub request_id: Uuid,
    pub parent_request_id: Option<Uuid>,
    pub priority: Priority,
}

impl RoutingMeta {
    pub fn task_id(&self) -> String {
        format!("{}-{}", self.account, self.platform)
    }

    pub fn module_id(&self) -> String {
        format!("{}-{}-{}", self.account, self.platform, self.module)
    }

    pub fn module_runtime_id(&self) -> String {
        format!(
            "{}-{}-{}-{}",
            self.account, self.platform, self.module, self.run_id
        )
    }
}

#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, Eq)]
pub struct ExecutionMeta {
    pub retry_count: u32,
    pub task_retry_count: u32,
    pub profile_version: u64,
    pub trace_id: Option<String>,
    pub fence_token: Option<u64>,
    pub created_at_ms: i64,
    pub updated_at_ms: i64,
}

impl ExecutionMeta {
    pub fn touch(mut self, updated_at_ms: i64) -> Self {
        self.updated_at_ms = updated_at_ms;
        self
    }
}

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct NodeInput {
    pub source_node: Option<String>,
    pub target_node: String,
    pub payload: TypedEnvelope,
}

impl NodeInput {
    pub fn new(target_node: impl Into<String>, payload: TypedEnvelope) -> Self {
        Self {
            source_node: None,
            target_node: target_node.into(),
            payload,
        }
    }

    pub fn from_source(mut self, source_node: impl Into<String>) -> Self {
        self.source_node = Some(source_node.into());
        self
    }
}

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct NodeDispatch {
    pub target_node: String,
    pub input: NodeInput,
}

impl NodeDispatch {
    pub fn new(target_node: impl Into<String>, input: NodeInput) -> Self {
        Self {
            target_node: target_node.into(),
            input,
        }
    }
}

#[derive(Debug, Clone, Default)]
pub struct NodeParseOutput {
    pub next: Vec<NodeDispatch>,
    pub data: Vec<ParsedData>,
    pub finished: bool,
}

impl NodeParseOutput {
    pub fn with_next(mut self, dispatch: NodeDispatch) -> Self {
        self.next.push(dispatch);
        self
    }

    pub fn with_data(mut self, data: ParsedData) -> Self {
        self.data.push(data);
        self
    }

    pub fn finish(mut self) -> Self {
        self.finished = true;
        self
    }
}

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct RequestContextRef {
    pub routing: RoutingMeta,
    pub exec: ExecutionMeta,
    pub input: NodeInput,
    pub node_config: ResolvedNodeConfig,
}

impl RequestContextRef {
    pub fn profile_key(&self) -> &str {
        &self.node_config.profile_key
    }

    pub fn profile_version(&self) -> u64 {
        self.node_config.profile_version
    }
}

#[cfg(test)]
mod tests {
    use std::collections::BTreeMap;

    use super::*;
    use crate::common::model::{PayloadCodec, ResolvedCommonConfig, TaskProfileSnapshot};

    fn sample_envelope() -> TypedEnvelope {
        TypedEnvelope::new(
            "node.payload",
            1,
            PayloadCodec::MsgPack,
            vec![1_u8, 2_u8, 3_u8],
        )
    }

    fn sample_routing() -> RoutingMeta {
        RoutingMeta {
            namespace: "demo".to_string(),
            account: "account-a".to_string(),
            platform: "platform-x".to_string(),
            module: "catalog".to_string(),
            node_key: "page".to_string(),
            run_id: Uuid::now_v7(),
            request_id: Uuid::now_v7(),
            parent_request_id: None,
            priority: Priority::High,
        }
    }

    #[test]
    fn routing_meta_helpers_match_existing_runtime_naming() {
        let routing = sample_routing();

        assert_eq!(routing.task_id(), "account-a-platform-x");
        assert_eq!(routing.module_id(), "account-a-platform-x-catalog");
        assert!(
            routing
                .module_runtime_id()
                .starts_with("account-a-platform-x-catalog-")
        );
    }

    #[test]
    fn execution_meta_touch_updates_timestamp() {
        let exec = ExecutionMeta {
            created_at_ms: 100,
            updated_at_ms: 100,
            ..ExecutionMeta::default()
        }
        .touch(250);

        assert_eq!(exec.created_at_ms, 100);
        assert_eq!(exec.updated_at_ms, 250);
    }

    #[test]
    fn node_parse_output_collects_dispatches_and_data() {
        let input = NodeInput::new("detail", sample_envelope()).from_source("page");
        let dispatch = NodeDispatch::new("detail", input.clone());
        let output = NodeParseOutput::default()
            .with_next(dispatch.clone())
            .with_data(ParsedData::default())
            .finish();

        assert_eq!(output.next.len(), 1);
        assert_eq!(output.next[0], dispatch);
        assert_eq!(output.data.len(), 1);
        assert!(output.finished);
        assert_eq!(input.source_node.as_deref(), Some("page"));
    }

    #[test]
    fn request_context_ref_exposes_profile_identity() {
        let snapshot = TaskProfileSnapshot {
            namespace: "demo".to_string(),
            account: "account-a".to_string(),
            platform: "platform-x".to_string(),
            module: "catalog".to_string(),
            version: 11,
            common: ResolvedCommonConfig::default(),
            node_configs: BTreeMap::from([("page".to_string(), sample_envelope())]),
            ..TaskProfileSnapshot::default()
        };
        let node_config = snapshot
            .resolve_node_config("page")
            .expect("node config should resolve");
        let context = RequestContextRef {
            routing: sample_routing(),
            exec: ExecutionMeta {
                profile_version: 11,
                ..ExecutionMeta::default()
            },
            input: NodeInput::new("page", sample_envelope()),
            node_config,
        };

        assert_eq!(
            context.profile_key(),
            "demo:profile:account-a:platform-x:catalog"
        );
        assert_eq!(context.profile_version(), 11);
        assert_eq!(context.exec.profile_version, 11);
    }
}