use crate::cacheable::{CacheAble, CacheService};
use crate::common::interface::MiddlewareManager;
use crate::common::model::Request;
use crate::common::model::workflow_profile::TaskProfileSnapshot;
use crate::common::processors::processor::{
ProcessorContext, ProcessorResult, ProcessorTrait, RetryPolicy,
};
use crate::common::state::State;
use crate::common::stream_stats::StreamStats;
use crate::downloader::{DownloaderManager, WebSocketDownloader};
use crate::engine::chain::{ConfigProcessor, ProxyMiddlewareProcessor, RequestMiddlewareProcessor};
use crate::engine::events::{DownloadEvent, EventBus, EventEnvelope, EventPhase, EventType};
use crate::engine::processors::event_processor::{EventAwareTypedChain, EventProcessorTrait};
use crate::engine::task::request_response_adapter::build_response_dispatch;
use crate::errors::Error;
use crate::proxy::ProxyManager;
use crate::queue::{QueueManager, QueuedItem};
use async_trait::async_trait;
use log::{error, warn};
use serde_json::json;
use std::sync::Arc;
struct WebSocketDownloadProcessor {
queue_manager: Arc<QueueManager>,
wss_downloader: Arc<WebSocketDownloader>,
middleware_manager: Arc<MiddlewareManager>,
cache_service: Arc<CacheService>,
}
#[async_trait]
impl ProcessorTrait<(Option<Request>, Arc<TaskProfileSnapshot>), ()>
for WebSocketDownloadProcessor
{
fn name(&self) -> &'static str {
"WebSocketDownloadProcessor"
}
async fn process(
&self,
input: (Option<Request>, Arc<TaskProfileSnapshot>),
context: ProcessorContext,
) -> ProcessorResult<()> {
let request = match input.0 {
Some(request) => request,
None => return ProcessorResult::Success(()),
};
let request_id = request.id;
let module_id = request.module_id();
let response = self.wss_downloader.send(request).await;
match response {
Ok(_resp) => ProcessorResult::Success(()),
Err(e) => {
warn!(
"[WebSocketDownloadProcessor] download failed, will retry: request_id={} module_id={} error={}",
request_id, module_id, e
);
ProcessorResult::RetryableFailure(context.retry_policy.unwrap_or_default())
}
}
}
async fn post_process(
&self,
input: &(Option<Request>, Arc<TaskProfileSnapshot>),
_output: &(),
_context: &ProcessorContext,
) -> crate::errors::Result<()> {
let request = match &input.0 {
Some(request) => request,
None => return Ok(()),
};
let (tx, mut rx) = tokio::sync::mpsc::channel(100);
let timeout = input.1.common.timeout_secs;
let module_id = request.module_id();
self.wss_downloader.subscribe(module_id.clone(), tx).await;
let sender = self.queue_manager.get_response_push_channel().clone();
let queue_manager = self.queue_manager.clone();
let middleware_manager = self.middleware_manager.clone();
let config = input.1.clone();
let wss_downloader = self.wss_downloader.clone();
let module_id_clone = module_id.clone();
let run_id = request.run_id;
let cache_service = self.cache_service.clone();
tokio::spawn(async move {
use tokio::time::{Duration, interval};
let mut stop_check = interval(Duration::from_secs(5));
let mut last_activity = tokio::time::Instant::now();
loop {
tokio::select! {
_ = stop_check.tick() => {
let key = format!("run:{}:module:{}", run_id, module_id_clone);
let stream_stats = StreamStats::sync(&key,&cache_service).await;
if let Ok(Some(val)) = stream_stats
&& val.0{
log::info!("[ResponsePublish] Module {} stopped, closing connection...", module_id_clone);
wss_downloader.close(&module_id_clone).await;
break;
}
if last_activity.elapsed() > Duration::from_secs(timeout) {
let active = wss_downloader.active_count().await;
if active == 0 {
log::info!("[ResponsePublish] No active WebSocket connections and idle for 60s, exiting...");
break;
}
}
}
res = rx.recv() => {
last_activity = tokio::time::Instant::now();
match res {
Some(response) => {
let modified_response = middleware_manager.handle_response(response, &config).await;
let Some(modified_response) = modified_response else {
continue;
};
let dispatch = match build_response_dispatch(
&modified_response,
queue_manager.namespace.clone(),
) {
Ok(dispatch) => dispatch,
Err(err) => {
error!(
"Failed to build response envelope for websocket response {}: {}",
modified_response.id,
err
);
continue;
}
};
let use_local_fast_path = queue_manager
.should_use_local_response_fast_path(&dispatch);
let item = match queue_manager.response_namespace_override(&dispatch) {
Some(namespace) => QueuedItem::new(dispatch).with_namespace(namespace),
None => QueuedItem::new(dispatch),
};
if let Err(e) = if use_local_fast_path {
match queue_manager.try_send_local_response(item) {
Ok(_) => Ok(()),
Err(tokio::sync::mpsc::error::TrySendError::Full(returned_item))
| Err(tokio::sync::mpsc::error::TrySendError::Closed(returned_item)) => {
sender.send(returned_item).await.map_err(|e| e.to_string())
}
}
} else {
sender.send(item).await.map_err(|e| e.to_string())
} {
error!("Failed to send response to queue: {e}");
warn!("[ResponsePublish] will retry due to queue send error");
}
}
None => {
log::info!("[ResponsePublish] WebSocket response channel closed for module {}, exiting...", module_id_clone);
break;
}
}
}
}
}
wss_downloader.unsubscribe(&module_id_clone).await;
log::info!(
"[ResponsePublish] Task completed for module {}",
module_id_clone
);
});
Ok(())
}
}
impl EventProcessorTrait<(Option<Request>, Arc<TaskProfileSnapshot>), ()>
for WebSocketDownloadProcessor
{
fn pre_status(
&self,
input: &(Option<Request>, Arc<TaskProfileSnapshot>),
) -> Option<EventEnvelope> {
match &input.0 {
Some(request) => {
let ev: DownloadEvent = request.into();
Some(EventEnvelope::engine(
EventType::Download,
EventPhase::Started,
ev,
))
}
None => Some(EventEnvelope::system_error(
"wss_download_skipped_without_request",
EventPhase::Completed,
)),
}
}
fn finish_status(
&self,
input: &(Option<Request>, Arc<TaskProfileSnapshot>),
_output: &(),
) -> Option<EventEnvelope> {
match &input.0 {
Some(request) => {
let ev: DownloadEvent = request.into();
Some(EventEnvelope::engine(
EventType::Download,
EventPhase::Completed,
ev,
))
}
None => Some(EventEnvelope::system_error(
"wss_download_skipped_without_request",
EventPhase::Completed,
)),
}
}
fn working_status(
&self,
input: &(Option<Request>, Arc<TaskProfileSnapshot>),
) -> Option<EventEnvelope> {
match &input.0 {
Some(request) => {
let ev: DownloadEvent = request.into();
Some(EventEnvelope::engine(
EventType::Download,
EventPhase::Started,
ev,
))
}
None => Some(EventEnvelope::system_error(
"wss_download_skipped_without_request",
EventPhase::Completed,
)),
}
}
fn error_status(
&self,
input: &(Option<Request>, Arc<TaskProfileSnapshot>),
err: &Error,
) -> Option<EventEnvelope> {
match &input.0 {
Some(request) => {
let ev: DownloadEvent = request.into();
Some(EventEnvelope::engine_error(
EventType::Download,
EventPhase::Failed,
ev,
err,
))
}
None => Some(EventEnvelope::system_error(
format!("wss_download_skipped_with_error: {err}"),
EventPhase::Failed,
)),
}
}
fn retry_status(
&self,
input: &(Option<Request>, Arc<TaskProfileSnapshot>),
retry_policy: &RetryPolicy,
) -> Option<EventEnvelope> {
match &input.0 {
Some(request) => {
let ev: DownloadEvent = request.into();
Some(EventEnvelope::engine(
EventType::Download,
EventPhase::Retry,
json!({
"data": ev,
"retry_count": retry_policy.current_retry,
"reason": retry_policy.reason.clone().unwrap_or_default(),
}),
))
}
None => Some(EventEnvelope::system_error(
"wss_download_skipped_retry_without_request",
EventPhase::Completed,
)),
}
}
}
pub async fn create_wss_download_chain(
state: Arc<State>,
downloader_manager: Arc<DownloaderManager>,
queue_manager: Arc<QueueManager>,
middleware_manager: Arc<MiddlewareManager>,
cache_service: Arc<CacheService>,
event_bus: Option<Arc<EventBus>>,
proxy_manager: Option<Arc<ProxyManager>>,
) -> EventAwareTypedChain<Request, ()> {
let download_processor = WebSocketDownloadProcessor {
queue_manager: queue_manager.clone(),
wss_downloader: downloader_manager.wss_downloader.clone(),
middleware_manager: middleware_manager.clone(),
cache_service: cache_service.clone(),
};
let request_middleware = RequestMiddlewareProcessor {
middleware_manager: middleware_manager.clone(),
};
let namespace = state.config.read().await.name.clone();
let config_processor = ConfigProcessor {
state: state.clone(),
namespace,
};
let proxy_middleware = ProxyMiddlewareProcessor { proxy_manager };
EventAwareTypedChain::<Request, Request>::new(event_bus)
.then::<(Request, Arc<TaskProfileSnapshot>), _>(config_processor)
.then::<(Request, Arc<TaskProfileSnapshot>), _>(proxy_middleware)
.then::<(Option<Request>, Arc<TaskProfileSnapshot>), _>(request_middleware)
.then::<(), _>(download_processor)
}