stormchaser_engine/handler/integrations/
jinja.rs1use crate::handler::{fetch_outputs, fetch_run_context, fetch_step_instance};
2use anyhow::Result;
3use chrono::Utc;
4use serde_json::Value;
5use sqlx::PgPool;
6use stormchaser_model::dsl::JinjaRenderSpec;
7use stormchaser_model::events::StepCompletedEvent;
8use stormchaser_model::RunId;
9use stormchaser_model::StepInstanceId;
10use tracing::info;
11
12pub async fn handle_jinja_render(
14 run_id: RunId,
15 step_id: StepInstanceId,
16 spec: Value,
17 pool: PgPool,
18 nats_client: async_nats::Client,
19) -> Result<()> {
20 let spec: JinjaRenderSpec = serde_json::from_value(spec)?;
21
22 info!("Rendering Jinja template for run {}", run_id);
23
24 let instance = fetch_step_instance(step_id, &pool).await?;
26 let machine =
27 crate::step_machine::StepMachine::<crate::step_machine::state::Pending>::from_instance(
28 instance,
29 );
30 let _ = machine
31 .start("jinja".to_string(), &mut *pool.acquire().await?)
32 .await?;
33
34 let run_context = fetch_run_context(run_id, &pool).await?;
36 let outputs = fetch_outputs(run_id, &pool).await?;
37
38 let template_ctx = prepare_template_context(&spec, run_context, outputs, run_id);
39
40 let rendered = render_template(&spec.template, &template_ctx)?;
42
43 save_output_and_complete(run_id, step_id, &spec, rendered, pool, nats_client).await
45}
46
47use stormchaser_model::dsl;
48
49fn prepare_template_context(
50 spec: &dsl::JinjaRenderSpec,
51 run_context: crate::handler::RunContext,
52 outputs: Value,
53 run_id: RunId,
54) -> Value {
55 let mut template_ctx = serde_json::json!({
56 "inputs": run_context.inputs,
57 "steps": outputs,
58 "run": {
59 "id": run_id.to_string(),
60 }
61 });
62
63 if let Some(custom_ctx) = &spec.context {
65 if let Some(obj) = template_ctx.as_object_mut() {
66 if let Some(custom_obj) = custom_ctx.as_object() {
67 for (k, v) in custom_obj {
68 obj.insert(k.clone(), v.clone());
69 }
70 }
71 }
72 }
73 template_ctx
74}
75
76fn render_template(template: &str, context: &Value) -> Result<String> {
77 use minijinja::Environment;
78 let env = Environment::new();
79 env.render_str(template, context)
80 .map_err(|e| anyhow::anyhow!("Failed to render Jinja template: {:?}", e))
81}
82
83async fn save_output_and_complete(
84 run_id: RunId,
85 step_id: StepInstanceId,
86 spec: &dsl::JinjaRenderSpec,
87 rendered: String,
88 pool: PgPool,
89 nats_client: async_nats::Client,
90) -> Result<()> {
91 let mut tx = pool.begin().await?;
92 let instance = fetch_step_instance(step_id, &mut *tx).await?;
93 let machine =
94 crate::step_machine::StepMachine::<crate::step_machine::state::Running>::from_instance(
95 instance,
96 );
97 let _ = machine.succeed(&mut *tx).await?;
98
99 let output_key = spec
100 .output_key
101 .clone()
102 .unwrap_or_else(|| "result".to_string());
103 crate::db::upsert_step_output(
104 &mut *tx,
105 step_id,
106 &output_key,
107 &Value::String(rendered.clone()),
108 )
109 .await?;
110
111 tx.commit().await?;
112
113 let mut outputs_map = std::collections::HashMap::new();
114 outputs_map.insert(output_key.clone(), serde_json::json!(rendered));
115
116 let event = StepCompletedEvent {
117 run_id,
118 step_id,
119 event_type: "stormchaser.v1.step.completed".to_string(),
120 outputs: Some(outputs_map),
121 exit_code: Some(0),
122 runner_id: None,
123 storage_hashes: None,
124 artifacts: None,
125 test_reports: None,
126 timestamp: Utc::now(),
127 };
128 let js = async_nats::jetstream::new(nats_client);
129 stormchaser_model::nats::publish_cloudevent(
130 &js,
131 "stormchaser.v1.step.completed",
132 "stormchaser.v1.step.completed",
133 "/stormchaser",
134 serde_json::to_value(event).unwrap(),
135 Some("1.0"),
136 None,
137 )
138 .await?;
139
140 Ok(())
141}
142
143#[cfg(test)]
144mod tests {
145 use super::*;
146 use serde_json::json;
147 use stormchaser_model::workflow::RunContext;
148
149 #[test]
150 fn test_prepare_template_context() {
151 let run_id = RunId::new_v4();
152 let spec = JinjaRenderSpec {
153 template: "Hello".to_string(),
154 context: Some(json!({"extra": "value"})),
155 output_key: None,
156 };
157 let run_context = RunContext {
158 run_id,
159 dsl_version: "1.0".to_string(),
160 workflow_definition: json!({}),
161 source_code: "".to_string(),
162 inputs: json!({"name": "User"}),
163 secrets: json!({}),
164 sensitive_values: vec![],
165 };
166 let outputs = json!({"prev_step": {"outputs": {"res": "done"}}});
167
168 let ctx = prepare_template_context(&spec, run_context, outputs, run_id);
169
170 assert_eq!(ctx["inputs"]["name"], "User");
171 assert_eq!(ctx["steps"]["prev_step"]["outputs"]["res"], "done");
172 assert_eq!(ctx["run"]["id"], run_id.to_string());
173 assert_eq!(ctx["extra"], "value");
174 }
175
176 #[test]
177 fn test_render_template_happy() {
178 let ctx = json!({"name": "World"});
179 let rendered = render_template("Hello {{ name }}!", &ctx).unwrap();
180 assert_eq!(rendered, "Hello World!");
181 }
182
183 #[test]
184 fn test_render_template_error() {
185 let ctx = json!({});
186 let result = render_template("Hello {{ name", &ctx);
187 assert!(result.is_err());
188 }
189}