stormchaser_engine/handler/step/events/
running.rs1use crate::handler::fetch_step_instance;
2use anyhow::Result;
3use sqlx::PgPool;
4use tracing::info;
5
6#[tracing::instrument(skip(event, pool), fields(run_id = tracing::field::Empty, step_id = tracing::field::Empty))]
7pub async fn handle_step_running(
9 event: stormchaser_model::events::StepRunningEvent,
10 pool: PgPool,
11) -> Result<()> {
12 let run_id = event.run_id;
13 let step_id = event.step_id;
14
15 let span = tracing::Span::current();
16 span.record("run_id", tracing::field::display(run_id));
17 span.record("step_id", tracing::field::display(step_id));
18 let runner_id = event.runner_id.as_deref().unwrap_or("unknown");
19
20 info!(
21 "Step {} (Run {}) is now running on runner {}",
22 step_id, run_id, runner_id
23 );
24
25 let instance = fetch_step_instance(step_id, &pool).await?;
27
28 let machine =
30 crate::step_machine::StepMachine::<crate::step_machine::state::Pending>::from_instance(
31 instance.clone(),
32 );
33 let _ = machine
34 .start(runner_id.to_string(), &mut *pool.acquire().await?)
35 .await?;
36
37 crate::STEPS_STARTED.add(
38 1,
39 &[
40 opentelemetry::KeyValue::new("step_name", instance.step_name),
41 opentelemetry::KeyValue::new("step_type", instance.step_type),
42 opentelemetry::KeyValue::new("runner_id", runner_id.to_string()),
43 ],
44 );
45
46 Ok(())
47}