astro_run/
runner.rs

1use crate::{
2  stream::StreamReceiver, Context, HookBeforeRunStepResult, HookNoopResult, JobRunResult,
3  StepRunResult, WorkflowLog, WorkflowLogType, WorkflowRunResult, WorkflowStateEvent,
4};
5use serde::{Deserialize, Serialize};
6pub use tokio_stream::{Stream, StreamExt};
7
8#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
9pub enum RunResult {
10  Succeeded,
11  Failed { exit_code: i32 },
12  Cancelled,
13}
14
15#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
16pub struct Log {
17  pub log_type: WorkflowLogType,
18  pub message: String,
19}
20
21impl Log {
22  #[allow(clippy::self_named_constructors)]
23  pub fn log(message: impl Into<String>) -> Self {
24    Self {
25      log_type: WorkflowLogType::Log,
26      message: message.into(),
27    }
28  }
29
30  pub fn error(message: impl Into<String>) -> Self {
31    Self {
32      log_type: WorkflowLogType::Error,
33      message: message.into(),
34    }
35  }
36
37  pub fn is_error(&self) -> bool {
38    self.log_type == WorkflowLogType::Error
39  }
40}
41
42pub type RunResponse = crate::Result<StreamReceiver>;
43
44/// # Runner
45/// The `Runner` trait provides the most fundamental deconstruction of a runner. You can implement the `run` method to customize your own runner.
46///
47/// The `run` method is asynchronous. Before `run` is executed, the `Step` status is `WorkflowState::Pending`.
48///
49/// During the execution of `run`, the status is set to `WorkflowState::Queued`.
50/// At this point, you can handle scheduling logic related to the runner. **(Please avoid executing step's runtime logic within the asynchronous `run` method. It is highly recommended to use a separate thread to process individual steps.)**
51///
52/// After `run` has completed, the status becomes `WorkflowState::InProgress`. This is when the step is truly running. The `run` method returns a stream result that implements the `Stream` trait, allowing dynamic log updates.
53///
54/// ## Example
55///
56/// ```rust
57/// struct Runner;
58///
59/// #[astro_run::async_trait]
60/// impl astro_run::Runner for Runner {
61///   async fn run(&self, ctx: astro_run::Context) -> astro_run::RunResponse {
62///     let (tx, rx) = astro_run::stream();
63///
64///     tokio::task::spawn(async move {
65///       // Send running log
66///       tx.log(ctx.command.run);
67///
68///       // Send success log
69///       tx.end(astro_run::RunResult::Succeeded);
70///     });
71///
72///     Ok(rx)
73///   }
74/// }
75/// ```
76///
77#[async_trait::async_trait]
78pub trait Runner: Send + Sync {
79  async fn on_run_workflow(&self, _event: crate::RunWorkflowEvent) -> HookNoopResult {
80    Ok(())
81  }
82  async fn on_run_job(&self, _event: crate::RunJobEvent) -> HookNoopResult {
83    Ok(())
84  }
85  async fn on_before_run_step(&self, step: crate::Step) -> HookBeforeRunStepResult {
86    Ok(step)
87  }
88  async fn on_run_step(&self, _event: crate::RunStepEvent) -> HookNoopResult {
89    Ok(())
90  }
91  async fn on_state_change(&self, _event: WorkflowStateEvent) -> HookNoopResult {
92    Ok(())
93  }
94  async fn on_log(&self, _log: WorkflowLog) -> HookNoopResult {
95    Ok(())
96  }
97  async fn on_step_completed(&self, _result: StepRunResult) -> HookNoopResult {
98    Ok(())
99  }
100  async fn on_job_completed(&self, _result: JobRunResult) -> HookNoopResult {
101    Ok(())
102  }
103  async fn on_workflow_completed(&self, _result: WorkflowRunResult) -> HookNoopResult {
104    Ok(())
105  }
106  async fn run(&self, config: Context) -> RunResponse;
107}
108
109#[cfg(test)]
110mod tests {
111  use super::*;
112
113  #[test]
114  fn test_log() {
115    let log = Log::log("test");
116    assert_eq!(log.log_type, WorkflowLogType::Log);
117    assert_eq!(log.message, "test");
118    assert!(!log.is_error());
119
120    let log = Log::error("test");
121    assert_eq!(log.log_type, WorkflowLogType::Error);
122    assert_eq!(log.message, "test");
123    assert!(log.is_error());
124  }
125}