1#![forbid(unsafe_code)]
2
3use std::collections::{BTreeMap, btree_map::Entry};
4use std::fs;
5use std::future::Future;
6use std::path::{Path, PathBuf};
7
8use crate::config::load_pack_config;
9use crate::flow_resolve::{read_flow_resolve_summary_for_flow, strip_file_uri_prefix};
10use crate::runtime::RuntimeContext;
11use anyhow::{Context, Result, anyhow, bail};
12use clap::Args;
13use greentic_distributor_client::{DistClient, DistOptions};
14use greentic_pack::pack_lock::{LockedComponent, PackLockV1, write_pack_lock};
15use greentic_pack::resolver::{ComponentResolver, ResolveReq, ResolvedComponent};
16use greentic_types::cbor::canonical;
17use greentic_types::flow_resolve_summary::{FlowResolveSummarySourceRefV1, FlowResolveSummaryV1};
18use greentic_types::schemas::component::v0_6_0::{ComponentDescribe, schema_hash};
19use hex;
20use sha2::{Digest, Sha256};
21use tokio::runtime::Handle;
22use wasmtime::Engine;
23use wasmtime::component::{Component as WasmtimeComponent, Linker};
24
25use crate::component_host_stubs::{
26 DescribeHostState, add_describe_host_imports, stub_remaining_imports,
27};
28
29#[derive(Debug, Args)]
30pub struct ResolveArgs {
31 #[arg(long = "in", value_name = "DIR", default_value = ".")]
33 pub input: PathBuf,
34
35 #[arg(long = "lock", value_name = "FILE")]
37 pub lock: Option<PathBuf>,
38}
39
40pub async fn handle(args: ResolveArgs, runtime: &RuntimeContext, emit_path: bool) -> Result<()> {
41 let pack_dir = args
42 .input
43 .canonicalize()
44 .with_context(|| format!("failed to resolve pack dir {}", args.input.display()))?;
45 let lock_path = resolve_lock_path(&pack_dir, args.lock.as_deref());
46
47 let config = load_pack_config(&pack_dir)?;
48 let mut entries: BTreeMap<String, LockedComponent> = BTreeMap::new();
49 for flow in &config.flows {
50 let summary = read_flow_resolve_summary_for_flow(&pack_dir, flow)?;
51 collect_from_summary(&pack_dir, flow, &summary, &mut entries)?;
52 }
53
54 if !entries.is_empty() {
55 let resolver = PackResolver::new(runtime)?;
56 let engine = Engine::default();
57 for component in entries.values_mut() {
58 populate_component_contract(&engine, &resolver, component).await?;
59 }
60 }
61
62 let lock = PackLockV1::new(entries);
63 write_pack_lock(&lock_path, &lock)?;
64 if emit_path {
65 eprintln!(
66 "{}",
67 crate::cli_i18n::tf("cli.common.wrote_path", &[&lock_path.display().to_string()])
68 );
69 }
70
71 Ok(())
72}
73
74fn resolve_lock_path(pack_dir: &Path, override_path: Option<&Path>) -> PathBuf {
75 match override_path {
76 Some(path) if path.is_absolute() => path.to_path_buf(),
77 Some(path) => pack_dir.join(path),
78 None => pack_dir.join("pack.lock.cbor"),
79 }
80}
81
82fn collect_from_summary(
83 pack_dir: &Path,
84 flow: &crate::config::FlowConfig,
85 doc: &FlowResolveSummaryV1,
86 out: &mut BTreeMap<String, LockedComponent>,
87) -> Result<()> {
88 let mut seen: BTreeMap<String, LockedComponent> = BTreeMap::new();
89
90 for resolve in doc.nodes.values() {
91 let source_ref = &resolve.source;
92 let (reference, digest) = match source_ref {
93 FlowResolveSummarySourceRefV1::Local { path } => {
94 let abs = normalize_local(pack_dir, flow, path)?;
95 (
96 format!("file://{}", abs.to_string_lossy()),
97 resolve.digest.clone(),
98 )
99 }
100 FlowResolveSummarySourceRefV1::Oci { .. }
101 | FlowResolveSummarySourceRefV1::Repo { .. }
102 | FlowResolveSummarySourceRefV1::Store { .. } => {
103 (format_reference(source_ref), resolve.digest.clone())
104 }
105 };
106 let component_id = resolve.component_id.clone();
107 let key = component_id.as_str().to_string();
108 let reference_for_insert = reference.clone();
109 let digest_for_insert = digest.clone();
110 let world = resolve.manifest.as_ref().map(|meta| meta.world.clone());
111 let component_version = resolve
112 .manifest
113 .as_ref()
114 .map(|meta| meta.version.to_string());
115 match seen.entry(key) {
116 Entry::Vacant(entry) => {
117 entry.insert(LockedComponent {
118 component_id: component_id.as_str().to_string(),
119 r#ref: Some(reference_for_insert),
120 abi_version: "0.6.0".to_string(),
121 resolved_digest: digest_for_insert,
122 describe_hash: String::new(),
123 operations: Vec::new(),
124 world,
125 component_version,
126 role: None,
127 });
128 }
129 Entry::Occupied(entry) => {
130 let existing = entry.get();
131 if existing.r#ref.as_deref() != Some(reference.as_str())
132 || existing.resolved_digest != digest
133 {
134 bail!(
135 "component {} resolved by nodes points to different artifacts ({}@{} vs {}@{})",
136 component_id.as_str(),
137 existing.r#ref.as_deref().unwrap_or("unknown-ref"),
138 existing.resolved_digest,
139 reference,
140 digest
141 );
142 }
143 }
144 }
145 }
146
147 out.extend(seen);
148
149 Ok(())
150}
151
152async fn populate_component_contract(
153 engine: &Engine,
154 resolver: &dyn ComponentResolver,
155 component: &mut LockedComponent,
156) -> Result<()> {
157 if is_builtin_component(component.component_id.as_str()) {
158 component.describe_hash = "0".repeat(64);
159 component.operations.clear();
160 component.role = Some("builtin".to_string());
161 if component.component_version.is_none() {
162 component.component_version = Some("0.0.0".to_string());
163 }
164 return Ok(());
165 }
166
167 let reference = component
168 .r#ref
169 .as_ref()
170 .ok_or_else(|| anyhow!("component {} missing ref", component.component_id))?;
171 let resolved = resolver.resolve(ResolveReq {
172 component_id: component.component_id.clone(),
173 reference: reference.clone(),
174 expected_digest: component.resolved_digest.clone(),
175 abi_version: component.abi_version.clone(),
176 world: component.world.clone(),
177 component_version: component.component_version.clone(),
178 })?;
179 let bytes = resolved.bytes;
180 component.resolved_digest = format!("sha256:{:x}", Sha256::digest(&bytes));
181 let use_describe_cache =
182 std::env::var("GREENTIC_PACK_USE_DESCRIBE_CACHE").is_ok() || cfg!(test);
183 let describe = match describe_component(engine, &bytes) {
184 Ok(describe) => describe,
185 Err(err) => {
186 if let Some(describe) = load_describe_from_cache_path(resolved.source_path.as_deref())?
187 {
188 describe
189 } else if is_state_store_tenant_ctx_abi_mismatch(&err)
190 || is_known_host_linker_gap(&err)
191 || is_missing_descriptor_instance(&err)
192 {
193 component.describe_hash = component
196 .resolved_digest
197 .strip_prefix("sha256:")
198 .unwrap_or(component.resolved_digest.as_str())
199 .to_string();
200 component.operations.clear();
201 component.role = Some("unknown".to_string());
202 if component.component_version.is_none() {
203 component.component_version = Some("0.0.0".to_string());
204 }
205 return Ok(());
206 } else if use_describe_cache {
207 return Err(err).context("describe failed and no describe cache present");
208 } else {
209 return Err(err);
210 }
211 }
212 };
213
214 if describe.info.id != component.component_id {
215 bail!(
216 "component {} describe id mismatch: {}",
217 component.component_id,
218 describe.info.id
219 );
220 }
221
222 let describe_hash = compute_describe_hash(&describe)?;
223 let mut operations: Vec<_> = describe
224 .operations
225 .iter()
226 .map(|op| {
227 let hash = schema_hash(&op.input.schema, &op.output.schema, &describe.config_schema)
228 .map_err(|err| anyhow!("schema_hash for {}: {}", op.id, err))?;
229 Ok((op.id.clone(), hash))
230 })
231 .collect::<Result<Vec<_>>>()?
232 .into_iter()
233 .map(
234 |(operation_id, schema_hash)| greentic_pack::pack_lock::LockedOperation {
235 operation_id,
236 schema_hash,
237 },
238 )
239 .collect();
240 operations.sort_by(|a, b| a.operation_id.cmp(&b.operation_id));
241
242 component.describe_hash = describe_hash;
243 component.operations = operations;
244 component.role = Some(describe.info.role);
245 component.component_version = Some(describe.info.version);
246 Ok(())
247}
248
249fn is_builtin_component(component_id: &str) -> bool {
250 matches!(
251 component_id,
252 "session.wait" | "flow.call" | "provider.invoke"
253 ) || component_id.starts_with("emit.")
254}
255
256struct PackResolver {
257 runtime: RuntimeContext,
258 dist: DistClient,
259}
260
261impl PackResolver {
262 fn new(runtime: &RuntimeContext) -> Result<Self> {
263 let dist = DistClient::new(DistOptions {
264 cache_dir: runtime.cache_dir(),
265 allow_tags: true,
266 offline: runtime.network_policy() == crate::runtime::NetworkPolicy::Offline,
267 allow_insecure_local_http: false,
268 ..DistOptions::default()
269 });
270 Ok(Self {
271 runtime: runtime.clone(),
272 dist,
273 })
274 }
275}
276
277impl ComponentResolver for PackResolver {
278 fn resolve(&self, req: ResolveReq) -> Result<ResolvedComponent> {
279 if req.reference.starts_with("file://") {
280 let path = strip_file_uri_prefix(&req.reference);
281 let bytes = fs::read(path).with_context(|| format!("read {}", path))?;
282 return Ok(ResolvedComponent {
283 bytes,
284 resolved_digest: req.expected_digest,
285 component_id: req.component_id,
286 abi_version: req.abi_version,
287 world: req.world,
288 component_version: req.component_version,
289 source_path: Some(PathBuf::from(path)),
290 });
291 }
292
293 let handle =
294 Handle::try_current().context("component resolution requires a Tokio runtime")?;
295 let offline = self.runtime.network_policy() == crate::runtime::NetworkPolicy::Offline;
296 let resolved = if offline {
297 self.dist
298 .open_cached(&req.expected_digest)
299 .map_err(|err| anyhow!("offline cache miss for {}: {}", req.reference, err))?
300 } else {
301 let source = self
302 .dist
303 .parse_source(&req.reference)
304 .map_err(|err| anyhow!("resolve {}: {}", req.reference, err))?;
305 let descriptor = block_on(
306 &handle,
307 self.dist
308 .resolve(source, greentic_distributor_client::ResolvePolicy),
309 )
310 .map_err(|err| anyhow!("resolve {}: {}", req.reference, err))?;
311 block_on(
312 &handle,
313 self.dist
314 .fetch(&descriptor, greentic_distributor_client::CachePolicy),
315 )
316 .map_err(|err| anyhow!("resolve {}: {}", req.reference, err))?
317 };
318 let path = resolved
319 .cache_path
320 .ok_or_else(|| anyhow!("resolved component missing path for {}", req.reference))?;
321 let bytes = fs::read(&path).with_context(|| format!("read {}", path.display()))?;
322 Ok(ResolvedComponent {
323 bytes,
324 resolved_digest: req.expected_digest,
325 component_id: req.component_id,
326 abi_version: req.abi_version,
327 world: req.world,
328 component_version: req.component_version,
329 source_path: Some(path),
330 })
331 }
332}
333
334fn block_on<F, T, E>(handle: &Handle, fut: F) -> std::result::Result<T, E>
335where
336 F: Future<Output = std::result::Result<T, E>>,
337{
338 tokio::task::block_in_place(|| handle.block_on(fut))
339}
340
341fn describe_component(engine: &Engine, bytes: &[u8]) -> Result<ComponentDescribe> {
342 describe_component_untyped(engine, bytes)
343}
344
345fn describe_component_untyped(engine: &Engine, bytes: &[u8]) -> Result<ComponentDescribe> {
346 let component = WasmtimeComponent::from_binary(engine, bytes)
347 .map_err(|err| anyhow!("decode component bytes: {err}"))?;
348 let mut store = wasmtime::Store::new(engine, DescribeHostState::default());
349 let mut linker = Linker::new(engine);
350 add_describe_host_imports(&mut linker)?;
351 stub_remaining_imports(&mut linker, &component)?;
354 let instance = linker
355 .instantiate(&mut store, &component)
356 .map_err(|err| anyhow!("instantiate component root world: {err}"))?;
357
358 let descriptor = [
359 "component-descriptor",
360 "greentic:component/component-descriptor",
361 "greentic:component/component-descriptor@0.6.0",
362 ]
363 .iter()
364 .find_map(|name| instance.get_export_index(&mut store, None, name))
365 .ok_or_else(|| anyhow!("missing exported descriptor instance"))?;
366 let describe_export = [
367 "describe",
368 "greentic:component/component-descriptor@0.6.0#describe",
369 ]
370 .iter()
371 .find_map(|name| instance.get_export_index(&mut store, Some(&descriptor), name))
372 .ok_or_else(|| anyhow!("missing exported describe function"))?;
373 let describe_func = instance
374 .get_typed_func::<(), (Vec<u8>,)>(&mut store, &describe_export)
375 .map_err(|err| anyhow!("lookup component-descriptor.describe: {err}"))?;
376 let (describe_bytes,) = describe_func
377 .call(&mut store, ())
378 .map_err(|err| anyhow!("call component-descriptor.describe: {err}"))?;
379 canonical::from_cbor(&describe_bytes).context("decode ComponentDescribe")
380}
381
382fn load_describe_from_cache_path(path: Option<&Path>) -> Result<Option<ComponentDescribe>> {
383 let Some(path) = path else {
384 return Ok(None);
385 };
386 let describe_path = PathBuf::from(format!("{}.describe.cbor", path.display()));
387 if !describe_path.exists() {
388 return Ok(None);
389 }
390 let bytes =
391 fs::read(&describe_path).with_context(|| format!("read {}", describe_path.display()))?;
392 canonical::ensure_canonical(&bytes).context("describe cache must be canonical")?;
393 let describe = canonical::from_cbor(&bytes).context("decode ComponentDescribe from cache")?;
394 Ok(Some(describe))
395}
396
397fn compute_describe_hash(describe: &ComponentDescribe) -> Result<String> {
398 let bytes =
399 canonical::to_canonical_cbor_allow_floats(describe).context("canonicalize describe")?;
400 let digest = Sha256::digest(bytes.as_slice());
401 Ok(hex::encode(digest))
402}
403
404fn is_state_store_tenant_ctx_abi_mismatch(err: &anyhow::Error) -> bool {
405 let text = format!("{:#}", err);
406 text.contains("greentic:state/state-store@1.0.0")
407 && text.contains("expected record of 19 fields, found 18 fields")
408}
409
410fn is_known_host_linker_gap(err: &anyhow::Error) -> bool {
411 let text = format!("{:#}", err);
412 let missing_impl = text.contains("matching implementation was not found in the linker");
413 missing_impl
414 && (text.contains("greentic:http/http-client@1.1.0")
415 || text.contains("greentic:http/http-client@1.0.0"))
416}
417
418fn is_missing_descriptor_instance(err: &anyhow::Error) -> bool {
419 format!("{:#}", err).contains("missing exported descriptor instance")
420}
421
422fn normalize_local(
423 pack_dir: &Path,
424 flow: &crate::config::FlowConfig,
425 rel: &str,
426) -> Result<PathBuf> {
427 let flow_path = if flow.file.is_absolute() {
428 flow.file.clone()
429 } else {
430 pack_dir.join(&flow.file)
431 };
432 let parent = flow_path
433 .parent()
434 .ok_or_else(|| anyhow!("flow path {} has no parent", flow_path.display()))?;
435 let rel = strip_file_uri_prefix(rel);
436 Ok(parent.join(rel))
437}
438
439fn format_reference(source: &FlowResolveSummarySourceRefV1) -> String {
440 match source {
441 FlowResolveSummarySourceRefV1::Local { path } => path.clone(),
442 FlowResolveSummarySourceRefV1::Oci { r#ref } => {
443 if r#ref.contains("://") {
444 r#ref.clone()
445 } else {
446 format!("oci://{}", r#ref)
447 }
448 }
449 FlowResolveSummarySourceRefV1::Repo { r#ref } => {
450 if r#ref.contains("://") {
451 r#ref.clone()
452 } else {
453 format!("repo://{}", r#ref)
454 }
455 }
456 FlowResolveSummarySourceRefV1::Store { r#ref } => {
457 if r#ref.contains("://") {
458 r#ref.clone()
459 } else {
460 format!("store://{}", r#ref)
461 }
462 }
463 }
464}
465
466#[cfg(test)]
467mod tests {
468 use super::*;
469 use greentic_types::ComponentId;
470 use greentic_types::flow_resolve_summary::{
471 FlowResolveSummarySourceRefV1, FlowResolveSummaryV1, NodeResolveSummaryV1,
472 };
473 use std::collections::BTreeMap;
474 use std::path::PathBuf;
475
476 fn sample_flow() -> crate::config::FlowConfig {
477 crate::config::FlowConfig {
478 id: "meetingPrep".to_string(),
479 file: PathBuf::from("flows/main.ygtc"),
480 tags: Vec::new(),
481 entrypoints: Vec::new(),
482 }
483 }
484
485 #[test]
486 fn collect_from_summary_dedups_duplicate_component_ids() {
487 let flow = sample_flow();
488 let pack_dir = PathBuf::from("/tmp");
489 let component_id =
490 ComponentId::new("ai.greentic.component-adaptive-card").expect("valid component id");
491
492 let mut nodes = BTreeMap::new();
493 for name in ["node_one", "node_two"] {
494 nodes.insert(
495 name.to_string(),
496 NodeResolveSummaryV1 {
497 component_id: component_id.clone(),
498 source: FlowResolveSummarySourceRefV1::Oci {
499 r#ref: "oci://ghcr.io/greenticai/components/component-adaptive-card:latest"
500 .to_string(),
501 },
502 digest: format!("sha256:{}", "a".repeat(64)),
503 manifest: None,
504 },
505 );
506 }
507
508 let summary = FlowResolveSummaryV1 {
509 schema_version: 1,
510 flow: "main.ygtc".to_string(),
511 nodes,
512 };
513
514 let mut entries = BTreeMap::new();
515 collect_from_summary(&pack_dir, &flow, &summary, &mut entries).expect("collect entries");
516
517 assert_eq!(entries.len(), 1);
518 let entry = entries.get(component_id.as_str()).expect("component entry");
519 assert_eq!(entry.component_id, component_id.as_str());
520 }
521
522 #[test]
523 fn collect_from_summary_rejects_conflicting_lock_data() {
524 let flow = sample_flow();
525 let pack_dir = PathBuf::from("/tmp");
526 let component_id =
527 ComponentId::new("ai.greentic.component-adaptive-card").expect("valid component id");
528
529 let mut nodes = BTreeMap::new();
530 nodes.insert(
531 "alpha".to_string(),
532 NodeResolveSummaryV1 {
533 component_id: component_id.clone(),
534 source: FlowResolveSummarySourceRefV1::Oci {
535 r#ref: "oci://ghcr.io/greenticai/components/component-adaptive-card:latest"
536 .to_string(),
537 },
538 digest: format!("sha256:{}", "b".repeat(64)),
539 manifest: None,
540 },
541 );
542 nodes.insert(
543 "beta".to_string(),
544 NodeResolveSummaryV1 {
545 component_id: component_id.clone(),
546 source: FlowResolveSummarySourceRefV1::Oci {
547 r#ref: "oci://ghcr.io/greenticai/components/component-adaptive-card:latest"
548 .to_string(),
549 },
550 digest: format!("sha256:{}", "c".repeat(64)),
551 manifest: None,
552 },
553 );
554
555 let summary = FlowResolveSummaryV1 {
556 schema_version: 1,
557 flow: "main.ygtc".to_string(),
558 nodes,
559 };
560
561 let mut entries = BTreeMap::new();
562 let err = collect_from_summary(&pack_dir, &flow, &summary, &mut entries).unwrap_err();
563 assert!(err.to_string().contains("points to different artifacts"));
564 }
565}