Skip to main content

packc/
flow_resolve.rs

1#![forbid(unsafe_code)]
2
3use std::collections::BTreeMap;
4use std::fs;
5use std::path::{Path, PathBuf};
6
7use anyhow::{Context, Result, anyhow};
8use greentic_flow::resolve_summary::write_flow_resolve_summary_for_flow;
9use greentic_types::ComponentId;
10use greentic_types::Flow;
11use greentic_types::error::ErrorCode;
12use greentic_types::flow_resolve::{
13    ComponentSourceRefV1, FlowResolveV1, read_flow_resolve, sidecar_path_for_flow,
14    write_flow_resolve,
15};
16use greentic_types::flow_resolve_summary::{
17    FLOW_RESOLVE_SUMMARY_SCHEMA_VERSION, FlowResolveSummaryManifestV1,
18    FlowResolveSummarySourceRefV1, FlowResolveSummaryV1, NodeResolveSummaryV1,
19    read_flow_resolve_summary, resolve_summary_path_for_flow, write_flow_resolve_summary,
20};
21use semver::Version;
22use sha2::{Digest, Sha256};
23
24use crate::config::FlowConfig;
25
26#[derive(Clone, Debug)]
27pub struct FlowResolveSidecar {
28    pub flow_id: String,
29    pub flow_path: PathBuf,
30    pub sidecar_path: PathBuf,
31    pub document: Option<FlowResolveV1>,
32    pub warning: Option<String>,
33}
34
35/// Discover flow resolve sidecars for the configured flows.
36///
37/// Missing or unreadable sidecars produce warnings but do not fail.
38pub fn discover_flow_resolves(pack_dir: &Path, flows: &[FlowConfig]) -> Vec<FlowResolveSidecar> {
39    flows
40        .iter()
41        .map(|flow| {
42            let flow_path = if flow.file.is_absolute() {
43                flow.file.clone()
44            } else {
45                pack_dir.join(&flow.file)
46            };
47            let sidecar_path = sidecar_path_for_flow(&flow_path);
48
49            let (document, warning) = match read_flow_resolve(&sidecar_path) {
50                Ok(doc) => (Some(doc), None),
51                Err(err) if err.code == ErrorCode::NotFound => (
52                    None,
53                    Some(format!(
54                        "flow resolve sidecar missing for {} ({})",
55                        flow.id,
56                        sidecar_path.display()
57                    )),
58                ),
59                Err(err) => (
60                    None,
61                    Some(format!(
62                        "failed to read flow resolve sidecar for {}: {}",
63                        flow.id, err
64                    )),
65                ),
66            };
67
68            FlowResolveSidecar {
69                flow_id: flow.id.clone(),
70                flow_path,
71                sidecar_path,
72                document,
73                warning,
74            }
75        })
76        .collect()
77}
78
79/// Ensure a flow resolve summary exists, generating it from the resolve sidecar if missing.
80pub fn load_flow_resolve_summary(
81    pack_dir: &Path,
82    flow: &FlowConfig,
83    compiled: &Flow,
84) -> Result<FlowResolveSummaryV1> {
85    let flow_path = resolve_flow_path(pack_dir, flow);
86    let summary = read_or_write_flow_resolve_summary(&flow_path, flow)?;
87    enforce_summary_mappings(flow, compiled, &summary, &flow_path)?;
88    Ok(summary)
89}
90
91/// Read or generate a flow resolve summary for a flow (no node enforcement).
92pub fn read_flow_resolve_summary_for_flow(
93    pack_dir: &Path,
94    flow: &FlowConfig,
95) -> Result<FlowResolveSummaryV1> {
96    let flow_path = resolve_flow_path(pack_dir, flow);
97    read_or_write_flow_resolve_summary(&flow_path, flow)
98}
99
100/// Ensure a resolve sidecar exists for a flow and optionally enforce node mappings.
101///
102/// When the sidecar is missing, an empty document is created. Missing node mappings
103/// emit warnings, or become errors when `strict` is set.
104pub fn ensure_sidecar_exists(
105    pack_dir: &Path,
106    flow: &FlowConfig,
107    compiled: &Flow,
108    strict: bool,
109) -> Result<()> {
110    let flow_path = if flow.file.is_absolute() {
111        flow.file.clone()
112    } else {
113        pack_dir.join(&flow.file)
114    };
115    let sidecar_path = sidecar_path_for_flow(&flow_path);
116
117    let doc = match read_flow_resolve(&sidecar_path) {
118        Ok(doc) => doc,
119        Err(err) if err.code == ErrorCode::NotFound => {
120            let doc = FlowResolveV1 {
121                schema_version: 1,
122                flow: flow.file.to_string_lossy().into_owned(),
123                nodes: BTreeMap::new(),
124            };
125            if let Some(parent) = sidecar_path.parent() {
126                fs::create_dir_all(parent)
127                    .with_context(|| format!("failed to create {}", parent.display()))?;
128            }
129            write_flow_resolve(&sidecar_path, &doc)
130                .with_context(|| format!("failed to write {}", sidecar_path.display()))?;
131            doc
132        }
133        Err(err) => {
134            return Err(anyhow!(
135                "failed to read flow resolve sidecar for {}: {}",
136                flow.id,
137                err
138            ));
139        }
140    };
141
142    let missing = missing_node_mappings(compiled, &doc);
143    if !missing.is_empty() {
144        if strict {
145            anyhow::bail!(
146                "flow {} is missing resolve entries for nodes {} (sidecar {}). Add mappings to the sidecar, then rerun `greentic-pack resolve` followed by `greentic-pack build`.",
147                flow.id,
148                missing.join(", "),
149                sidecar_path.display()
150            );
151        } else {
152            eprintln!(
153                "warning: flow {} has no resolve entries for nodes {} ({}); add mappings to the sidecar and rerun `greentic-pack resolve`",
154                flow.id,
155                missing.join(", "),
156                sidecar_path.display()
157            );
158        }
159    }
160
161    Ok(())
162}
163
164/// Require that a resolve sidecar exists and covers every node in the compiled flow.
165pub fn enforce_sidecar_mappings(pack_dir: &Path, flow: &FlowConfig, compiled: &Flow) -> Result<()> {
166    let flow_path = resolve_flow_path(pack_dir, flow);
167    let sidecar_path = sidecar_path_for_flow(&flow_path);
168    let doc = read_flow_resolve(&sidecar_path).map_err(|err| {
169        anyhow!(
170            "flow {} requires a resolve sidecar; expected {}: {}",
171            flow.id,
172            sidecar_path.display(),
173            err
174        )
175    })?;
176
177    let missing = missing_node_mappings(compiled, &doc);
178    if !missing.is_empty() {
179        anyhow::bail!(
180            "flow {} is missing resolve entries for nodes {} (sidecar {}). Add mappings to the sidecar, then rerun `greentic-pack resolve` followed by `greentic-pack build`.",
181            flow.id,
182            missing.join(", "),
183            sidecar_path.display()
184        );
185    }
186
187    Ok(())
188}
189
190/// Compute which nodes in a flow lack resolve entries.
191pub fn missing_node_mappings(flow: &Flow, doc: &FlowResolveV1) -> Vec<String> {
192    flow.nodes
193        .iter()
194        .filter_map(|(node, flow_node)| {
195            if is_runtime_builtin_component(flow_node.component.id.as_str())
196                || runtime_builtin_from_operation(
197                    flow_node.component.id.as_str(),
198                    flow_node.component.operation.as_deref(),
199                )
200                .is_some()
201            {
202                return None;
203            }
204            let id = node.to_string();
205            if doc.nodes.contains_key(id.as_str()) {
206                None
207            } else {
208                Some(id)
209            }
210        })
211        .collect()
212}
213
214fn resolve_flow_path(pack_dir: &Path, flow: &FlowConfig) -> PathBuf {
215    if flow.file.is_absolute() {
216        flow.file.clone()
217    } else {
218        pack_dir.join(&flow.file)
219    }
220}
221
222fn read_or_write_flow_resolve_summary(
223    flow_path: &Path,
224    flow: &FlowConfig,
225) -> Result<FlowResolveSummaryV1> {
226    let summary_path = resolve_summary_path_for_flow(flow_path);
227    if !summary_path.exists() {
228        let sidecar_path = sidecar_path_for_flow(flow_path);
229        let sidecar = read_flow_resolve(&sidecar_path).map_err(|err| {
230            anyhow!(
231                "flow {} requires a resolve sidecar to generate summary; expected {}: {}",
232                flow.id,
233                sidecar_path.display(),
234                err
235            )
236        })?;
237        write_flow_resolve_summary_safe(flow_path, &sidecar).with_context(|| {
238            format!(
239                "failed to generate flow resolve summary for {}",
240                flow_path.display()
241            )
242        })?;
243    }
244
245    read_flow_resolve_summary(&summary_path).map_err(|err| {
246        anyhow!(
247            "failed to read flow resolve summary for {}: {}",
248            flow.id,
249            err
250        )
251    })
252}
253
254fn write_flow_resolve_summary_safe(flow_path: &Path, sidecar: &FlowResolveV1) -> Result<PathBuf> {
255    let result = if tokio::runtime::Handle::try_current().is_ok() {
256        let flow_path = flow_path.to_path_buf();
257        let sidecar = sidecar.clone();
258        let join =
259            std::thread::spawn(move || write_flow_resolve_summary_for_flow(&flow_path, &sidecar));
260        join.join()
261            .map_err(|_| anyhow!("flow resolve summary generation panicked"))?
262    } else {
263        write_flow_resolve_summary_for_flow(flow_path, sidecar)
264    };
265
266    match result {
267        Ok(path) => Ok(path),
268        Err(err) => {
269            if sidecar
270                .nodes
271                .values()
272                .all(|node| matches!(node.source, ComponentSourceRefV1::Local { .. }))
273            {
274                let summary = build_flow_resolve_summary_fallback(flow_path, sidecar)?;
275                let summary_path = resolve_summary_path_for_flow(flow_path);
276                write_flow_resolve_summary(&summary_path, &summary)
277                    .map_err(|e| anyhow!(e.to_string()))?;
278                return Ok(summary_path);
279            }
280            Err(err)
281        }
282    }
283}
284
285fn enforce_summary_mappings(
286    flow: &FlowConfig,
287    compiled: &Flow,
288    summary: &FlowResolveSummaryV1,
289    flow_path: &Path,
290) -> Result<()> {
291    let missing = missing_summary_node_mappings(compiled, summary);
292    if !missing.is_empty() {
293        let summary_path = resolve_summary_path_for_flow(flow_path);
294        anyhow::bail!(
295            "flow {} is missing resolve summary entries for nodes {} (summary {}). Regenerate the summary and rerun build.",
296            flow.id,
297            missing.join(", "),
298            summary_path.display()
299        );
300    }
301    Ok(())
302}
303
304fn missing_summary_node_mappings(flow: &Flow, doc: &FlowResolveSummaryV1) -> Vec<String> {
305    flow.nodes
306        .iter()
307        .filter_map(|(node, flow_node)| {
308            if is_runtime_builtin_component(flow_node.component.id.as_str())
309                || runtime_builtin_from_operation(
310                    flow_node.component.id.as_str(),
311                    flow_node.component.operation.as_deref(),
312                )
313                .is_some()
314            {
315                return None;
316            }
317            let id = node.to_string();
318            if doc.nodes.contains_key(id.as_str()) {
319                None
320            } else {
321                Some(id)
322            }
323        })
324        .collect()
325}
326
327pub(crate) fn is_runtime_builtin_component(component_id: &str) -> bool {
328    matches!(component_id, "dw.agent" | "dw.agent_graph")
329        || runtime_builtin_from_component_id(component_id).is_some()
330}
331
332pub(crate) fn runtime_builtin_from_component_id(
333    component_id: &str,
334) -> Option<(&'static str, &str)> {
335    if let Some(agent_id) = component_id.strip_prefix("dw.agent.") {
336        return Some(("dw.agent", agent_id));
337    }
338    if let Some(agent_id) = component_id.strip_prefix("dw.agent_graph.") {
339        return Some(("dw.agent_graph", agent_id));
340    }
341    None
342}
343
344pub(crate) fn runtime_builtin_from_operation<'a>(
345    component_id: &str,
346    operation: Option<&'a str>,
347) -> Option<(&'static str, &'a str)> {
348    if component_id != "component.exec" {
349        return None;
350    }
351    let operation = operation?;
352    runtime_builtin_from_component_id(operation)
353}
354
355fn build_flow_resolve_summary_fallback(
356    flow_path: &Path,
357    sidecar: &FlowResolveV1,
358) -> Result<FlowResolveSummaryV1> {
359    let mut nodes = BTreeMap::new();
360    for (node_id, entry) in &sidecar.nodes {
361        let summary = summarize_node_fallback(flow_path, node_id, &entry.source)?;
362        nodes.insert(node_id.clone(), summary);
363    }
364    Ok(FlowResolveSummaryV1 {
365        schema_version: FLOW_RESOLVE_SUMMARY_SCHEMA_VERSION,
366        flow: flow_name_from_path(flow_path),
367        nodes,
368    })
369}
370
371fn summarize_node_fallback(
372    flow_path: &Path,
373    node_id: &str,
374    source: &ComponentSourceRefV1,
375) -> Result<NodeResolveSummaryV1> {
376    let ComponentSourceRefV1::Local { path, .. } = source else {
377        anyhow::bail!(
378            "flow resolve fallback only supports local sources (node {})",
379            node_id
380        );
381    };
382    let source_ref = FlowResolveSummarySourceRefV1::Local {
383        path: strip_file_uri_prefix(path).to_string(),
384    };
385    let wasm_path = local_path_from_sidecar(path, flow_path);
386    let digest = compute_sha256(&wasm_path)?;
387    let manifest_path = find_manifest_for_wasm_loose(&wasm_path).with_context(|| {
388        format!(
389            "component.manifest.json not found for node '{}' ({})",
390            node_id,
391            wasm_path.display()
392        )
393    })?;
394    let (component_id, manifest) = read_manifest_metadata(&manifest_path).with_context(|| {
395        format!(
396            "failed to read component.manifest.json for node '{}' ({})",
397            node_id,
398            manifest_path.display()
399        )
400    })?;
401
402    Ok(NodeResolveSummaryV1 {
403        component_id,
404        source: source_ref,
405        digest,
406        manifest,
407    })
408}
409
410fn find_manifest_for_wasm_loose(wasm_path: &Path) -> Result<PathBuf> {
411    let wasm_abs = fs::canonicalize(wasm_path)
412        .with_context(|| format!("resolve wasm path {}", wasm_path.display()))?;
413    let mut current = wasm_abs.parent();
414    let mut fallback = None;
415    while let Some(dir) = current {
416        let candidate = dir.join("component.manifest.json");
417        if candidate.exists() {
418            if manifest_matches_wasm_loose(&candidate, &wasm_abs)? {
419                return Ok(candidate);
420            }
421            if fallback.is_none() {
422                fallback = Some(candidate);
423            }
424        }
425        current = dir.parent();
426    }
427
428    if let Some(candidate) = fallback {
429        return Ok(candidate);
430    }
431
432    anyhow::bail!(
433        "component.manifest.json not found for wasm {}",
434        wasm_abs.display()
435    );
436}
437
438fn manifest_matches_wasm_loose(manifest_path: &Path, wasm_abs: &Path) -> Result<bool> {
439    let raw = fs::read_to_string(manifest_path)
440        .with_context(|| format!("read {}", manifest_path.display()))?;
441    let json: serde_json::Value =
442        serde_json::from_str(&raw).context("parse component.manifest.json")?;
443    let Some(rel) = json
444        .get("artifacts")
445        .and_then(|v| v.get("component_wasm"))
446        .and_then(|v| v.as_str())
447    else {
448        return Ok(false);
449    };
450    let manifest_dir = manifest_path
451        .parent()
452        .ok_or_else(|| anyhow!("manifest path {} has no parent", manifest_path.display()))?;
453    let sanitized = strip_file_uri_prefix(rel);
454    let Ok(abs) = fs::canonicalize(manifest_dir.join(sanitized)) else {
455        return Ok(false);
456    };
457    Ok(abs == *wasm_abs)
458}
459
460fn read_manifest_metadata(
461    manifest_path: &Path,
462) -> Result<(ComponentId, Option<FlowResolveSummaryManifestV1>)> {
463    let raw = fs::read_to_string(manifest_path)
464        .with_context(|| format!("read {}", manifest_path.display()))?;
465    let json: serde_json::Value =
466        serde_json::from_str(&raw).context("parse component.manifest.json")?;
467    let id = json
468        .get("id")
469        .and_then(|v| v.as_str())
470        .ok_or_else(|| anyhow!("manifest missing id"))?;
471    let component_id =
472        ComponentId::new(id).with_context(|| format!("invalid component id {}", id))?;
473    let world = json.get("world").and_then(|v| v.as_str());
474    let version = json.get("version").and_then(|v| v.as_str());
475    let manifest = match (world, version) {
476        (Some(world), Some(version)) => {
477            let parsed = Version::parse(version)
478                .with_context(|| format!("invalid semver version {}", version))?;
479            Some(FlowResolveSummaryManifestV1 {
480                world: world.to_string(),
481                version: parsed,
482            })
483        }
484        _ => None,
485    };
486    Ok((component_id, manifest))
487}
488
489fn flow_name_from_path(flow_path: &Path) -> String {
490    flow_path
491        .file_name()
492        .map(|name| name.to_string_lossy().to_string())
493        .unwrap_or_else(|| "flow.ygtc".to_string())
494}
495
496pub(crate) fn strip_file_uri_prefix(path: &str) -> &str {
497    path.strip_prefix("file://")
498        .or_else(|| path.strip_prefix("file:/"))
499        .or_else(|| path.strip_prefix("file:"))
500        .unwrap_or(path)
501}
502
503fn local_path_from_sidecar(path: &str, flow_path: &Path) -> PathBuf {
504    let trimmed = strip_file_uri_prefix(path);
505    let raw = PathBuf::from(trimmed);
506    if raw.is_absolute() {
507        raw
508    } else {
509        flow_path
510            .parent()
511            .unwrap_or_else(|| Path::new("."))
512            .join(raw)
513    }
514}
515
516fn compute_sha256(path: &Path) -> Result<String> {
517    let bytes = fs::read(path).with_context(|| format!("read wasm at {}", path.display()))?;
518    let mut sha = Sha256::new();
519    sha.update(bytes);
520    Ok(format!("sha256:{}", hex::encode(sha.finalize())))
521}
522
523#[cfg(test)]
524mod tests {
525    use super::*;
526    use serde_json::json;
527    use std::fs;
528    use tempfile::tempdir;
529
530    #[test]
531    fn strip_file_uri_prefix_removes_scheme_variants() {
532        assert_eq!(strip_file_uri_prefix("file:///tmp/foo"), "/tmp/foo");
533        assert_eq!(strip_file_uri_prefix("file:/tmp/foo"), "tmp/foo");
534        assert_eq!(strip_file_uri_prefix("file://bar/baz"), "bar/baz");
535        assert_eq!(strip_file_uri_prefix("file:relative/path"), "relative/path");
536        assert_eq!(
537            strip_file_uri_prefix("../components/foo"),
538            "../components/foo"
539        );
540    }
541
542    #[test]
543    fn manifest_matches_wasm_loose_handles_relative_file_uri_paths() {
544        let temp = tempdir().expect("alloc temp dir");
545        let components = temp.path().join("components");
546        fs::create_dir_all(&components).expect("create components dir");
547        let wasm_path = components.join("component.wasm");
548        fs::write(&wasm_path, b"wasm-bytes").expect("write wasm");
549        let manifest_path = components.join("component.manifest.json");
550        let manifest = json!({
551            "artifacts": {
552                "component_wasm": "file://component.wasm"
553            }
554        });
555        fs::write(
556            &manifest_path,
557            serde_json::to_vec_pretty(&manifest).expect("encode manifest"),
558        )
559        .expect("write manifest");
560        let wasm_abs = fs::canonicalize(&wasm_path).expect("canonicalize wasm");
561        assert!(manifest_matches_wasm_loose(&manifest_path, &wasm_abs).expect("manifest lookup"));
562
563        let parent_manifest = json!({
564            "artifacts": {
565                "component_wasm": "file://../component.wasm"
566            }
567        });
568        let parent_dir = components.join("child");
569        fs::create_dir_all(&parent_dir).expect("create child dir");
570        let child_manifest_path = parent_dir.join("component.manifest.json");
571        fs::write(
572            &child_manifest_path,
573            serde_json::to_vec_pretty(&parent_manifest).unwrap(),
574        )
575        .expect("write child manifest");
576        assert!(
577            manifest_matches_wasm_loose(&child_manifest_path, &wasm_abs)
578                .expect("manifest matches child")
579        );
580    }
581
582    #[test]
583    fn manifest_matches_wasm_loose_handles_absolute_file_uri_paths() {
584        let temp = tempdir().expect("alloc temp dir");
585        let components = temp.path().join("components");
586        fs::create_dir_all(&components).expect("create components dir");
587        let wasm_path = components.join("component.wasm");
588        fs::write(&wasm_path, b"bytes").expect("write wasm");
589        let wasm_abs = fs::canonicalize(&wasm_path).expect("canonicalize wasm");
590        let manifest_path = components.join("component.manifest.json");
591        let manifest = json!({
592            "artifacts": {
593                "component_wasm": format!("file://{}", wasm_abs.display())
594            }
595        });
596        fs::write(
597            &manifest_path,
598            serde_json::to_vec_pretty(&manifest).expect("encode manifest"),
599        )
600        .expect("write manifest");
601        assert!(manifest_matches_wasm_loose(&manifest_path, &wasm_abs).expect("manifest lookup"));
602    }
603}