Skip to main content

stormchaser_engine/handler/step/events/
running.rs

1use crate::handler::fetch_step_instance;
2use anyhow::Result;
3use sqlx::PgPool;
4use tracing::info;
5
6#[tracing::instrument(skip(event, pool), fields(run_id = tracing::field::Empty, step_id = tracing::field::Empty))]
7/// Handle step running.
8pub async fn handle_step_running(
9    event: stormchaser_model::events::StepRunningEvent,
10    pool: PgPool,
11) -> Result<()> {
12    let run_id = event.run_id;
13    let step_id = event.step_id;
14
15    let span = tracing::Span::current();
16    span.record("run_id", tracing::field::display(run_id));
17    span.record("step_id", tracing::field::display(step_id));
18    let runner_id = event.runner_id.as_deref().unwrap_or("unknown");
19
20    info!(
21        "Step {} (Run {}) is now running on runner {}",
22        step_id, run_id, runner_id
23    );
24
25    // 1. Fetch current instance
26    let instance = fetch_step_instance(step_id, &pool).await?;
27
28    // 2. Use state machine to transition
29    let machine =
30        crate::step_machine::StepMachine::<crate::step_machine::state::Pending>::from_instance(
31            instance.clone(),
32        );
33    let _ = machine
34        .start(runner_id.to_string(), &mut *pool.acquire().await?)
35        .await?;
36
37    crate::STEPS_STARTED.add(
38        1,
39        &[
40            opentelemetry::KeyValue::new("step_name", instance.step_name),
41            opentelemetry::KeyValue::new("step_type", instance.step_type),
42            opentelemetry::KeyValue::new("runner_id", runner_id.to_string()),
43        ],
44    );
45
46    Ok(())
47}