1use std::path::{Path, PathBuf};
2
3use anyhow::{Context, Result};
4use clap::{Parser, Subcommand};
5
6use crate::config::Config;
7use crate::{Courier, Registry};
8
9pub const DEFAULT_CONFIG_PATH: &str = "config.toml";
10pub const CONFIG_ENV_VAR: &str = "COURIER_CONFIG";
11
12#[derive(Debug, Parser)]
13#[command(
14 version,
15 name = "courier",
16 about = "Run and inspect Courier pipeline runtimes",
17 subcommand_required = true,
18 arg_required_else_help = true
19)]
20pub struct Cli {
21 #[command(subcommand)]
22 pub command: CliCommand,
23}
24
25#[derive(Debug, Clone, Eq, PartialEq, Subcommand)]
26pub enum CliCommand {
27 Run {
29 #[arg(short, long)]
31 config: Option<PathBuf>,
32 },
33 Validate {
35 #[arg(short, long)]
37 config: Option<PathBuf>,
38 },
39 ListComponents,
41}
42
43pub fn resolve_config_path(config: Option<PathBuf>) -> PathBuf {
44 resolve_config_path_with_env(config, |key| std::env::var(key).ok())
45}
46
47fn resolve_config_path_with_env(
48 config: Option<PathBuf>,
49 env: impl FnOnce(&str) -> Option<String>,
50) -> PathBuf {
51 config.unwrap_or_else(|| {
52 env(CONFIG_ENV_VAR)
53 .map(PathBuf::from)
54 .unwrap_or_else(|| PathBuf::from(DEFAULT_CONFIG_PATH))
55 })
56}
57
58pub fn build_runtime(path: &Path) -> Result<Courier> {
59 let config = Config::load(path)?;
60 build_runtime_from_config(config, path)
61}
62
63pub fn build_runtime_from_config(config: Config, source: &Path) -> Result<Courier> {
68 let registry = Registry::with_builtins()?;
69 registry
70 .build_courier(config)
71 .with_context(|| format!("failed to build runtime from config {}", source.display()))
72}
73
74pub fn validate_config(path: &Path) -> Result<()> {
75 let config = Config::load(path)?;
76 let registry = Registry::with_builtins()?;
77 registry
78 .dry_run_build(config)
79 .with_context(|| format!("failed to build runtime from config {}", path.display()))
80}
81
82pub fn list_components(registry: &Registry) -> String {
83 fn sorted(items: impl Iterator<Item = impl AsRef<str>>) -> Vec<String> {
84 let mut values: Vec<_> = items.map(|item| item.as_ref().to_string()).collect();
85 values.sort();
86 values
87 }
88
89 let sources = sorted(registry.source_kinds());
90 let transforms = sorted(registry.transform_kinds());
91 let sinks = sorted(registry.sink_kinds());
92
93 let mut out = String::new();
94 out.push_str("sources:\n");
95 for kind in sources {
96 out.push_str(" ");
97 out.push_str(&kind);
98 out.push('\n');
99 }
100 out.push_str("transforms:\n");
101 for kind in transforms {
102 out.push_str(" ");
103 out.push_str(&kind);
104 out.push('\n');
105 }
106 out.push_str("sinks:\n");
107 for kind in sinks {
108 out.push_str(" ");
109 out.push_str(&kind);
110 out.push('\n');
111 }
112 out
113}
114
115#[cfg(test)]
116mod tests {
117 use super::*;
118
119 #[test]
120 fn resolve_path_prefers_arg_then_env_then_default() {
121 assert_eq!(
122 resolve_config_path_with_env(Some(PathBuf::from("arg.toml")), |_| {
123 Some("env.toml".to_string())
124 }),
125 PathBuf::from("arg.toml")
126 );
127 assert_eq!(
128 resolve_config_path_with_env(None, |_| Some("env.toml".to_string())),
129 PathBuf::from("env.toml")
130 );
131 assert_eq!(
132 resolve_config_path_with_env(None, |_| None),
133 PathBuf::from(DEFAULT_CONFIG_PATH)
134 );
135 }
136
137 #[test]
138 fn list_components_includes_registered_builtins() {
139 let registry = Registry::with_builtins().unwrap();
140 let output = list_components(®istry);
141
142 assert!(output.contains("sources:\n"));
143 assert!(output.contains(" api_poll\n"));
144 assert!(output.contains(" http_webhook\n"));
145 assert!(output.contains(" sql_query_poll\n"));
146 assert!(output.contains("transforms:\n"));
147 assert!(output.contains(" script\n"));
148 assert!(output.contains("sinks:\n"));
149 assert!(output.contains(" sql\n"));
150 }
151
152 #[test]
153 fn validate_accepts_valid_config() {
154 let dir = tempfile::tempdir().unwrap();
155 let path = dir.path().join("valid.toml");
156 std::fs::write(
157 &path,
158 r#"
159[[pipelines]]
160name = "valid"
161
162[pipelines.source]
163type = "api_poll"
164url = "http://localhost/data"
165interval_secs = 60
166
167[[pipelines.sinks]]
168type = "api"
169url = "http://localhost/ingest"
170"#,
171 )
172 .unwrap();
173
174 validate_config(&path).unwrap();
175 }
176
177 #[test]
178 fn validate_does_not_create_file_sink_output() {
179 let dir = tempfile::tempdir().unwrap();
180 let config_path = dir.path().join("valid.toml");
181 let output_path = dir.path().join("nested/out.jsonl");
182 std::fs::write(
183 &config_path,
184 format!(
185 r#"
186[[pipelines]]
187name = "valid-file"
188
189[pipelines.source]
190type = "api_poll"
191url = "http://localhost/data"
192interval_secs = 60
193
194[[pipelines.sinks]]
195type = "file"
196path = "{}"
197"#,
198 output_path.display()
199 ),
200 )
201 .unwrap();
202
203 validate_config(&config_path).unwrap();
204 assert!(!output_path.exists());
205 assert!(!output_path.parent().unwrap().exists());
206 }
207
208 #[test]
209 fn validate_rejects_invalid_config_with_path_context() {
210 let dir = tempfile::tempdir().unwrap();
211 let path = dir.path().join("invalid.toml");
212 std::fs::write(
213 &path,
214 r#"
215[[pipelines]]
216name = "invalid"
217
218[pipelines.source]
219type = "api_poll"
220url = "http://localhost/data"
221interval_secs = 60
222
223[[pipelines.sinks]]
224type = "missing"
225"#,
226 )
227 .unwrap();
228
229 let err = validate_config(&path).unwrap_err();
230 let msg = format!("{err:#}");
231 assert!(msg.contains("failed to build runtime from config"), "{msg}");
232 assert!(msg.contains("invalid.toml"), "{msg}");
233 assert!(msg.contains("unknown sink type 'missing'"), "{msg}");
234 }
235
236 #[test]
237 fn validate_rejects_invalid_script_config_with_component_context() {
238 let dir = tempfile::tempdir().unwrap();
239 let path = dir.path().join("invalid-script.toml");
240 std::fs::write(
241 &path,
242 r#"
243[[pipelines]]
244name = "invalid-script"
245
246[pipelines.source]
247type = "api_poll"
248url = "http://localhost/data"
249interval_secs = 60
250
251[[pipelines.transforms]]
252type = "script"
253runtime = "rhai"
254script = "payload"
255script_file = "/tmp/also-set.rhai"
256
257[[pipelines.sinks]]
258type = "api"
259url = "http://localhost/ingest"
260"#,
261 )
262 .unwrap();
263
264 let err = validate_config(&path).unwrap_err();
265 let msg = format!("{err:#}");
266 assert!(
267 msg.contains("pipeline 'invalid-script' transform[0]"),
268 "{msg}"
269 );
270 assert!(
271 msg.contains("set either 'script' or 'script_file', not both"),
272 "{msg}"
273 );
274 }
275}