Skip to main content

courier/
cli.rs

1use std::path::{Path, PathBuf};
2
3use anyhow::{Context, Result};
4use clap::{Parser, Subcommand};
5
6use crate::config::Config;
7use crate::{Courier, Registry};
8
9pub const DEFAULT_CONFIG_PATH: &str = "config.toml";
10pub const CONFIG_ENV_VAR: &str = "COURIER_CONFIG";
11
12#[derive(Debug, Parser)]
13#[command(
14    version,
15    name = "courier",
16    about = "Run and inspect Courier pipeline runtimes",
17    subcommand_required = true,
18    arg_required_else_help = true
19)]
20pub struct Cli {
21    #[command(subcommand)]
22    pub command: CliCommand,
23}
24
25#[derive(Debug, Clone, Eq, PartialEq, Subcommand)]
26pub enum CliCommand {
27    /// Load config and start pipelines.
28    Run {
29        /// Config file or directory. Overrides COURIER_CONFIG.
30        #[arg(short, long)]
31        config: Option<PathBuf>,
32    },
33    /// Load config and build the runtime, then exit.
34    Validate {
35        /// Config file or directory. Overrides COURIER_CONFIG.
36        #[arg(short, long)]
37        config: Option<PathBuf>,
38    },
39    /// Print registered component kinds.
40    ListComponents,
41}
42
43pub fn resolve_config_path(config: Option<PathBuf>) -> PathBuf {
44    resolve_config_path_with_env(config, |key| std::env::var(key).ok())
45}
46
47fn resolve_config_path_with_env(
48    config: Option<PathBuf>,
49    env: impl FnOnce(&str) -> Option<String>,
50) -> PathBuf {
51    config.unwrap_or_else(|| {
52        env(CONFIG_ENV_VAR)
53            .map(PathBuf::from)
54            .unwrap_or_else(|| PathBuf::from(DEFAULT_CONFIG_PATH))
55    })
56}
57
58pub fn build_runtime(path: &Path) -> Result<Courier> {
59    let config = Config::load(path)?;
60    build_runtime_from_config(config, path)
61}
62
63/// Build a runtime from a pre-loaded `Config`. Used by `main` so that the
64/// observability config can be read off the parsed `Config` to drive the
65/// logging subscriber before the runtime is built — without re-reading
66/// the file from disk.
67pub fn build_runtime_from_config(config: Config, source: &Path) -> Result<Courier> {
68    let registry = Registry::with_builtins()?;
69    registry
70        .build_courier(config)
71        .with_context(|| format!("failed to build runtime from config {}", source.display()))
72}
73
74pub fn validate_config(path: &Path) -> Result<()> {
75    let config = Config::load(path)?;
76    let registry = Registry::with_builtins()?;
77    registry
78        .dry_run_build(config)
79        .with_context(|| format!("failed to build runtime from config {}", path.display()))
80}
81
82pub fn list_components(registry: &Registry) -> String {
83    fn sorted(items: impl Iterator<Item = impl AsRef<str>>) -> Vec<String> {
84        let mut values: Vec<_> = items.map(|item| item.as_ref().to_string()).collect();
85        values.sort();
86        values
87    }
88
89    let sources = sorted(registry.source_kinds());
90    let transforms = sorted(registry.transform_kinds());
91    let sinks = sorted(registry.sink_kinds());
92
93    let mut out = String::new();
94    out.push_str("sources:\n");
95    for kind in sources {
96        out.push_str("  ");
97        out.push_str(&kind);
98        out.push('\n');
99    }
100    out.push_str("transforms:\n");
101    for kind in transforms {
102        out.push_str("  ");
103        out.push_str(&kind);
104        out.push('\n');
105    }
106    out.push_str("sinks:\n");
107    for kind in sinks {
108        out.push_str("  ");
109        out.push_str(&kind);
110        out.push('\n');
111    }
112    out
113}
114
115#[cfg(test)]
116mod tests {
117    use super::*;
118
119    #[test]
120    fn resolve_path_prefers_arg_then_env_then_default() {
121        assert_eq!(
122            resolve_config_path_with_env(Some(PathBuf::from("arg.toml")), |_| {
123                Some("env.toml".to_string())
124            }),
125            PathBuf::from("arg.toml")
126        );
127        assert_eq!(
128            resolve_config_path_with_env(None, |_| Some("env.toml".to_string())),
129            PathBuf::from("env.toml")
130        );
131        assert_eq!(
132            resolve_config_path_with_env(None, |_| None),
133            PathBuf::from(DEFAULT_CONFIG_PATH)
134        );
135    }
136
137    #[test]
138    fn list_components_includes_registered_builtins() {
139        let registry = Registry::with_builtins().unwrap();
140        let output = list_components(&registry);
141
142        assert!(output.contains("sources:\n"));
143        assert!(output.contains("  api_poll\n"));
144        assert!(output.contains("  http_webhook\n"));
145        assert!(output.contains("  sql_query_poll\n"));
146        assert!(output.contains("transforms:\n"));
147        assert!(output.contains("  script\n"));
148        assert!(output.contains("sinks:\n"));
149        assert!(output.contains("  sql\n"));
150    }
151
152    #[test]
153    fn validate_accepts_valid_config() {
154        let dir = tempfile::tempdir().unwrap();
155        let path = dir.path().join("valid.toml");
156        std::fs::write(
157            &path,
158            r#"
159[[pipelines]]
160name = "valid"
161
162[pipelines.source]
163type = "api_poll"
164url = "http://localhost/data"
165interval_secs = 60
166
167[[pipelines.sinks]]
168type = "api"
169url = "http://localhost/ingest"
170"#,
171        )
172        .unwrap();
173
174        validate_config(&path).unwrap();
175    }
176
177    #[test]
178    fn validate_does_not_create_file_sink_output() {
179        let dir = tempfile::tempdir().unwrap();
180        let config_path = dir.path().join("valid.toml");
181        let output_path = dir.path().join("nested/out.jsonl");
182        std::fs::write(
183            &config_path,
184            format!(
185                r#"
186[[pipelines]]
187name = "valid-file"
188
189[pipelines.source]
190type = "api_poll"
191url = "http://localhost/data"
192interval_secs = 60
193
194[[pipelines.sinks]]
195type = "file"
196path = "{}"
197"#,
198                output_path.display()
199            ),
200        )
201        .unwrap();
202
203        validate_config(&config_path).unwrap();
204        assert!(!output_path.exists());
205        assert!(!output_path.parent().unwrap().exists());
206    }
207
208    #[test]
209    fn validate_rejects_invalid_config_with_path_context() {
210        let dir = tempfile::tempdir().unwrap();
211        let path = dir.path().join("invalid.toml");
212        std::fs::write(
213            &path,
214            r#"
215[[pipelines]]
216name = "invalid"
217
218[pipelines.source]
219type = "api_poll"
220url = "http://localhost/data"
221interval_secs = 60
222
223[[pipelines.sinks]]
224type = "missing"
225"#,
226        )
227        .unwrap();
228
229        let err = validate_config(&path).unwrap_err();
230        let msg = format!("{err:#}");
231        assert!(msg.contains("failed to build runtime from config"), "{msg}");
232        assert!(msg.contains("invalid.toml"), "{msg}");
233        assert!(msg.contains("unknown sink type 'missing'"), "{msg}");
234    }
235
236    #[test]
237    fn validate_rejects_invalid_script_config_with_component_context() {
238        let dir = tempfile::tempdir().unwrap();
239        let path = dir.path().join("invalid-script.toml");
240        std::fs::write(
241            &path,
242            r#"
243[[pipelines]]
244name = "invalid-script"
245
246[pipelines.source]
247type = "api_poll"
248url = "http://localhost/data"
249interval_secs = 60
250
251[[pipelines.transforms]]
252type = "script"
253runtime = "rhai"
254script = "payload"
255script_file = "/tmp/also-set.rhai"
256
257[[pipelines.sinks]]
258type = "api"
259url = "http://localhost/ingest"
260"#,
261        )
262        .unwrap();
263
264        let err = validate_config(&path).unwrap_err();
265        let msg = format!("{err:#}");
266        assert!(
267            msg.contains("pipeline 'invalid-script' transform[0]"),
268            "{msg}"
269        );
270        assert!(
271            msg.contains("set either 'script' or 'script_file', not both"),
272            "{msg}"
273        );
274    }
275}