Skip to main content

hm_dsl_engine/
python_engine.rs

1use std::path::Path;
2use std::process::Stdio;
3
4use anyhow::{Context, Result, bail};
5use async_trait::async_trait;
6use tracing::debug;
7
8use crate::bundled_sources;
9use crate::{DslEngine, PipelineMeta};
10
11const LIST_PIPELINES_SCRIPT: &str = "\
12import sys, json, pathlib, importlib.util
13try:
14    import harmont as hm
15except ImportError as e:
16    print(f'error: {e}', file=sys.stderr)
17    sys.exit(1)
18for p in sorted(pathlib.Path('.hm').glob('*.py')):
19    spec = importlib.util.spec_from_file_location(f'_harmont_{p.stem}', p)
20    mod = importlib.util.module_from_spec(spec)
21    spec.loader.exec_module(mod)
22envelope = json.loads(hm.dump_registry_json())
23print(json.dumps([{'slug': p['slug'], 'name': p['name']} for p in envelope['pipelines']]))
24";
25
26const REGISTRY_JSON_SCRIPT: &str = "\
27import sys, pathlib, importlib.util
28try:
29    import harmont as hm
30except ImportError as e:
31    print(f'error: {e}', file=sys.stderr)
32    sys.exit(1)
33for p in sorted(pathlib.Path('.hm').glob('*.py')):
34    spec = importlib.util.spec_from_file_location(f'_harmont_{p.stem}', p)
35    mod = importlib.util.module_from_spec(spec)
36    spec.loader.exec_module(mod)
37sys.stdout.write(hm.dump_registry_json())
38";
39
40const RENDER_PIPELINE_SCRIPT: &str = "\
41import sys, json, pathlib, importlib.util
42try:
43    import harmont as hm
44except ImportError as e:
45    print(f'error: {e}', file=sys.stderr)
46    sys.exit(1)
47slug = sys.argv[1]
48for p in sorted(pathlib.Path('.hm').glob('*.py')):
49    spec = importlib.util.spec_from_file_location(f'_harmont_{p.stem}', p)
50    mod = importlib.util.module_from_spec(spec)
51    spec.loader.exec_module(mod)
52envelope = json.loads(hm.dump_registry_json())
53match = next((p for p in envelope['pipelines'] if p['slug'] == slug), None)
54if match is None:
55    avail = ', '.join(p['slug'] for p in envelope['pipelines']) or '(none)'
56    print(f'error: pipeline {slug!r} not found\\n  -> available: {avail}', file=sys.stderr)
57    sys.exit(2)
58print(json.dumps(match['definition']))
59";
60
61#[derive(Debug)]
62pub struct SubprocessPythonEngine {
63    python_bin: std::path::PathBuf,
64}
65
66impl SubprocessPythonEngine {
67    /// Create engine, verifying `python3` is available on PATH.
68    ///
69    /// # Errors
70    ///
71    /// Returns an error if `python3` is not found on `PATH`.
72    pub fn new() -> Result<Self> {
73        let python_bin =
74            which::which("python3").context("python3 not found on PATH — install Python 3.11+")?;
75        Ok(Self { python_bin })
76    }
77
78    async fn run_script(
79        &self,
80        project_dir: &Path,
81        script: &str,
82        extra_args: &[&str],
83    ) -> Result<String> {
84        let tmp = tempfile::tempdir().context("creating temp dir for harmont-py")?;
85        let harmont_pkg = tmp.path().join("harmont");
86        bundled_sources::extract_to(&bundled_sources::HARMONT_PY, &harmont_pkg)?;
87
88        let mut cmd = tokio::process::Command::new(&self.python_bin);
89        cmd.arg("-c")
90            .arg(script)
91            .args(extra_args)
92            .current_dir(project_dir)
93            .env("PYTHONPATH", tmp.path())
94            .env("PYTHONDONTWRITEBYTECODE", "1")
95            .stdin(Stdio::null())
96            .stdout(Stdio::piped())
97            .stderr(Stdio::piped());
98
99        debug!(?cmd, "running python3 subprocess");
100
101        let output = cmd.output().await.context("spawning python3")?;
102
103        if !output.status.success() {
104            let stderr = String::from_utf8_lossy(&output.stderr);
105            let code = output.status.code().unwrap_or(-1);
106            bail!("python3 exited with code {code}:\n{stderr}");
107        }
108
109        String::from_utf8(output.stdout).context("python3 stdout is not valid UTF-8")
110    }
111}
112
113#[async_trait]
114impl DslEngine for SubprocessPythonEngine {
115    async fn list_pipelines(&self, project_dir: &Path) -> Result<Vec<PipelineMeta>> {
116        let stdout = self
117            .run_script(project_dir, LIST_PIPELINES_SCRIPT, &[])
118            .await
119            .context("listing pipelines via python3")?;
120
121        debug!(raw_len = stdout.len(), "list_pipelines stdout");
122
123        serde_json::from_str(&stdout).context("decoding pipeline metadata from python3 stdout")
124    }
125
126    async fn render_pipeline_json(&self, project_dir: &Path, slug: &str) -> Result<String> {
127        self.run_script(project_dir, RENDER_PIPELINE_SCRIPT, &[slug])
128            .await
129            .context("rendering pipeline via python3")
130    }
131
132    async fn registry_json(&self, project_dir: &Path) -> Result<String> {
133        self.run_script(project_dir, REGISTRY_JSON_SCRIPT, &[])
134            .await
135            .context("dumping pipeline registry via python3")
136    }
137}