1use bnto_core::{BntoError, Dependency, NodeRegistry, PipelineDefinition};
6use std::collections::HashSet;
7
8#[derive(Debug, Clone)]
10pub struct DependencyStatus {
11 pub dependency: Dependency,
12 pub found: bool,
13}
14
15pub fn collect_pipeline_dependencies(
21 definition: &PipelineDefinition,
22 registry: &NodeRegistry,
23) -> Vec<Dependency> {
24 let mut seen = HashSet::new();
25 let mut deps = Vec::new();
26 collect_from_nodes(&definition.nodes, registry, &mut seen, &mut deps);
27 deps
28}
29
30fn collect_from_nodes(
31 nodes: &[bnto_core::PipelineNode],
32 registry: &NodeRegistry,
33 seen: &mut HashSet<String>,
34 deps: &mut Vec<Dependency>,
35) {
36 let empty_params = serde_json::Map::new();
37 for node in nodes {
38 if let Some(processor) = registry.resolve(&node.node_type, &empty_params) {
39 for dep in &processor.metadata().requires {
40 if seen.insert(dep.binary.clone()) {
41 deps.push(dep.clone());
42 }
43 }
44 }
45 if let Some(children) = &node.children {
47 collect_from_nodes(children, registry, seen, deps);
48 }
49 }
50}
51
52pub fn collect_all_dependencies(registry: &NodeRegistry) -> Vec<Dependency> {
54 let mut seen = HashSet::new();
55 let mut deps = Vec::new();
56 for metadata in registry.catalog() {
57 for dep in metadata.requires {
58 if seen.insert(dep.binary.clone()) {
59 deps.push(dep);
60 }
61 }
62 }
63 deps
64}
65
66pub fn check_dependencies(
71 deps: &[Dependency],
72 ctx: &dyn bnto_core::ProcessContext,
73) -> Vec<DependencyStatus> {
74 deps.iter()
75 .map(|dep| {
76 let found = ctx.run_command("which", &[&dep.binary]).is_ok();
77 DependencyStatus {
78 dependency: dep.clone(),
79 found,
80 }
81 })
82 .collect()
83}
84
85pub fn check_pipeline_dependencies(
91 definition: &PipelineDefinition,
92 registry: &NodeRegistry,
93 ctx: &dyn bnto_core::ProcessContext,
94) -> Result<(), BntoError> {
95 let deps = collect_pipeline_dependencies(definition, registry);
96 if deps.is_empty() {
97 return Ok(());
98 }
99
100 let statuses = check_dependencies(&deps, ctx);
101 let missing: Vec<&DependencyStatus> = statuses.iter().filter(|s| !s.found).collect();
102
103 if missing.is_empty() {
104 return Ok(());
105 }
106
107 let messages: Vec<String> = missing
108 .iter()
109 .map(|s| {
110 let hint = &s.dependency.install_hint;
111 format!(" - {} (install: {})", s.dependency.binary, hint)
112 })
113 .collect();
114
115 Err(BntoError::InvalidInput(format!(
116 "Missing required dependencies:\n{}",
117 messages.join("\n")
118 )))
119}
120
121#[cfg(test)]
126mod tests {
127 use super::*;
128 use bnto_core::NodeProcessor;
129 use bnto_core::NoopContext;
130 use bnto_core::context::ProcessContext;
131 use bnto_core::errors::BntoError;
132 use bnto_core::metadata::{Dependency, InputCardinality, NodeCategory, NodeMetadata};
133 use bnto_core::processor::{NodeInput, NodeOutput, OutputFile};
134 use bnto_core::progress::ProgressReporter;
135 use std::path::{Path, PathBuf};
136
137 struct FfmpegProcessor;
140
141 impl NodeProcessor for FfmpegProcessor {
142 fn name(&self) -> &str {
143 "video-transcode"
144 }
145
146 fn process(
147 &self,
148 input: NodeInput,
149 _progress: &ProgressReporter,
150 _ctx: &dyn ProcessContext,
151 ) -> Result<NodeOutput, BntoError> {
152 Ok(NodeOutput {
153 files: vec![OutputFile {
154 data: input.data,
155 filename: input.filename,
156 mime_type: "video/mp4".to_string(),
157 }],
158 metadata: serde_json::Map::new(),
159 })
160 }
161
162 fn metadata(&self) -> NodeMetadata {
163 NodeMetadata {
164 node_type: "video-transcode".to_string(),
165 name: "Transcode Video".to_string(),
166 description: "Transcode video using ffmpeg.".to_string(),
167 category: NodeCategory::Data,
168 accepts: vec!["video/*".to_string()],
169 platforms: vec!["cli".to_string()],
170 parameters: vec![],
171 input_cardinality: InputCardinality::PerFile,
172 requires: vec![Dependency {
173 binary: "ffmpeg".to_string(),
174 version: ">=6.0".to_string(),
175 install_hint: "brew install ffmpeg".to_string(),
176 homepage: "https://ffmpeg.org".to_string(),
177 }],
178 }
179 }
180 }
181
182 struct YtDlpProcessor;
183
184 impl NodeProcessor for YtDlpProcessor {
185 fn name(&self) -> &str {
186 "video-download"
187 }
188
189 fn process(
190 &self,
191 input: NodeInput,
192 _progress: &ProgressReporter,
193 _ctx: &dyn ProcessContext,
194 ) -> Result<NodeOutput, BntoError> {
195 Ok(NodeOutput {
196 files: vec![OutputFile {
197 data: input.data,
198 filename: input.filename,
199 mime_type: "video/mp4".to_string(),
200 }],
201 metadata: serde_json::Map::new(),
202 })
203 }
204
205 fn metadata(&self) -> NodeMetadata {
206 NodeMetadata {
207 node_type: "video-download".to_string(),
208 name: "Download Video".to_string(),
209 description: "Download video using yt-dlp.".to_string(),
210 category: NodeCategory::Data,
211 accepts: vec![],
212 platforms: vec!["cli".to_string()],
213 parameters: vec![],
214 input_cardinality: InputCardinality::PerFile,
215 requires: vec![
216 Dependency {
217 binary: "yt-dlp".to_string(),
218 version: String::new(),
219 install_hint: "brew install yt-dlp".to_string(),
220 homepage: "https://github.com/yt-dlp/yt-dlp".to_string(),
221 },
222 Dependency {
223 binary: "ffmpeg".to_string(),
224 version: ">=6.0".to_string(),
225 install_hint: "brew install ffmpeg".to_string(),
226 homepage: "https://ffmpeg.org".to_string(),
227 },
228 ],
229 }
230 }
231 }
232
233 struct NoDepsProcessor;
235
236 impl NodeProcessor for NoDepsProcessor {
237 fn name(&self) -> &str {
238 "no-deps"
239 }
240
241 fn process(
242 &self,
243 input: NodeInput,
244 _progress: &ProgressReporter,
245 _ctx: &dyn ProcessContext,
246 ) -> Result<NodeOutput, BntoError> {
247 Ok(NodeOutput {
248 files: vec![OutputFile {
249 data: input.data,
250 filename: input.filename,
251 mime_type: "application/octet-stream".to_string(),
252 }],
253 metadata: serde_json::Map::new(),
254 })
255 }
256 }
257
258 struct AllMissingContext;
260
261 impl ProcessContext for AllMissingContext {
262 fn run_command(&self, _cmd: &str, _args: &[&str]) -> Result<Vec<u8>, BntoError> {
263 Err(BntoError::ProcessingFailed("not found".to_string()))
264 }
265 fn temp_file(&self, _suffix: &str) -> Result<PathBuf, BntoError> {
266 Err(BntoError::ProcessingFailed("not available".to_string()))
267 }
268 fn env_var(&self, _key: &str) -> Option<String> {
269 None
270 }
271 fn work_dir(&self) -> Result<&Path, BntoError> {
272 Err(BntoError::ProcessingFailed("not available".to_string()))
273 }
274 }
275
276 struct AllFoundContext;
278
279 impl ProcessContext for AllFoundContext {
280 fn run_command(&self, _cmd: &str, _args: &[&str]) -> Result<Vec<u8>, BntoError> {
281 Ok(b"/usr/local/bin/found".to_vec())
282 }
283 fn temp_file(&self, _suffix: &str) -> Result<PathBuf, BntoError> {
284 Err(BntoError::ProcessingFailed("not available".to_string()))
285 }
286 fn env_var(&self, _key: &str) -> Option<String> {
287 None
288 }
289 fn work_dir(&self) -> Result<&Path, BntoError> {
290 Err(BntoError::ProcessingFailed("not available".to_string()))
291 }
292 }
293
294 fn make_definition(node_types: &[&str]) -> PipelineDefinition {
295 let json = serde_json::json!({
296 "nodes": node_types.iter().enumerate().map(|(i, t)| {
297 serde_json::json!({ "id": format!("n{i}"), "type": t })
298 }).collect::<Vec<_>>()
299 });
300 serde_json::from_value(json).unwrap()
301 }
302
303 #[test]
306 fn test_collect_empty_pipeline_returns_no_deps() {
307 let def = make_definition(&["input", "output"]);
308 let registry = NodeRegistry::new();
309 let deps = collect_pipeline_dependencies(&def, ®istry);
310 assert!(deps.is_empty());
311 }
312
313 #[test]
314 fn test_collect_pipeline_with_no_dep_processor() {
315 let mut registry = NodeRegistry::new();
316 registry.register("no-deps", Box::new(NoDepsProcessor));
317 let def = make_definition(&["input", "no-deps", "output"]);
318 let deps = collect_pipeline_dependencies(&def, ®istry);
319 assert!(deps.is_empty());
320 }
321
322 #[test]
323 fn test_collect_pipeline_with_ffmpeg_dep() {
324 let mut registry = NodeRegistry::new();
325 registry.register("video-transcode", Box::new(FfmpegProcessor));
326 let def = make_definition(&["input", "video-transcode", "output"]);
327 let deps = collect_pipeline_dependencies(&def, ®istry);
328 assert_eq!(deps.len(), 1);
329 assert_eq!(deps[0].binary, "ffmpeg");
330 }
331
332 #[test]
333 fn test_collect_deduplicates_shared_deps() {
334 let mut registry = NodeRegistry::new();
335 registry.register("video-transcode", Box::new(FfmpegProcessor));
336 registry.register("video-download", Box::new(YtDlpProcessor));
337 let def = make_definition(&["input", "video-transcode", "video-download", "output"]);
338 let deps = collect_pipeline_dependencies(&def, ®istry);
339 assert_eq!(deps.len(), 2); let binaries: Vec<&str> = deps.iter().map(|d| d.binary.as_str()).collect();
342 assert!(binaries.contains(&"ffmpeg"));
343 assert!(binaries.contains(&"yt-dlp"));
344 }
345
346 #[test]
349 fn test_collect_all_from_empty_registry() {
350 let registry = NodeRegistry::new();
351 let deps = collect_all_dependencies(®istry);
352 assert!(deps.is_empty());
353 }
354
355 #[test]
356 fn test_collect_all_deduplicates() {
357 let mut registry = NodeRegistry::new();
358 registry.register("video-transcode", Box::new(FfmpegProcessor));
359 registry.register("video-download", Box::new(YtDlpProcessor));
360 registry.register("no-deps", Box::new(NoDepsProcessor));
361 let deps = collect_all_dependencies(®istry);
362 assert_eq!(deps.len(), 2); }
364
365 #[test]
368 fn test_check_all_missing() {
369 let deps = vec![Dependency {
370 binary: "ffmpeg".to_string(),
371 version: String::new(),
372 install_hint: "brew install ffmpeg".to_string(),
373 homepage: String::new(),
374 }];
375 let statuses = check_dependencies(&deps, &AllMissingContext);
376 assert_eq!(statuses.len(), 1);
377 assert!(!statuses[0].found);
378 }
379
380 #[test]
381 fn test_check_all_found() {
382 let deps = vec![Dependency {
383 binary: "ffmpeg".to_string(),
384 version: String::new(),
385 install_hint: "brew install ffmpeg".to_string(),
386 homepage: String::new(),
387 }];
388 let statuses = check_dependencies(&deps, &AllFoundContext);
389 assert_eq!(statuses.len(), 1);
390 assert!(statuses[0].found);
391 }
392
393 #[test]
394 fn test_check_empty_deps_returns_empty() {
395 let statuses = check_dependencies(&[], &NoopContext);
396 assert!(statuses.is_empty());
397 }
398
399 #[test]
402 fn test_preflight_no_deps_ok() {
403 let mut registry = NodeRegistry::new();
404 registry.register("no-deps", Box::new(NoDepsProcessor));
405 let def = make_definition(&["input", "no-deps", "output"]);
406 let result = check_pipeline_dependencies(&def, ®istry, &NoopContext);
407 assert!(result.is_ok());
408 }
409
410 #[test]
411 fn test_preflight_missing_dep_returns_error() {
412 let mut registry = NodeRegistry::new();
413 registry.register("video-transcode", Box::new(FfmpegProcessor));
414 let def = make_definition(&["input", "video-transcode", "output"]);
415 let result = check_pipeline_dependencies(&def, ®istry, &AllMissingContext);
416 assert!(result.is_err());
417 let err_msg = result.unwrap_err().to_string();
418 assert!(err_msg.contains("ffmpeg"));
419 assert!(err_msg.contains("brew install ffmpeg"));
420 }
421
422 #[test]
423 fn test_preflight_all_found_ok() {
424 let mut registry = NodeRegistry::new();
425 registry.register("video-transcode", Box::new(FfmpegProcessor));
426 let def = make_definition(&["input", "video-transcode", "output"]);
427 let result = check_pipeline_dependencies(&def, ®istry, &AllFoundContext);
428 assert!(result.is_ok());
429 }
430
431 #[test]
432 fn test_preflight_error_lists_all_missing() {
433 let mut registry = NodeRegistry::new();
434 registry.register("video-download", Box::new(YtDlpProcessor));
435 let def = make_definition(&["input", "video-download", "output"]);
436 let result = check_pipeline_dependencies(&def, ®istry, &AllMissingContext);
437 assert!(result.is_err());
438 let err_msg = result.unwrap_err().to_string();
439 assert!(err_msg.contains("yt-dlp"));
440 assert!(err_msg.contains("ffmpeg"));
441 }
442}