1use std::fs::{self, File};
2use std::io::Read;
3use std::path::{Path, PathBuf};
4use std::sync::Arc;
5use std::time::Duration;
6
7use greentic_config_types::{NetworkConfig, TlsMode};
8use greentic_distributor_client::PackId;
9use greentic_distributor_client::source::DistributorSource;
10use greentic_types::ConnectionKind;
11use greentic_types::cbor::decode_pack_manifest;
12use greentic_types::component::ComponentManifest;
13use greentic_types::deployment::{
14 ChannelPlan, DeploymentPlan, MessagingPlan, RunnerPlan, TelemetryPlan,
15};
16use greentic_types::flow::FlowKind;
17use greentic_types::pack::PackRef;
18use greentic_types::pack_manifest::{PackFlowEntry, PackKind, PackManifest};
19use greentic_types::secrets::{SecretRequirement, SecretScope};
20use semver::Version;
21use serde_json::{Value as JsonValue, json};
22use tar::Archive;
23use zip::result::ZipError;
24
25use crate::config::DeployerConfig;
26use crate::error::{DeployerError, Result};
27use crate::path_safety::normalize_under_root;
28use crate::plan::{
29 ComponentRole, DeploymentHints, DeploymentProfile, InferenceNotes, InfraPlan, PlanContext,
30 PlannedComponent, Target, assemble_plan,
31};
32
33pub fn load_pack_manifest_from_bytes(bytes: &[u8]) -> Result<PackManifest> {
37 decode_pack_manifest(bytes).map_err(DeployerError::ManifestDecode)
38}
39
40pub fn build_plan(config: &DeployerConfig) -> Result<PlanContext> {
42 let cwd = std::env::current_dir()?;
43 let mut source = if let Some(pack_ref) = &config.pack_ref {
44 let source = resolve_distributor_source(config)?;
45 PackSource::from_registry(pack_ref.clone(), source)?
46 } else {
47 let safe_path = if config.pack_path.is_absolute() {
48 config.pack_path.canonicalize()?
49 } else {
50 normalize_under_root(&cwd, &config.pack_path)
51 .map_err(|err| DeployerError::Pack(err.to_string()))?
52 };
53 PackSource::open(&safe_path)?
54 };
55 build_plan_with_source(&mut source, config)
56}
57
58pub fn build_plan_with_source(
60 source: &mut PackSource,
61 config: &DeployerConfig,
62) -> Result<PlanContext> {
63 let manifest = source.read_manifest()?;
64 let deployment = build_deployment_hints(config);
65 let base = plan_from_pack_kind(&manifest, config);
66 let external_components: Vec<String> = external_facing_components(&manifest)
67 .into_iter()
68 .map(|c| c.id.to_string())
69 .collect();
70 let components = infer_component_profiles(&manifest, &deployment);
71 Ok(assemble_plan(
72 base,
73 config,
74 deployment,
75 external_components,
76 components,
77 ))
78}
79
80#[allow(dead_code)]
82pub enum PackSource {
83 GtpackPath(PathBuf),
84 Dir(PathBuf),
85 Registry {
86 reference: PackRef,
87 source: Arc<dyn DistributorSource>,
88 },
89}
90
91impl PackSource {
92 fn open(path: &Path) -> Result<Self> {
93 if path.is_dir() {
94 Ok(Self::Dir(path.to_path_buf()))
95 } else {
96 Ok(Self::GtpackPath(path.to_path_buf()))
97 }
98 }
99
100 #[allow(dead_code)]
101 pub fn from_registry(reference: PackRef, source: Arc<dyn DistributorSource>) -> Result<Self> {
102 Ok(Self::Registry { reference, source })
103 }
104
105 fn read_manifest(&mut self) -> Result<PackManifest> {
106 match self {
107 PackSource::GtpackPath(path) => read_manifest_from_gtpack(path),
108 PackSource::Dir(path) => read_manifest_from_directory(path),
109 PackSource::Registry {
110 source, reference, ..
111 } => read_manifest_from_registry(source.as_ref(), reference),
112 }
113 }
114}
115
116fn read_manifest_from_tar(path: &Path) -> Result<PackManifest> {
117 let file = File::open(path)?;
118 let mut archive = Archive::new(file);
119 let mut manifest_bytes = None;
120
121 for entry in archive.entries()? {
122 let mut entry = entry?;
123 if entry.path()?.as_ref() == Path::new("manifest.cbor") {
124 let mut buf = Vec::new();
125 entry.read_to_end(&mut buf)?;
126 manifest_bytes = Some(buf);
127 break;
128 }
129 }
130
131 let bytes = manifest_bytes.ok_or_else(|| {
132 DeployerError::Pack(format!(
133 "manifest.cbor missing in pack archive {}",
134 path.display()
135 ))
136 })?;
137
138 load_pack_manifest_from_bytes(&bytes)
139}
140
141fn read_manifest_from_zip(path: &Path) -> Result<PackManifest> {
142 let file = File::open(path)?;
143 let mut archive = zip::ZipArchive::new(file).map_err(|err| {
144 DeployerError::Pack(format!("failed to open zip pack {}: {err}", path.display()))
145 })?;
146 let mut entry = archive.by_name("manifest.cbor").map_err(|err| match err {
147 ZipError::FileNotFound => DeployerError::Pack(format!(
148 "manifest.cbor missing in pack archive {}",
149 path.display()
150 )),
151 other => DeployerError::Pack(format!(
152 "failed to read manifest.cbor in {}: {other}",
153 path.display()
154 )),
155 })?;
156 let mut bytes = Vec::new();
157 entry.read_to_end(&mut bytes)?;
158 load_pack_manifest_from_bytes(&bytes)
159}
160
161pub fn read_manifest_from_gtpack(path: &Path) -> Result<PackManifest> {
163 match read_manifest_from_tar(path) {
164 Ok(manifest) => Ok(manifest),
165 Err(DeployerError::Io(err)) if err.kind() == std::io::ErrorKind::InvalidData => {
166 read_manifest_from_zip(path)
167 }
168 Err(DeployerError::Io(err)) if err.kind() == std::io::ErrorKind::Other => {
169 read_manifest_from_zip(path)
170 }
171 Err(err) => Err(err),
172 }
173}
174
175pub fn read_entry_from_gtpack(path: &Path, entry_path: &Path) -> Result<Vec<u8>> {
177 match read_entry_from_tar_gtpack(path, entry_path) {
178 Ok(bytes) => Ok(bytes),
179 Err(DeployerError::Io(err)) if err.kind() == std::io::ErrorKind::InvalidData => {
180 read_entry_from_zip_gtpack(path, entry_path)
181 }
182 Err(DeployerError::Io(err)) if err.kind() == std::io::ErrorKind::Other => {
183 read_entry_from_zip_gtpack(path, entry_path)
184 }
185 Err(err) => Err(err),
186 }
187}
188
189fn read_entry_from_tar_gtpack(path: &Path, entry_path: &Path) -> Result<Vec<u8>> {
190 let file = File::open(path)?;
191 let mut archive = Archive::new(file);
192 for entry in archive.entries()? {
193 let mut entry = entry?;
194 if entry.path()?.as_ref() == entry_path {
195 let mut buf = Vec::new();
196 entry.read_to_end(&mut buf)?;
197 return Ok(buf);
198 }
199 }
200 Err(DeployerError::Pack(format!(
201 "entry {} not found in {}",
202 entry_path.display(),
203 path.display()
204 )))
205}
206
207fn read_entry_from_zip_gtpack(path: &Path, entry_path: &Path) -> Result<Vec<u8>> {
208 let file = File::open(path)?;
209 let mut archive = zip::ZipArchive::new(file).map_err(|err| {
210 DeployerError::Pack(format!("failed to open zip pack {}: {err}", path.display()))
211 })?;
212 let mut entry = archive
213 .by_name(&entry_path.to_string_lossy())
214 .map_err(|err| match err {
215 ZipError::FileNotFound => DeployerError::Pack(format!(
216 "entry {} not found in {}",
217 entry_path.display(),
218 path.display()
219 )),
220 other => DeployerError::Pack(format!(
221 "failed to read entry {} in {}: {other}",
222 entry_path.display(),
223 path.display()
224 )),
225 })?;
226 let mut buf = Vec::new();
227 entry.read_to_end(&mut buf)?;
228 Ok(buf)
229}
230
231fn resolve_distributor_source(config: &DeployerConfig) -> Result<Arc<dyn DistributorSource>> {
232 if let Some(source) = DISTRIBUTOR_SOURCE.get().cloned() {
233 return Ok(source);
234 }
235
236 build_http_distributor_source(config).map_err(|err| {
237 if matches!(err, DeployerError::Config(_) | DeployerError::OfflineDisallowed(_)) {
238 err
239 } else {
240 DeployerError::Config(format!(
241 "no distributor source registered; either register one programmatically or set distributor_url ({err})"
242 ))
243 }
244 })
245}
246
247static DISTRIBUTOR_SOURCE: once_cell::sync::OnceCell<Arc<dyn DistributorSource>> =
248 once_cell::sync::OnceCell::new();
249
250pub fn set_distributor_source(source: Arc<dyn DistributorSource>) {
252 let _ = DISTRIBUTOR_SOURCE.set(source);
253}
254
255fn build_http_distributor_source(config: &DeployerConfig) -> Result<Arc<dyn DistributorSource>> {
256 let base_url = config
257 .distributor_url
258 .as_deref()
259 .ok_or_else(|| DeployerError::Config("set distributor_url when using pack_id".into()))?;
260
261 if matches!(
262 config.greentic.environment.connection,
263 Some(ConnectionKind::Offline)
264 ) {
265 return Err(DeployerError::OfflineDisallowed(
266 "connection is Offline but distributor URL was requested".into(),
267 ));
268 }
269
270 let client = build_http_client(&config.greentic.network, base_url)?;
271 Ok(Arc::new(HttpPackSource::new(
272 client,
273 base_url.to_string(),
274 config.distributor_token.clone(),
275 )))
276}
277
278struct HttpPackSource {
279 client: reqwest::blocking::Client,
280 base_url: String,
281 token: Option<String>,
282 retries: usize,
283}
284
285impl HttpPackSource {
286 fn new(client: reqwest::blocking::Client, base_url: String, token: Option<String>) -> Self {
287 Self {
288 client,
289 base_url,
290 token,
291 retries: 3,
292 }
293 }
294}
295
296fn build_http_client(network: &NetworkConfig, base_url: &str) -> Result<reqwest::blocking::Client> {
297 let mut builder = reqwest::blocking::Client::builder();
298
299 if let Some(proxy_url) = &network.proxy_url {
300 let proxy = reqwest::Proxy::all(proxy_url).map_err(|err| {
301 DeployerError::Config(format!("invalid proxy URL {proxy_url}: {err}"))
302 })?;
303 builder = builder.proxy(proxy);
304 }
305
306 builder = match network.tls_mode {
307 TlsMode::Disabled => {
308 if base_url.starts_with("https://") {
309 return Err(DeployerError::Config(
310 "network.tls_mode=disabled is not allowed for https distributor_url; use http or enable TLS"
311 .into(),
312 ));
313 }
314 builder
315 }
316 TlsMode::System | TlsMode::Strict => builder,
317 };
318
319 if let Some(connect_ms) = network.connect_timeout_ms {
320 builder = builder.connect_timeout(Duration::from_millis(connect_ms));
321 }
322 if let Some(read_ms) = network.read_timeout_ms {
323 builder = builder.timeout(Duration::from_millis(read_ms));
324 }
325
326 builder.build().map_err(|err| {
327 DeployerError::Config(format!(
328 "failed to build HTTP client for distributor: {err}"
329 ))
330 })
331}
332
333impl DistributorSource for HttpPackSource {
334 fn fetch_pack(
335 &self,
336 pack_id: &PackId,
337 version: &Version,
338 ) -> std::result::Result<Vec<u8>, greentic_distributor_client::error::DistributorError> {
339 let url = format!("{}/distributor-api/pack", self.base_url);
340 let payload = serde_json::json!({
341 "pack_id": pack_id.as_str(),
342 "version": version.to_string(),
343 });
344 let mut last_err = None;
345 for _ in 0..self.retries {
346 let mut request = self.client.post(url.clone()).json(&payload);
347 if let Some(token) = &self.token {
348 request = request.bearer_auth(token);
349 }
350 match request.send() {
351 Ok(response) if response.status().is_success() => {
352 let bytes = response
353 .bytes()
354 .map_err(|err| {
355 greentic_distributor_client::error::DistributorError::Other(
356 err.to_string(),
357 )
358 })?
359 .to_vec();
360 return Ok(bytes);
361 }
362 Ok(response) if response.status() == reqwest::StatusCode::NOT_FOUND => {
363 return Err(greentic_distributor_client::error::DistributorError::NotFound);
364 }
365 Ok(response)
366 if response.status() == reqwest::StatusCode::UNAUTHORIZED
367 || response.status() == reqwest::StatusCode::FORBIDDEN =>
368 {
369 return Err(
370 greentic_distributor_client::error::DistributorError::PermissionDenied,
371 );
372 }
373 Ok(response) => {
374 last_err = Some(format!("http status {}", response.status()));
375 }
376 Err(err) => {
377 last_err = Some(err.to_string());
378 }
379 }
380 }
381
382 Err(greentic_distributor_client::error::DistributorError::Other(
383 last_err.unwrap_or_else(|| "failed to fetch pack".into()),
384 ))
385 }
386
387 fn fetch_component(
388 &self,
389 _component_id: &greentic_distributor_client::ComponentId,
390 _version: &Version,
391 ) -> std::result::Result<Vec<u8>, greentic_distributor_client::error::DistributorError> {
392 Err(greentic_distributor_client::error::DistributorError::NotFound)
393 }
394}
395
396pub fn read_manifest_from_directory(root: &Path) -> Result<PackManifest> {
397 let cbor = normalize_under_root(root, Path::new("manifest.cbor"))
398 .map_err(|err| DeployerError::Pack(err.to_string()))?;
399 if !cbor.exists() {
400 return Err(DeployerError::Pack(format!(
401 "manifest.cbor missing in {}",
402 root.display()
403 )));
404 }
405 let bytes = fs::read(cbor)?;
406 load_pack_manifest_from_bytes(&bytes)
407}
408
409fn read_manifest_from_registry(
410 source: &dyn DistributorSource,
411 reference: &PackRef,
412) -> Result<PackManifest> {
413 let pack_id = reference.oci_url.parse::<PackId>().map_err(|err| {
414 DeployerError::Config(format!("invalid pack id '{}': {err}", reference.oci_url))
415 })?;
416 let bytes = source.fetch_pack(&pack_id, &reference.version)?;
417 load_pack_manifest_from_bytes(&bytes)
418}
419
420fn build_deployment_hints(config: &DeployerConfig) -> DeploymentHints {
421 let target: Target = config.provider.into();
422 DeploymentHints {
423 target,
424 provider: config.provider.as_str().to_string(),
425 strategy: config.strategy.clone(),
426 }
427}
428
429fn plan_from_pack_kind(manifest: &PackManifest, config: &DeployerConfig) -> DeploymentPlan {
430 match manifest.kind {
431 PackKind::Application => plan_application(manifest, config),
432 PackKind::Provider => plan_provider(manifest, config),
433 PackKind::Infrastructure => plan_infrastructure(manifest, config),
434 PackKind::Library => plan_library(manifest, config),
435 }
436}
437
438fn plan_application(manifest: &PackManifest, config: &DeployerConfig) -> DeploymentPlan {
439 infer_base_deployment_plan(manifest, config)
440}
441
442fn plan_provider(manifest: &PackManifest, config: &DeployerConfig) -> DeploymentPlan {
443 let mut plan = infer_base_deployment_plan(manifest, config);
444 plan.channels.clear();
446 plan
447}
448
449fn plan_infrastructure(manifest: &PackManifest, config: &DeployerConfig) -> DeploymentPlan {
450 let mut plan = infer_base_deployment_plan(manifest, config);
451 plan.channels.clear();
453 plan.messaging = None;
454 plan
455}
456
457fn plan_library(manifest: &PackManifest, config: &DeployerConfig) -> DeploymentPlan {
458 DeploymentPlan {
460 pack_id: manifest.pack_id.to_string(),
461 pack_version: manifest.version.clone(),
462 tenant: config.tenant.clone(),
463 environment: config.environment.clone(),
464 runners: Vec::new(),
465 messaging: None,
466 channels: Vec::new(),
467 secrets: collect_secret_requirements(manifest, config),
468 oauth: Vec::new(),
469 telemetry: None,
470 extra: JsonValue::Null,
471 }
472}
473
474fn infer_base_deployment_plan(manifest: &PackManifest, config: &DeployerConfig) -> DeploymentPlan {
475 let runners = build_runner_plan(manifest);
476 let channels = build_channel_plan(manifest);
477 let secrets = collect_secret_requirements(manifest, config);
478 let messaging = messaging_plan_if_needed(manifest, &channels);
479 let telemetry = Some(TelemetryPlan {
480 required: true,
481 suggested_endpoint: None,
482 extra: JsonValue::Null,
483 });
484
485 DeploymentPlan {
486 pack_id: manifest.pack_id.to_string(),
487 pack_version: manifest.version.clone(),
488 tenant: config.tenant.clone(),
489 environment: config.environment.clone(),
490 runners,
491 messaging,
492 channels,
493 secrets,
494 oauth: Vec::new(),
495 telemetry,
496 extra: JsonValue::Null,
497 }
498}
499
500fn messaging_plan_if_needed(
501 manifest: &PackManifest,
502 channels: &[ChannelPlan],
503) -> Option<MessagingPlan> {
504 if messaging_flows(manifest).next().is_none() && channels.is_empty() {
505 return None;
506 }
507
508 Some(MessagingPlan {
509 logical_cluster: "nats-default".to_string(),
510 subjects: Vec::new(),
511 extra: JsonValue::Null,
512 })
513}
514
515fn build_runner_plan(manifest: &PackManifest) -> Vec<RunnerPlan> {
516 components_for_deployment(manifest)
517 .into_iter()
518 .map(|component| {
519 let resources = &component.resources;
520 let replicas = if resources.average_latency_ms.unwrap_or(0) < 50 {
521 2
522 } else {
523 1
524 };
525 RunnerPlan {
526 name: component.id.to_string(),
527 replicas,
528 capabilities: json!({
529 "cpu_millis": resources.cpu_millis,
530 "memory_mb": resources.memory_mb,
531 "average_latency_ms": resources.average_latency_ms,
532 }),
533 }
534 })
535 .collect()
536}
537
538fn build_channel_plan(manifest: &PackManifest) -> Vec<ChannelPlan> {
539 let mut channels = Vec::new();
540
541 for entry in messaging_flows(manifest) {
542 let entrypoints: Vec<String> = if entry.flow.entrypoints.is_empty() {
543 vec!["default".to_string()]
544 } else {
545 entry.flow.entrypoints.keys().cloned().collect()
546 };
547
548 for name in entrypoints {
549 channels.push(ChannelPlan {
550 name: name.clone(),
551 flow_id: entry.id.to_string(),
552 kind: "messaging".to_string(),
553 config: JsonValue::Null,
554 });
555 }
556 }
557
558 for entry in http_flows(manifest) {
559 let entrypoints: Vec<String> = if entry.flow.entrypoints.is_empty() {
560 vec!["default".to_string()]
561 } else {
562 entry.flow.entrypoints.keys().cloned().collect()
563 };
564
565 for name in entrypoints {
566 channels.push(ChannelPlan {
567 name: name.clone(),
568 flow_id: entry.id.to_string(),
569 kind: "http".to_string(),
570 config: JsonValue::Null,
571 });
572 }
573 }
574
575 channels
576}
577
578fn collect_secret_requirements(
579 manifest: &PackManifest,
580 config: &DeployerConfig,
581) -> Vec<SecretRequirement> {
582 let mut secrets = Vec::new();
583 for component in components_for_deployment(manifest) {
584 if let Some(spec) = component.capabilities.host.secrets.as_ref() {
585 for requirement in &spec.required {
586 let mut requirement = requirement.clone();
587 if requirement.scope.is_none() {
588 requirement.scope = Some(SecretScope {
589 env: config.environment.clone(),
590 tenant: config.tenant.clone(),
591 team: None,
592 });
593 }
594
595 if secrets.iter().any(|entry: &SecretRequirement| {
596 entry.key == requirement.key && entry.scope == requirement.scope
597 }) {
598 continue;
599 }
600 secrets.push(requirement);
601 }
602 }
603 }
604 secrets
605}
606
607pub fn components_for_deployment(manifest: &PackManifest) -> Vec<&ComponentManifest> {
609 manifest.components.iter().collect()
610}
611
612pub fn external_facing_components(manifest: &PackManifest) -> Vec<&ComponentManifest> {
614 manifest
615 .components
616 .iter()
617 .filter(|component| {
618 let host_caps = &component.capabilities.host;
619 host_caps
620 .messaging
621 .as_ref()
622 .map(|m| m.inbound)
623 .unwrap_or(false)
624 || host_caps
625 .events
626 .as_ref()
627 .map(|e| e.inbound)
628 .unwrap_or(false)
629 || host_caps
630 .http
631 .as_ref()
632 .map(|http| http.server)
633 .unwrap_or(false)
634 })
635 .collect()
636}
637
638pub fn messaging_flows<'a>(
640 manifest: &'a PackManifest,
641) -> impl Iterator<Item = &'a PackFlowEntry> + 'a {
642 manifest
643 .flows
644 .iter()
645 .filter(|entry| entry.kind == FlowKind::Messaging)
646}
647
648pub fn http_flows<'a>(manifest: &'a PackManifest) -> impl Iterator<Item = &'a PackFlowEntry> + 'a {
650 manifest
651 .flows
652 .iter()
653 .filter(|entry| entry.kind == FlowKind::Http)
654}
655
656pub fn config_flows<'a>(
658 manifest: &'a PackManifest,
659) -> impl Iterator<Item = &'a PackFlowEntry> + 'a {
660 manifest
661 .flows
662 .iter()
663 .filter(|entry| entry.kind == FlowKind::ComponentConfig)
664}
665
666fn infer_component_profiles(
667 manifest: &PackManifest,
668 deployment: &DeploymentHints,
669) -> Vec<PlannedComponent> {
670 let mut planned = Vec::new();
671 for component in &manifest.components {
672 let role = infer_component_role(component);
673 let (profile, inference) = infer_profile(component, &role);
674 let infra = map_profile_to_infra(&deployment.target, &profile);
675 planned.push(PlannedComponent {
676 id: component.id.to_string(),
677 role,
678 profile,
679 target: deployment.target.clone(),
680 infra,
681 inference,
682 });
683 }
684
685 planned.sort_by(|a, b| a.id.cmp(&b.id));
686 planned
687}
688
689fn infer_component_role(component: &ComponentManifest) -> ComponentRole {
690 let host_caps = &component.capabilities.host;
691 if host_caps
692 .messaging
693 .as_ref()
694 .map(|caps| caps.inbound)
695 .unwrap_or(false)
696 {
697 return ComponentRole::MessagingAdapter;
698 }
699 if host_caps
700 .events
701 .as_ref()
702 .map(|caps| caps.inbound)
703 .unwrap_or(false)
704 {
705 return ComponentRole::EventProvider;
706 }
707 if host_caps
708 .events
709 .as_ref()
710 .map(|caps| caps.outbound)
711 .unwrap_or(false)
712 {
713 return ComponentRole::EventBridge;
714 }
715 ComponentRole::Worker
716}
717
718fn infer_profile(
719 component: &ComponentManifest,
720 role: &ComponentRole,
721) -> (DeploymentProfile, Option<InferenceNotes>) {
722 if let Some(default) = component.profiles.default.as_deref()
723 && let Some(profile) = parse_profile(default)
724 {
725 return (
726 profile,
727 Some(InferenceNotes {
728 source: "default profile from component manifest".to_string(),
729 warnings: Vec::new(),
730 }),
731 );
732 }
733
734 let host_caps = &component.capabilities.host;
735 if host_caps
736 .http
737 .as_ref()
738 .map(|caps| caps.server)
739 .unwrap_or(false)
740 {
741 return (
742 DeploymentProfile::HttpEndpoint,
743 Some(InferenceNotes {
744 source: "inferred from http.server capability".to_string(),
745 warnings: Vec::new(),
746 }),
747 );
748 }
749 if host_caps
750 .messaging
751 .as_ref()
752 .map(|caps| caps.inbound || caps.outbound)
753 .unwrap_or(false)
754 {
755 return (
756 DeploymentProfile::LongLivedService,
757 Some(InferenceNotes {
758 source: "inferred from messaging capability".to_string(),
759 warnings: Vec::new(),
760 }),
761 );
762 }
763 if host_caps
764 .events
765 .as_ref()
766 .map(|caps| caps.inbound || caps.outbound)
767 .unwrap_or(false)
768 {
769 return (
770 DeploymentProfile::ScheduledSource,
771 Some(InferenceNotes {
772 source: "inferred from events capability".to_string(),
773 warnings: Vec::new(),
774 }),
775 );
776 }
777
778 let (profile, warning) = default_profile(role);
779 let warnings = if warning {
780 vec![format!(
781 "component {} (role={}) defaulted to {:?}",
782 component.id,
783 role_label(role),
784 profile
785 )]
786 } else {
787 Vec::new()
788 };
789
790 (
791 profile,
792 Some(InferenceNotes {
793 source: "fallback profile inference".to_string(),
794 warnings,
795 }),
796 )
797}
798
799fn role_label(role: &ComponentRole) -> &'static str {
800 match role {
801 ComponentRole::EventProvider => "event_provider",
802 ComponentRole::EventBridge => "event_bridge",
803 ComponentRole::MessagingAdapter => "messaging_adapter",
804 ComponentRole::Worker => "worker",
805 ComponentRole::Other => "component",
806 }
807}
808
809fn default_profile(role: &ComponentRole) -> (DeploymentProfile, bool) {
810 match role {
811 ComponentRole::Worker => (DeploymentProfile::OneShotJob, false),
812 ComponentRole::EventProvider | ComponentRole::EventBridge => {
813 (DeploymentProfile::LongLivedService, true)
814 }
815 ComponentRole::MessagingAdapter => (DeploymentProfile::LongLivedService, true),
816 ComponentRole::Other => (DeploymentProfile::LongLivedService, true),
817 }
818}
819
820fn parse_profile(value: &str) -> Option<DeploymentProfile> {
821 let normalized = value.trim().to_ascii_lowercase().replace(['-', ' '], "_");
822 match normalized.as_str() {
823 "longlivedservice" | "long_lived_service" => Some(DeploymentProfile::LongLivedService),
824 "httpendpoint" | "http_endpoint" => Some(DeploymentProfile::HttpEndpoint),
825 "queueconsumer" | "queue_consumer" => Some(DeploymentProfile::QueueConsumer),
826 "scheduledsource" | "scheduled_source" => Some(DeploymentProfile::ScheduledSource),
827 "oneshotjob" | "one_shot_job" | "one_shot" => Some(DeploymentProfile::OneShotJob),
828 _ => None,
829 }
830}
831
832fn map_profile_to_infra(target: &Target, profile: &DeploymentProfile) -> InfraPlan {
833 let (summary, resources) = match (target, profile) {
834 (Target::Local, DeploymentProfile::HttpEndpoint) => (
835 "local gateway + handler".to_string(),
836 vec!["local-gateway".into(), "runner-handler".into()],
837 ),
838 (Target::Aws, DeploymentProfile::HttpEndpoint) => (
839 "api-gateway + lambda".to_string(),
840 vec!["api-gateway".into(), "lambda".into()],
841 ),
842 (Target::Azure, DeploymentProfile::HttpEndpoint) => (
843 "function app (http trigger)".to_string(),
844 vec!["function-app".into()],
845 ),
846 (Target::Gcp, DeploymentProfile::HttpEndpoint) => {
847 ("cloud run (http)".to_string(), vec!["cloud-run".into()])
848 }
849 (Target::K8s, DeploymentProfile::HttpEndpoint) => (
850 "ingress + service + deployment".to_string(),
851 vec!["ingress".into(), "service".into(), "deployment".into()],
852 ),
853 (Target::Local, DeploymentProfile::LongLivedService) => (
854 "runner-managed long-lived process".to_string(),
855 vec!["local-runner".into()],
856 ),
857 (Target::Aws, DeploymentProfile::LongLivedService) => (
858 "ecs/eks service".to_string(),
859 vec!["container-service".into()],
860 ),
861 (Target::Azure, DeploymentProfile::LongLivedService) => (
862 "container apps / app service".to_string(),
863 vec!["container-app".into()],
864 ),
865 (Target::Gcp, DeploymentProfile::LongLivedService) => (
866 "cloud run (always on)".to_string(),
867 vec!["cloud-run".into()],
868 ),
869 (Target::K8s, DeploymentProfile::LongLivedService) => (
870 "deployment + service".to_string(),
871 vec!["deployment".into(), "service".into()],
872 ),
873 (Target::Local, DeploymentProfile::QueueConsumer) => (
874 "local queue worker".to_string(),
875 vec!["local-queue-worker".into()],
876 ),
877 (Target::Aws, DeploymentProfile::QueueConsumer) => (
878 "sqs/event source + lambda".to_string(),
879 vec!["sqs".into(), "lambda".into()],
880 ),
881 (Target::Azure, DeploymentProfile::QueueConsumer) => (
882 "service bus queue trigger".to_string(),
883 vec!["service-bus".into(), "function".into()],
884 ),
885 (Target::Gcp, DeploymentProfile::QueueConsumer) => (
886 "pubsub subscriber".to_string(),
887 vec!["pubsub".into(), "subscriber".into()],
888 ),
889 (Target::K8s, DeploymentProfile::QueueConsumer) => (
890 "deployment + queue consumer".to_string(),
891 vec!["deployment".into()],
892 ),
893 (Target::Local, DeploymentProfile::ScheduledSource) => (
894 "local scheduler + runner invocation".to_string(),
895 vec!["scheduler".into(), "runner".into()],
896 ),
897 (Target::Aws, DeploymentProfile::ScheduledSource) => (
898 "eventbridge schedule + lambda".to_string(),
899 vec!["eventbridge".into(), "lambda".into()],
900 ),
901 (Target::Azure, DeploymentProfile::ScheduledSource) => (
902 "timer-triggered function".to_string(),
903 vec!["function-app".into()],
904 ),
905 (Target::Gcp, DeploymentProfile::ScheduledSource) => (
906 "cloud scheduler + run/function".to_string(),
907 vec!["cloud-scheduler".into(), "cloud-run".into()],
908 ),
909 (Target::K8s, DeploymentProfile::ScheduledSource) => {
910 ("cronjob".to_string(), vec!["cronjob".into()])
911 }
912 (Target::Local, DeploymentProfile::OneShotJob) => {
913 ("runner one-shot job".to_string(), vec!["runner".into()])
914 }
915 (Target::Aws, DeploymentProfile::OneShotJob) => {
916 ("lambda invocation".to_string(), vec!["lambda".into()])
917 }
918 (Target::Azure, DeploymentProfile::OneShotJob) => (
919 "container apps job / function".to_string(),
920 vec!["container-app-job".into()],
921 ),
922 (Target::Gcp, DeploymentProfile::OneShotJob) => {
923 ("cloud run job".to_string(), vec!["cloud-run-job".into()])
924 }
925 (Target::K8s, DeploymentProfile::OneShotJob) => ("job".to_string(), vec!["job".into()]),
926 };
927
928 InfraPlan {
929 target: target.clone(),
930 profile: profile.clone(),
931 summary,
932 resources,
933 notes: None,
934 }
935}
936
937#[cfg(test)]
938mod tests {
939 use super::*;
940 use crate::config::{DeployerConfig, OutputFormat, Provider};
941 use crate::contract::DeployerCapability;
942 use greentic_types::cbor::encode_pack_manifest;
943 use greentic_types::component::{ComponentCapabilities, ComponentProfiles, HostCapabilities};
944 use greentic_types::flow::{
945 ComponentRef, Flow, FlowMetadata, InputMapping, Node, OutputMapping,
946 };
947 use greentic_types::pack_manifest::PackDependency;
948 use greentic_types::{ComponentId, FlowId, NodeId, PackId, SemverReq};
949 use indexmap::IndexMap;
950 use semver::Version;
951 use std::env;
952 use std::io::Write;
953 use std::str::FromStr;
954 use tar::Builder;
955
956 fn sample_component(id: &str, inbound_messaging: bool) -> ComponentManifest {
957 let host_caps = HostCapabilities {
958 messaging: Some(greentic_types::component::MessagingCapabilities {
959 inbound: inbound_messaging,
960 outbound: true,
961 }),
962 ..Default::default()
963 };
964 ComponentManifest {
965 id: ComponentId::from_str(id).unwrap(),
966 version: Version::new(0, 1, 0),
967 supports: vec![FlowKind::Messaging, FlowKind::Http],
968 world: "greentic:test/world".to_string(),
969 profiles: ComponentProfiles {
970 default: Some("long_lived_service".to_string()),
971 supported: vec!["long_lived_service".to_string()],
972 },
973 capabilities: ComponentCapabilities {
974 host: host_caps,
975 ..Default::default()
976 },
977 configurators: None,
978 operations: Vec::new(),
979 config_schema: None,
980 resources: Default::default(),
981 dev_flows: Default::default(),
982 }
983 }
984
985 fn sample_flow(id: &str, kind: FlowKind, component: &ComponentManifest) -> PackFlowEntry {
986 let mut nodes: IndexMap<NodeId, Node, greentic_types::flow::FlowHasher> =
987 IndexMap::default();
988 nodes.insert(
989 NodeId::from_str("start").unwrap(),
990 Node {
991 id: NodeId::from_str("start").unwrap(),
992 component: ComponentRef {
993 id: component.id.clone(),
994 pack_alias: None,
995 operation: None,
996 },
997 input: InputMapping {
998 mapping: JsonValue::Null,
999 },
1000 output: OutputMapping {
1001 mapping: JsonValue::Null,
1002 },
1003 routing: greentic_types::flow::Routing::End,
1004 telemetry: Default::default(),
1005 err_map: Default::default(),
1006 },
1007 );
1008
1009 let mut entrypoints = std::collections::BTreeMap::new();
1010 entrypoints.insert("default".to_string(), JsonValue::Null);
1011
1012 let flow = Flow {
1013 schema_version: "flowir-v1".to_string(),
1014 id: FlowId::from_str(id).unwrap(),
1015 kind,
1016 entrypoints,
1017 nodes,
1018 metadata: FlowMetadata::default(),
1019 };
1020
1021 PackFlowEntry {
1022 id: flow.id.clone(),
1023 kind,
1024 flow,
1025 tags: vec![format!("{kind:?}")],
1026 entrypoints: vec!["default".to_string()],
1027 }
1028 }
1029
1030 fn sample_manifest() -> PackManifest {
1031 let messaging_component = sample_component("dev.greentic.chat", true);
1032 let http_component = sample_component("dev.greentic.http", false);
1033
1034 let flows = vec![
1035 sample_flow("chat_flow", FlowKind::Messaging, &messaging_component),
1036 sample_flow("http_flow", FlowKind::Http, &http_component),
1037 sample_flow(
1038 "config_flow",
1039 FlowKind::ComponentConfig,
1040 &messaging_component,
1041 ),
1042 ];
1043
1044 PackManifest {
1045 schema_version: "pack-v1".to_string(),
1046 pack_id: PackId::from_str("dev.greentic.sample").unwrap(),
1047 name: None,
1048 version: Version::new(0, 1, 0),
1049 kind: PackKind::Application,
1050 publisher: "greentic".to_string(),
1051 secret_requirements: Vec::new(),
1052 components: vec![messaging_component, http_component],
1053 flows,
1054 dependencies: vec![PackDependency {
1055 alias: "common".to_string(),
1056 pack_id: PackId::from_str("dev.greentic.common").unwrap(),
1057 version_req: SemverReq::parse("*").unwrap(),
1058 required_capabilities: vec![],
1059 }],
1060 capabilities: Vec::new(),
1061 signatures: Default::default(),
1062 bootstrap: None,
1063 extensions: None,
1064 }
1065 }
1066
1067 #[test]
1068 fn manifest_round_trip_from_tar_and_dir() {
1069 let manifest = sample_manifest();
1070 let encoded = encode_pack_manifest(&manifest).expect("encode manifest");
1071
1072 let mut builder = Builder::new(Vec::new());
1073 let mut header = tar::Header::new_gnu();
1074 header.set_size(encoded.len() as u64);
1075 header.set_cksum();
1076 header.set_mode(0o644);
1077 builder
1078 .append_data(&mut header, "manifest.cbor", encoded.as_slice())
1079 .expect("append manifest");
1080 let dummy = b"wasm";
1081 let mut comp_header = tar::Header::new_gnu();
1082 comp_header.set_size(dummy.len() as u64);
1083 comp_header.set_cksum();
1084 comp_header.set_mode(0o644);
1085 builder
1086 .append_data(
1087 &mut comp_header,
1088 "components/dev.greentic.chat.wasm",
1089 dummy.as_slice(),
1090 )
1091 .expect("append component");
1092 let tar_bytes = builder.into_inner().expect("tar bytes");
1093
1094 let from_bytes = load_pack_manifest_from_bytes(&encode_pack_manifest(&manifest).unwrap())
1095 .expect("decode manifest");
1096 assert_eq!(from_bytes.pack_id, manifest.pack_id);
1097
1098 let base = env::current_dir().expect("cwd").join("target/tmp-tests");
1099 std::fs::create_dir_all(&base).expect("create tmp base");
1100 let dir = tempfile::tempdir_in(base).expect("temp dir");
1101 let manifest_path = dir.path().join("manifest.cbor");
1102 fs::write(&manifest_path, &encoded).expect("write manifest");
1103 fs::create_dir(dir.path().join("components")).expect("mkdir components");
1104 fs::write(
1105 dir.path().join("components/dev.greentic.chat.wasm"),
1106 b"wasm",
1107 )
1108 .expect("write component");
1109
1110 let mut tar_file = tempfile::NamedTempFile::new().expect("temp tar");
1111 tar_file.write_all(&tar_bytes).expect("write tar");
1112
1113 let decoded_tar = {
1114 let mut source = PackSource::open(tar_file.path()).expect("open tar source");
1115 source.read_manifest().expect("read tar manifest")
1116 };
1117 assert_eq!(decoded_tar.pack_id, manifest.pack_id);
1118 let decoded_dir = {
1119 let mut source = PackSource::open(dir.path()).expect("open dir source");
1120 source.read_manifest().expect("read dir manifest")
1121 };
1122 assert_eq!(decoded_dir.pack_id, manifest.pack_id);
1123 }
1124
1125 #[test]
1126 fn helpers_filter_flows_and_components() {
1127 let manifest = sample_manifest();
1128 let messaging: Vec<_> = messaging_flows(&manifest).collect();
1129 let http: Vec<_> = http_flows(&manifest).collect();
1130 let config: Vec<_> = config_flows(&manifest).collect();
1131
1132 assert_eq!(messaging.len(), 1);
1133 assert_eq!(http.len(), 1);
1134 assert_eq!(config.len(), 1);
1135
1136 let components = components_for_deployment(&manifest);
1137 assert_eq!(components.len(), 2);
1138 let external = external_facing_components(&manifest);
1139 assert_eq!(external.len(), 1);
1140 assert_eq!(
1141 external[0].id,
1142 ComponentId::from_str("dev.greentic.chat").unwrap()
1143 );
1144 }
1145
1146 #[test]
1147 fn runner_plan_respects_resource_hints() {
1148 let mut manifest = sample_manifest();
1149 if let Some(component) = manifest
1151 .components
1152 .iter_mut()
1153 .find(|c| c.id == ComponentId::from_str("dev.greentic.chat").unwrap())
1154 {
1155 component.resources.cpu_millis = Some(256);
1156 component.resources.memory_mb = Some(512);
1157 component.resources.average_latency_ms = Some(10);
1158 }
1159 let runners = build_runner_plan(&manifest);
1160 let chat = runners
1161 .iter()
1162 .find(|r| r.name == "dev.greentic.chat")
1163 .expect("runner present");
1164 assert!(
1165 chat.replicas >= 2,
1166 "low-latency components scale up replicas"
1167 );
1168 assert_eq!(
1169 chat.capabilities.get("cpu_millis").and_then(|v| v.as_u64()),
1170 Some(256)
1171 );
1172 assert_eq!(
1173 chat.capabilities.get("memory_mb").and_then(|v| v.as_u64()),
1174 Some(512)
1175 );
1176 }
1177
1178 #[test]
1179 fn library_pack_skips_runners_and_channels() {
1180 let mut manifest = sample_manifest();
1181 manifest.kind = PackKind::Library;
1182 let config = default_config(PathBuf::from("."));
1183 let plan = plan_from_pack_kind(&manifest, &config);
1184 assert!(plan.runners.is_empty());
1185 assert!(plan.channels.is_empty());
1186 assert_eq!(plan.pack_id, manifest.pack_id.to_string());
1187 }
1188
1189 #[test]
1190 fn provider_plan_drops_channels_but_keeps_runners() {
1191 let mut manifest = sample_manifest();
1192 manifest.kind = PackKind::Provider;
1193 let config = default_config(PathBuf::from("."));
1194 let plan = plan_from_pack_kind(&manifest, &config);
1195 assert!(
1196 plan.channels.is_empty(),
1197 "provider packs should not expose channels"
1198 );
1199 assert!(!plan.runners.is_empty(), "provider packs keep runners");
1200 }
1201
1202 #[test]
1203 fn infrastructure_plan_has_no_messaging() {
1204 let mut manifest = sample_manifest();
1205 manifest.kind = PackKind::Infrastructure;
1206 let config = default_config(PathBuf::from("."));
1207 let plan = plan_from_pack_kind(&manifest, &config);
1208 assert!(plan.messaging.is_none(), "infra packs drop messaging plan");
1209 }
1210
1211 struct MemorySource {
1212 bytes: Vec<u8>,
1213 }
1214
1215 impl DistributorSource for MemorySource {
1216 fn fetch_pack(
1217 &self,
1218 _pack_id: &PackId,
1219 _version: &Version,
1220 ) -> std::result::Result<Vec<u8>, greentic_distributor_client::error::DistributorError>
1221 {
1222 Ok(self.bytes.clone())
1223 }
1224
1225 fn fetch_component(
1226 &self,
1227 _component_id: &greentic_distributor_client::ComponentId,
1228 _version: &Version,
1229 ) -> std::result::Result<Vec<u8>, greentic_distributor_client::error::DistributorError>
1230 {
1231 Err(greentic_distributor_client::error::DistributorError::NotFound)
1232 }
1233 }
1234
1235 #[test]
1236 fn registry_source_can_load_manifest() {
1237 let manifest = sample_manifest();
1238 let encoded = encode_pack_manifest(&manifest).expect("encode manifest");
1239 let source = MemorySource { bytes: encoded };
1240 let pack_id = PackId::try_from("dev.greentic.sample").unwrap();
1241 let reference = PackRef::new(
1242 pack_id.to_string(),
1243 Version::new(0, 1, 0),
1244 "sha256:deadbeef",
1245 );
1246 let decoded = read_manifest_from_registry(&source, &reference).expect("registry decode");
1247 assert_eq!(decoded.pack_id, manifest.pack_id);
1248 }
1249
1250 #[test]
1251 fn build_plan_uses_registry_when_pack_ref_set() {
1252 let manifest = sample_manifest();
1253 let encoded = encode_pack_manifest(&manifest).expect("encode manifest");
1254 let source = MemorySource { bytes: encoded };
1255 set_distributor_source(Arc::new(source));
1256
1257 let config = registry_config();
1258
1259 let plan = build_plan(&config).expect("plan builds via registry");
1260 assert_eq!(plan.plan.pack_id, manifest.pack_id.to_string());
1261 }
1262
1263 fn registry_config() -> DeployerConfig {
1264 DeployerConfig {
1265 capability: DeployerCapability::Plan,
1266 provider: Provider::Aws,
1267 strategy: "iac-only".into(),
1268 tenant: "acme".into(),
1269 environment: "staging".into(),
1270 pack_path: PathBuf::from("unused.gtpack"),
1271 bundle_root: None,
1272 providers_dir: PathBuf::from("providers/deployer"),
1273 packs_dir: PathBuf::from("packs"),
1274 provider_pack: None,
1275 pack_ref: Some(PackRef::new(
1276 "dev.greentic.sample",
1277 Version::new(0, 1, 0),
1278 "sha256:deadbeef",
1279 )),
1280 distributor_url: None,
1281 distributor_token: None,
1282 preview: false,
1283 dry_run: false,
1284 execute_local: false,
1285 output: OutputFormat::Text,
1286 greentic: greentic_config::ConfigResolver::new()
1287 .load()
1288 .expect("load default config")
1289 .config,
1290 provenance: greentic_config::ProvenanceMap::new(),
1291 config_warnings: Vec::new(),
1292 deploy_pack_id_override: None,
1293 deploy_flow_id_override: None,
1294 bundle_source: None,
1295 bundle_digest: None,
1296 repo_registry_base: None,
1297 store_registry_base: None,
1298 }
1299 }
1300
1301 fn default_config(pack_path: PathBuf) -> DeployerConfig {
1302 DeployerConfig {
1303 capability: DeployerCapability::Plan,
1304 provider: Provider::Aws,
1305 strategy: "iac-only".into(),
1306 tenant: "acme".into(),
1307 environment: "staging".into(),
1308 pack_path,
1309 bundle_root: None,
1310 providers_dir: PathBuf::from("providers/deployer"),
1311 packs_dir: PathBuf::from("packs"),
1312 provider_pack: None,
1313 pack_ref: None,
1314 distributor_url: None,
1315 distributor_token: None,
1316 preview: false,
1317 dry_run: false,
1318 execute_local: false,
1319 output: OutputFormat::Text,
1320 greentic: greentic_config::ConfigResolver::new()
1321 .load()
1322 .expect("load default config")
1323 .config,
1324 provenance: greentic_config::ProvenanceMap::new(),
1325 config_warnings: Vec::new(),
1326 deploy_pack_id_override: None,
1327 deploy_flow_id_override: None,
1328 bundle_source: None,
1329 bundle_digest: None,
1330 repo_registry_base: None,
1331 store_registry_base: None,
1332 }
1333 }
1334}