hm_dsl_engine/
python_engine.rs1use std::path::Path;
2use std::process::Stdio;
3
4use anyhow::{Context, Result, bail};
5use async_trait::async_trait;
6use tracing::debug;
7
8use crate::bundled_sources;
9use crate::{DslEngine, PipelineMeta};
10
11const LIST_PIPELINES_SCRIPT: &str = "\
12import sys, json, pathlib, importlib.util
13try:
14 import harmont as hm
15except ImportError as e:
16 print(f'error: {e}', file=sys.stderr)
17 sys.exit(1)
18for p in sorted(pathlib.Path('.hm').glob('*.py')):
19 spec = importlib.util.spec_from_file_location(f'_harmont_{p.stem}', p)
20 mod = importlib.util.module_from_spec(spec)
21 spec.loader.exec_module(mod)
22envelope = json.loads(hm.dump_registry_json())
23print(json.dumps([{'slug': p['slug'], 'name': p['name']} for p in envelope['pipelines']]))
24";
25
26const REGISTRY_JSON_SCRIPT: &str = "\
27import sys, pathlib, importlib.util
28try:
29 import harmont as hm
30except ImportError as e:
31 print(f'error: {e}', file=sys.stderr)
32 sys.exit(1)
33for p in sorted(pathlib.Path('.hm').glob('*.py')):
34 spec = importlib.util.spec_from_file_location(f'_harmont_{p.stem}', p)
35 mod = importlib.util.module_from_spec(spec)
36 spec.loader.exec_module(mod)
37sys.stdout.write(hm.dump_registry_json())
38";
39
40const RENDER_PIPELINE_SCRIPT: &str = "\
41import sys, json, pathlib, importlib.util
42try:
43 import harmont as hm
44except ImportError as e:
45 print(f'error: {e}', file=sys.stderr)
46 sys.exit(1)
47slug = sys.argv[1]
48for p in sorted(pathlib.Path('.hm').glob('*.py')):
49 spec = importlib.util.spec_from_file_location(f'_harmont_{p.stem}', p)
50 mod = importlib.util.module_from_spec(spec)
51 spec.loader.exec_module(mod)
52envelope = json.loads(hm.dump_registry_json())
53match = next((p for p in envelope['pipelines'] if p['slug'] == slug), None)
54if match is None:
55 avail = ', '.join(p['slug'] for p in envelope['pipelines']) or '(none)'
56 print(f'error: pipeline {slug!r} not found\\n -> available: {avail}', file=sys.stderr)
57 sys.exit(2)
58print(json.dumps(match['definition']))
59";
60
61#[derive(Debug)]
62pub struct SubprocessPythonEngine {
63 python_bin: std::path::PathBuf,
64}
65
66impl SubprocessPythonEngine {
67 pub fn new() -> Result<Self> {
73 let python_bin =
74 which::which("python3").context("python3 not found on PATH — install Python 3.11+")?;
75 Ok(Self { python_bin })
76 }
77
78 async fn run_script(
79 &self,
80 project_dir: &Path,
81 script: &str,
82 extra_args: &[&str],
83 ) -> Result<String> {
84 let tmp = tempfile::tempdir().context("creating temp dir for harmont-py")?;
85 let harmont_pkg = tmp.path().join("harmont");
86 bundled_sources::extract_to(&bundled_sources::HARMONT_PY, &harmont_pkg)?;
87
88 let mut cmd = tokio::process::Command::new(&self.python_bin);
89 cmd.arg("-c")
90 .arg(script)
91 .args(extra_args)
92 .current_dir(project_dir)
93 .env("PYTHONPATH", tmp.path())
94 .env("PYTHONDONTWRITEBYTECODE", "1")
95 .stdin(Stdio::null())
96 .stdout(Stdio::piped())
97 .stderr(Stdio::piped());
98
99 debug!(?cmd, "running python3 subprocess");
100
101 let output = cmd.output().await.context("spawning python3")?;
102
103 if !output.status.success() {
104 let stderr = String::from_utf8_lossy(&output.stderr);
105 let code = output.status.code().unwrap_or(-1);
106 bail!("python3 exited with code {code}:\n{stderr}");
107 }
108
109 String::from_utf8(output.stdout).context("python3 stdout is not valid UTF-8")
110 }
111}
112
113#[async_trait]
114impl DslEngine for SubprocessPythonEngine {
115 async fn list_pipelines(&self, project_dir: &Path) -> Result<Vec<PipelineMeta>> {
116 let stdout = self
117 .run_script(project_dir, LIST_PIPELINES_SCRIPT, &[])
118 .await
119 .context("listing pipelines via python3")?;
120
121 debug!(raw_len = stdout.len(), "list_pipelines stdout");
122
123 serde_json::from_str(&stdout).context("decoding pipeline metadata from python3 stdout")
124 }
125
126 async fn render_pipeline_json(&self, project_dir: &Path, slug: &str) -> Result<String> {
127 self.run_script(project_dir, RENDER_PIPELINE_SCRIPT, &[slug])
128 .await
129 .context("rendering pipeline via python3")
130 }
131
132 async fn registry_json(&self, project_dir: &Path) -> Result<String> {
133 self.run_script(project_dir, REGISTRY_JSON_SCRIPT, &[])
134 .await
135 .context("dumping pipeline registry via python3")
136 }
137}