Skip to main content

courier/transforms/script/
python.rs

1use std::io::{BufRead, BufReader, BufWriter, Write};
2use std::process::{Child, ChildStdin, ChildStdout, Command, Stdio};
3use std::sync::{Arc, Mutex};
4
5use anyhow::{Context, Result, anyhow, bail};
6use async_trait::async_trait;
7use serde::Deserialize;
8use serde_json::Value;
9use tokio::task;
10
11use crate::config::redact_secret;
12use crate::envelope::Envelope;
13
14use super::{ScriptEngine, ScriptTransformConfig};
15
16const PYTHON_BOOTSTRAP: &str = r#"
17import json
18import sys
19import traceback
20
21entrypoint_name = sys.argv[1]
22namespace = {}
23
24try:
25    script = json.loads(sys.stdin.readline())
26    exec(script, namespace)
27    entrypoint = namespace.get(entrypoint_name)
28    if not callable(entrypoint):
29        raise RuntimeError(f"missing Python entrypoint '{entrypoint_name}'")
30    sys.stdout.write(json.dumps({"ok": True, "ready": True}) + "\n")
31    sys.stdout.flush()
32except Exception:
33    sys.stdout.write(json.dumps({"ok": False, "error": traceback.format_exc()}) + "\n")
34    sys.stdout.flush()
35    raise SystemExit(1)
36
37for line in sys.stdin:
38    line = line.strip()
39    if not line:
40        continue
41    try:
42        env = json.loads(line)
43        result = entrypoint(env)
44        if result is None:
45            response = {"ok": True, "filtered": True}
46        else:
47            response = {"ok": True, "filtered": False, "env": result}
48    except Exception:
49        response = {"ok": False, "error": traceback.format_exc()}
50
51    sys.stdout.write(json.dumps(response) + "\n")
52    sys.stdout.flush()
53"#;
54
55pub struct PythonEngine {
56    entrypoint: String,
57    worker: Arc<Mutex<PythonWorker>>,
58}
59
60struct PythonWorker {
61    child: Child,
62    stdin: BufWriter<ChildStdin>,
63    stdout: BufReader<ChildStdout>,
64}
65
66#[derive(Deserialize)]
67struct PythonInitResponse {
68    ok: bool,
69    error: Option<String>,
70}
71
72#[derive(Deserialize)]
73struct PythonRunResponse {
74    ok: bool,
75    filtered: Option<bool>,
76    env: Option<Value>,
77    error: Option<String>,
78}
79
80#[async_trait]
81impl ScriptEngine for PythonEngine {
82    async fn run(&self, env: Envelope) -> Result<Option<Envelope>> {
83        let worker = Arc::clone(&self.worker);
84        let entrypoint = self.entrypoint.clone();
85
86        task::spawn_blocking(move || run_python_worker(&worker, &entrypoint, env))
87            .await
88            .context("Python runtime task failed")?
89    }
90}
91
92impl PythonEngine {
93    pub(super) fn new(config: &ScriptTransformConfig) -> Result<Self> {
94        let python = config
95            .python
96            .as_ref()
97            .expect("Python config missing for Python runtime");
98
99        let mut child = Command::new(&python.bin)
100            .arg("-u")
101            .arg("-c")
102            .arg(PYTHON_BOOTSTRAP)
103            .arg(&config.entrypoint)
104            .stdin(Stdio::piped())
105            .stdout(Stdio::piped())
106            .stderr(Stdio::inherit())
107            .spawn()
108            .with_context(|| {
109                format!(
110                    "failed to spawn Python interpreter '{}'",
111                    redact_secret(&python.bin)
112                )
113            })?;
114
115        let mut stdin = child
116            .stdin
117            .take()
118            .context("failed to capture Python stdin")?;
119        let stdout = child
120            .stdout
121            .take()
122            .context("failed to capture Python stdout")?;
123        serde_json::to_writer(&mut stdin, &config.script)
124            .context("failed to encode Python script for bootstrap")?;
125        stdin
126            .write_all(b"\n")
127            .context("failed to write Python bootstrap script delimiter")?;
128        stdin
129            .flush()
130            .context("failed to flush Python bootstrap script")?;
131        let mut stdout = BufReader::new(stdout);
132
133        let mut line = String::new();
134        let bytes = stdout
135            .read_line(&mut line)
136            .context("failed to read Python bootstrap response")?;
137        if bytes == 0 {
138            bail!("Python bootstrap exited before initialization completed");
139        }
140
141        let init: PythonInitResponse = serde_json::from_str(line.trim_end())
142            .context("failed to parse Python bootstrap response")?;
143        if !init.ok {
144            bail!(
145                "failed to initialize Python runtime: {}",
146                init.error.unwrap_or_else(|| "unknown error".into())
147            );
148        }
149
150        Ok(Self {
151            entrypoint: config.entrypoint.clone(),
152            worker: Arc::new(Mutex::new(PythonWorker {
153                child,
154                stdin: BufWriter::new(stdin),
155                stdout,
156            })),
157        })
158    }
159
160    #[cfg(test)]
161    fn run(&self, env: Envelope) -> Result<Option<Envelope>> {
162        run_python_worker(&self.worker, &self.entrypoint, env)
163    }
164}
165
166fn run_python_worker(
167    worker: &Mutex<PythonWorker>,
168    entrypoint: &str,
169    env: Envelope,
170) -> Result<Option<Envelope>> {
171    let mut worker = worker
172        .lock()
173        .map_err(|_| anyhow!("Python worker lock poisoned"))?;
174
175    serde_json::to_writer(&mut worker.stdin, &env)
176        .context("failed to encode envelope for Python runtime")?;
177    worker
178        .stdin
179        .write_all(b"\n")
180        .context("failed to write Python request delimiter")?;
181    worker
182        .stdin
183        .flush()
184        .context("failed to flush Python request")?;
185
186    let mut line = String::new();
187    let bytes = worker
188        .stdout
189        .read_line(&mut line)
190        .context("failed to read Python runtime response")?;
191    if bytes == 0 {
192        bail!(
193            "Python entrypoint '{}' exited before returning a response",
194            entrypoint
195        );
196    }
197
198    let response: PythonRunResponse =
199        serde_json::from_str(line.trim_end()).context("failed to parse Python runtime response")?;
200    if !response.ok {
201        bail!(
202            "Python entrypoint '{}' failed: {}",
203            entrypoint,
204            response.error.unwrap_or_else(|| "unknown error".into())
205        );
206    }
207
208    if response.filtered.unwrap_or(false) {
209        return Ok(None);
210    }
211
212    let env = response
213        .env
214        .context("Python runtime did not return an envelope")?;
215    serde_json::from_value(env)
216        .map(Some)
217        .map_err(|err| anyhow!(err).context("failed to convert Python return value into envelope"))
218}
219
220impl Drop for PythonEngine {
221    fn drop(&mut self) {
222        if let Ok(mut worker) = self.worker.lock() {
223            let _ = worker.child.kill();
224            let _ = worker.child.wait();
225        }
226    }
227}
228
229#[cfg(test)]
230mod tests {
231    use serde_json::json;
232
233    use super::PythonEngine;
234    use crate::envelope::Envelope;
235    use crate::transforms::script::{PythonConfig, ScriptRuntime, ScriptTransformConfig};
236
237    fn config_with_entrypoint(script: &str, entrypoint: &str) -> ScriptTransformConfig {
238        ScriptTransformConfig {
239            runtime: ScriptRuntime::Python,
240            script: script.into(),
241            entrypoint: entrypoint.into(),
242            python: Some(PythonConfig {
243                bin: "python3".into(),
244            }),
245            rhai: None,
246        }
247    }
248
249    fn config(script: &str) -> ScriptTransformConfig {
250        config_with_entrypoint(script, "transform")
251    }
252
253    #[test]
254    fn mutates_payload() {
255        let engine = PythonEngine::new(&config(
256            r#"
257
258def transform(env):
259    env["payload"]["processed"] = True
260    return env
261"#,
262        ))
263        .unwrap();
264
265        let out = engine
266            .run(Envelope::new("src", json!({ "value": 1 })))
267            .unwrap()
268            .unwrap();
269        assert_eq!(out.payload, json!({ "value": 1, "processed": true }));
270    }
271
272    #[test]
273    fn mutates_metadata() {
274        let engine = PythonEngine::new(&config(
275            r#"
276
277def transform(env):
278    env["meta"]["headers"]["script_runtime"] = "python"
279    return env
280"#,
281        ))
282        .unwrap();
283
284        let out = engine
285            .run(Envelope::new("src", json!({})))
286            .unwrap()
287            .unwrap();
288        assert_eq!(
289            out.meta.headers.get("script_runtime").map(String::as_str),
290            Some("python")
291        );
292    }
293
294    #[test]
295    fn none_return_filters_envelope() {
296        let engine = PythonEngine::new(&config(
297            r#"
298
299def transform(env):
300    return None
301"#,
302        ))
303        .unwrap();
304
305        let out = engine
306            .run(Envelope::new("src", json!({ "skip": true })))
307            .unwrap();
308        assert!(out.is_none());
309    }
310
311    #[test]
312    fn script_is_not_exposed_to_python_child_processes_as_env() {
313        let engine = PythonEngine::new(&config(
314            r#"
315import subprocess
316import sys
317
318def transform(env):
319    out = subprocess.check_output([
320        sys.executable,
321        "-c",
322        "import os; print(os.environ.get('COURIER_PYTHON_SCRIPT', ''))",
323    ], text=True)
324    env["payload"]["inherited_script"] = out.strip()
325    return env
326"#,
327        ))
328        .unwrap();
329
330        let out = engine
331            .run(Envelope::new("src", json!({})))
332            .unwrap()
333            .unwrap();
334        assert_eq!(out.payload, json!({ "inherited_script": "" }));
335    }
336
337    #[test]
338    fn compile_error_fails_build() {
339        let err = PythonEngine::new(&config("def transform(env):\n    if\n"))
340            .err()
341            .expect("expected compile error");
342        let msg = format!("{err:#}");
343        assert!(msg.contains("failed to initialize Python runtime"), "{msg}");
344    }
345
346    #[test]
347    fn supports_custom_entrypoint() {
348        let engine = PythonEngine::new(&config_with_entrypoint(
349            r#"
350
351def process(env):
352    env["payload"]["processed"] = True
353    return env
354"#,
355            "process",
356        ))
357        .unwrap();
358
359        let out = engine
360            .run(Envelope::new("src", json!({ "value": 1 })))
361            .unwrap()
362            .unwrap();
363        assert_eq!(out.payload, json!({ "value": 1, "processed": true }));
364    }
365
366    #[test]
367    fn missing_entrypoint_fails_build() {
368        let err = PythonEngine::new(&config("def other(env):\n    return env\n"))
369            .err()
370            .expect("expected missing entrypoint error");
371        let msg = format!("{err:#}");
372        assert!(
373            msg.contains("missing Python entrypoint 'transform'"),
374            "{msg}"
375        );
376    }
377
378    #[test]
379    fn invalid_return_shape_fails_run() {
380        let engine = PythonEngine::new(&config(
381            r#"
382
383def transform(env):
384    return 42
385"#,
386        ))
387        .unwrap();
388
389        let err = engine.run(Envelope::new("src", json!({}))).unwrap_err();
390        let msg = format!("{err:#}");
391        assert!(
392            msg.contains("failed to convert Python return value into envelope"),
393            "{msg}"
394        );
395    }
396
397    #[test]
398    fn runtime_exception_fails_run() {
399        let engine = PythonEngine::new(&config(
400            r#"
401
402def transform(env):
403    raise RuntimeError("boom")
404"#,
405        ))
406        .unwrap();
407
408        let err = engine.run(Envelope::new("src", json!({}))).unwrap_err();
409        let msg = format!("{err:#}");
410        assert!(
411            msg.contains("Python entrypoint 'transform' failed"),
412            "{msg}"
413        );
414    }
415
416    #[test]
417    fn missing_interpreter_fails_build() {
418        let mut config = config("def transform(env):\n    return env\n");
419        config.python = Some(PythonConfig {
420            bin: "__courier_missing_python__".into(),
421        });
422
423        let err = PythonEngine::new(&config)
424            .err()
425            .expect("expected missing interpreter error");
426        let msg = format!("{err:#}");
427        assert!(msg.contains("failed to spawn Python interpreter"), "{msg}");
428    }
429}