Skip to main content

stormchaser_engine/handler/integrations/
jinja.rs

1use crate::handler::{fetch_outputs, fetch_run_context, fetch_step_instance};
2use anyhow::Result;
3use chrono::Utc;
4use serde_json::Value;
5use sqlx::PgPool;
6use stormchaser_model::dsl::JinjaRenderSpec;
7use stormchaser_model::events::StepCompletedEvent;
8use stormchaser_model::RunId;
9use stormchaser_model::StepInstanceId;
10use tracing::info;
11
12/// Handle jinja render.
13pub async fn handle_jinja_render(
14    run_id: RunId,
15    step_id: StepInstanceId,
16    spec: Value,
17    pool: PgPool,
18    nats_client: async_nats::Client,
19) -> Result<()> {
20    let spec: JinjaRenderSpec = serde_json::from_value(spec)?;
21
22    info!("Rendering Jinja template for run {}", run_id);
23
24    // 1. Mark as Running
25    let instance = fetch_step_instance(step_id, &pool).await?;
26    let machine =
27        crate::step_machine::StepMachine::<crate::step_machine::state::Pending>::from_instance(
28            instance,
29        );
30    let _ = machine
31        .start("jinja".to_string(), &mut *pool.acquire().await?)
32        .await?;
33
34    // 2. Prepare Context for Template Rendering
35    let run_context = fetch_run_context(run_id, &pool).await?;
36    let outputs = fetch_outputs(run_id, &pool).await?;
37
38    let template_ctx = prepare_template_context(&spec, run_context, outputs, run_id);
39
40    // 3. Render Template
41    let rendered = render_template(&spec.template, &template_ctx)?;
42
43    // 4. Save Output and Complete Step
44    save_output_and_complete(run_id, step_id, &spec, rendered, pool, nats_client).await
45}
46
47use stormchaser_model::dsl;
48
49fn prepare_template_context(
50    spec: &dsl::JinjaRenderSpec,
51    run_context: crate::handler::RunContext,
52    outputs: Value,
53    run_id: RunId,
54) -> Value {
55    let mut template_ctx = serde_json::json!({
56        "inputs": run_context.inputs,
57        "steps": outputs,
58        "run": {
59            "id": run_id.to_string(),
60        }
61    });
62
63    // Merge custom context if provided
64    if let Some(custom_ctx) = &spec.context {
65        if let Some(obj) = template_ctx.as_object_mut() {
66            if let Some(custom_obj) = custom_ctx.as_object() {
67                for (k, v) in custom_obj {
68                    obj.insert(k.clone(), v.clone());
69                }
70            }
71        }
72    }
73    template_ctx
74}
75
76fn render_template(template: &str, context: &Value) -> Result<String> {
77    use minijinja::Environment;
78    let env = Environment::new();
79    env.render_str(template, context)
80        .map_err(|e| anyhow::anyhow!("Failed to render Jinja template: {:?}", e))
81}
82
83async fn save_output_and_complete(
84    run_id: RunId,
85    step_id: StepInstanceId,
86    spec: &dsl::JinjaRenderSpec,
87    rendered: String,
88    pool: PgPool,
89    nats_client: async_nats::Client,
90) -> Result<()> {
91    let mut tx = pool.begin().await?;
92    let instance = fetch_step_instance(step_id, &mut *tx).await?;
93    let machine =
94        crate::step_machine::StepMachine::<crate::step_machine::state::Running>::from_instance(
95            instance,
96        );
97    let _ = machine.succeed(&mut *tx).await?;
98
99    let output_key = spec
100        .output_key
101        .clone()
102        .unwrap_or_else(|| "result".to_string());
103    crate::db::upsert_step_output(
104        &mut *tx,
105        step_id,
106        &output_key,
107        &Value::String(rendered.clone()),
108    )
109    .await?;
110
111    tx.commit().await?;
112
113    let mut outputs_map = std::collections::HashMap::new();
114    outputs_map.insert(output_key.clone(), serde_json::json!(rendered));
115
116    let event = StepCompletedEvent {
117        run_id,
118        step_id,
119        event_type: "stormchaser.v1.step.completed".to_string(),
120        outputs: Some(outputs_map),
121        exit_code: Some(0),
122        runner_id: None,
123        storage_hashes: None,
124        artifacts: None,
125        test_reports: None,
126        timestamp: Utc::now(),
127    };
128    let js = async_nats::jetstream::new(nats_client);
129    stormchaser_model::nats::publish_cloudevent(
130        &js,
131        "stormchaser.v1.step.completed",
132        "stormchaser.v1.step.completed",
133        "/stormchaser",
134        serde_json::to_value(event).unwrap(),
135        Some("1.0"),
136        None,
137    )
138    .await?;
139
140    Ok(())
141}
142
143#[cfg(test)]
144mod tests {
145    use super::*;
146    use serde_json::json;
147    use stormchaser_model::workflow::RunContext;
148
149    #[test]
150    fn test_prepare_template_context() {
151        let run_id = RunId::new_v4();
152        let spec = JinjaRenderSpec {
153            template: "Hello".to_string(),
154            context: Some(json!({"extra": "value"})),
155            output_key: None,
156        };
157        let run_context = RunContext {
158            run_id,
159            dsl_version: "1.0".to_string(),
160            workflow_definition: json!({}),
161            source_code: "".to_string(),
162            inputs: json!({"name": "User"}),
163            secrets: json!({}),
164            sensitive_values: vec![],
165        };
166        let outputs = json!({"prev_step": {"outputs": {"res": "done"}}});
167
168        let ctx = prepare_template_context(&spec, run_context, outputs, run_id);
169
170        assert_eq!(ctx["inputs"]["name"], "User");
171        assert_eq!(ctx["steps"]["prev_step"]["outputs"]["res"], "done");
172        assert_eq!(ctx["run"]["id"], run_id.to_string());
173        assert_eq!(ctx["extra"], "value");
174    }
175
176    #[test]
177    fn test_render_template_happy() {
178        let ctx = json!({"name": "World"});
179        let rendered = render_template("Hello {{ name }}!", &ctx).unwrap();
180        assert_eq!(rendered, "Hello World!");
181    }
182
183    #[test]
184    fn test_render_template_error() {
185        let ctx = json!({});
186        let result = render_template("Hello {{ name", &ctx);
187        assert!(result.is_err());
188    }
189}