use crate::downloader::DownloaderManager;
use crate::engine::events::EventBus;
use crate::engine::monitor::SystemMonitor;
use crate::engine::zombie;
use crate::queue::Identifiable;
use crate::utils::device_info::get_primary_local_ip;
use crate::common::policy::{DlqPolicy, PolicyResolver};
use crate::engine::chain::{
UnifiedTaskIngressChain, create_download_chain, create_parser_chain,
create_unified_task_ingress_chain,
};
use crate::engine::events::{EventEnvelope, EventPhase, EventType, HealthCheckEvent};
use metrics::counter;
use crate::common::state::State;
use crate::engine::chain::stream_chain::create_wss_download_chain;
use crate::queue::{QueueManager, QueuedItem};
use futures::{FutureExt, StreamExt};
use log::{error, info, warn};
use crate::common::interface::{
DataMiddlewareHandle, DataStoreMiddlewareHandle, DownloadMiddlewareHandle, MiddlewareManager,
ModuleTrait,
};
use crate::common::model::message::UnifiedTaskInput;
use crate::common::processors::processor::{ProcessorContext, RetryPolicy};
use crate::engine::runner::{ProcessorRunner, ProcessorRunnerConfig};
use crate::engine::scheduler::{CronScheduler, CronSchedulerConfig};
use crate::engine::task::TaskManager;
use crate::proxy::ProxyManager;
use crate::schedule::dag::Dag;
use crate::sync::{LeadershipGate, build_leadership_gate};
use crate::utils::logger as app_logger;
use crate::utils::logger::{
LogOutputConfig as AppLogOutputConfig, LogSender as AppLogSender,
LoggerConfig as AppLoggerConfig, PrometheusConfig as AppPrometheusConfig,
};
use metrics_exporter_prometheus::{PrometheusBuilder, PrometheusHandle};
use std::path::PathBuf;
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::sync::{broadcast, watch};
use uuid::Uuid;
mod processors;
mod runtime;
#[cfg(test)]
mod tests;
pub struct Engine {
pub queue_manager: Arc<QueueManager>,
pub downloader_manager: Arc<DownloaderManager>,
pub task_manager: Arc<TaskManager>,
pub proxy_manager: Option<Arc<ProxyManager>>,
pub middleware_manager: Arc<MiddlewareManager>,
pub event_bus: Option<Arc<EventBus>>,
pub state: Arc<State>,
shutdown_tx: broadcast::Sender<()>,
pause_tx: watch::Sender<bool>,
pub prometheus_handle: Option<PrometheusHandle>,
pub node_id: String,
pub cron_scheduler: Arc<CronScheduler>,
pub inflight_counter: Arc<std::sync::atomic::AtomicUsize>,
}
struct PolicyFailureRequest<'a, T> {
policy_resolver: &'a PolicyResolver,
queue_manager: &'a QueueManager,
topic: &'a str,
event_type: &'a str,
item: &'a T,
err: &'a crate::errors::Error,
ack_fn: &'a mut Option<crate::queue::AckFn>,
nack_fn: &'a mut Option<crate::queue::NackFn>,
}
struct PolicyRetryRequest<'a, T> {
policy_resolver: &'a PolicyResolver,
queue_manager: &'a QueueManager,
topic: &'a str,
event_type: &'a str,
item: &'a T,
retry_policy: &'a RetryPolicy,
ack_fn: &'a mut Option<crate::queue::AckFn>,
nack_fn: &'a mut Option<crate::queue::NackFn>,
}
impl Engine {
const NODE_HEARTBEAT_INTERVAL_SECS: u64 = 10;
const LEADERSHIP_TTL_MS: u64 = 5000;
fn init_leadership_gate(state: &Arc<State>, namespace: &str) -> Arc<dyn LeadershipGate> {
build_leadership_gate(
state.raft_runtime.clone(),
None,
namespace,
Self::LEADERSHIP_TTL_MS,
)
}
fn apply_pause_state_update(pause_tx: &watch::Sender<bool>, is_paused: bool) -> bool {
if *pause_tx.borrow() == is_paused {
return false;
}
let _ = pause_tx.send(is_paused);
true
}
fn spawn_pause_state_watcher(
profile_store: Arc<crate::engine::api::profile_store::ProfileControlPlaneStore>,
pause_tx: watch::Sender<bool>,
shutdown_tx: &broadcast::Sender<()>,
) {
let mut shutdown_rx = shutdown_tx.subscribe();
let mut pause_state_rx = profile_store.subscribe_pause_state();
tokio::spawn(async move {
loop {
tokio::select! {
changed = pause_state_rx.changed() => {
if changed.is_err() {
info!("Engine pause watcher stopping because the control-plane pause channel closed");
break;
}
let is_paused = *pause_state_rx.borrow_and_update();
if Self::apply_pause_state_update(&pause_tx, is_paused) {
if is_paused {
info!("Engine paused by global signal");
} else {
info!("Engine resumed by global signal");
}
}
}
_ = shutdown_rx.recv() => {
info!("Engine pause watcher shutting down");
break;
}
}
}
});
}
fn policy_event_label(event_type: &str) -> &'static str {
match event_type {
"task_model" => "task_model",
"download" => "download",
"parser_dispatch" => "parser_dispatch",
"system_error" => "system_error",
"parser" => "parser",
_ => "unknown",
}
}
fn policy_kind_label(kind: &crate::errors::ErrorKind) -> &'static str {
match kind {
crate::errors::ErrorKind::Request => "request",
crate::errors::ErrorKind::Response => "response",
crate::errors::ErrorKind::Command => "command",
crate::errors::ErrorKind::Service => "service",
crate::errors::ErrorKind::Proxy => "proxy",
crate::errors::ErrorKind::Download => "download",
crate::errors::ErrorKind::Queue => "queue",
crate::errors::ErrorKind::Orm => "orm",
crate::errors::ErrorKind::Task => "task",
crate::errors::ErrorKind::Module => "module",
crate::errors::ErrorKind::RateLimit => "rate_limit",
crate::errors::ErrorKind::ProcessorChain => "processor_chain",
crate::errors::ErrorKind::Parser => "parser",
crate::errors::ErrorKind::DataMiddleware => "data_middleware",
crate::errors::ErrorKind::DataStore => "data_store",
crate::errors::ErrorKind::DynamicLibrary => "dynamic_library",
crate::errors::ErrorKind::CacheService => "cache_service",
}
}
async fn handle_policy_failure<T>(request: PolicyFailureRequest<'_, T>)
where
T: serde::Serialize + Identifiable + Send + Sync,
{
let decision = request.policy_resolver.resolve_with_error(
"engine",
Some(request.event_type),
Some("failed"),
request.err,
);
let action = if decision.policy.retryable {
"retry"
} else if decision.policy.dlq == DlqPolicy::Never {
"ack"
} else {
"dlq"
};
let event_label = Self::policy_event_label(request.event_type);
let kind_label = Self::policy_kind_label(request.err.kind());
counter!(
"mocra_policy_decisions_total",
"domain" => "engine",
"event_type" => event_label,
"phase" => "failed",
"kind" => kind_label,
"action" => action
)
.increment(1);
let reason = format!("{}: {}", decision.reason, request.err);
match action {
"retry" => {
if let Some(f) = request.nack_fn.take() {
let _ = f(reason).await;
}
}
"dlq" => {
let _ = request
.queue_manager
.send_to_dlq(request.topic, request.item, &reason)
.await;
if let Some(f) = request.ack_fn.take() {
let _ = f().await;
}
}
_ => {
if let Some(f) = request.ack_fn.take() {
let _ = f().await;
}
}
}
}
async fn handle_policy_retry<T>(request: PolicyRetryRequest<'_, T>)
where
T: serde::Serialize + Identifiable + Send + Sync,
{
let reason = request
.retry_policy
.reason
.clone()
.unwrap_or_else(|| "retryable failure".to_string());
let err = crate::errors::Error::new(
crate::errors::ErrorKind::ProcessorChain,
Some(std::io::Error::other(reason.clone())),
);
let decision = request.policy_resolver.resolve_with_error(
"engine",
Some(request.event_type),
Some("retry"),
&err,
);
let action = if decision.policy.retryable {
"retry"
} else if decision.policy.dlq == DlqPolicy::Never {
"ack"
} else {
"dlq"
};
let event_label = Self::policy_event_label(request.event_type);
let kind_label = Self::policy_kind_label(err.kind());
counter!(
"mocra_policy_decisions_total",
"domain" => "engine",
"event_type" => event_label,
"phase" => "retry",
"kind" => kind_label,
"action" => action
)
.increment(1);
let reason = format!("{}: {}", decision.reason, reason);
match action {
"retry" => {
if let Some(f) = request.nack_fn.take() {
let _ = f(reason).await;
}
}
"dlq" => {
let _ = request
.queue_manager
.send_to_dlq(request.topic, request.item, &reason)
.await;
if let Some(f) = request.ack_fn.take() {
let _ = f().await;
}
}
_ => {
if let Some(f) = request.ack_fn.take() {
let _ = f().await;
}
}
}
}
fn init_queue_manager(cfg: &crate::common::model::config::Config) -> Arc<QueueManager> {
let log_topic = cfg.logger.as_ref().and_then(Self::first_mq_topic);
QueueManager::from_config_with_log_topic(cfg, log_topic.as_deref())
}
fn first_mq_topic(
logger: &crate::common::model::logger_config::LoggerConfig,
) -> Option<String> {
logger.outputs.iter().find_map(|output| match output {
crate::common::model::logger_config::LogOutputConfig::Mq { topic, .. } => {
Some(topic.clone())
}
_ => None,
})
}
fn build_app_logger_config(
logger: &crate::common::model::logger_config::LoggerConfig,
namespace: &str,
) -> AppLoggerConfig {
let mut config = AppLoggerConfig::for_app(namespace);
if let Some(enabled) = logger.enabled {
config.enabled = enabled;
}
if let Some(level) = &logger.level {
config.level = level.clone();
}
if let Some(format) = &logger.format {
if format.to_lowercase() != "text" {
eprintln!("logger.format only supports text for console/file, got {format}");
}
config.format = "text".to_string();
}
if let Some(include) = &logger.include {
config.include = include.clone();
}
if let Some(buffer) = logger.buffer {
config.buffer = buffer;
}
if let Some(interval) = logger.flush_interval_ms {
config.flush_interval_ms = interval;
}
config.outputs = logger
.outputs
.iter()
.map(|output| match output {
crate::common::model::logger_config::LogOutputConfig::Console => {
AppLogOutputConfig::Console
}
crate::common::model::logger_config::LogOutputConfig::File {
path,
rotation,
..
} => AppLogOutputConfig::File {
path: PathBuf::from(path),
rotation: rotation.clone(),
},
crate::common::model::logger_config::LogOutputConfig::Mq { format, .. } => {
if let Some(format) = format
&& format.to_lowercase() != "json"
{
eprintln!("logger.outputs.mq.format only supports json, got {format}");
}
AppLogOutputConfig::Mq
}
})
.collect();
if config.outputs.is_empty() {
config.outputs = AppLoggerConfig::default().outputs;
}
if let Some(prometheus) = &logger.prometheus
&& prometheus.enabled
{
config.prometheus = Some(AppPrometheusConfig { enabled: true });
}
config
}
fn base_level_from_filter(level: &str) -> Option<&str> {
level
.split([',', ';'])
.map(|value| value.trim())
.find(|value| !value.is_empty())
}
async fn setup_mq_log_sender(
logger: &crate::common::model::logger_config::LoggerConfig,
queue_manager: Arc<QueueManager>,
) -> Option<AppLogSender> {
let mq_output = logger.outputs.iter().find_map(|output| match output {
crate::common::model::logger_config::LogOutputConfig::Mq { buffer, .. } => {
Some(*buffer)
}
_ => None,
})?;
let buffer = mq_output.or(logger.buffer).unwrap_or(10000);
let level = logger
.level
.as_deref()
.and_then(Self::base_level_from_filter)
.unwrap_or("info")
.to_string();
let (sender, mut receiver) = tokio::sync::mpsc::channel(buffer);
let log_sender = AppLogSender::with_capacity(sender, level, buffer);
let queue_sender = queue_manager.get_log_push_channel();
tokio::spawn(async move {
while let Some(log) = receiver.recv().await {
let item = QueuedItem::new(log);
if let Err(e) = queue_sender.send(item).await {
eprintln!("Failed to forward log to queue: {e}");
}
}
});
Some(log_sender)
}
pub async fn new(
state: Arc<State>,
queue_manager: Option<Arc<QueueManager>>,
) -> crate::errors::Result<Self> {
let builder = PrometheusBuilder::new();
let prometheus_handle = builder.install_recorder().ok();
let event_bus = state
.config
.read()
.await
.event_bus
.as_ref()
.map(|conf| Arc::new(EventBus::new(conf.capacity, conf.concurrency)));
let (shutdown_tx, _shutdown_rx) = broadcast::channel(1);
let (pause_tx, _) = watch::channel(state.profile_store.is_paused());
let task_manager = Arc::new(TaskManager::new(Arc::clone(&state)));
let cfg = state.config.read().await.clone();
let _channel_config = cfg.channel_config.clone();
let namespace = cfg.name.clone();
let node_id = cfg
.crawler
.node_id
.clone()
.unwrap_or_else(|| Uuid::new_v4().to_string());
crate::common::metrics::init_metrics(crate::common::metrics::MetricsScope::new(
namespace.clone(),
node_id.clone(),
state.is_single_node_deployment(),
));
crate::common::metrics::set_node_up(true);
crate::common::metrics::set_component_health("engine", true);
let queue_manager = if let Some(qm) = queue_manager {
qm
} else {
Self::init_queue_manager(&cfg)
};
if let Some(logger_config) = &cfg.logger
&& logger_config.enabled.unwrap_or(true)
{
let app_config = Self::build_app_logger_config(logger_config, &namespace);
let log_sender = Self::setup_mq_log_sender(logger_config, queue_manager.clone()).await;
let _ = app_logger::init_logger(app_config).await;
if let Some(sender) = log_sender {
let _ = app_logger::set_log_sender(sender);
}
}
if let (Some(log_config), Some(event_bus)) = (&cfg.logger, event_bus.as_ref()) {
if log_config.enabled == Some(false) {
info!("Logger disabled; skipping EventBus log handlers");
} else {
use crate::common::model::logger_config::LogOutputConfig;
use crate::engine::events::handlers::{
console_handler::ConsoleLogHandler, queue_handler::QueueLogHandler,
};
for output in &log_config.outputs {
match output {
LogOutputConfig::Mq { .. } => {
let rx = event_bus.subscribe("*".to_string()).await;
QueueLogHandler::start(rx, queue_manager.clone(), "mq".to_string())
.await;
info!("Registered MQ Logger for EventBus");
}
LogOutputConfig::Console => {
let rx = event_bus.subscribe("*".to_string()).await;
let level = log_config
.level
.as_deref()
.and_then(Self::base_level_from_filter)
.unwrap_or("info")
.to_string();
ConsoleLogHandler::start(rx, level).await;
info!("Registered Console Logger for EventBus");
}
LogOutputConfig::File { .. } => {
info!(
"Registered File Logger for EventBus (Handled by Global Tracing)"
);
}
}
}
}
} else if cfg.logger.is_some() {
info!("EventBus disabled; skipping logger EventBus handlers");
}
let downloader_manager = DownloaderManager::new(Arc::clone(&state)).await;
let proxy_manager = if let Some(proxy_config) = state.config.read().await.proxy.clone() {
Some(Arc::new(
ProxyManager::from_proxy_config(&proxy_config)
.await
.map_err(|e| {
crate::errors::Error::new(
crate::errors::ErrorKind::Service,
Some(format!("Failed to create ProxyManager: {}", e)),
)
})?,
))
} else {
None
};
let middleware_manager = MiddlewareManager::new(state.clone());
let leadership_gate = Self::init_leadership_gate(&state, &namespace);
let cron_scheduler = Arc::new(
CronScheduler::new(CronSchedulerConfig {
task_manager: task_manager.clone(),
state: state.clone(),
queue_manager: queue_manager.clone(),
shutdown_rx: shutdown_tx.subscribe(),
leadership_gate,
})
.await,
);
Self::spawn_pause_state_watcher(
Arc::clone(&state.profile_store),
pause_tx.clone(),
&shutdown_tx,
);
Ok(Self {
queue_manager,
downloader_manager: Arc::new(downloader_manager),
task_manager,
proxy_manager,
middleware_manager: Arc::new(middleware_manager),
event_bus,
state,
shutdown_tx,
pause_tx,
prometheus_handle,
node_id,
cron_scheduler,
inflight_counter: Arc::new(std::sync::atomic::AtomicUsize::new(0)),
})
}
pub async fn register_download_middleware(&self, middleware: DownloadMiddlewareHandle) {
self.middleware_manager
.register_download_middleware(middleware)
.await;
}
pub async fn register_data_middleware(&self, middleware: DataMiddlewareHandle) {
self.middleware_manager
.register_data_middleware(middleware)
.await;
}
pub async fn register_store_middleware(&self, middleware: DataStoreMiddlewareHandle) {
self.middleware_manager
.register_store_middleware(middleware)
.await;
}
pub async fn register_module(&self, module: Arc<dyn ModuleTrait>) {
self.task_manager.add_module(module).await;
}
pub fn get_module_dag(&self, module_name: &str) -> Option<Dag> {
self.task_manager.get_module_dag(module_name)
}
}