stormchaser_engine/handler/step/intrinsic/
webhook.rs1use anyhow::Result;
2use serde_json::Value;
3use sqlx::PgPool;
4use std::sync::Arc;
5use stormchaser_model::RunId;
6use stormchaser_model::StepInstanceId;
7use stormchaser_tls::TlsReloader;
8
9use crate::handler::fetch_step_instance;
10use crate::handler::handle_webhook_invoke;
11
12pub async fn try_dispatch(
14 run_id: RunId,
15 step_instance_id: StepInstanceId,
16 step_type: &str,
17 resolved_spec: &Value,
18 pool: PgPool,
19 nats_client: async_nats::Client,
20 _tls_reloader: Arc<TlsReloader>,
21) -> Result<bool> {
22 if step_type == "WebhookInvoke" {
23 let pool = pool.clone();
24 let nats_client = nats_client.clone();
25 let spec = resolved_spec.clone();
26
27 tokio::spawn(async move {
28 if let Err(e) = handle_webhook_invoke(
29 run_id,
30 step_instance_id,
31 spec,
32 pool.clone(),
33 nats_client.clone(),
34 )
35 .await
36 {
37 if let Ok(instance) = fetch_step_instance(step_instance_id, &pool).await {
38 let machine = crate::step_machine::StepMachine::<
39 crate::step_machine::state::Pending,
40 >::from_instance(instance);
41 if let Ok(mut conn) = pool.acquire().await {
42 if let Ok(_machine) = machine
43 .start("error-recovery".to_string(), &mut *conn)
44 .await
45 {
46 if let Ok(instance) = fetch_step_instance(step_instance_id, &pool).await
47 {
48 let machine =
49 crate::step_machine::StepMachine::<
50 crate::step_machine::state::Running,
51 >::from_instance(instance);
52 let _ = machine
53 .fail(
54 format!("Webhook invoke failed: {:?}", e),
55 None,
56 &mut *conn,
57 )
58 .await;
59 }
60 }
61 }
62 }
63 }
64 });
65 return Ok(true);
66 }
67
68 Ok(false)
69}