use crate::common::model::login_info::LoginInfo;
use crate::common::model::{
Cookies, CronConfig, ExecutionMeta, Headers, NodeInput, NodeParseOutput, Priority, Request,
ResolvedCommonConfig, ResolvedNodeConfig, Response, RoutingMeta, TaskProfileSnapshot,
};
use crate::engine::task::module_dag_compiler::ModuleDagDefinition;
use crate::errors::Result;
use async_trait::async_trait;
use futures::Stream;
use std::pin::Pin;
use std::sync::Arc;
pub type SyncBoxStream<'a, T> = Pin<Box<dyn Stream<Item = T> + Send + Sync + 'a>>;
#[async_trait]
pub trait ModuleTrait: Send + Sync {
fn should_login(&self) -> bool {
true
}
fn name(&self) -> &'static str;
fn version(&self) -> i32;
fn rate_limit(&self) -> Option<f32> {
None
}
fn proxy_pool(&self) -> Option<&str> {
None
}
fn timeout_secs(&self) -> Option<u64> {
None
}
fn priority(&self) -> Priority {
Priority::default()
}
fn serial_execution(&self) -> bool {
false
}
fn module_locker(&self) -> bool {
false
}
fn enable_session(&self) -> bool {
false
}
fn downloader(&self) -> &str {
"request_downloader"
}
fn rate_limit_group(&self) -> Option<&str> {
None
}
fn response_cache_enabled(&self) -> bool {
false
}
fn response_cache_ttl_secs(&self) -> Option<u64> {
None
}
async fn headers(&self) -> Headers {
Headers::default()
}
async fn cookies(&self) -> Cookies {
Cookies::default()
}
fn default_arc() -> Arc<dyn ModuleTrait>
where
Self: Sized;
async fn dag_definition(&self) -> Option<ModuleDagDefinition> {
None
}
async fn add_step(&self) -> Vec<Arc<dyn ModuleNodeTrait>> {
vec![]
}
fn default_common_config(&self) -> ResolvedCommonConfig {
ResolvedCommonConfig {
timeout_secs: self.timeout_secs().unwrap_or(30),
rate_limit: self.rate_limit(),
priority: self.priority(),
proxy_pool: self.proxy_pool().map(str::to_string),
downloader: self.downloader().to_string(),
enable_session: self.enable_session(),
serial_execution: self.serial_execution(),
module_locker: self.module_locker(),
rate_limit_group: self.rate_limit_group().map(str::to_string),
response_cache_enabled: self.response_cache_enabled(),
response_cache_ttl_secs: self.response_cache_ttl_secs(),
}
}
async fn pre_process(&self, _profile: Option<Arc<TaskProfileSnapshot>>) -> Result<()> {
Ok(())
}
async fn post_process(&self, _profile: Option<Arc<TaskProfileSnapshot>>) -> Result<()> {
Ok(())
}
fn cron(&self) -> Option<CronConfig> {
None
}
}
pub struct NodeGenerateContext<'a> {
pub routing: &'a RoutingMeta,
pub exec: &'a ExecutionMeta,
pub config: &'a ResolvedNodeConfig,
pub input: &'a NodeInput,
pub login_info: Option<&'a LoginInfo>,
}
pub struct NodeParseContext<'a> {
pub routing: &'a RoutingMeta,
pub exec: &'a ExecutionMeta,
pub config: &'a ResolvedNodeConfig,
pub login_info: Option<&'a LoginInfo>,
}
#[async_trait]
pub trait ModuleNodeTrait: Send + Sync {
async fn generate(
&self,
_ctx: NodeGenerateContext<'_>,
) -> Result<SyncBoxStream<'static, Request>>;
async fn parser(
&self,
response: Response,
_ctx: NodeParseContext<'_>,
) -> Result<NodeParseOutput>;
fn retryable(&self) -> bool {
true
}
fn stable_node_key(&self) -> &'static str {
""
}
}
pub trait ToSyncBoxStream<T> {
fn to_stream(self) -> SyncBoxStream<'static, T>;
fn into_stream_ok(self) -> Result<SyncBoxStream<'static, T>>;
}
impl<T> ToSyncBoxStream<T> for Vec<T>
where
T: Send + Sync + 'static,
{
fn to_stream(self) -> SyncBoxStream<'static, T> {
Box::pin(futures::stream::iter(self))
}
fn into_stream_ok(self) -> Result<SyncBoxStream<'static, T>> {
Ok(Box::pin(futures::stream::iter(self)))
}
}
#[cfg(test)]
mod tests {
use super::*;
struct DefaultConfigModule;
#[async_trait]
impl ModuleTrait for DefaultConfigModule {
fn name(&self) -> &'static str {
"default-config-module"
}
fn version(&self) -> i32 {
1
}
fn default_arc() -> Arc<dyn ModuleTrait>
where
Self: Sized,
{
Arc::new(Self)
}
fn timeout_secs(&self) -> Option<u64> {
Some(45)
}
fn rate_limit(&self) -> Option<f32> {
Some(2.5)
}
fn priority(&self) -> Priority {
Priority::High
}
fn proxy_pool(&self) -> Option<&str> {
Some("pool-a")
}
fn serial_execution(&self) -> bool {
true
}
fn module_locker(&self) -> bool {
true
}
fn enable_session(&self) -> bool {
true
}
fn rate_limit_group(&self) -> Option<&str> {
Some("group-a")
}
fn response_cache_enabled(&self) -> bool {
true
}
fn response_cache_ttl_secs(&self) -> Option<u64> {
Some(60)
}
}
#[test]
fn module_trait_default_common_config_assembles_static_defaults() {
let module = DefaultConfigModule;
let config = module.default_common_config();
assert_eq!(config.timeout_secs, 45);
assert_eq!(config.rate_limit, Some(2.5));
assert_eq!(config.priority, Priority::High);
assert_eq!(config.proxy_pool.as_deref(), Some("pool-a"));
assert_eq!(config.downloader, "request_downloader");
assert!(config.enable_session);
assert!(config.serial_execution);
assert_eq!(config.rate_limit_group.as_deref(), Some("group-a"));
assert!(config.response_cache_enabled);
assert_eq!(config.response_cache_ttl_secs, Some(60));
}
}