Skip to main content

greentic_deployer/
pack_introspect.rs

1use std::fs::{self, File};
2use std::io::Read;
3use std::path::{Path, PathBuf};
4use std::sync::Arc;
5use std::time::Duration;
6
7use greentic_config_types::{NetworkConfig, TlsMode};
8use greentic_distributor_client::PackId;
9use greentic_distributor_client::source::DistributorSource;
10use greentic_types::ConnectionKind;
11use greentic_types::cbor::decode_pack_manifest;
12use greentic_types::component::ComponentManifest;
13use greentic_types::deployment::{
14    ChannelPlan, DeploymentPlan, MessagingPlan, RunnerPlan, TelemetryPlan,
15};
16use greentic_types::flow::FlowKind;
17use greentic_types::pack::PackRef;
18use greentic_types::pack_manifest::{PackFlowEntry, PackKind, PackManifest};
19use greentic_types::secrets::{SecretRequirement, SecretScope};
20use semver::Version;
21use serde_json::{Value as JsonValue, json};
22use tar::Archive;
23use zip::result::ZipError;
24
25use crate::config::DeployerConfig;
26use crate::error::{DeployerError, Result};
27use crate::path_safety::normalize_under_root;
28use crate::plan::{
29    ComponentRole, DeploymentHints, DeploymentProfile, InferenceNotes, InfraPlan, PlanContext,
30    PlannedComponent, Target, assemble_plan,
31};
32
33/// Load a pack manifest from raw .gtpack bytes.
34///
35/// CBOR must be decoded exclusively via `greentic_types::decode_pack_manifest`.
36pub fn load_pack_manifest_from_bytes(bytes: &[u8]) -> Result<PackManifest> {
37    decode_pack_manifest(bytes).map_err(DeployerError::ManifestDecode)
38}
39
40/// Build a plan context from the provided pack.
41pub fn build_plan(config: &DeployerConfig) -> Result<PlanContext> {
42    let cwd = std::env::current_dir()?;
43    let mut source = if let Some(pack_ref) = &config.pack_ref {
44        let source = resolve_distributor_source(config)?;
45        PackSource::from_registry(pack_ref.clone(), source)?
46    } else {
47        let safe_path = if config.pack_path.is_absolute() {
48            config.pack_path.canonicalize()?
49        } else {
50            normalize_under_root(&cwd, &config.pack_path)
51                .map_err(|err| DeployerError::Pack(err.to_string()))?
52        };
53        PackSource::open(&safe_path)?
54    };
55    build_plan_with_source(&mut source, config)
56}
57
58/// Build a plan using an explicitly provided pack source (e.g., registry).
59pub fn build_plan_with_source(
60    source: &mut PackSource,
61    config: &DeployerConfig,
62) -> Result<PlanContext> {
63    let manifest = source.read_manifest()?;
64    let deployment = build_deployment_hints(config);
65    let base = plan_from_pack_kind(&manifest, config);
66    let external_components: Vec<String> = external_facing_components(&manifest)
67        .into_iter()
68        .map(|c| c.id.to_string())
69        .collect();
70    let components = infer_component_profiles(&manifest, &deployment);
71    Ok(assemble_plan(
72        base,
73        config,
74        deployment,
75        external_components,
76        components,
77    ))
78}
79
80/// Preferred pack sources.
81#[allow(dead_code)]
82pub enum PackSource {
83    GtpackPath(PathBuf),
84    Dir(PathBuf),
85    Registry {
86        reference: PackRef,
87        source: Arc<dyn DistributorSource>,
88    },
89}
90
91impl PackSource {
92    fn open(path: &Path) -> Result<Self> {
93        if path.is_dir() {
94            Ok(Self::Dir(path.to_path_buf()))
95        } else {
96            Ok(Self::GtpackPath(path.to_path_buf()))
97        }
98    }
99
100    #[allow(dead_code)]
101    pub fn from_registry(reference: PackRef, source: Arc<dyn DistributorSource>) -> Result<Self> {
102        Ok(Self::Registry { reference, source })
103    }
104
105    fn read_manifest(&mut self) -> Result<PackManifest> {
106        match self {
107            PackSource::GtpackPath(path) => read_manifest_from_gtpack(path),
108            PackSource::Dir(path) => read_manifest_from_directory(path),
109            PackSource::Registry {
110                source, reference, ..
111            } => read_manifest_from_registry(source.as_ref(), reference),
112        }
113    }
114}
115
116fn read_manifest_from_tar(path: &Path) -> Result<PackManifest> {
117    let file = File::open(path)?;
118    let mut archive = Archive::new(file);
119    let mut manifest_bytes = None;
120
121    for entry in archive.entries()? {
122        let mut entry = entry?;
123        if entry.path()?.as_ref() == Path::new("manifest.cbor") {
124            let mut buf = Vec::new();
125            entry.read_to_end(&mut buf)?;
126            manifest_bytes = Some(buf);
127            break;
128        }
129    }
130
131    let bytes = manifest_bytes.ok_or_else(|| {
132        DeployerError::Pack(format!(
133            "manifest.cbor missing in pack archive {}",
134            path.display()
135        ))
136    })?;
137
138    load_pack_manifest_from_bytes(&bytes)
139}
140
141fn read_manifest_from_zip(path: &Path) -> Result<PackManifest> {
142    let file = File::open(path)?;
143    let mut archive = zip::ZipArchive::new(file).map_err(|err| {
144        DeployerError::Pack(format!("failed to open zip pack {}: {err}", path.display()))
145    })?;
146    let mut entry = archive.by_name("manifest.cbor").map_err(|err| match err {
147        ZipError::FileNotFound => DeployerError::Pack(format!(
148            "manifest.cbor missing in pack archive {}",
149            path.display()
150        )),
151        other => DeployerError::Pack(format!(
152            "failed to read manifest.cbor in {}: {other}",
153            path.display()
154        )),
155    })?;
156    let mut bytes = Vec::new();
157    entry.read_to_end(&mut bytes)?;
158    load_pack_manifest_from_bytes(&bytes)
159}
160
161/// Read a manifest directly from a `.gtpack` archive on disk.
162pub fn read_manifest_from_gtpack(path: &Path) -> Result<PackManifest> {
163    match read_manifest_from_tar(path) {
164        Ok(manifest) => Ok(manifest),
165        Err(DeployerError::Io(err)) if err.kind() == std::io::ErrorKind::InvalidData => {
166            read_manifest_from_zip(path)
167        }
168        Err(DeployerError::Io(err)) if err.kind() == std::io::ErrorKind::Other => {
169            read_manifest_from_zip(path)
170        }
171        Err(err) => Err(err),
172    }
173}
174
175/// Read an arbitrary entry from a `.gtpack` archive.
176pub fn read_entry_from_gtpack(path: &Path, entry_path: &Path) -> Result<Vec<u8>> {
177    match read_entry_from_tar_gtpack(path, entry_path) {
178        Ok(bytes) => Ok(bytes),
179        Err(DeployerError::Io(err)) if err.kind() == std::io::ErrorKind::InvalidData => {
180            read_entry_from_zip_gtpack(path, entry_path)
181        }
182        Err(DeployerError::Io(err)) if err.kind() == std::io::ErrorKind::Other => {
183            read_entry_from_zip_gtpack(path, entry_path)
184        }
185        Err(err) => Err(err),
186    }
187}
188
189fn read_entry_from_tar_gtpack(path: &Path, entry_path: &Path) -> Result<Vec<u8>> {
190    let file = File::open(path)?;
191    let mut archive = Archive::new(file);
192    for entry in archive.entries()? {
193        let mut entry = entry?;
194        if entry.path()?.as_ref() == entry_path {
195            let mut buf = Vec::new();
196            entry.read_to_end(&mut buf)?;
197            return Ok(buf);
198        }
199    }
200    Err(DeployerError::Pack(format!(
201        "entry {} not found in {}",
202        entry_path.display(),
203        path.display()
204    )))
205}
206
207fn read_entry_from_zip_gtpack(path: &Path, entry_path: &Path) -> Result<Vec<u8>> {
208    let file = File::open(path)?;
209    let mut archive = zip::ZipArchive::new(file).map_err(|err| {
210        DeployerError::Pack(format!("failed to open zip pack {}: {err}", path.display()))
211    })?;
212    let mut entry = archive
213        .by_name(&entry_path.to_string_lossy())
214        .map_err(|err| match err {
215            ZipError::FileNotFound => DeployerError::Pack(format!(
216                "entry {} not found in {}",
217                entry_path.display(),
218                path.display()
219            )),
220            other => DeployerError::Pack(format!(
221                "failed to read entry {} in {}: {other}",
222                entry_path.display(),
223                path.display()
224            )),
225        })?;
226    let mut buf = Vec::new();
227    entry.read_to_end(&mut buf)?;
228    Ok(buf)
229}
230
231fn resolve_distributor_source(config: &DeployerConfig) -> Result<Arc<dyn DistributorSource>> {
232    if let Some(source) = DISTRIBUTOR_SOURCE.get().cloned() {
233        return Ok(source);
234    }
235
236    build_http_distributor_source(config).map_err(|err| {
237        if matches!(err, DeployerError::Config(_) | DeployerError::OfflineDisallowed(_)) {
238            err
239        } else {
240            DeployerError::Config(format!(
241                "no distributor source registered; either register one programmatically or set distributor_url ({err})"
242            ))
243        }
244    })
245}
246
247static DISTRIBUTOR_SOURCE: once_cell::sync::OnceCell<Arc<dyn DistributorSource>> =
248    once_cell::sync::OnceCell::new();
249
250/// Register a distributor source for registry-based pack resolution.
251pub fn set_distributor_source(source: Arc<dyn DistributorSource>) {
252    let _ = DISTRIBUTOR_SOURCE.set(source);
253}
254
255fn build_http_distributor_source(config: &DeployerConfig) -> Result<Arc<dyn DistributorSource>> {
256    let base_url = config
257        .distributor_url
258        .as_deref()
259        .ok_or_else(|| DeployerError::Config("set distributor_url when using pack_id".into()))?;
260
261    if matches!(
262        config.greentic.environment.connection,
263        Some(ConnectionKind::Offline)
264    ) {
265        return Err(DeployerError::OfflineDisallowed(
266            "connection is Offline but distributor URL was requested".into(),
267        ));
268    }
269
270    let client = build_http_client(&config.greentic.network, base_url)?;
271    Ok(Arc::new(HttpPackSource::new(
272        client,
273        base_url.to_string(),
274        config.distributor_token.clone(),
275    )))
276}
277
278struct HttpPackSource {
279    client: reqwest::blocking::Client,
280    base_url: String,
281    token: Option<String>,
282    retries: usize,
283}
284
285impl HttpPackSource {
286    fn new(client: reqwest::blocking::Client, base_url: String, token: Option<String>) -> Self {
287        Self {
288            client,
289            base_url,
290            token,
291            retries: 3,
292        }
293    }
294}
295
296fn build_http_client(network: &NetworkConfig, base_url: &str) -> Result<reqwest::blocking::Client> {
297    let mut builder = reqwest::blocking::Client::builder();
298
299    if let Some(proxy_url) = &network.proxy_url {
300        let proxy = reqwest::Proxy::all(proxy_url).map_err(|err| {
301            DeployerError::Config(format!("invalid proxy URL {proxy_url}: {err}"))
302        })?;
303        builder = builder.proxy(proxy);
304    }
305
306    builder = match network.tls_mode {
307        TlsMode::Disabled => {
308            if base_url.starts_with("https://") {
309                return Err(DeployerError::Config(
310                    "network.tls_mode=disabled is not allowed for https distributor_url; use http or enable TLS"
311                        .into(),
312                ));
313            }
314            builder
315        }
316        TlsMode::System | TlsMode::Strict => builder,
317    };
318
319    if let Some(connect_ms) = network.connect_timeout_ms {
320        builder = builder.connect_timeout(Duration::from_millis(connect_ms));
321    }
322    if let Some(read_ms) = network.read_timeout_ms {
323        builder = builder.timeout(Duration::from_millis(read_ms));
324    }
325
326    builder.build().map_err(|err| {
327        DeployerError::Config(format!(
328            "failed to build HTTP client for distributor: {err}"
329        ))
330    })
331}
332
333impl DistributorSource for HttpPackSource {
334    fn fetch_pack(
335        &self,
336        pack_id: &PackId,
337        version: &Version,
338    ) -> std::result::Result<Vec<u8>, greentic_distributor_client::error::DistributorError> {
339        let url = format!("{}/distributor-api/pack", self.base_url);
340        let payload = serde_json::json!({
341            "pack_id": pack_id.as_str(),
342            "version": version.to_string(),
343        });
344        let mut last_err = None;
345        for _ in 0..self.retries {
346            let mut request = self.client.post(url.clone()).json(&payload);
347            if let Some(token) = &self.token {
348                request = request.bearer_auth(token);
349            }
350            match request.send() {
351                Ok(response) if response.status().is_success() => {
352                    let bytes = response
353                        .bytes()
354                        .map_err(|err| {
355                            greentic_distributor_client::error::DistributorError::Other(
356                                err.to_string(),
357                            )
358                        })?
359                        .to_vec();
360                    return Ok(bytes);
361                }
362                Ok(response) if response.status() == reqwest::StatusCode::NOT_FOUND => {
363                    return Err(greentic_distributor_client::error::DistributorError::NotFound);
364                }
365                Ok(response)
366                    if response.status() == reqwest::StatusCode::UNAUTHORIZED
367                        || response.status() == reqwest::StatusCode::FORBIDDEN =>
368                {
369                    return Err(
370                        greentic_distributor_client::error::DistributorError::PermissionDenied,
371                    );
372                }
373                Ok(response) => {
374                    last_err = Some(format!("http status {}", response.status()));
375                }
376                Err(err) => {
377                    last_err = Some(err.to_string());
378                }
379            }
380        }
381
382        Err(greentic_distributor_client::error::DistributorError::Other(
383            last_err.unwrap_or_else(|| "failed to fetch pack".into()),
384        ))
385    }
386
387    fn fetch_component(
388        &self,
389        _component_id: &greentic_distributor_client::ComponentId,
390        _version: &Version,
391    ) -> std::result::Result<Vec<u8>, greentic_distributor_client::error::DistributorError> {
392        Err(greentic_distributor_client::error::DistributorError::NotFound)
393    }
394}
395
396pub fn read_manifest_from_directory(root: &Path) -> Result<PackManifest> {
397    let cbor = normalize_under_root(root, Path::new("manifest.cbor"))
398        .map_err(|err| DeployerError::Pack(err.to_string()))?;
399    if !cbor.exists() {
400        return Err(DeployerError::Pack(format!(
401            "manifest.cbor missing in {}",
402            root.display()
403        )));
404    }
405    let bytes = fs::read(cbor)?;
406    load_pack_manifest_from_bytes(&bytes)
407}
408
409fn read_manifest_from_registry(
410    source: &dyn DistributorSource,
411    reference: &PackRef,
412) -> Result<PackManifest> {
413    let pack_id = reference.oci_url.parse::<PackId>().map_err(|err| {
414        DeployerError::Config(format!("invalid pack id '{}': {err}", reference.oci_url))
415    })?;
416    let bytes = source.fetch_pack(&pack_id, &reference.version)?;
417    load_pack_manifest_from_bytes(&bytes)
418}
419
420fn build_deployment_hints(config: &DeployerConfig) -> DeploymentHints {
421    let target: Target = config.provider.into();
422    DeploymentHints {
423        target,
424        provider: config.provider.as_str().to_string(),
425        strategy: config.strategy.clone(),
426    }
427}
428
429fn plan_from_pack_kind(manifest: &PackManifest, config: &DeployerConfig) -> DeploymentPlan {
430    match manifest.kind {
431        PackKind::Application => plan_application(manifest, config),
432        PackKind::Provider => plan_provider(manifest, config),
433        PackKind::Infrastructure => plan_infrastructure(manifest, config),
434        PackKind::Library => plan_library(manifest, config),
435    }
436}
437
438fn plan_application(manifest: &PackManifest, config: &DeployerConfig) -> DeploymentPlan {
439    infer_base_deployment_plan(manifest, config)
440}
441
442fn plan_provider(manifest: &PackManifest, config: &DeployerConfig) -> DeploymentPlan {
443    let mut plan = infer_base_deployment_plan(manifest, config);
444    // Providers shouldn't expose channels directly; keep runners/secrets but drop channels.
445    plan.channels.clear();
446    plan
447}
448
449fn plan_infrastructure(manifest: &PackManifest, config: &DeployerConfig) -> DeploymentPlan {
450    let mut plan = infer_base_deployment_plan(manifest, config);
451    // Infra packs generally lack messaging entrypoints.
452    plan.channels.clear();
453    plan.messaging = None;
454    plan
455}
456
457fn plan_library(manifest: &PackManifest, config: &DeployerConfig) -> DeploymentPlan {
458    // Libraries are not deployed directly; surface metadata without runners/channels.
459    DeploymentPlan {
460        pack_id: manifest.pack_id.to_string(),
461        pack_version: manifest.version.clone(),
462        tenant: config.tenant.clone(),
463        environment: config.environment.clone(),
464        runners: Vec::new(),
465        messaging: None,
466        channels: Vec::new(),
467        secrets: collect_secret_requirements(manifest, config),
468        oauth: Vec::new(),
469        telemetry: None,
470        extra: JsonValue::Null,
471    }
472}
473
474fn infer_base_deployment_plan(manifest: &PackManifest, config: &DeployerConfig) -> DeploymentPlan {
475    let runners = build_runner_plan(manifest);
476    let channels = build_channel_plan(manifest);
477    let secrets = collect_secret_requirements(manifest, config);
478    let messaging = messaging_plan_if_needed(manifest, &channels);
479    let telemetry = Some(TelemetryPlan {
480        required: true,
481        suggested_endpoint: None,
482        extra: JsonValue::Null,
483    });
484
485    DeploymentPlan {
486        pack_id: manifest.pack_id.to_string(),
487        pack_version: manifest.version.clone(),
488        tenant: config.tenant.clone(),
489        environment: config.environment.clone(),
490        runners,
491        messaging,
492        channels,
493        secrets,
494        oauth: Vec::new(),
495        telemetry,
496        extra: JsonValue::Null,
497    }
498}
499
500fn messaging_plan_if_needed(
501    manifest: &PackManifest,
502    channels: &[ChannelPlan],
503) -> Option<MessagingPlan> {
504    if messaging_flows(manifest).next().is_none() && channels.is_empty() {
505        return None;
506    }
507
508    Some(MessagingPlan {
509        logical_cluster: "nats-default".to_string(),
510        subjects: Vec::new(),
511        extra: JsonValue::Null,
512    })
513}
514
515fn build_runner_plan(manifest: &PackManifest) -> Vec<RunnerPlan> {
516    components_for_deployment(manifest)
517        .into_iter()
518        .map(|component| {
519            let resources = &component.resources;
520            let replicas = if resources.average_latency_ms.unwrap_or(0) < 50 {
521                2
522            } else {
523                1
524            };
525            RunnerPlan {
526                name: component.id.to_string(),
527                replicas,
528                capabilities: json!({
529                    "cpu_millis": resources.cpu_millis,
530                    "memory_mb": resources.memory_mb,
531                    "average_latency_ms": resources.average_latency_ms,
532                }),
533            }
534        })
535        .collect()
536}
537
538fn build_channel_plan(manifest: &PackManifest) -> Vec<ChannelPlan> {
539    let mut channels = Vec::new();
540
541    for entry in messaging_flows(manifest) {
542        let entrypoints: Vec<String> = if entry.flow.entrypoints.is_empty() {
543            vec!["default".to_string()]
544        } else {
545            entry.flow.entrypoints.keys().cloned().collect()
546        };
547
548        for name in entrypoints {
549            channels.push(ChannelPlan {
550                name: name.clone(),
551                flow_id: entry.id.to_string(),
552                kind: "messaging".to_string(),
553                config: JsonValue::Null,
554            });
555        }
556    }
557
558    for entry in http_flows(manifest) {
559        let entrypoints: Vec<String> = if entry.flow.entrypoints.is_empty() {
560            vec!["default".to_string()]
561        } else {
562            entry.flow.entrypoints.keys().cloned().collect()
563        };
564
565        for name in entrypoints {
566            channels.push(ChannelPlan {
567                name: name.clone(),
568                flow_id: entry.id.to_string(),
569                kind: "http".to_string(),
570                config: JsonValue::Null,
571            });
572        }
573    }
574
575    channels
576}
577
578fn collect_secret_requirements(
579    manifest: &PackManifest,
580    config: &DeployerConfig,
581) -> Vec<SecretRequirement> {
582    let mut secrets = Vec::new();
583    for component in components_for_deployment(manifest) {
584        if let Some(spec) = component.capabilities.host.secrets.as_ref() {
585            for requirement in &spec.required {
586                let mut requirement = requirement.clone();
587                if requirement.scope.is_none() {
588                    requirement.scope = Some(SecretScope {
589                        env: config.environment.clone(),
590                        tenant: config.tenant.clone(),
591                        team: None,
592                    });
593                }
594
595                if secrets.iter().any(|entry: &SecretRequirement| {
596                    entry.key == requirement.key && entry.scope == requirement.scope
597                }) {
598                    continue;
599                }
600                secrets.push(requirement);
601            }
602        }
603    }
604    secrets
605}
606
607/// Components that should be deployed (currently all declared components).
608pub fn components_for_deployment(manifest: &PackManifest) -> Vec<&ComponentManifest> {
609    manifest.components.iter().collect()
610}
611
612/// Components that are external-facing (messaging/http/event ingress).
613pub fn external_facing_components(manifest: &PackManifest) -> Vec<&ComponentManifest> {
614    manifest
615        .components
616        .iter()
617        .filter(|component| {
618            let host_caps = &component.capabilities.host;
619            host_caps
620                .messaging
621                .as_ref()
622                .map(|m| m.inbound)
623                .unwrap_or(false)
624                || host_caps
625                    .events
626                    .as_ref()
627                    .map(|e| e.inbound)
628                    .unwrap_or(false)
629                || host_caps
630                    .http
631                    .as_ref()
632                    .map(|http| http.server)
633                    .unwrap_or(false)
634        })
635        .collect()
636}
637
638/// Iterator over messaging flows embedded in the pack.
639pub fn messaging_flows<'a>(
640    manifest: &'a PackManifest,
641) -> impl Iterator<Item = &'a PackFlowEntry> + 'a {
642    manifest
643        .flows
644        .iter()
645        .filter(|entry| entry.kind == FlowKind::Messaging)
646}
647
648/// Iterator over HTTP flows embedded in the pack.
649pub fn http_flows<'a>(manifest: &'a PackManifest) -> impl Iterator<Item = &'a PackFlowEntry> + 'a {
650    manifest
651        .flows
652        .iter()
653        .filter(|entry| entry.kind == FlowKind::Http)
654}
655
656/// Iterator over component configuration flows.
657pub fn config_flows<'a>(
658    manifest: &'a PackManifest,
659) -> impl Iterator<Item = &'a PackFlowEntry> + 'a {
660    manifest
661        .flows
662        .iter()
663        .filter(|entry| entry.kind == FlowKind::ComponentConfig)
664}
665
666fn infer_component_profiles(
667    manifest: &PackManifest,
668    deployment: &DeploymentHints,
669) -> Vec<PlannedComponent> {
670    let mut planned = Vec::new();
671    for component in &manifest.components {
672        let role = infer_component_role(component);
673        let (profile, inference) = infer_profile(component, &role);
674        let infra = map_profile_to_infra(&deployment.target, &profile);
675        planned.push(PlannedComponent {
676            id: component.id.to_string(),
677            role,
678            profile,
679            target: deployment.target.clone(),
680            infra,
681            inference,
682        });
683    }
684
685    planned.sort_by(|a, b| a.id.cmp(&b.id));
686    planned
687}
688
689fn infer_component_role(component: &ComponentManifest) -> ComponentRole {
690    let host_caps = &component.capabilities.host;
691    if host_caps
692        .messaging
693        .as_ref()
694        .map(|caps| caps.inbound)
695        .unwrap_or(false)
696    {
697        return ComponentRole::MessagingAdapter;
698    }
699    if host_caps
700        .events
701        .as_ref()
702        .map(|caps| caps.inbound)
703        .unwrap_or(false)
704    {
705        return ComponentRole::EventProvider;
706    }
707    if host_caps
708        .events
709        .as_ref()
710        .map(|caps| caps.outbound)
711        .unwrap_or(false)
712    {
713        return ComponentRole::EventBridge;
714    }
715    ComponentRole::Worker
716}
717
718fn infer_profile(
719    component: &ComponentManifest,
720    role: &ComponentRole,
721) -> (DeploymentProfile, Option<InferenceNotes>) {
722    if let Some(default) = component.profiles.default.as_deref()
723        && let Some(profile) = parse_profile(default)
724    {
725        return (
726            profile,
727            Some(InferenceNotes {
728                source: "default profile from component manifest".to_string(),
729                warnings: Vec::new(),
730            }),
731        );
732    }
733
734    let host_caps = &component.capabilities.host;
735    if host_caps
736        .http
737        .as_ref()
738        .map(|caps| caps.server)
739        .unwrap_or(false)
740    {
741        return (
742            DeploymentProfile::HttpEndpoint,
743            Some(InferenceNotes {
744                source: "inferred from http.server capability".to_string(),
745                warnings: Vec::new(),
746            }),
747        );
748    }
749    if host_caps
750        .messaging
751        .as_ref()
752        .map(|caps| caps.inbound || caps.outbound)
753        .unwrap_or(false)
754    {
755        return (
756            DeploymentProfile::LongLivedService,
757            Some(InferenceNotes {
758                source: "inferred from messaging capability".to_string(),
759                warnings: Vec::new(),
760            }),
761        );
762    }
763    if host_caps
764        .events
765        .as_ref()
766        .map(|caps| caps.inbound || caps.outbound)
767        .unwrap_or(false)
768    {
769        return (
770            DeploymentProfile::ScheduledSource,
771            Some(InferenceNotes {
772                source: "inferred from events capability".to_string(),
773                warnings: Vec::new(),
774            }),
775        );
776    }
777
778    let (profile, warning) = default_profile(role);
779    let warnings = if warning {
780        vec![format!(
781            "component {} (role={}) defaulted to {:?}",
782            component.id,
783            role_label(role),
784            profile
785        )]
786    } else {
787        Vec::new()
788    };
789
790    (
791        profile,
792        Some(InferenceNotes {
793            source: "fallback profile inference".to_string(),
794            warnings,
795        }),
796    )
797}
798
799fn role_label(role: &ComponentRole) -> &'static str {
800    match role {
801        ComponentRole::EventProvider => "event_provider",
802        ComponentRole::EventBridge => "event_bridge",
803        ComponentRole::MessagingAdapter => "messaging_adapter",
804        ComponentRole::Worker => "worker",
805        ComponentRole::Other => "component",
806    }
807}
808
809fn default_profile(role: &ComponentRole) -> (DeploymentProfile, bool) {
810    match role {
811        ComponentRole::Worker => (DeploymentProfile::OneShotJob, false),
812        ComponentRole::EventProvider | ComponentRole::EventBridge => {
813            (DeploymentProfile::LongLivedService, true)
814        }
815        ComponentRole::MessagingAdapter => (DeploymentProfile::LongLivedService, true),
816        ComponentRole::Other => (DeploymentProfile::LongLivedService, true),
817    }
818}
819
820fn parse_profile(value: &str) -> Option<DeploymentProfile> {
821    let normalized = value.trim().to_ascii_lowercase().replace(['-', ' '], "_");
822    match normalized.as_str() {
823        "longlivedservice" | "long_lived_service" => Some(DeploymentProfile::LongLivedService),
824        "httpendpoint" | "http_endpoint" => Some(DeploymentProfile::HttpEndpoint),
825        "queueconsumer" | "queue_consumer" => Some(DeploymentProfile::QueueConsumer),
826        "scheduledsource" | "scheduled_source" => Some(DeploymentProfile::ScheduledSource),
827        "oneshotjob" | "one_shot_job" | "one_shot" => Some(DeploymentProfile::OneShotJob),
828        _ => None,
829    }
830}
831
832fn map_profile_to_infra(target: &Target, profile: &DeploymentProfile) -> InfraPlan {
833    let (summary, resources) = match (target, profile) {
834        (Target::Local, DeploymentProfile::HttpEndpoint) => (
835            "local gateway + handler".to_string(),
836            vec!["local-gateway".into(), "runner-handler".into()],
837        ),
838        (Target::Aws, DeploymentProfile::HttpEndpoint) => (
839            "api-gateway + lambda".to_string(),
840            vec!["api-gateway".into(), "lambda".into()],
841        ),
842        (Target::Azure, DeploymentProfile::HttpEndpoint) => (
843            "function app (http trigger)".to_string(),
844            vec!["function-app".into()],
845        ),
846        (Target::Gcp, DeploymentProfile::HttpEndpoint) => {
847            ("cloud run (http)".to_string(), vec!["cloud-run".into()])
848        }
849        (Target::K8s, DeploymentProfile::HttpEndpoint) => (
850            "ingress + service + deployment".to_string(),
851            vec!["ingress".into(), "service".into(), "deployment".into()],
852        ),
853        (Target::Local, DeploymentProfile::LongLivedService) => (
854            "runner-managed long-lived process".to_string(),
855            vec!["local-runner".into()],
856        ),
857        (Target::Aws, DeploymentProfile::LongLivedService) => (
858            "ecs/eks service".to_string(),
859            vec!["container-service".into()],
860        ),
861        (Target::Azure, DeploymentProfile::LongLivedService) => (
862            "container apps / app service".to_string(),
863            vec!["container-app".into()],
864        ),
865        (Target::Gcp, DeploymentProfile::LongLivedService) => (
866            "cloud run (always on)".to_string(),
867            vec!["cloud-run".into()],
868        ),
869        (Target::K8s, DeploymentProfile::LongLivedService) => (
870            "deployment + service".to_string(),
871            vec!["deployment".into(), "service".into()],
872        ),
873        (Target::Local, DeploymentProfile::QueueConsumer) => (
874            "local queue worker".to_string(),
875            vec!["local-queue-worker".into()],
876        ),
877        (Target::Aws, DeploymentProfile::QueueConsumer) => (
878            "sqs/event source + lambda".to_string(),
879            vec!["sqs".into(), "lambda".into()],
880        ),
881        (Target::Azure, DeploymentProfile::QueueConsumer) => (
882            "service bus queue trigger".to_string(),
883            vec!["service-bus".into(), "function".into()],
884        ),
885        (Target::Gcp, DeploymentProfile::QueueConsumer) => (
886            "pubsub subscriber".to_string(),
887            vec!["pubsub".into(), "subscriber".into()],
888        ),
889        (Target::K8s, DeploymentProfile::QueueConsumer) => (
890            "deployment + queue consumer".to_string(),
891            vec!["deployment".into()],
892        ),
893        (Target::Local, DeploymentProfile::ScheduledSource) => (
894            "local scheduler + runner invocation".to_string(),
895            vec!["scheduler".into(), "runner".into()],
896        ),
897        (Target::Aws, DeploymentProfile::ScheduledSource) => (
898            "eventbridge schedule + lambda".to_string(),
899            vec!["eventbridge".into(), "lambda".into()],
900        ),
901        (Target::Azure, DeploymentProfile::ScheduledSource) => (
902            "timer-triggered function".to_string(),
903            vec!["function-app".into()],
904        ),
905        (Target::Gcp, DeploymentProfile::ScheduledSource) => (
906            "cloud scheduler + run/function".to_string(),
907            vec!["cloud-scheduler".into(), "cloud-run".into()],
908        ),
909        (Target::K8s, DeploymentProfile::ScheduledSource) => {
910            ("cronjob".to_string(), vec!["cronjob".into()])
911        }
912        (Target::Local, DeploymentProfile::OneShotJob) => {
913            ("runner one-shot job".to_string(), vec!["runner".into()])
914        }
915        (Target::Aws, DeploymentProfile::OneShotJob) => {
916            ("lambda invocation".to_string(), vec!["lambda".into()])
917        }
918        (Target::Azure, DeploymentProfile::OneShotJob) => (
919            "container apps job / function".to_string(),
920            vec!["container-app-job".into()],
921        ),
922        (Target::Gcp, DeploymentProfile::OneShotJob) => {
923            ("cloud run job".to_string(), vec!["cloud-run-job".into()])
924        }
925        (Target::K8s, DeploymentProfile::OneShotJob) => ("job".to_string(), vec!["job".into()]),
926    };
927
928    InfraPlan {
929        target: target.clone(),
930        profile: profile.clone(),
931        summary,
932        resources,
933        notes: None,
934    }
935}
936
937#[cfg(test)]
938mod tests {
939    use super::*;
940    use crate::config::{DeployerConfig, OutputFormat, Provider};
941    use crate::contract::DeployerCapability;
942    use greentic_types::cbor::encode_pack_manifest;
943    use greentic_types::component::{ComponentCapabilities, ComponentProfiles, HostCapabilities};
944    use greentic_types::flow::{
945        ComponentRef, Flow, FlowMetadata, InputMapping, Node, OutputMapping,
946    };
947    use greentic_types::pack_manifest::PackDependency;
948    use greentic_types::{ComponentId, FlowId, NodeId, PackId, SemverReq};
949    use indexmap::IndexMap;
950    use semver::Version;
951    use std::env;
952    use std::io::Write;
953    use std::str::FromStr;
954    use tar::Builder;
955
956    fn sample_component(id: &str, inbound_messaging: bool) -> ComponentManifest {
957        let host_caps = HostCapabilities {
958            messaging: Some(greentic_types::component::MessagingCapabilities {
959                inbound: inbound_messaging,
960                outbound: true,
961            }),
962            ..Default::default()
963        };
964        ComponentManifest {
965            id: ComponentId::from_str(id).unwrap(),
966            version: Version::new(0, 1, 0),
967            supports: vec![FlowKind::Messaging, FlowKind::Http],
968            world: "greentic:test/world".to_string(),
969            profiles: ComponentProfiles {
970                default: Some("long_lived_service".to_string()),
971                supported: vec!["long_lived_service".to_string()],
972            },
973            capabilities: ComponentCapabilities {
974                host: host_caps,
975                ..Default::default()
976            },
977            configurators: None,
978            operations: Vec::new(),
979            config_schema: None,
980            resources: Default::default(),
981            dev_flows: Default::default(),
982        }
983    }
984
985    fn sample_flow(id: &str, kind: FlowKind, component: &ComponentManifest) -> PackFlowEntry {
986        let mut nodes: IndexMap<NodeId, Node, greentic_types::flow::FlowHasher> =
987            IndexMap::default();
988        nodes.insert(
989            NodeId::from_str("start").unwrap(),
990            Node {
991                id: NodeId::from_str("start").unwrap(),
992                component: ComponentRef {
993                    id: component.id.clone(),
994                    pack_alias: None,
995                    operation: None,
996                },
997                input: InputMapping {
998                    mapping: JsonValue::Null,
999                },
1000                output: OutputMapping {
1001                    mapping: JsonValue::Null,
1002                },
1003                routing: greentic_types::flow::Routing::End,
1004                telemetry: Default::default(),
1005                err_map: Default::default(),
1006            },
1007        );
1008
1009        let mut entrypoints = std::collections::BTreeMap::new();
1010        entrypoints.insert("default".to_string(), JsonValue::Null);
1011
1012        let flow = Flow {
1013            schema_version: "flowir-v1".to_string(),
1014            id: FlowId::from_str(id).unwrap(),
1015            kind,
1016            entrypoints,
1017            nodes,
1018            metadata: FlowMetadata::default(),
1019        };
1020
1021        PackFlowEntry {
1022            id: flow.id.clone(),
1023            kind,
1024            flow,
1025            tags: vec![format!("{kind:?}")],
1026            entrypoints: vec!["default".to_string()],
1027        }
1028    }
1029
1030    fn sample_manifest() -> PackManifest {
1031        let messaging_component = sample_component("dev.greentic.chat", true);
1032        let http_component = sample_component("dev.greentic.http", false);
1033
1034        let flows = vec![
1035            sample_flow("chat_flow", FlowKind::Messaging, &messaging_component),
1036            sample_flow("http_flow", FlowKind::Http, &http_component),
1037            sample_flow(
1038                "config_flow",
1039                FlowKind::ComponentConfig,
1040                &messaging_component,
1041            ),
1042        ];
1043
1044        PackManifest {
1045            schema_version: "pack-v1".to_string(),
1046            pack_id: PackId::from_str("dev.greentic.sample").unwrap(),
1047            name: None,
1048            version: Version::new(0, 1, 0),
1049            kind: PackKind::Application,
1050            publisher: "greentic".to_string(),
1051            secret_requirements: Vec::new(),
1052            components: vec![messaging_component, http_component],
1053            flows,
1054            dependencies: vec![PackDependency {
1055                alias: "common".to_string(),
1056                pack_id: PackId::from_str("dev.greentic.common").unwrap(),
1057                version_req: SemverReq::parse("*").unwrap(),
1058                required_capabilities: vec![],
1059            }],
1060            capabilities: Vec::new(),
1061            signatures: Default::default(),
1062            bootstrap: None,
1063            extensions: None,
1064        }
1065    }
1066
1067    #[test]
1068    fn manifest_round_trip_from_tar_and_dir() {
1069        let manifest = sample_manifest();
1070        let encoded = encode_pack_manifest(&manifest).expect("encode manifest");
1071
1072        let mut builder = Builder::new(Vec::new());
1073        let mut header = tar::Header::new_gnu();
1074        header.set_size(encoded.len() as u64);
1075        header.set_cksum();
1076        header.set_mode(0o644);
1077        builder
1078            .append_data(&mut header, "manifest.cbor", encoded.as_slice())
1079            .expect("append manifest");
1080        let dummy = b"wasm";
1081        let mut comp_header = tar::Header::new_gnu();
1082        comp_header.set_size(dummy.len() as u64);
1083        comp_header.set_cksum();
1084        comp_header.set_mode(0o644);
1085        builder
1086            .append_data(
1087                &mut comp_header,
1088                "components/dev.greentic.chat.wasm",
1089                dummy.as_slice(),
1090            )
1091            .expect("append component");
1092        let tar_bytes = builder.into_inner().expect("tar bytes");
1093
1094        let from_bytes = load_pack_manifest_from_bytes(&encode_pack_manifest(&manifest).unwrap())
1095            .expect("decode manifest");
1096        assert_eq!(from_bytes.pack_id, manifest.pack_id);
1097
1098        let base = env::current_dir().expect("cwd").join("target/tmp-tests");
1099        std::fs::create_dir_all(&base).expect("create tmp base");
1100        let dir = tempfile::tempdir_in(base).expect("temp dir");
1101        let manifest_path = dir.path().join("manifest.cbor");
1102        fs::write(&manifest_path, &encoded).expect("write manifest");
1103        fs::create_dir(dir.path().join("components")).expect("mkdir components");
1104        fs::write(
1105            dir.path().join("components/dev.greentic.chat.wasm"),
1106            b"wasm",
1107        )
1108        .expect("write component");
1109
1110        let mut tar_file = tempfile::NamedTempFile::new().expect("temp tar");
1111        tar_file.write_all(&tar_bytes).expect("write tar");
1112
1113        let decoded_tar = {
1114            let mut source = PackSource::open(tar_file.path()).expect("open tar source");
1115            source.read_manifest().expect("read tar manifest")
1116        };
1117        assert_eq!(decoded_tar.pack_id, manifest.pack_id);
1118        let decoded_dir = {
1119            let mut source = PackSource::open(dir.path()).expect("open dir source");
1120            source.read_manifest().expect("read dir manifest")
1121        };
1122        assert_eq!(decoded_dir.pack_id, manifest.pack_id);
1123    }
1124
1125    #[test]
1126    fn helpers_filter_flows_and_components() {
1127        let manifest = sample_manifest();
1128        let messaging: Vec<_> = messaging_flows(&manifest).collect();
1129        let http: Vec<_> = http_flows(&manifest).collect();
1130        let config: Vec<_> = config_flows(&manifest).collect();
1131
1132        assert_eq!(messaging.len(), 1);
1133        assert_eq!(http.len(), 1);
1134        assert_eq!(config.len(), 1);
1135
1136        let components = components_for_deployment(&manifest);
1137        assert_eq!(components.len(), 2);
1138        let external = external_facing_components(&manifest);
1139        assert_eq!(external.len(), 1);
1140        assert_eq!(
1141            external[0].id,
1142            ComponentId::from_str("dev.greentic.chat").unwrap()
1143        );
1144    }
1145
1146    #[test]
1147    fn runner_plan_respects_resource_hints() {
1148        let mut manifest = sample_manifest();
1149        // Set resource hints to drive replicas > 1.
1150        if let Some(component) = manifest
1151            .components
1152            .iter_mut()
1153            .find(|c| c.id == ComponentId::from_str("dev.greentic.chat").unwrap())
1154        {
1155            component.resources.cpu_millis = Some(256);
1156            component.resources.memory_mb = Some(512);
1157            component.resources.average_latency_ms = Some(10);
1158        }
1159        let runners = build_runner_plan(&manifest);
1160        let chat = runners
1161            .iter()
1162            .find(|r| r.name == "dev.greentic.chat")
1163            .expect("runner present");
1164        assert!(
1165            chat.replicas >= 2,
1166            "low-latency components scale up replicas"
1167        );
1168        assert_eq!(
1169            chat.capabilities.get("cpu_millis").and_then(|v| v.as_u64()),
1170            Some(256)
1171        );
1172        assert_eq!(
1173            chat.capabilities.get("memory_mb").and_then(|v| v.as_u64()),
1174            Some(512)
1175        );
1176    }
1177
1178    #[test]
1179    fn library_pack_skips_runners_and_channels() {
1180        let mut manifest = sample_manifest();
1181        manifest.kind = PackKind::Library;
1182        let config = default_config(PathBuf::from("."));
1183        let plan = plan_from_pack_kind(&manifest, &config);
1184        assert!(plan.runners.is_empty());
1185        assert!(plan.channels.is_empty());
1186        assert_eq!(plan.pack_id, manifest.pack_id.to_string());
1187    }
1188
1189    #[test]
1190    fn provider_plan_drops_channels_but_keeps_runners() {
1191        let mut manifest = sample_manifest();
1192        manifest.kind = PackKind::Provider;
1193        let config = default_config(PathBuf::from("."));
1194        let plan = plan_from_pack_kind(&manifest, &config);
1195        assert!(
1196            plan.channels.is_empty(),
1197            "provider packs should not expose channels"
1198        );
1199        assert!(!plan.runners.is_empty(), "provider packs keep runners");
1200    }
1201
1202    #[test]
1203    fn infrastructure_plan_has_no_messaging() {
1204        let mut manifest = sample_manifest();
1205        manifest.kind = PackKind::Infrastructure;
1206        let config = default_config(PathBuf::from("."));
1207        let plan = plan_from_pack_kind(&manifest, &config);
1208        assert!(plan.messaging.is_none(), "infra packs drop messaging plan");
1209    }
1210
1211    struct MemorySource {
1212        bytes: Vec<u8>,
1213    }
1214
1215    impl DistributorSource for MemorySource {
1216        fn fetch_pack(
1217            &self,
1218            _pack_id: &PackId,
1219            _version: &Version,
1220        ) -> std::result::Result<Vec<u8>, greentic_distributor_client::error::DistributorError>
1221        {
1222            Ok(self.bytes.clone())
1223        }
1224
1225        fn fetch_component(
1226            &self,
1227            _component_id: &greentic_distributor_client::ComponentId,
1228            _version: &Version,
1229        ) -> std::result::Result<Vec<u8>, greentic_distributor_client::error::DistributorError>
1230        {
1231            Err(greentic_distributor_client::error::DistributorError::NotFound)
1232        }
1233    }
1234
1235    #[test]
1236    fn registry_source_can_load_manifest() {
1237        let manifest = sample_manifest();
1238        let encoded = encode_pack_manifest(&manifest).expect("encode manifest");
1239        let source = MemorySource { bytes: encoded };
1240        let pack_id = PackId::try_from("dev.greentic.sample").unwrap();
1241        let reference = PackRef::new(
1242            pack_id.to_string(),
1243            Version::new(0, 1, 0),
1244            "sha256:deadbeef",
1245        );
1246        let decoded = read_manifest_from_registry(&source, &reference).expect("registry decode");
1247        assert_eq!(decoded.pack_id, manifest.pack_id);
1248    }
1249
1250    #[test]
1251    fn build_plan_uses_registry_when_pack_ref_set() {
1252        let manifest = sample_manifest();
1253        let encoded = encode_pack_manifest(&manifest).expect("encode manifest");
1254        let source = MemorySource { bytes: encoded };
1255        set_distributor_source(Arc::new(source));
1256
1257        let config = registry_config();
1258
1259        let plan = build_plan(&config).expect("plan builds via registry");
1260        assert_eq!(plan.plan.pack_id, manifest.pack_id.to_string());
1261    }
1262
1263    fn registry_config() -> DeployerConfig {
1264        DeployerConfig {
1265            capability: DeployerCapability::Plan,
1266            provider: Provider::Aws,
1267            strategy: "iac-only".into(),
1268            tenant: "acme".into(),
1269            environment: "staging".into(),
1270            pack_path: PathBuf::from("unused.gtpack"),
1271            bundle_root: None,
1272            providers_dir: PathBuf::from("providers/deployer"),
1273            packs_dir: PathBuf::from("packs"),
1274            provider_pack: None,
1275            pack_ref: Some(PackRef::new(
1276                "dev.greentic.sample",
1277                Version::new(0, 1, 0),
1278                "sha256:deadbeef",
1279            )),
1280            distributor_url: None,
1281            distributor_token: None,
1282            preview: false,
1283            dry_run: false,
1284            execute_local: false,
1285            output: OutputFormat::Text,
1286            greentic: greentic_config::ConfigResolver::new()
1287                .load()
1288                .expect("load default config")
1289                .config,
1290            provenance: greentic_config::ProvenanceMap::new(),
1291            config_warnings: Vec::new(),
1292            deploy_pack_id_override: None,
1293            deploy_flow_id_override: None,
1294            bundle_source: None,
1295            bundle_digest: None,
1296            repo_registry_base: None,
1297            store_registry_base: None,
1298        }
1299    }
1300
1301    fn default_config(pack_path: PathBuf) -> DeployerConfig {
1302        DeployerConfig {
1303            capability: DeployerCapability::Plan,
1304            provider: Provider::Aws,
1305            strategy: "iac-only".into(),
1306            tenant: "acme".into(),
1307            environment: "staging".into(),
1308            pack_path,
1309            bundle_root: None,
1310            providers_dir: PathBuf::from("providers/deployer"),
1311            packs_dir: PathBuf::from("packs"),
1312            provider_pack: None,
1313            pack_ref: None,
1314            distributor_url: None,
1315            distributor_token: None,
1316            preview: false,
1317            dry_run: false,
1318            execute_local: false,
1319            output: OutputFormat::Text,
1320            greentic: greentic_config::ConfigResolver::new()
1321                .load()
1322                .expect("load default config")
1323                .config,
1324            provenance: greentic_config::ProvenanceMap::new(),
1325            config_warnings: Vec::new(),
1326            deploy_pack_id_override: None,
1327            deploy_flow_id_override: None,
1328            bundle_source: None,
1329            bundle_digest: None,
1330            repo_registry_base: None,
1331            store_registry_base: None,
1332        }
1333    }
1334}