Skip to main content

aster/scheduler/
executor.rs

1//! 任务执行器模块
2//!
3//! 本模块定义任务执行器的 trait 和实现,包括:
4//! - `TaskExecutor`: 任务执行器 trait
5//! - `ExecutionResult`: 执行结果结构体
6//! - `MainSessionExecutor`: 主会话执行器
7//! - `IsolatedSessionExecutor`: 隔离会话执行器
8//!
9//! ## 需求映射
10//!
11//! - **Requirement 7.7**: 任务执行器 trait 定义
12//! - **Requirement 7.8**: 执行结果结构体
13//! - **Requirement 7.9**: 状态更新逻辑
14//! - **Requirement 4.4**: 隔离会话创建
15//! - **Requirement 4.5**: 隔离会话执行
16//! - **Requirement 4.6**: 结果回传逻辑
17//! - **Requirement 4.7**: 输出截断
18
19use anyhow::Result;
20use async_trait::async_trait;
21use chrono::Utc;
22use std::sync::Arc;
23use tokio_util::sync::CancellationToken;
24
25use super::types::{
26    IsolationConfig, JobStatus, PostToMainMode, ScheduledJob as NewScheduledJob, SessionTarget,
27};
28
29// ============================================================================
30// ExecutionResult 结构体 (Task 7.1)
31// ============================================================================
32
33/// 任务执行结果
34///
35/// 包含任务执行的完整结果信息,用于状态更新和结果投递。
36///
37/// # 字段说明
38///
39/// - `session_id`: 执行任务的会话 ID
40/// - `output`: 执行输出内容(可选)
41/// - `duration_ms`: 执行耗时(毫秒)
42/// - `status`: 执行状态
43/// - `error`: 错误信息(仅当 status 为 Error 时)
44///
45/// # 需求映射
46///
47/// - **Requirement 7.8**: 执行结果结构体
48///
49/// # 示例
50///
51/// ```rust
52/// use aster::scheduler::executor::ExecutionResult;
53/// use aster::scheduler::types::JobStatus;
54///
55/// // 成功的执行结果
56/// let success = ExecutionResult {
57///     session_id: "session-123".to_string(),
58///     output: Some("Task completed successfully".to_string()),
59///     duration_ms: 1500,
60///     status: JobStatus::Ok,
61///     error: None,
62/// };
63///
64/// // 失败的执行结果
65/// let failure = ExecutionResult {
66///     session_id: "session-456".to_string(),
67///     output: None,
68///     duration_ms: 500,
69///     status: JobStatus::Error,
70///     error: Some("Connection timeout".to_string()),
71/// };
72/// ```
73#[derive(Clone, Debug)]
74pub struct ExecutionResult {
75    /// 执行任务的会话 ID
76    pub session_id: String,
77
78    /// 执行输出内容
79    ///
80    /// 对于 AgentTurn 任务,这是 Agent 的最终输出。
81    /// 对于 SystemEvent 任务,这可能是事件处理的结果。
82    pub output: Option<String>,
83
84    /// 执行耗时(毫秒)
85    pub duration_ms: u64,
86
87    /// 执行状态
88    pub status: JobStatus,
89
90    /// 错误信息
91    ///
92    /// 仅当 `status` 为 `Error` 时有值。
93    pub error: Option<String>,
94}
95
96impl ExecutionResult {
97    /// 创建成功的执行结果
98    ///
99    /// # 参数
100    /// - `session_id`: 会话 ID
101    /// - `output`: 输出内容
102    /// - `duration_ms`: 执行耗时
103    pub fn success(
104        session_id: impl Into<String>,
105        output: Option<String>,
106        duration_ms: u64,
107    ) -> Self {
108        Self {
109            session_id: session_id.into(),
110            output,
111            duration_ms,
112            status: JobStatus::Ok,
113            error: None,
114        }
115    }
116
117    /// 创建失败的执行结果
118    ///
119    /// # 参数
120    /// - `session_id`: 会话 ID
121    /// - `error`: 错误信息
122    /// - `duration_ms`: 执行耗时
123    pub fn failure(
124        session_id: impl Into<String>,
125        error: impl Into<String>,
126        duration_ms: u64,
127    ) -> Self {
128        Self {
129            session_id: session_id.into(),
130            output: None,
131            duration_ms,
132            status: JobStatus::Error,
133            error: Some(error.into()),
134        }
135    }
136
137    /// 创建跳过的执行结果
138    ///
139    /// # 参数
140    /// - `session_id`: 会话 ID
141    /// - `reason`: 跳过原因
142    pub fn skipped(session_id: impl Into<String>, reason: impl Into<String>) -> Self {
143        Self {
144            session_id: session_id.into(),
145            output: Some(reason.into()),
146            duration_ms: 0,
147            status: JobStatus::Skipped,
148            error: None,
149        }
150    }
151
152    /// 检查是否成功
153    pub fn is_success(&self) -> bool {
154        self.status.is_ok()
155    }
156
157    /// 检查是否失败
158    pub fn is_failure(&self) -> bool {
159        self.status.is_error()
160    }
161
162    /// 检查是否跳过
163    pub fn is_skipped(&self) -> bool {
164        self.status.is_skipped()
165    }
166
167    /// 获取格式化的输出(用于回传)
168    ///
169    /// 根据隔离配置格式化输出内容。
170    ///
171    /// # 参数
172    /// - `config`: 隔离配置
173    pub fn format_output(&self, config: &IsolationConfig) -> String {
174        match &self.output {
175            Some(output) => config.format_message(output),
176            None => match &self.error {
177                Some(err) => config.format_message(&format!("Error: {}", err)),
178                None => config.format_message("Task completed"),
179            },
180        }
181    }
182}
183
184// ============================================================================
185// ExecutionContext 结构体
186// ============================================================================
187
188/// 执行上下文
189///
190/// 包含任务执行所需的上下文信息。
191#[derive(Clone, Debug)]
192pub struct ExecutionContext {
193    /// 取消令牌
194    pub cancel_token: CancellationToken,
195
196    /// 执行开始时间(毫秒时间戳)
197    pub start_time_ms: i64,
198}
199
200impl ExecutionContext {
201    /// 创建新的执行上下文
202    pub fn new() -> Self {
203        Self {
204            cancel_token: CancellationToken::new(),
205            start_time_ms: Utc::now().timestamp_millis(),
206        }
207    }
208
209    /// 使用指定的取消令牌创建执行上下文
210    pub fn with_cancel_token(cancel_token: CancellationToken) -> Self {
211        Self {
212            cancel_token,
213            start_time_ms: Utc::now().timestamp_millis(),
214        }
215    }
216
217    /// 计算已执行时间(毫秒)
218    pub fn elapsed_ms(&self) -> u64 {
219        let now = Utc::now().timestamp_millis();
220        (now - self.start_time_ms).max(0) as u64
221    }
222
223    /// 检查是否已取消
224    pub fn is_cancelled(&self) -> bool {
225        self.cancel_token.is_cancelled()
226    }
227}
228
229impl Default for ExecutionContext {
230    fn default() -> Self {
231        Self::new()
232    }
233}
234
235// ============================================================================
236// TaskExecutor Trait (Task 7.1)
237// ============================================================================
238
239/// 任务执行器 trait
240///
241/// 定义任务执行的标准接口,支持不同的执行策略(主会话、隔离会话等)。
242///
243/// # 需求映射
244///
245/// - **Requirement 7.7**: 任务执行器 trait 定义
246///
247/// # 实现者
248///
249/// - `MainSessionExecutor`: 在主会话中执行任务
250/// - `IsolatedSessionExecutor`: 在隔离会话中执行任务
251///
252/// # 示例
253///
254/// ```rust,ignore
255/// use aster::scheduler::executor::{TaskExecutor, ExecutionResult, ExecutionContext};
256///
257/// struct MyExecutor;
258///
259/// #[async_trait]
260/// impl TaskExecutor for MyExecutor {
261///     async fn execute(
262///         &self,
263///         job: &ScheduledJob,
264///         ctx: &ExecutionContext,
265///     ) -> Result<ExecutionResult> {
266///         // 执行任务逻辑
267///         Ok(ExecutionResult::success("session-id", None, 100))
268///     }
269///
270///     async fn cancel(&self, job_id: &str) -> Result<()> {
271///         // 取消任务逻辑
272///         Ok(())
273///     }
274/// }
275/// ```
276#[async_trait]
277pub trait TaskExecutor: Send + Sync {
278    /// 执行任务
279    ///
280    /// # 参数
281    /// - `job`: 要执行的调度任务
282    /// - `ctx`: 执行上下文
283    ///
284    /// # 返回值
285    /// - `Ok(ExecutionResult)`: 执行结果
286    /// - `Err`: 执行过程中的错误
287    async fn execute(
288        &self,
289        job: &NewScheduledJob,
290        ctx: &ExecutionContext,
291    ) -> Result<ExecutionResult>;
292
293    /// 取消执行
294    ///
295    /// # 参数
296    /// - `job_id`: 要取消的任务 ID
297    ///
298    /// # 返回值
299    /// - `Ok(())`: 取消成功
300    /// - `Err`: 取消失败
301    async fn cancel(&self, job_id: &str) -> Result<()>;
302
303    /// 获取执行器名称
304    fn name(&self) -> &str;
305}
306
307// ============================================================================
308// MainSessionExecutor (Task 7.2)
309// ============================================================================
310
311/// 主会话执行器
312///
313/// 在主会话中执行调度任务,任务执行结果直接影响主会话状态。
314///
315/// # 需求映射
316///
317/// - **Requirement 7.7**: 主会话执行器实现
318/// - **Requirement 7.8**: 执行结果返回
319/// - **Requirement 7.9**: 状态更新
320///
321/// # 使用场景
322///
323/// 适用于需要与用户当前会话交互的任务,如:
324/// - 定时提醒
325/// - 状态报告
326/// - 需要用户响应的任务
327///
328/// # 示例
329///
330/// ```rust,ignore
331/// use aster::scheduler::executor::{MainSessionExecutor, ExecutionContext};
332///
333/// let executor = MainSessionExecutor::new();
334/// let ctx = ExecutionContext::new();
335/// let result = executor.execute(&job, &ctx).await?;
336/// ```
337pub struct MainSessionExecutor {
338    /// 执行器名称
339    name: String,
340}
341
342impl MainSessionExecutor {
343    /// 创建新的主会话执行器
344    pub fn new() -> Self {
345        Self {
346            name: "main_session".to_string(),
347        }
348    }
349}
350
351impl Default for MainSessionExecutor {
352    fn default() -> Self {
353        Self::new()
354    }
355}
356
357#[async_trait]
358impl TaskExecutor for MainSessionExecutor {
359    async fn execute(
360        &self,
361        job: &NewScheduledJob,
362        ctx: &ExecutionContext,
363    ) -> Result<ExecutionResult> {
364        // 检查任务是否启用
365        if !job.enabled {
366            return Ok(ExecutionResult::skipped(
367                "main",
368                format!("Job '{}' is disabled", job.id),
369            ));
370        }
371
372        // 检查是否已取消
373        if ctx.is_cancelled() {
374            return Ok(ExecutionResult::skipped(
375                "main",
376                format!("Job '{}' was cancelled before execution", job.id),
377            ));
378        }
379
380        // 获取任务文本
381        let task_text = job.payload.get_text();
382
383        // 模拟执行(实际实现需要集成 Agent)
384        // TODO: 集成实际的 Agent 执行逻辑
385        tracing::info!(
386            "MainSessionExecutor: Executing job '{}' with payload: {}",
387            job.id,
388            task_text
389        );
390
391        let duration_ms = ctx.elapsed_ms();
392
393        // 返回成功结果
394        Ok(ExecutionResult::success(
395            "main",
396            Some(format!("Executed: {}", task_text)),
397            duration_ms,
398        ))
399    }
400
401    async fn cancel(&self, job_id: &str) -> Result<()> {
402        tracing::info!("MainSessionExecutor: Cancelling job '{}'", job_id);
403        // 主会话取消逻辑
404        // TODO: 实现实际的取消逻辑
405        Ok(())
406    }
407
408    fn name(&self) -> &str {
409        &self.name
410    }
411}
412
413// ============================================================================
414// IsolatedSessionExecutor (Task 7.3)
415// ============================================================================
416
417/// 隔离会话执行器
418///
419/// 在独立的隔离会话中执行调度任务,不影响主会话状态。
420/// 执行完成后可以将结果回传到主会话。
421///
422/// # 需求映射
423///
424/// - **Requirement 4.4**: 隔离会话创建
425/// - **Requirement 4.5**: 隔离会话执行
426/// - **Requirement 4.6**: 结果回传逻辑
427/// - **Requirement 4.7**: 输出截断
428///
429/// # 使用场景
430///
431/// 适用于需要独立执行的任务,如:
432/// - 长时间运行的任务
433/// - 可能产生大量输出的任务
434/// - 不希望影响主会话状态的任务
435///
436/// # 示例
437///
438/// ```rust,ignore
439/// use aster::scheduler::executor::{IsolatedSessionExecutor, ExecutionContext};
440///
441/// let executor = IsolatedSessionExecutor::new();
442/// let ctx = ExecutionContext::new();
443/// let result = executor.execute(&job, &ctx).await?;
444/// ```
445pub struct IsolatedSessionExecutor {
446    /// 执行器名称
447    name: String,
448}
449
450impl IsolatedSessionExecutor {
451    /// 创建新的隔离会话执行器
452    pub fn new() -> Self {
453        Self {
454            name: "isolated_session".to_string(),
455        }
456    }
457
458    /// 生成隔离会话 ID
459    fn generate_session_id(&self, job_id: &str) -> String {
460        let timestamp = Utc::now().timestamp_millis();
461        format!("isolated-{}-{}", job_id, timestamp)
462    }
463
464    /// 处理执行结果,应用隔离配置
465    fn process_result(
466        &self,
467        result: ExecutionResult,
468        isolation: &IsolationConfig,
469    ) -> ExecutionResult {
470        if !isolation.enabled {
471            return result;
472        }
473
474        // 根据配置处理输出
475        let processed_output = result.output.map(|output| {
476            match isolation.post_to_main_mode {
477                PostToMainMode::Summary => {
478                    // 摘要模式:生成简短状态
479                    if result.status.is_ok() {
480                        "Task completed successfully".to_string()
481                    } else if result.status.is_error() {
482                        format!(
483                            "Task failed: {}",
484                            result.error.as_deref().unwrap_or("Unknown error")
485                        )
486                    } else {
487                        "Task skipped".to_string()
488                    }
489                }
490                PostToMainMode::Full => {
491                    // 完整模式:截断输出
492                    isolation.truncate_output(&output)
493                }
494            }
495        });
496
497        ExecutionResult {
498            output: processed_output,
499            ..result
500        }
501    }
502}
503
504impl Default for IsolatedSessionExecutor {
505    fn default() -> Self {
506        Self::new()
507    }
508}
509
510#[async_trait]
511impl TaskExecutor for IsolatedSessionExecutor {
512    async fn execute(
513        &self,
514        job: &NewScheduledJob,
515        ctx: &ExecutionContext,
516    ) -> Result<ExecutionResult> {
517        // 检查任务是否启用
518        if !job.enabled {
519            return Ok(ExecutionResult::skipped(
520                self.generate_session_id(&job.id),
521                format!("Job '{}' is disabled", job.id),
522            ));
523        }
524
525        // 检查是否已取消
526        if ctx.is_cancelled() {
527            return Ok(ExecutionResult::skipped(
528                self.generate_session_id(&job.id),
529                format!("Job '{}' was cancelled before execution", job.id),
530            ));
531        }
532
533        // 生成隔离会话 ID
534        let session_id = self.generate_session_id(&job.id);
535
536        tracing::info!(
537            "IsolatedSessionExecutor: Creating isolated session '{}' for job '{}'",
538            session_id,
539            job.id
540        );
541
542        // 获取任务文本
543        let task_text = job.payload.get_text();
544
545        // 模拟执行(实际实现需要集成 Agent 和 SessionManager)
546        // TODO: 集成实际的隔离会话创建和 Agent 执行逻辑
547        tracing::info!(
548            "IsolatedSessionExecutor: Executing job '{}' in session '{}' with payload: {}",
549            job.id,
550            session_id,
551            task_text
552        );
553
554        let duration_ms = ctx.elapsed_ms();
555
556        // 创建执行结果
557        let result = ExecutionResult::success(
558            session_id,
559            Some(format!("Isolated execution: {}", task_text)),
560            duration_ms,
561        );
562
563        // 应用隔离配置处理结果
564        let isolation = job.isolation.as_ref().cloned().unwrap_or_default();
565        Ok(self.process_result(result, &isolation))
566    }
567
568    async fn cancel(&self, job_id: &str) -> Result<()> {
569        tracing::info!("IsolatedSessionExecutor: Cancelling job '{}'", job_id);
570        // 隔离会话取消逻辑
571        // TODO: 实现实际的取消逻辑
572        Ok(())
573    }
574
575    fn name(&self) -> &str {
576        &self.name
577    }
578}
579
580// ============================================================================
581// ExecutorFactory
582// ============================================================================
583
584/// 执行器工厂
585///
586/// 根据任务配置创建合适的执行器。
587pub struct ExecutorFactory;
588
589impl ExecutorFactory {
590    /// 根据会话目标创建执行器
591    ///
592    /// # 参数
593    /// - `target`: 会话目标
594    ///
595    /// # 返回值
596    /// 返回对应的执行器实例
597    pub fn create(target: &SessionTarget) -> Arc<dyn TaskExecutor> {
598        match target {
599            SessionTarget::Main => Arc::new(MainSessionExecutor::new()),
600            SessionTarget::Isolated => Arc::new(IsolatedSessionExecutor::new()),
601        }
602    }
603
604    /// 为任务创建执行器
605    ///
606    /// # 参数
607    /// - `job`: 调度任务
608    ///
609    /// # 返回值
610    /// 返回对应的执行器实例
611    pub fn create_for_job(job: &NewScheduledJob) -> Arc<dyn TaskExecutor> {
612        Self::create(&job.session_target)
613    }
614}
615
616// ============================================================================
617// 单元测试
618// ============================================================================
619
620#[cfg(test)]
621mod tests {
622    use super::*;
623    use crate::scheduler::types::{CronPayload, JobState, ScheduleType, WakeMode};
624
625    // 创建测试用的 ScheduledJob
626    fn create_test_job(id: &str, enabled: bool, target: SessionTarget) -> NewScheduledJob {
627        NewScheduledJob {
628            id: id.to_string(),
629            agent_id: None,
630            name: id.to_string(),
631            description: None,
632            enabled,
633            delete_after_run: false,
634            created_at_ms: Utc::now().timestamp_millis(),
635            updated_at_ms: Utc::now().timestamp_millis(),
636            schedule: ScheduleType::Cron {
637                expr: "0 0 9 * * *".to_string(),
638                tz: None,
639            },
640            session_target: target,
641            wake_mode: WakeMode::Now,
642            payload: CronPayload::agent_turn("Test task"),
643            isolation: None,
644            delivery: None,
645            state: JobState::default(),
646            source: None,
647            cron: None,
648        }
649    }
650
651    // ------------------------------------------------------------------------
652    // ExecutionResult 测试
653    // ------------------------------------------------------------------------
654
655    #[test]
656    fn test_execution_result_success() {
657        let result = ExecutionResult::success("session-1", Some("output".to_string()), 100);
658
659        assert_eq!(result.session_id, "session-1");
660        assert_eq!(result.output, Some("output".to_string()));
661        assert_eq!(result.duration_ms, 100);
662        assert!(result.is_success());
663        assert!(!result.is_failure());
664        assert!(!result.is_skipped());
665        assert!(result.error.is_none());
666    }
667
668    #[test]
669    fn test_execution_result_failure() {
670        let result = ExecutionResult::failure("session-2", "Connection error", 50);
671
672        assert_eq!(result.session_id, "session-2");
673        assert!(result.output.is_none());
674        assert_eq!(result.duration_ms, 50);
675        assert!(!result.is_success());
676        assert!(result.is_failure());
677        assert!(!result.is_skipped());
678        assert_eq!(result.error, Some("Connection error".to_string()));
679    }
680
681    #[test]
682    fn test_execution_result_skipped() {
683        let result = ExecutionResult::skipped("session-3", "Job disabled");
684
685        assert_eq!(result.session_id, "session-3");
686        assert_eq!(result.output, Some("Job disabled".to_string()));
687        assert_eq!(result.duration_ms, 0);
688        assert!(!result.is_success());
689        assert!(!result.is_failure());
690        assert!(result.is_skipped());
691        assert!(result.error.is_none());
692    }
693
694    #[test]
695    fn test_execution_result_format_output_with_output() {
696        let result = ExecutionResult::success("session", Some("Task output".to_string()), 100);
697        let config = IsolationConfig {
698            enabled: true,
699            post_to_main_prefix: Some("[Task]".to_string()),
700            post_to_main_mode: PostToMainMode::Full,
701            post_to_main_max_chars: 1000,
702        };
703
704        let formatted = result.format_output(&config);
705        assert!(formatted.starts_with("[Task]"));
706        assert!(formatted.contains("Task output"));
707    }
708
709    #[test]
710    fn test_execution_result_format_output_with_error() {
711        let result = ExecutionResult::failure("session", "Some error", 100);
712        let config = IsolationConfig::default();
713
714        let formatted = result.format_output(&config);
715        assert!(formatted.contains("Error:"));
716        assert!(formatted.contains("Some error"));
717    }
718
719    #[test]
720    fn test_execution_result_format_output_no_output() {
721        let result = ExecutionResult {
722            session_id: "session".to_string(),
723            output: None,
724            duration_ms: 100,
725            status: JobStatus::Ok,
726            error: None,
727        };
728        let config = IsolationConfig::default();
729
730        let formatted = result.format_output(&config);
731        assert!(formatted.contains("Task completed"));
732    }
733
734    // ------------------------------------------------------------------------
735    // ExecutionContext 测试
736    // ------------------------------------------------------------------------
737
738    #[test]
739    fn test_execution_context_new() {
740        let ctx = ExecutionContext::new();
741
742        assert!(!ctx.is_cancelled());
743        assert!(ctx.start_time_ms > 0);
744    }
745
746    #[test]
747    fn test_execution_context_with_cancel_token() {
748        let token = CancellationToken::new();
749        let ctx = ExecutionContext::with_cancel_token(token.clone());
750
751        assert!(!ctx.is_cancelled());
752
753        token.cancel();
754        assert!(ctx.is_cancelled());
755    }
756
757    #[test]
758    fn test_execution_context_elapsed_ms() {
759        let ctx = ExecutionContext::new();
760
761        // 应该返回非负值
762        let elapsed = ctx.elapsed_ms();
763        assert!(elapsed < 1000); // 应该很快
764    }
765
766    #[test]
767    fn test_execution_context_default() {
768        let ctx = ExecutionContext::default();
769
770        assert!(!ctx.is_cancelled());
771        assert!(ctx.start_time_ms > 0);
772    }
773
774    // ------------------------------------------------------------------------
775    // MainSessionExecutor 测试
776    // ------------------------------------------------------------------------
777
778    #[tokio::test]
779    async fn test_main_session_executor_execute_enabled() {
780        let executor = MainSessionExecutor::new();
781        let job = create_test_job("test-job", true, SessionTarget::Main);
782        let ctx = ExecutionContext::new();
783
784        let result = executor.execute(&job, &ctx).await.unwrap();
785
786        assert!(result.is_success());
787        assert_eq!(result.session_id, "main");
788        assert!(result.output.is_some());
789    }
790
791    #[tokio::test]
792    async fn test_main_session_executor_execute_disabled() {
793        let executor = MainSessionExecutor::new();
794        let job = create_test_job("disabled-job", false, SessionTarget::Main);
795        let ctx = ExecutionContext::new();
796
797        let result = executor.execute(&job, &ctx).await.unwrap();
798
799        assert!(result.is_skipped());
800        assert!(result.output.unwrap().contains("disabled"));
801    }
802
803    #[tokio::test]
804    async fn test_main_session_executor_execute_cancelled() {
805        let executor = MainSessionExecutor::new();
806        let job = create_test_job("cancelled-job", true, SessionTarget::Main);
807        let token = CancellationToken::new();
808        token.cancel();
809        let ctx = ExecutionContext::with_cancel_token(token);
810
811        let result = executor.execute(&job, &ctx).await.unwrap();
812
813        assert!(result.is_skipped());
814        assert!(result.output.unwrap().contains("cancelled"));
815    }
816
817    #[tokio::test]
818    async fn test_main_session_executor_cancel() {
819        let executor = MainSessionExecutor::new();
820
821        let result = executor.cancel("test-job").await;
822        assert!(result.is_ok());
823    }
824
825    #[test]
826    fn test_main_session_executor_name() {
827        let executor = MainSessionExecutor::new();
828        assert_eq!(executor.name(), "main_session");
829    }
830
831    // ------------------------------------------------------------------------
832    // IsolatedSessionExecutor 测试
833    // ------------------------------------------------------------------------
834
835    #[tokio::test]
836    async fn test_isolated_session_executor_execute_enabled() {
837        let executor = IsolatedSessionExecutor::new();
838        let job = create_test_job("test-job", true, SessionTarget::Isolated);
839        let ctx = ExecutionContext::new();
840
841        let result = executor.execute(&job, &ctx).await.unwrap();
842
843        assert!(result.is_success());
844        assert!(result.session_id.starts_with("isolated-"));
845        assert!(result.session_id.contains("test-job"));
846        assert!(result.output.is_some());
847    }
848
849    #[tokio::test]
850    async fn test_isolated_session_executor_execute_disabled() {
851        let executor = IsolatedSessionExecutor::new();
852        let job = create_test_job("disabled-job", false, SessionTarget::Isolated);
853        let ctx = ExecutionContext::new();
854
855        let result = executor.execute(&job, &ctx).await.unwrap();
856
857        assert!(result.is_skipped());
858        assert!(result.output.unwrap().contains("disabled"));
859    }
860
861    #[tokio::test]
862    async fn test_isolated_session_executor_execute_cancelled() {
863        let executor = IsolatedSessionExecutor::new();
864        let job = create_test_job("cancelled-job", true, SessionTarget::Isolated);
865        let token = CancellationToken::new();
866        token.cancel();
867        let ctx = ExecutionContext::with_cancel_token(token);
868
869        let result = executor.execute(&job, &ctx).await.unwrap();
870
871        assert!(result.is_skipped());
872        assert!(result.output.unwrap().contains("cancelled"));
873    }
874
875    #[tokio::test]
876    async fn test_isolated_session_executor_with_isolation_config() {
877        let executor = IsolatedSessionExecutor::new();
878        let mut job = create_test_job("isolated-job", true, SessionTarget::Isolated);
879        job.isolation = Some(IsolationConfig {
880            enabled: true,
881            post_to_main_prefix: Some("[Scheduled]".to_string()),
882            post_to_main_mode: PostToMainMode::Summary,
883            post_to_main_max_chars: 100,
884        });
885        let ctx = ExecutionContext::new();
886
887        let result = executor.execute(&job, &ctx).await.unwrap();
888
889        assert!(result.is_success());
890        // Summary 模式下输出应该是简短的状态信息
891        assert!(result.output.unwrap().contains("completed"));
892    }
893
894    #[tokio::test]
895    async fn test_isolated_session_executor_cancel() {
896        let executor = IsolatedSessionExecutor::new();
897
898        let result = executor.cancel("test-job").await;
899        assert!(result.is_ok());
900    }
901
902    #[test]
903    fn test_isolated_session_executor_name() {
904        let executor = IsolatedSessionExecutor::new();
905        assert_eq!(executor.name(), "isolated_session");
906    }
907
908    #[test]
909    fn test_isolated_session_executor_generate_session_id() {
910        let executor = IsolatedSessionExecutor::new();
911
912        let id1 = executor.generate_session_id("job-1");
913        let id2 = executor.generate_session_id("job-1");
914
915        assert!(id1.starts_with("isolated-job-1-"));
916        assert!(id2.starts_with("isolated-job-1-"));
917        // 时间戳不同,ID 应该不同
918        // 注意:在快速执行时可能相同,所以不做严格断言
919    }
920
921    // ------------------------------------------------------------------------
922    // ExecutorFactory 测试
923    // ------------------------------------------------------------------------
924
925    #[test]
926    fn test_executor_factory_create_main() {
927        let executor = ExecutorFactory::create(&SessionTarget::Main);
928        assert_eq!(executor.name(), "main_session");
929    }
930
931    #[test]
932    fn test_executor_factory_create_isolated() {
933        let executor = ExecutorFactory::create(&SessionTarget::Isolated);
934        assert_eq!(executor.name(), "isolated_session");
935    }
936
937    #[test]
938    fn test_executor_factory_create_for_job_main() {
939        let job = create_test_job("test", true, SessionTarget::Main);
940        let executor = ExecutorFactory::create_for_job(&job);
941        assert_eq!(executor.name(), "main_session");
942    }
943
944    #[test]
945    fn test_executor_factory_create_for_job_isolated() {
946        let job = create_test_job("test", true, SessionTarget::Isolated);
947        let executor = ExecutorFactory::create_for_job(&job);
948        assert_eq!(executor.name(), "isolated_session");
949    }
950}
951
952// ============================================================================
953// 属性测试 (Property-Based Tests) - Task 7.4
954// ============================================================================
955
956#[cfg(test)]
957mod property_tests {
958    use super::*;
959    use crate::scheduler::types::{CronPayload, JobState, ScheduleType, WakeMode};
960    use proptest::prelude::*;
961
962    // ------------------------------------------------------------------------
963    // 生成器 (Generators)
964    // ------------------------------------------------------------------------
965
966    /// 生成有效的任务 ID
967    fn arb_job_id() -> impl Strategy<Value = String> {
968        "[a-z][a-z0-9-]{0,20}".prop_filter("非空 ID", |s| !s.is_empty())
969    }
970
971    /// 生成 SessionTarget
972    fn arb_session_target() -> impl Strategy<Value = SessionTarget> {
973        prop_oneof![Just(SessionTarget::Main), Just(SessionTarget::Isolated),]
974    }
975
976    /// 生成 IsolationConfig
977    fn arb_isolation_config() -> impl Strategy<Value = Option<IsolationConfig>> {
978        prop_oneof![
979            Just(None),
980            (
981                proptest::bool::ANY,
982                proptest::option::of("[A-Za-z\\[\\]]{1,10}"),
983                prop_oneof![Just(PostToMainMode::Summary), Just(PostToMainMode::Full),],
984                100usize..10000usize,
985            )
986                .prop_map(|(enabled, prefix, mode, max_chars)| {
987                    Some(IsolationConfig {
988                        enabled,
989                        post_to_main_prefix: prefix,
990                        post_to_main_mode: mode,
991                        post_to_main_max_chars: max_chars,
992                    })
993                }),
994        ]
995    }
996
997    /// 生成测试用 ScheduledJob
998    fn arb_test_job() -> impl Strategy<Value = NewScheduledJob> {
999        (
1000            arb_job_id(),
1001            proptest::bool::ANY,
1002            arb_session_target(),
1003            arb_isolation_config(),
1004        )
1005            .prop_map(|(id, enabled, target, isolation)| NewScheduledJob {
1006                id: id.clone(),
1007                agent_id: None,
1008                name: id,
1009                description: None,
1010                enabled,
1011                delete_after_run: false,
1012                created_at_ms: Utc::now().timestamp_millis(),
1013                updated_at_ms: Utc::now().timestamp_millis(),
1014                schedule: ScheduleType::Cron {
1015                    expr: "0 0 9 * * *".to_string(),
1016                    tz: None,
1017                },
1018                session_target: target,
1019                wake_mode: WakeMode::Now,
1020                payload: CronPayload::agent_turn("Test"),
1021                isolation,
1022                delivery: None,
1023                state: JobState::default(),
1024                source: None,
1025                cron: None,
1026            })
1027    }
1028
1029    // ------------------------------------------------------------------------
1030    // Property 6: 隔离会话创建
1031    // ------------------------------------------------------------------------
1032
1033    proptest! {
1034        #![proptest_config(ProptestConfig::with_cases(50))]
1035
1036        /// Property 6: 隔离会话创建
1037        ///
1038        /// **Validates: Requirements 4.4**
1039        ///
1040        /// *For any* 启用隔离的 ScheduledJob,执行时应创建新的隔离会话,
1041        /// 且该会话 ID 与主会话不同。
1042        #[test]
1043        fn prop_isolated_session_id_differs_from_main(job in arb_test_job()) {
1044            let rt = tokio::runtime::Runtime::new().unwrap();
1045            rt.block_on(async {
1046                let executor = IsolatedSessionExecutor::new();
1047                let ctx = ExecutionContext::new();
1048
1049                if job.enabled {
1050                    let result = executor.execute(&job, &ctx).await.unwrap();
1051
1052                    // 隔离会话 ID 应该以 "isolated-" 开头
1053                    prop_assert!(
1054                        result.session_id.starts_with("isolated-"),
1055                        "隔离会话 ID 应以 'isolated-' 开头"
1056                    );
1057
1058                    // 隔离会话 ID 应该包含任务 ID
1059                    prop_assert!(
1060                        result.session_id.contains(&job.id),
1061                        "隔离会话 ID 应包含任务 ID"
1062                    );
1063
1064                    // 隔离会话 ID 不应该是 "main"
1065                    prop_assert_ne!(
1066                        result.session_id,
1067                        "main",
1068                        "隔离会话 ID 不应为 'main'"
1069                    );
1070                }
1071
1072                Ok(())
1073            })?;
1074        }
1075
1076        /// Property 6.2: 主会话执行器使用固定会话 ID
1077        #[test]
1078        fn prop_main_session_uses_fixed_id(job in arb_test_job()) {
1079            let rt = tokio::runtime::Runtime::new().unwrap();
1080            rt.block_on(async {
1081                let executor = MainSessionExecutor::new();
1082                let ctx = ExecutionContext::new();
1083
1084                if job.enabled {
1085                    let result = executor.execute(&job, &ctx).await.unwrap();
1086
1087                    // 主会话 ID 应该是 "main"
1088                    prop_assert_eq!(
1089                        result.session_id,
1090                        "main",
1091                        "主会话 ID 应为 'main'"
1092                    );
1093                }
1094
1095                Ok(())
1096            })?;
1097        }
1098    }
1099
1100    // ------------------------------------------------------------------------
1101    // Property 7: 任务状态跟踪
1102    // ------------------------------------------------------------------------
1103
1104    proptest! {
1105        #![proptest_config(ProptestConfig::with_cases(50))]
1106
1107        /// Property 7.1: 执行结果包含正确的状态
1108        ///
1109        /// **Validates: Requirements 7.7, 7.8, 7.9**
1110        #[test]
1111        fn prop_execution_result_has_correct_status(job in arb_test_job()) {
1112            let rt = tokio::runtime::Runtime::new().unwrap();
1113            rt.block_on(async {
1114                let executor = ExecutorFactory::create_for_job(&job);
1115                let ctx = ExecutionContext::new();
1116
1117                let result = executor.execute(&job, &ctx).await.unwrap();
1118
1119                if job.enabled {
1120                    // 启用的任务应该成功执行
1121                    prop_assert!(
1122                        result.is_success(),
1123                        "启用的任务应成功执行"
1124                    );
1125                } else {
1126                    // 禁用的任务应该被跳过
1127                    prop_assert!(
1128                        result.is_skipped(),
1129                        "禁用的任务应被跳过"
1130                    );
1131                }
1132
1133                Ok(())
1134            })?;
1135        }
1136
1137        /// Property 7.2: 执行结果包含耗时信息
1138        ///
1139        /// **Validates: Requirements 7.8**
1140        #[test]
1141        fn prop_execution_result_has_duration(job in arb_test_job()) {
1142            let rt = tokio::runtime::Runtime::new().unwrap();
1143            rt.block_on(async {
1144                let executor = ExecutorFactory::create_for_job(&job);
1145                let ctx = ExecutionContext::new();
1146
1147                let result = executor.execute(&job, &ctx).await.unwrap();
1148
1149                // 耗时应该是非负数
1150                // 跳过的任务耗时为 0
1151                if job.enabled {
1152                    // 启用的任务耗时可能为 0 或更大
1153                    prop_assert!(
1154                        result.duration_ms < 10000,
1155                        "执行耗时应在合理范围内"
1156                    );
1157                } else {
1158                    prop_assert_eq!(
1159                        result.duration_ms,
1160                        0,
1161                        "跳过的任务耗时应为 0"
1162                    );
1163                }
1164
1165                Ok(())
1166            })?;
1167        }
1168
1169        /// Property 7.3: 取消的任务返回跳过状态
1170        ///
1171        /// **Validates: Requirements 7.9**
1172        #[test]
1173        fn prop_cancelled_task_returns_skipped(job in arb_test_job()) {
1174            let rt = tokio::runtime::Runtime::new().unwrap();
1175            rt.block_on(async {
1176                let executor = ExecutorFactory::create_for_job(&job);
1177                let token = CancellationToken::new();
1178                token.cancel();
1179                let ctx = ExecutionContext::with_cancel_token(token);
1180
1181                let result = executor.execute(&job, &ctx).await.unwrap();
1182
1183                // 取消的任务应该被跳过
1184                prop_assert!(
1185                    result.is_skipped(),
1186                    "取消的任务应被跳过"
1187                );
1188
1189                // 输出应该包含 "cancelled"
1190                if let Some(output) = &result.output {
1191                    prop_assert!(
1192                        output.contains("cancelled") || output.contains("disabled"),
1193                        "跳过原因应包含 'cancelled' 或 'disabled'"
1194                    );
1195                }
1196
1197                Ok(())
1198            })?;
1199        }
1200    }
1201
1202    // ------------------------------------------------------------------------
1203    // ExecutorFactory 属性测试
1204    // ------------------------------------------------------------------------
1205
1206    proptest! {
1207        #![proptest_config(ProptestConfig::with_cases(50))]
1208
1209        /// 工厂创建的执行器类型与会话目标匹配
1210        #[test]
1211        fn prop_factory_creates_correct_executor(target in arb_session_target()) {
1212            let executor = ExecutorFactory::create(&target);
1213
1214            match target {
1215                SessionTarget::Main => {
1216                    prop_assert_eq!(
1217                        executor.name(),
1218                        "main_session",
1219                        "Main 目标应创建 main_session 执行器"
1220                    );
1221                }
1222                SessionTarget::Isolated => {
1223                    prop_assert_eq!(
1224                        executor.name(),
1225                        "isolated_session",
1226                        "Isolated 目标应创建 isolated_session 执行器"
1227                    );
1228                }
1229            }
1230        }
1231
1232        /// 工厂为任务创建正确的执行器
1233        #[test]
1234        fn prop_factory_creates_correct_executor_for_job(job in arb_test_job()) {
1235            let executor = ExecutorFactory::create_for_job(&job);
1236
1237            match job.session_target {
1238                SessionTarget::Main => {
1239                    prop_assert_eq!(
1240                        executor.name(),
1241                        "main_session"
1242                    );
1243                }
1244                SessionTarget::Isolated => {
1245                    prop_assert_eq!(
1246                        executor.name(),
1247                        "isolated_session"
1248                    );
1249                }
1250            }
1251        }
1252    }
1253
1254    // ------------------------------------------------------------------------
1255    // ExecutionResult 属性测试
1256    // ------------------------------------------------------------------------
1257
1258    proptest! {
1259        #![proptest_config(ProptestConfig::with_cases(100))]
1260
1261        /// 成功结果的状态一致性
1262        #[test]
1263        fn prop_success_result_consistency(
1264            session_id in "[a-z]{5,15}",
1265            output in proptest::option::of("[a-zA-Z0-9 ]{0,100}"),
1266            duration in 0u64..100000u64,
1267        ) {
1268            let result = ExecutionResult::success(session_id.clone(), output.clone(), duration);
1269
1270            prop_assert_eq!(&result.session_id, &session_id);
1271            prop_assert_eq!(&result.output, &output);
1272            prop_assert_eq!(result.duration_ms, duration);
1273            prop_assert!(result.is_success());
1274            prop_assert!(!result.is_failure());
1275            prop_assert!(!result.is_skipped());
1276            prop_assert!(result.error.is_none());
1277        }
1278
1279        /// 失败结果的状态一致性
1280        #[test]
1281        fn prop_failure_result_consistency(
1282            session_id in "[a-z]{5,15}",
1283            error in "[a-zA-Z0-9 ]{1,50}",
1284            duration in 0u64..100000u64,
1285        ) {
1286            let result = ExecutionResult::failure(session_id.clone(), error.clone(), duration);
1287
1288            prop_assert_eq!(&result.session_id, &session_id);
1289            prop_assert!(result.output.is_none());
1290            prop_assert_eq!(result.duration_ms, duration);
1291            prop_assert!(!result.is_success());
1292            prop_assert!(result.is_failure());
1293            prop_assert!(!result.is_skipped());
1294            prop_assert_eq!(&result.error, &Some(error));
1295        }
1296
1297        /// 跳过结果的状态一致性
1298        #[test]
1299        fn prop_skipped_result_consistency(
1300            session_id in "[a-z]{5,15}",
1301            reason in "[a-zA-Z0-9 ]{1,50}",
1302        ) {
1303            let result = ExecutionResult::skipped(session_id.clone(), reason.clone());
1304
1305            prop_assert_eq!(&result.session_id, &session_id);
1306            prop_assert_eq!(&result.output, &Some(reason));
1307            prop_assert_eq!(result.duration_ms, 0);
1308            prop_assert!(!result.is_success());
1309            prop_assert!(!result.is_failure());
1310            prop_assert!(result.is_skipped());
1311            prop_assert!(result.error.is_none());
1312        }
1313    }
1314}