mocra 0.3.0

A distributed, event-driven crawling and data collection framework
use crate::cacheable::{CacheAble, CacheService};
use crate::common::interface::MiddlewareManager;
use crate::common::model::Request;
use crate::common::model::workflow_profile::TaskProfileSnapshot;
use crate::common::processors::processor::{
    ProcessorContext, ProcessorResult, ProcessorTrait, RetryPolicy,
};
use crate::common::state::State;
use crate::common::stream_stats::StreamStats;
use crate::downloader::{DownloaderManager, WebSocketDownloader};
use crate::engine::chain::{ConfigProcessor, ProxyMiddlewareProcessor, RequestMiddlewareProcessor};
use crate::engine::events::{DownloadEvent, EventBus, EventEnvelope, EventPhase, EventType};
use crate::engine::processors::event_processor::{EventAwareTypedChain, EventProcessorTrait};
use crate::engine::task::request_response_adapter::build_response_dispatch;
use crate::errors::Error;
use crate::proxy::ProxyManager;
use crate::queue::{QueueManager, QueuedItem};
use async_trait::async_trait;
use log::{error, warn};
use serde_json::json;
use std::sync::Arc;

/// WebSocket download processor that delegates response publishing in post-process.
///
/// This processor emits `()` on success because actual response handling is driven
/// by the websocket subscription loop started in `post_process`.
struct WebSocketDownloadProcessor {
    queue_manager: Arc<QueueManager>,
    wss_downloader: Arc<WebSocketDownloader>,
    middleware_manager: Arc<MiddlewareManager>,
    cache_service: Arc<CacheService>,
}

#[async_trait]
impl ProcessorTrait<(Option<Request>, Arc<TaskProfileSnapshot>), ()>
    for WebSocketDownloadProcessor
{
    fn name(&self) -> &'static str {
        "WebSocketDownloadProcessor"
    }

    async fn process(
        &self,
        input: (Option<Request>, Arc<TaskProfileSnapshot>),
        context: ProcessorContext,
    ) -> ProcessorResult<()> {
        let request = match input.0 {
            Some(request) => request,
            None => return ProcessorResult::Success(()),
        };
        let request_id = request.id;
        let module_id = request.module_id();
        let response = self.wss_downloader.send(request).await;
        match response {
            // For websocket mode, response forwarding happens asynchronously in post-process.
            Ok(_resp) => ProcessorResult::Success(()),
            Err(e) => {
                warn!(
                    "[WebSocketDownloadProcessor] download failed, will retry: request_id={} module_id={} error={}",
                    request_id, module_id, e
                );
                ProcessorResult::RetryableFailure(context.retry_policy.unwrap_or_default())
            }
        }
    }
    async fn post_process(
        &self,
        input: &(Option<Request>, Arc<TaskProfileSnapshot>),
        _output: &(),
        _context: &ProcessorContext,
    ) -> crate::errors::Result<()> {
        let request = match &input.0 {
            Some(request) => request,
            None => return Ok(()),
        };
        // Read responses from websocket receiver and publish through queue manager.
        let (tx, mut rx) = tokio::sync::mpsc::channel(100);
        let timeout = input.1.common.timeout_secs;
        let module_id = request.module_id();

        // Register module-scoped subscription.
        self.wss_downloader.subscribe(module_id.clone(), tx).await;

        let sender = self.queue_manager.get_response_push_channel().clone();
        let queue_manager = self.queue_manager.clone();
        let middleware_manager = self.middleware_manager.clone();
        let config = input.1.clone();
        let wss_downloader = self.wss_downloader.clone();
        let module_id_clone = module_id.clone();
        let run_id = request.run_id;
        let cache_service = self.cache_service.clone();
        tokio::spawn(async move {
            use tokio::time::{Duration, interval};
            let mut stop_check = interval(Duration::from_secs(5));
            let mut last_activity = tokio::time::Instant::now();
            loop {
                tokio::select! {
                    _ = stop_check.tick() => {
                        let key = format!("run:{}:module:{}", run_id, module_id_clone);
                        // Check distributed stop flag.
                        let stream_stats = StreamStats::sync(&key,&cache_service).await;
                        if let Ok(Some(val)) = stream_stats
                             && val.0{
                                 log::info!("[ResponsePublish] Module {} stopped, closing connection...", module_id_clone);
                                 wss_downloader.close(&module_id_clone).await;
                                 break;
                             }

                        // Check idle timeout.
                        if last_activity.elapsed() > Duration::from_secs(timeout) {
                             let active = wss_downloader.active_count().await;
                             if active == 0 {
                                log::info!("[ResponsePublish] No active WebSocket connections and idle for 60s, exiting...");
                                break;
                             }
                        }
                    }
                    res = rx.recv() => {
                        last_activity = tokio::time::Instant::now();
                        match res {
                            Some(response) => {
                                // Handle middleware + queue publish.
                                let modified_response = middleware_manager.handle_response(response, &config).await;
                                let Some(modified_response) = modified_response else {
                                    continue;
                                };
                                let dispatch = match build_response_dispatch(
                                    &modified_response,
                                    queue_manager.namespace.clone(),
                                ) {
                                    Ok(dispatch) => dispatch,
                                    Err(err) => {
                                        error!(
                                            "Failed to build response envelope for websocket response {}: {}",
                                            modified_response.id,
                                            err
                                        );
                                        continue;
                                    }
                                };
                                let use_local_fast_path = queue_manager
                                    .should_use_local_response_fast_path(&dispatch);
                                let item = match queue_manager.response_namespace_override(&dispatch) {
                                    Some(namespace) => QueuedItem::new(dispatch).with_namespace(namespace),
                                    None => QueuedItem::new(dispatch),
                                };
                                if let Err(e) = if use_local_fast_path {
                                    match queue_manager.try_send_local_response(item) {
                                        Ok(_) => Ok(()),
                                        Err(tokio::sync::mpsc::error::TrySendError::Full(returned_item))
                                        | Err(tokio::sync::mpsc::error::TrySendError::Closed(returned_item)) => {
                                            sender.send(returned_item).await.map_err(|e| e.to_string())
                                        }
                                    }
                                } else {
                                    sender.send(item).await.map_err(|e| e.to_string())
                                } {
                                    error!("Failed to send response to queue: {e}");
                                    warn!("[ResponsePublish] will retry due to queue send error");
                                }
                            }
                            None => {
                                // Channel closed, exit loop gracefully.
                                log::info!("[ResponsePublish] WebSocket response channel closed for module {}, exiting...", module_id_clone);
                                break;
                            }
                        }
                    }
                }
            }
            // Always remove subscription on task exit.
            wss_downloader.unsubscribe(&module_id_clone).await;
            log::info!(
                "[ResponsePublish] Task completed for module {}",
                module_id_clone
            );
        });

        Ok(())
    }
}
impl EventProcessorTrait<(Option<Request>, Arc<TaskProfileSnapshot>), ()>
    for WebSocketDownloadProcessor
{
    fn pre_status(
        &self,
        input: &(Option<Request>, Arc<TaskProfileSnapshot>),
    ) -> Option<EventEnvelope> {
        match &input.0 {
            Some(request) => {
                let ev: DownloadEvent = request.into();
                Some(EventEnvelope::engine(
                    EventType::Download,
                    EventPhase::Started,
                    ev,
                ))
            }
            None => Some(EventEnvelope::system_error(
                "wss_download_skipped_without_request",
                EventPhase::Completed,
            )),
        }
    }

    fn finish_status(
        &self,
        input: &(Option<Request>, Arc<TaskProfileSnapshot>),
        _output: &(),
    ) -> Option<EventEnvelope> {
        match &input.0 {
            Some(request) => {
                let ev: DownloadEvent = request.into();
                Some(EventEnvelope::engine(
                    EventType::Download,
                    EventPhase::Completed,
                    ev,
                ))
            }
            None => Some(EventEnvelope::system_error(
                "wss_download_skipped_without_request",
                EventPhase::Completed,
            )),
        }
    }

    fn working_status(
        &self,
        input: &(Option<Request>, Arc<TaskProfileSnapshot>),
    ) -> Option<EventEnvelope> {
        match &input.0 {
            Some(request) => {
                let ev: DownloadEvent = request.into();
                Some(EventEnvelope::engine(
                    EventType::Download,
                    EventPhase::Started,
                    ev,
                ))
            }
            None => Some(EventEnvelope::system_error(
                "wss_download_skipped_without_request",
                EventPhase::Completed,
            )),
        }
    }

    fn error_status(
        &self,
        input: &(Option<Request>, Arc<TaskProfileSnapshot>),
        err: &Error,
    ) -> Option<EventEnvelope> {
        match &input.0 {
            Some(request) => {
                let ev: DownloadEvent = request.into();
                Some(EventEnvelope::engine_error(
                    EventType::Download,
                    EventPhase::Failed,
                    ev,
                    err,
                ))
            }
            None => Some(EventEnvelope::system_error(
                format!("wss_download_skipped_with_error: {err}"),
                EventPhase::Failed,
            )),
        }
    }

    fn retry_status(
        &self,
        input: &(Option<Request>, Arc<TaskProfileSnapshot>),
        retry_policy: &RetryPolicy,
    ) -> Option<EventEnvelope> {
        match &input.0 {
            Some(request) => {
                let ev: DownloadEvent = request.into();
                Some(EventEnvelope::engine(
                    EventType::Download,
                    EventPhase::Retry,
                    json!({
                        "data": ev,
                        "retry_count": retry_policy.current_retry,
                        "reason": retry_policy.reason.clone().unwrap_or_default(),
                    }),
                ))
            }
            None => Some(EventEnvelope::system_error(
                "wss_download_skipped_retry_without_request",
                EventPhase::Completed,
            )),
        }
    }
}

/// Builds websocket request chain:
/// config -> proxy middleware -> request middleware -> websocket download/subscription.
pub async fn create_wss_download_chain(
    state: Arc<State>,
    downloader_manager: Arc<DownloaderManager>,
    queue_manager: Arc<QueueManager>,
    middleware_manager: Arc<MiddlewareManager>,
    cache_service: Arc<CacheService>,
    event_bus: Option<Arc<EventBus>>,
    proxy_manager: Option<Arc<ProxyManager>>,
) -> EventAwareTypedChain<Request, ()> {
    let download_processor = WebSocketDownloadProcessor {
        queue_manager: queue_manager.clone(),
        wss_downloader: downloader_manager.wss_downloader.clone(),
        middleware_manager: middleware_manager.clone(),
        cache_service: cache_service.clone(),
    };

    let request_middleware = RequestMiddlewareProcessor {
        middleware_manager: middleware_manager.clone(),
    };
    let namespace = state.config.read().await.name.clone();
    let config_processor = ConfigProcessor {
        state: state.clone(),
        namespace,
    };
    let proxy_middleware = ProxyMiddlewareProcessor { proxy_manager };

    EventAwareTypedChain::<Request, Request>::new(event_bus)
        .then::<(Request, Arc<TaskProfileSnapshot>), _>(config_processor)
        .then::<(Request, Arc<TaskProfileSnapshot>), _>(proxy_middleware)
        .then::<(Option<Request>, Arc<TaskProfileSnapshot>), _>(request_middleware)
        .then::<(), _>(download_processor)
}