courier/transforms/script/
python.rs1use std::io::{BufRead, BufReader, BufWriter, Write};
2use std::process::{Child, ChildStdin, ChildStdout, Command, Stdio};
3use std::sync::{Arc, Mutex};
4
5use anyhow::{Context, Result, anyhow, bail};
6use async_trait::async_trait;
7use serde::Deserialize;
8use serde_json::Value;
9use tokio::task;
10
11use crate::config::redact_secret;
12use crate::envelope::Envelope;
13
14use super::{ScriptEngine, ScriptTransformConfig};
15
16const PYTHON_BOOTSTRAP: &str = r#"
17import json
18import sys
19import traceback
20
21entrypoint_name = sys.argv[1]
22namespace = {}
23
24try:
25 script = json.loads(sys.stdin.readline())
26 exec(script, namespace)
27 entrypoint = namespace.get(entrypoint_name)
28 if not callable(entrypoint):
29 raise RuntimeError(f"missing Python entrypoint '{entrypoint_name}'")
30 sys.stdout.write(json.dumps({"ok": True, "ready": True}) + "\n")
31 sys.stdout.flush()
32except Exception:
33 sys.stdout.write(json.dumps({"ok": False, "error": traceback.format_exc()}) + "\n")
34 sys.stdout.flush()
35 raise SystemExit(1)
36
37for line in sys.stdin:
38 line = line.strip()
39 if not line:
40 continue
41 try:
42 env = json.loads(line)
43 result = entrypoint(env)
44 if result is None:
45 response = {"ok": True, "filtered": True}
46 else:
47 response = {"ok": True, "filtered": False, "env": result}
48 except Exception:
49 response = {"ok": False, "error": traceback.format_exc()}
50
51 sys.stdout.write(json.dumps(response) + "\n")
52 sys.stdout.flush()
53"#;
54
55pub struct PythonEngine {
56 entrypoint: String,
57 worker: Arc<Mutex<PythonWorker>>,
58}
59
60struct PythonWorker {
61 child: Child,
62 stdin: BufWriter<ChildStdin>,
63 stdout: BufReader<ChildStdout>,
64}
65
66#[derive(Deserialize)]
67struct PythonInitResponse {
68 ok: bool,
69 error: Option<String>,
70}
71
72#[derive(Deserialize)]
73struct PythonRunResponse {
74 ok: bool,
75 filtered: Option<bool>,
76 env: Option<Value>,
77 error: Option<String>,
78}
79
80#[async_trait]
81impl ScriptEngine for PythonEngine {
82 async fn run(&self, env: Envelope) -> Result<Option<Envelope>> {
83 let worker = Arc::clone(&self.worker);
84 let entrypoint = self.entrypoint.clone();
85
86 task::spawn_blocking(move || run_python_worker(&worker, &entrypoint, env))
87 .await
88 .context("Python runtime task failed")?
89 }
90}
91
92impl PythonEngine {
93 pub(super) fn new(config: &ScriptTransformConfig) -> Result<Self> {
94 let python = config
95 .python
96 .as_ref()
97 .expect("Python config missing for Python runtime");
98
99 let mut child = Command::new(&python.bin)
100 .arg("-u")
101 .arg("-c")
102 .arg(PYTHON_BOOTSTRAP)
103 .arg(&config.entrypoint)
104 .stdin(Stdio::piped())
105 .stdout(Stdio::piped())
106 .stderr(Stdio::inherit())
107 .spawn()
108 .with_context(|| {
109 format!(
110 "failed to spawn Python interpreter '{}'",
111 redact_secret(&python.bin)
112 )
113 })?;
114
115 let mut stdin = child
116 .stdin
117 .take()
118 .context("failed to capture Python stdin")?;
119 let stdout = child
120 .stdout
121 .take()
122 .context("failed to capture Python stdout")?;
123 serde_json::to_writer(&mut stdin, &config.script)
124 .context("failed to encode Python script for bootstrap")?;
125 stdin
126 .write_all(b"\n")
127 .context("failed to write Python bootstrap script delimiter")?;
128 stdin
129 .flush()
130 .context("failed to flush Python bootstrap script")?;
131 let mut stdout = BufReader::new(stdout);
132
133 let mut line = String::new();
134 let bytes = stdout
135 .read_line(&mut line)
136 .context("failed to read Python bootstrap response")?;
137 if bytes == 0 {
138 bail!("Python bootstrap exited before initialization completed");
139 }
140
141 let init: PythonInitResponse = serde_json::from_str(line.trim_end())
142 .context("failed to parse Python bootstrap response")?;
143 if !init.ok {
144 bail!(
145 "failed to initialize Python runtime: {}",
146 init.error.unwrap_or_else(|| "unknown error".into())
147 );
148 }
149
150 Ok(Self {
151 entrypoint: config.entrypoint.clone(),
152 worker: Arc::new(Mutex::new(PythonWorker {
153 child,
154 stdin: BufWriter::new(stdin),
155 stdout,
156 })),
157 })
158 }
159
160 #[cfg(test)]
161 fn run(&self, env: Envelope) -> Result<Option<Envelope>> {
162 run_python_worker(&self.worker, &self.entrypoint, env)
163 }
164}
165
166fn run_python_worker(
167 worker: &Mutex<PythonWorker>,
168 entrypoint: &str,
169 env: Envelope,
170) -> Result<Option<Envelope>> {
171 let mut worker = worker
172 .lock()
173 .map_err(|_| anyhow!("Python worker lock poisoned"))?;
174
175 serde_json::to_writer(&mut worker.stdin, &env)
176 .context("failed to encode envelope for Python runtime")?;
177 worker
178 .stdin
179 .write_all(b"\n")
180 .context("failed to write Python request delimiter")?;
181 worker
182 .stdin
183 .flush()
184 .context("failed to flush Python request")?;
185
186 let mut line = String::new();
187 let bytes = worker
188 .stdout
189 .read_line(&mut line)
190 .context("failed to read Python runtime response")?;
191 if bytes == 0 {
192 bail!(
193 "Python entrypoint '{}' exited before returning a response",
194 entrypoint
195 );
196 }
197
198 let response: PythonRunResponse =
199 serde_json::from_str(line.trim_end()).context("failed to parse Python runtime response")?;
200 if !response.ok {
201 bail!(
202 "Python entrypoint '{}' failed: {}",
203 entrypoint,
204 response.error.unwrap_or_else(|| "unknown error".into())
205 );
206 }
207
208 if response.filtered.unwrap_or(false) {
209 return Ok(None);
210 }
211
212 let env = response
213 .env
214 .context("Python runtime did not return an envelope")?;
215 serde_json::from_value(env)
216 .map(Some)
217 .map_err(|err| anyhow!(err).context("failed to convert Python return value into envelope"))
218}
219
220impl Drop for PythonEngine {
221 fn drop(&mut self) {
222 if let Ok(mut worker) = self.worker.lock() {
223 let _ = worker.child.kill();
224 let _ = worker.child.wait();
225 }
226 }
227}
228
229#[cfg(test)]
230mod tests {
231 use serde_json::json;
232
233 use super::PythonEngine;
234 use crate::envelope::Envelope;
235 use crate::transforms::script::{PythonConfig, ScriptRuntime, ScriptTransformConfig};
236
237 fn config_with_entrypoint(script: &str, entrypoint: &str) -> ScriptTransformConfig {
238 ScriptTransformConfig {
239 runtime: ScriptRuntime::Python,
240 script: script.into(),
241 entrypoint: entrypoint.into(),
242 python: Some(PythonConfig {
243 bin: "python3".into(),
244 }),
245 rhai: None,
246 }
247 }
248
249 fn config(script: &str) -> ScriptTransformConfig {
250 config_with_entrypoint(script, "transform")
251 }
252
253 #[test]
254 fn mutates_payload() {
255 let engine = PythonEngine::new(&config(
256 r#"
257
258def transform(env):
259 env["payload"]["processed"] = True
260 return env
261"#,
262 ))
263 .unwrap();
264
265 let out = engine
266 .run(Envelope::new("src", json!({ "value": 1 })))
267 .unwrap()
268 .unwrap();
269 assert_eq!(out.payload, json!({ "value": 1, "processed": true }));
270 }
271
272 #[test]
273 fn mutates_metadata() {
274 let engine = PythonEngine::new(&config(
275 r#"
276
277def transform(env):
278 env["meta"]["headers"]["script_runtime"] = "python"
279 return env
280"#,
281 ))
282 .unwrap();
283
284 let out = engine
285 .run(Envelope::new("src", json!({})))
286 .unwrap()
287 .unwrap();
288 assert_eq!(
289 out.meta.headers.get("script_runtime").map(String::as_str),
290 Some("python")
291 );
292 }
293
294 #[test]
295 fn none_return_filters_envelope() {
296 let engine = PythonEngine::new(&config(
297 r#"
298
299def transform(env):
300 return None
301"#,
302 ))
303 .unwrap();
304
305 let out = engine
306 .run(Envelope::new("src", json!({ "skip": true })))
307 .unwrap();
308 assert!(out.is_none());
309 }
310
311 #[test]
312 fn script_is_not_exposed_to_python_child_processes_as_env() {
313 let engine = PythonEngine::new(&config(
314 r#"
315import subprocess
316import sys
317
318def transform(env):
319 out = subprocess.check_output([
320 sys.executable,
321 "-c",
322 "import os; print(os.environ.get('COURIER_PYTHON_SCRIPT', ''))",
323 ], text=True)
324 env["payload"]["inherited_script"] = out.strip()
325 return env
326"#,
327 ))
328 .unwrap();
329
330 let out = engine
331 .run(Envelope::new("src", json!({})))
332 .unwrap()
333 .unwrap();
334 assert_eq!(out.payload, json!({ "inherited_script": "" }));
335 }
336
337 #[test]
338 fn compile_error_fails_build() {
339 let err = PythonEngine::new(&config("def transform(env):\n if\n"))
340 .err()
341 .expect("expected compile error");
342 let msg = format!("{err:#}");
343 assert!(msg.contains("failed to initialize Python runtime"), "{msg}");
344 }
345
346 #[test]
347 fn supports_custom_entrypoint() {
348 let engine = PythonEngine::new(&config_with_entrypoint(
349 r#"
350
351def process(env):
352 env["payload"]["processed"] = True
353 return env
354"#,
355 "process",
356 ))
357 .unwrap();
358
359 let out = engine
360 .run(Envelope::new("src", json!({ "value": 1 })))
361 .unwrap()
362 .unwrap();
363 assert_eq!(out.payload, json!({ "value": 1, "processed": true }));
364 }
365
366 #[test]
367 fn missing_entrypoint_fails_build() {
368 let err = PythonEngine::new(&config("def other(env):\n return env\n"))
369 .err()
370 .expect("expected missing entrypoint error");
371 let msg = format!("{err:#}");
372 assert!(
373 msg.contains("missing Python entrypoint 'transform'"),
374 "{msg}"
375 );
376 }
377
378 #[test]
379 fn invalid_return_shape_fails_run() {
380 let engine = PythonEngine::new(&config(
381 r#"
382
383def transform(env):
384 return 42
385"#,
386 ))
387 .unwrap();
388
389 let err = engine.run(Envelope::new("src", json!({}))).unwrap_err();
390 let msg = format!("{err:#}");
391 assert!(
392 msg.contains("failed to convert Python return value into envelope"),
393 "{msg}"
394 );
395 }
396
397 #[test]
398 fn runtime_exception_fails_run() {
399 let engine = PythonEngine::new(&config(
400 r#"
401
402def transform(env):
403 raise RuntimeError("boom")
404"#,
405 ))
406 .unwrap();
407
408 let err = engine.run(Envelope::new("src", json!({}))).unwrap_err();
409 let msg = format!("{err:#}");
410 assert!(
411 msg.contains("Python entrypoint 'transform' failed"),
412 "{msg}"
413 );
414 }
415
416 #[test]
417 fn missing_interpreter_fails_build() {
418 let mut config = config("def transform(env):\n return env\n");
419 config.python = Some(PythonConfig {
420 bin: "__courier_missing_python__".into(),
421 });
422
423 let err = PythonEngine::new(&config)
424 .err()
425 .expect("expected missing interpreter error");
426 let msg = format!("{err:#}");
427 assert!(msg.contains("failed to spawn Python interpreter"), "{msg}");
428 }
429}