1#![allow(dead_code)]
2
3use std::path::{Path, PathBuf};
4use std::sync::Arc;
5use std::thread;
6use std::time::Duration;
7
8use std::collections::{BTreeMap, BTreeSet};
9
10use crate::domains::Domain;
11use crate::http_ingress::{HttpIngressConfig, HttpIngressServer};
12use crate::operator_log;
13use crate::runner_host::DemoRunnerHost;
14use crate::runtime_state::{
15 RuntimePaths, persist_service_manifest, read_service_manifest, remove_service_manifest,
16 write_json,
17};
18use crate::secrets_gate;
19use crate::services;
20use crate::startup_contract::{
21 BundleStaticRoutesInspection, RuntimeConfig, RuntimePublicBaseUrl, RuntimePublicBaseUrlSource,
22 StartupContract, StartupContractInput,
23};
24use crate::supervisor;
25use anyhow::Context;
26
27use crate::cloudflared::{self, CloudflaredConfig};
28use crate::config::{DemoConfig, DemoSubscriptionsMode};
29use crate::ngrok::{self, NgrokConfig};
30
31use crate::subscriptions_universal::{
32 build_runner, ensure_desired_subscriptions, scheduler::Scheduler, service::SubscriptionService,
33 state_root, store::SubscriptionStore,
34};
35
36#[derive(Default)]
37pub struct ForegroundRuntimeHandles {
38 pub ingress_server: Option<HttpIngressServer>,
39}
40
41impl ForegroundRuntimeHandles {
42 pub fn stop(mut self) -> anyhow::Result<()> {
43 if let Some(server) = self.ingress_server.take() {
44 server.stop()?;
45 }
46 Ok(())
47 }
48}
49
50struct StartupInfo {
52 bundle_name: String,
53 http_url: Option<String>,
54 static_route_urls: Vec<String>,
55 public_url: Option<String>,
56 channels: Vec<String>,
57 mode: String,
58 webhook_results: Vec<(String, String)>,
59 subscription_results: Vec<(String, String)>,
60}
61
62impl StartupInfo {
63 fn print(&self) {
64 println!();
65 println!("{}", self.bundle_name);
66 if let Some(ref url) = self.http_url {
67 println!(" HTTP: {url}");
68 }
69 if !self.static_route_urls.is_empty() {
70 println!(" Routes: {}", self.static_route_urls.join(", "));
71 }
72 if let Some(ref url) = self.public_url {
73 println!(" Public: {url}");
74 }
75 if !self.channels.is_empty() {
76 println!(" Channels: {}", self.channels.join(", "));
77 }
78 println!(" Mode: {}", self.mode);
79
80 if !self.webhook_results.is_empty() {
81 println!();
82 println!("Webhooks:");
83 for (provider, desc) in &self.webhook_results {
84 println!(" [{provider}] {desc}");
85 }
86 }
87 if !self.subscription_results.is_empty() {
88 println!();
89 println!("Subscriptions:");
90 for (provider, desc) in &self.subscription_results {
91 println!(" [{provider}] {desc}");
92 }
93 }
94 }
95}
96
97struct ServiceSummary {
98 id: String,
99 pid: Option<u32>,
100 details: Vec<String>,
101}
102
103impl ServiceSummary {
104 fn new(id: impl Into<String>, pid: Option<u32>) -> Self {
105 Self {
106 id: id.into(),
107 pid,
108 details: Vec::new(),
109 }
110 }
111
112 fn with_details(id: impl Into<String>, pid: Option<u32>, details: Vec<String>) -> Self {
113 Self {
114 id: id.into(),
115 pid,
116 details,
117 }
118 }
119
120 fn add_detail(&mut self, detail: impl Into<String>) {
121 self.details.push(detail.into());
122 }
123
124 fn describe(&self) -> String {
125 let pid_str = self
126 .pid
127 .map(|pid| pid.to_string())
128 .unwrap_or_else(|| "-".to_string());
129 if self.details.is_empty() {
130 format!("{} (pid={})", self.id, pid_str)
131 } else {
132 format!(
133 "{} (pid={}) [{}]",
134 self.id,
135 pid_str,
136 self.details.join(" | ")
137 )
138 }
139 }
140}
141
142struct ServiceTracker<'a> {
143 paths: &'a RuntimePaths,
144 manifest: crate::runtime_state::ServiceManifest,
145}
146
147impl<'a> ServiceTracker<'a> {
148 fn new(paths: &'a RuntimePaths, log_dir: Option<&Path>) -> anyhow::Result<Self> {
149 remove_service_manifest(paths)?;
150 let mut manifest = crate::runtime_state::ServiceManifest::default();
151 if let Some(dir) = log_dir {
152 manifest.log_dir = Some(dir.display().to_string());
153 }
154 persist_service_manifest(paths, &manifest)?;
155 Ok(Self { paths, manifest })
156 }
157
158 fn record(&mut self, entry: crate::runtime_state::ServiceEntry) -> anyhow::Result<()> {
159 self.manifest.services.push(entry);
160 persist_service_manifest(self.paths, &self.manifest)
161 }
162
163 fn record_with_log(
164 &mut self,
165 id: impl Into<String>,
166 kind: impl Into<String>,
167 log_path: Option<&Path>,
168 ) -> anyhow::Result<()> {
169 let entry = crate::runtime_state::ServiceEntry::new(id, kind, log_path);
170 self.record(entry)
171 }
172}
173
174fn log_service_spec_debug(
175 service_id: &str,
176 kind: &str,
177 spec: &supervisor::ServiceSpec,
178 tenant: &str,
179 team: &str,
180 debug_enabled: bool,
181) {
182 if !debug_enabled {
183 return;
184 }
185 let cwd = spec
186 .cwd
187 .as_ref()
188 .map(|path| path.display().to_string())
189 .unwrap_or_else(|| "<unset>".to_string());
190 let argv = spec.argv.join(" ");
191 let env_pairs = spec
192 .env
193 .iter()
194 .map(|(key, value)| format!("{}={}", key, value))
195 .collect::<Vec<_>>()
196 .join(" ");
197 let env_display = if env_pairs.is_empty() {
198 "<empty>".to_string()
199 } else {
200 env_pairs
201 };
202 operator_log::debug(
203 module_path!(),
204 format!(
205 "[demo dev] service {} kind={} tenant={} team={} cwd={} argv=\"{}\" env={}",
206 service_id, kind, tenant, team, cwd, argv, env_display
207 ),
208 );
209}
210
211#[allow(clippy::too_many_arguments)]
212fn spawn_supervised_service(
213 service_id: &str,
214 kind: &str,
215 spec: &supervisor::ServiceSpec,
216 log_dir: &Path,
217 paths: &RuntimePaths,
218 restart: &BTreeSet<String>,
219 tracker: &mut ServiceTracker,
220 tenant: &str,
221 team: &str,
222 debug_enabled: bool,
223) -> anyhow::Result<ServiceSummary> {
224 let log_path = operator_log::reserve_service_log(log_dir, service_id)?;
225 log_service_spec_debug(service_id, kind, spec, tenant, team, debug_enabled);
226 let handle = spawn_if_needed(paths, spec, restart, Some(log_path.clone()))?;
227 let pid = if let Some(handle) = &handle {
228 Some(handle.pid)
229 } else {
230 read_pid(&paths.pid_path(service_id))?
231 };
232 let actual_log = handle
233 .as_ref()
234 .map(|handle| handle.log_path.clone())
235 .unwrap_or(log_path.clone());
236 tracker.record_with_log(service_id, kind, Some(&actual_log))?;
237 operator_log::info(
238 module_path!(),
239 format!(
240 "service {} ready pid={:?} log={}",
241 service_id,
242 pid,
243 actual_log.display()
244 ),
245 );
246 let mut summary = ServiceSummary::new(service_id, pid);
247 summary.add_detail(format!("log={}", actual_log.display()));
248 Ok(summary)
249}
250
251fn print_service_summary(summaries: &[ServiceSummary]) {
252 if summaries.is_empty() {
253 return;
254 }
255 println!(
256 "\n{}",
257 crate::operator_i18n::tr("demo.runtime.started_services", "Started services:")
258 );
259 for summary in summaries {
260 println!("{}", summary.describe());
261 }
262}
263
264#[allow(clippy::too_many_arguments)]
265fn spawn_embedded_messaging(
266 bundle_root: &Path,
267 tenant: &str,
268 team: &str,
269 env: BTreeMap<String, String>,
270 log_dir: &Path,
271 restart: &BTreeSet<String>,
272 tracker: &mut ServiceTracker,
273 debug_enabled: bool,
274) -> anyhow::Result<ServiceSummary> {
275 let exe = std::env::current_exe()?;
276 let mut args = vec![
277 "dev".to_string(),
278 "embedded".to_string(),
279 "--project-root".to_string(),
280 bundle_root.display().to_string(),
281 "--no-nats".to_string(),
282 ];
283 let mut argv = vec![exe.to_string_lossy().to_string()];
284 argv.append(&mut args);
285
286 let spec = supervisor::ServiceSpec {
287 id: supervisor::ServiceId::new("messaging")?,
288 argv,
289 cwd: None,
290 env,
291 };
292
293 let mut summary = spawn_supervised_service(
294 "messaging",
295 "messaging",
296 &spec,
297 log_dir,
298 tracker.paths,
299 restart,
300 tracker,
301 tenant,
302 team,
303 debug_enabled,
304 )?;
305 summary.add_detail(format!("tenant={tenant} team={team}"));
306 summary.add_detail(format!(
307 "cmd=dev embedded --project-root {}",
308 bundle_root.display()
309 ));
310 Ok(summary)
311}
312
313#[allow(clippy::too_many_arguments)]
314fn spawn_universal_subscriptions_service(
315 bundle_root: &Path,
316 config: &DemoConfig,
317 tenant: &str,
318 team: &str,
319 runner_binary: Option<PathBuf>,
320 tracker: &mut ServiceTracker,
321 log_dir: &Path,
322 debug_enabled: bool,
323) -> anyhow::Result<ServiceSummary> {
324 let team_override = if team.trim().is_empty() {
325 None
326 } else {
327 Some(team.to_string())
328 };
329 let log_path = operator_log::reserve_service_log(log_dir, "subscriptions")
330 .with_context(|| "unable to open subscriptions log file")?;
331 tracker.record_with_log("subscriptions-universal", "subscriptions", Some(&log_path))?;
332
333 let desired = &config.services.subscriptions.universal.desired;
334 let (runner_host, context) =
335 build_runner(bundle_root, tenant, team_override.clone(), runner_binary)?;
336 let store = SubscriptionStore::new(state_root(bundle_root));
337 let scheduler = Scheduler::new(SubscriptionService::new(runner_host, context), store);
338
339 ensure_desired_subscriptions(
340 bundle_root,
341 tenant,
342 team_override.clone(),
343 desired,
344 &scheduler,
345 )?;
346
347 let renew_interval_secs = config
348 .services
349 .subscriptions
350 .universal
351 .renew_interval_seconds
352 .max(1);
353 let renew_skew_secs = config
354 .services
355 .subscriptions
356 .universal
357 .renew_skew_minutes
358 .max(1)
359 .saturating_mul(60);
360 let interval = Duration::from_secs(renew_interval_secs);
361 let skew = Duration::from_secs(renew_skew_secs);
362
363 let scheduler_handle = scheduler;
364 thread::Builder::new()
365 .name("subscriptions-universal".to_string())
366 .spawn(move || {
367 operator_log::info(
368 module_path!(),
369 format!(
370 "subscriptions-universal scheduler running interval={}s skew={}s",
371 renew_interval_secs, renew_skew_secs
372 ),
373 );
374 loop {
375 std::thread::sleep(interval);
376 if let Err(err) = scheduler_handle.renew_due(skew) {
377 operator_log::error(
378 module_path!(),
379 format!("subscriptions-universal renew failed err={}", err),
380 );
381 }
382 }
383 })?;
384
385 let mut summary = ServiceSummary::new("subscriptions-universal", None);
386 summary.add_detail(format!("log={}", log_path.display()));
387 summary.add_detail(format!("renew_interval={}s", renew_interval_secs));
388 summary.add_detail("mode=universal".to_string());
389 if debug_enabled {
390 operator_log::debug(
391 module_path!(),
392 format!(
393 "[demo dev] tenant={} team={} universal subscriptions running",
394 tenant, team
395 ),
396 );
397 }
398 Ok(summary)
399}
400
401fn spawn_if_needed(
402 paths: &RuntimePaths,
403 spec: &supervisor::ServiceSpec,
404 restart: &BTreeSet<String>,
405 log_path_override: Option<PathBuf>,
406) -> anyhow::Result<Option<supervisor::ServiceHandle>> {
407 if should_restart(restart, spec.id.as_str()) {
408 let _ = supervisor::stop_service(paths, &spec.id, 2_000);
409 }
410
411 let pid_path = paths.pid_path(spec.id.as_str());
412 if let Some(pid) = read_pid(&pid_path)?
413 && supervisor::is_running(pid)
414 {
415 operator_log::info(
416 module_path!(),
417 format!("{}: already running (pid={})", spec.id.as_str(), pid),
418 );
419 return Ok(None);
420 }
421 let handle = supervisor::spawn_service(paths, spec.clone(), log_path_override.clone())?;
422 operator_log::info(
423 module_path!(),
424 format!("{}: started (pid={})", spec.id.as_str(), handle.pid),
425 );
426 if spec.id.as_str() == "nats" {
427 operator_log::info(
428 module_path!(),
429 format!(
430 "spawned nats pid={} log={}",
431 handle.pid,
432 handle.log_path.display()
433 ),
434 );
435 }
436 Ok(Some(handle))
437}
438
439fn read_pid(path: &Path) -> anyhow::Result<Option<u32>> {
440 if !path.exists() {
441 return Ok(None);
442 }
443 let contents = std::fs::read_to_string(path)?;
444 let trimmed = contents.trim();
445 if trimmed.is_empty() {
446 return Ok(None);
447 }
448 Ok(Some(trimmed.parse()?))
449}
450
451fn looks_like_path(value: &str) -> bool {
452 value.contains('/') || value.contains('\\') || Path::new(value).is_absolute()
453}
454
455fn should_restart(restart: &BTreeSet<String>, service: &str) -> bool {
456 restart.contains("all") || restart.contains(service)
457}
458
459#[allow(clippy::too_many_arguments)]
460#[derive(Clone, Copy, Debug, PartialEq, Eq)]
461pub enum NatsMode {
462 Off,
463 On,
464 External,
465}
466
467#[allow(clippy::too_many_arguments)]
468pub fn demo_up(
469 bundle_root: &Path,
470 tenant: &str,
471 team: Option<&str>,
472 nats_url: Option<&str>,
473 nats_mode: NatsMode,
474 messaging_enabled: bool,
475 cloudflared: Option<CloudflaredConfig>,
476 ngrok: Option<NgrokConfig>,
477 log_dir: &Path,
478 debug_enabled: bool,
479) -> anyhow::Result<()> {
480 let team_id = team.unwrap_or("default");
481 let state_dir = bundle_root.join("state");
482 std::fs::create_dir_all(&state_dir)?;
483 let paths = RuntimePaths::new(&state_dir, tenant, team_id);
484 let mut service_tracker = ServiceTracker::new(&paths, Some(log_dir))?;
485 let mut service_summaries = Vec::new();
486 let restart_targets = BTreeSet::new();
487 let mut public_base_url: Option<String> = None;
488 if debug_enabled {
489 operator_log::debug(
490 module_path!(),
491 format!(
492 "[demo dev] demo_up tenant={} team={} nats_mode={:?} messaging_enabled={}",
493 tenant, team_id, nats_mode, messaging_enabled
494 ),
495 );
496 }
497 if let Some(config) = cloudflared {
498 let cloudflared_log = operator_log::reserve_service_log(log_dir, "cloudflared")
499 .with_context(|| "unable to open cloudflared.log")?;
500 operator_log::info(
501 module_path!(),
502 format!(
503 "starting cloudflared log={} bundle={}",
504 cloudflared_log.display(),
505 bundle_root.display()
506 ),
507 );
508 let handle = cloudflared::start_quick_tunnel(&paths, &config, &cloudflared_log)?;
509 operator_log::info(
510 module_path!(),
511 format!(
512 "cloudflared url discovered url={} log={}",
513 handle.url,
514 handle.log_path.display()
515 ),
516 );
517 match cloudflared::wait_tunnel_ready(&handle.url, std::time::Duration::from_secs(30)) {
519 Ok(()) => {
520 operator_log::info(module_path!(), "cloudflared tunnel verified reachable");
521 }
522 Err(err) => {
523 operator_log::warn(
524 module_path!(),
525 format!("cloudflared tunnel not yet reachable, continuing anyway: {err}"),
526 );
527 }
528 }
529 if debug_enabled {
530 operator_log::debug(
531 module_path!(),
532 format!(
533 "[demo dev] tenant={} team={} cloudflared url={} log={}",
534 tenant,
535 team_id,
536 handle.url,
537 handle.log_path.display()
538 ),
539 );
540 }
541 let url = handle.url.clone();
542 let log_path = handle.log_path.clone();
543 service_tracker.record_with_log("cloudflared", "cloudflared", Some(&log_path))?;
544 let summary = ServiceSummary::with_details(
545 "cloudflared",
546 Some(handle.pid),
547 vec![
548 format!("url={}", url),
549 format!("log={}", log_path.display()),
550 ],
551 );
552 service_summaries.push(summary);
553 public_base_url = Some(url.clone());
554 } else if let Some(config) = ngrok {
555 let ngrok_log = operator_log::reserve_service_log(log_dir, "ngrok")
556 .with_context(|| "unable to open ngrok.log")?;
557 operator_log::info(
558 module_path!(),
559 format!(
560 "starting ngrok log={} bundle={}",
561 ngrok_log.display(),
562 bundle_root.display()
563 ),
564 );
565 let handle = ngrok::start_tunnel(&paths, &config, &ngrok_log)?;
566 operator_log::info(
567 module_path!(),
568 format!(
569 "ngrok ready url={} log={}",
570 handle.url,
571 handle.log_path.display()
572 ),
573 );
574 if debug_enabled {
575 operator_log::debug(
576 module_path!(),
577 format!(
578 "[demo dev] tenant={} team={} ngrok url={} log={}",
579 tenant,
580 team_id,
581 handle.url,
582 handle.log_path.display()
583 ),
584 );
585 }
586 let url = handle.url.clone();
587 let log_path = handle.log_path.clone();
588 service_tracker.record_with_log("ngrok", "ngrok", Some(&log_path))?;
589 let summary = ServiceSummary::with_details(
590 "ngrok",
591 Some(handle.pid),
592 vec![
593 format!("url={}", url),
594 format!("log={}", log_path.display()),
595 ],
596 );
597 service_summaries.push(summary);
598 public_base_url = Some(url.clone());
599 }
600
601 let mut resolved_nats_url = nats_url.map(|value| value.to_string());
602 if matches!(nats_mode, NatsMode::On) && resolved_nats_url.is_none() {
603 match operator_log::reserve_service_log(log_dir, "nats") {
604 Ok(nats_log) => {
605 operator_log::info(
606 module_path!(),
607 format!("starting nats log={}", nats_log.display()),
608 );
609 match services::start_nats_with_log(bundle_root, Some(nats_log.clone())) {
610 Ok(state) => {
611 operator_log::info(
612 module_path!(),
613 format!("nats started state={:?} log={}", state, nats_log.display()),
614 );
615 if debug_enabled {
616 operator_log::debug(
617 module_path!(),
618 format!(
619 "[demo dev] tenant={} team={} nats state={:?} log={}",
620 tenant,
621 team_id,
622 state,
623 nats_log.display()
624 ),
625 );
626 }
627 service_tracker
628 .record_with_log("nats", "nats", Some(&nats_log))
629 .with_context(|| "failed to record nats service state")?;
630 resolved_nats_url = Some(services::nats_url(bundle_root));
631 let pid = read_pid(&paths.pid_path("nats"))?;
632 let mut summary = ServiceSummary::new("nats", pid);
633 summary.add_detail(format!("state={:?}", state));
634 summary.add_detail(format!("url={}", services::nats_url(bundle_root)));
635 summary.add_detail(format!("log={}", nats_log.display()));
636 service_summaries.push(summary);
637 mark_nats_started(&paths)?;
638 }
639 Err(err) => {
640 eprintln!(
641 "{}",
642 crate::operator_i18n::trf(
643 "demo.runtime.warn_failed_start_nats",
644 "Warning: failed to start NATS: {}",
645 &[&err.to_string()]
646 )
647 );
648 operator_log::error(
649 module_path!(),
650 format!("failed to start nats (log={}): {err}", nats_log.display()),
651 );
652 }
653 }
654 }
655 Err(err) => {
656 eprintln!(
657 "{}",
658 crate::operator_i18n::trf(
659 "demo.runtime.warn_failed_prepare_nats_log",
660 "Warning: failed to prepare NATS log: {}",
661 &[&err.to_string()]
662 )
663 );
664 operator_log::error(module_path!(), format!("failed to open nats.log: {err}"));
665 }
666 }
667 }
668
669 let run_gsm_services = matches!(nats_mode, NatsMode::On);
670 if messaging_enabled && run_gsm_services {
671 let mut env_map = build_env(tenant, team_id, resolved_nats_url.as_deref(), None);
672 if let Some(url) = public_base_url.as_deref() {
673 env_map.insert("PUBLIC_BASE_URL".to_string(), url.to_string());
674 }
675 if debug_enabled {
676 operator_log::debug(
677 module_path!(),
678 format!(
679 "[demo dev] launching GSM gateway/egress/subscriptions tenant={} team={} envs={:?}",
680 tenant, team_id, env_map
681 ),
682 );
683 }
684 let mut messaging_summary = spawn_embedded_messaging(
685 bundle_root,
686 tenant,
687 team_id,
688 env_map,
689 log_dir,
690 &restart_targets,
691 &mut service_tracker,
692 debug_enabled,
693 )?;
694 messaging_summary.add_detail("embedded messaging stack".to_string());
695 service_summaries.push(messaging_summary);
696 } else {
697 operator_log::info(
698 module_path!(),
699 "messaging: running embedded runner (no gsm gateway/egress)",
700 );
701 }
702
703 operator_log::info(
704 module_path!(),
705 "events: handled in-process (HTTP ingress + timer scheduler)",
706 );
707
708 if !run_gsm_services {
709 operator_log::info(
710 module_path!(),
711 "demo running in embedded runner mode; gateway/egress disabled",
712 );
713 if debug_enabled {
714 operator_log::debug(
715 module_path!(),
716 format!(
717 "[demo dev] embedded runner mode only tenant={} team={} (gateway/egress/subscriptions skipped)",
718 tenant, team_id
719 ),
720 );
721 }
722 }
723
724 let bundle_name = bundle_root
726 .file_name()
727 .map(|n| n.to_string_lossy().into_owned())
728 .unwrap_or_else(|| tenant.to_string());
729 let mode = if run_gsm_services {
730 "gsm gateway + egress".to_string()
731 } else {
732 "embedded runner".to_string()
733 };
734 let info = StartupInfo {
735 bundle_name,
736 http_url: None,
737 static_route_urls: Vec::new(),
738 public_url: public_base_url,
739 channels: Vec::new(),
740 mode,
741 webhook_results: Vec::new(),
742 subscription_results: Vec::new(),
743 };
744 info.print();
745
746 Ok(())
747}
748
749#[allow(clippy::too_many_arguments)]
750pub fn demo_up_services(
751 config_path: &Path,
752 config: &DemoConfig,
753 static_routes: &BundleStaticRoutesInspection,
754 configured_public_base_url: Option<String>,
755 env_store_public_base_url: Option<String>,
756 cloudflared: Option<CloudflaredConfig>,
757 ngrok: Option<NgrokConfig>,
758 restart: &BTreeSet<String>,
759 runner_binary: Option<PathBuf>,
760 log_dir: &Path,
761 debug_enabled: bool,
762 no_browser: bool,
763) -> anyhow::Result<ForegroundRuntimeHandles> {
764 let config_dir = config_path
765 .parent()
766 .ok_or_else(|| anyhow::anyhow!("config path has no parent directory"))?;
767 let state_dir = config_dir.join("state");
768 let tenant = config.tenant.as_str();
769 let team = config.team.as_str();
770 let paths = RuntimePaths::new(&state_dir, tenant, team);
771 let mut service_tracker = ServiceTracker::new(&paths, Some(log_dir))?;
772 match crate::dependency_resolver::check_all(config_dir) {
774 Ok(report) => {
775 for dep in &report.missing {
776 crate::operator_log::warn(
777 module_path!(),
778 format!(
779 "missing pack dependency: {} (required by {}, capabilities: {:?})",
780 dep.pack_id, dep.required_by, dep.required_capabilities
781 ),
782 );
783 }
784 }
785 Err(err) => {
786 crate::operator_log::debug(
787 module_path!(),
788 format!("dependency check skipped: {err:#}"),
789 );
790 }
791 }
792
793 let discovery = crate::discovery::discover(config_dir)?;
794 crate::discovery::persist(config_dir, tenant, &discovery)?;
795 let secrets_handle = secrets_gate::resolve_secrets_manager(config_dir, tenant, Some(team))?;
796 let runner_host = Arc::new(DemoRunnerHost::new(
797 config_dir.to_path_buf(),
798 &discovery,
799 runner_binary.clone(),
800 secrets_handle.clone(),
801 debug_enabled,
802 )?);
803 let operator_config = crate::config::load_operator_config(config_dir)
804 .context("load operator config")?
805 .unwrap_or_default();
806 let notifier_config = {
807 let resolver = crate::notifier::config::SecretsManagerResolver {
808 manager: secrets_handle.manager(),
809 };
810 let fut = crate::notifier::config::resolve_notifier_config(
811 config_dir,
812 &operator_config,
813 &resolver,
814 );
815 match tokio::runtime::Handle::try_current() {
816 Ok(handle) => tokio::task::block_in_place(|| handle.block_on(fut)),
817 Err(_) => tokio::runtime::Builder::new_current_thread()
818 .enable_all()
819 .build()
820 .context("failed to build temporary tokio runtime for notifier config")?
821 .block_on(fut),
822 }
823 .context("failed to resolve notifier config")?
824 };
825 let ingress_domains = detect_http_ingress_domains(&discovery, runner_host.as_ref());
826 let enable_static_routes = static_routes.bundle_has_static_routes();
828 if enable_static_routes || ingress_domains.contains(&Domain::Messaging) {
829 validate_messaging_app_route(config_dir, tenant, Some(team))
830 .with_context(|| "messaging app route validation failed")?;
831 }
832 let ingress_server = start_http_ingress_server(
833 config,
834 &ingress_domains,
835 runner_host.clone(),
836 enable_static_routes,
837 None, notifier_config,
839 )
840 .with_context(|| "failed to start local HTTP ingress server")?;
841 let run_gsm_services = config.services.nats.enabled;
842 operator_log::info(
843 module_path!(),
844 format!(
845 "demo start services start bundle={} tenant={} team={} log_dir={}",
846 config_path.display(),
847 tenant,
848 team,
849 log_dir.display()
850 ),
851 );
852 if debug_enabled {
853 operator_log::debug(
854 module_path!(),
855 format!(
856 "[demo verbose] bundle={} tenant={} team={} logging=debug",
857 config_path.display(),
858 tenant,
859 team
860 ),
861 );
862 }
863
864 if should_restart(restart, "cloudflared") {
865 let _ = supervisor::stop_pidfile(&paths.pid_path("cloudflared"), 2_000);
866 }
867 if should_restart(restart, "ngrok") {
868 let _ = supervisor::stop_pidfile(&paths.pid_path("ngrok"), 2_000);
869 }
870
871 let tunnel_public_base_url = if let Some(mut cfg) = cloudflared {
872 if ingress_server.is_none() {
873 operator_log::warn(
874 module_path!(),
875 "cloudflared requested but no local HTTP ingress listener is enabled; skipping tunnel startup",
876 );
877 None
878 } else {
879 if let Some(ref server) = ingress_server {
882 cfg.local_port = server.actual_port;
883 }
884 let cloudflared_log = operator_log::reserve_service_log(log_dir, "cloudflared")
885 .with_context(|| "unable to open cloudflared.log")?;
886 operator_log::info(
887 module_path!(),
888 format!("starting cloudflared log={}", cloudflared_log.display()),
889 );
890 let handle = cloudflared::start_quick_tunnel(&paths, &cfg, &cloudflared_log)?;
891 match cloudflared::wait_tunnel_ready(&handle.url, std::time::Duration::from_secs(30)) {
893 Ok(()) => {
894 operator_log::info(module_path!(), "cloudflared tunnel verified reachable");
895 }
896 Err(err) => {
897 operator_log::warn(
898 module_path!(),
899 format!("cloudflared tunnel not yet reachable, continuing anyway: {err}"),
900 );
901 }
902 }
903 let mut domain_labels = Vec::new();
904 if discovery.domains.messaging {
905 domain_labels.push("messaging");
906 }
907 if discovery.domains.events {
908 domain_labels.push("events");
909 }
910 if discovery.domains.oauth {
911 domain_labels.push("oauth");
912 }
913 let domain_list = if domain_labels.is_empty() {
914 "none".to_string()
915 } else {
916 domain_labels.join(",")
917 };
918 operator_log::info(
919 module_path!(),
920 format!(
921 "cloudflared ready domains={} url={} log={}",
922 domain_list,
923 handle.url,
924 handle.log_path.display()
925 ),
926 );
927 if debug_enabled {
928 operator_log::debug(
929 module_path!(),
930 format!(
931 "[demo dev] tenant={} team={} cloudflared domains={} url={} log={}",
932 tenant,
933 team,
934 domain_list,
935 handle.url,
936 handle.log_path.display()
937 ),
938 );
939 }
940 service_tracker.record_with_log(
941 "cloudflared",
942 "cloudflared",
943 Some(&handle.log_path),
944 )?;
945 Some(handle.url)
946 }
947 } else if let Some(cfg) = ngrok {
948 if ingress_server.is_none() {
949 operator_log::warn(
950 module_path!(),
951 "ngrok requested but no local HTTP ingress listener is enabled; skipping tunnel startup",
952 );
953 None
954 } else {
955 let ngrok_log = operator_log::reserve_service_log(log_dir, "ngrok")
956 .with_context(|| "unable to open ngrok.log")?;
957 operator_log::info(
958 module_path!(),
959 format!("starting ngrok log={}", ngrok_log.display()),
960 );
961 let handle = ngrok::start_tunnel(&paths, &cfg, &ngrok_log)?;
962 let mut domain_labels = Vec::new();
963 if discovery.domains.messaging {
964 domain_labels.push("messaging");
965 }
966 if discovery.domains.events {
967 domain_labels.push("events");
968 }
969 let domain_list = if domain_labels.is_empty() {
970 "none".to_string()
971 } else {
972 domain_labels.join(",")
973 };
974 operator_log::info(
975 module_path!(),
976 format!(
977 "ngrok ready domains={} url={} log={}",
978 domain_list,
979 handle.url,
980 handle.log_path.display()
981 ),
982 );
983 if debug_enabled {
984 operator_log::debug(
985 module_path!(),
986 format!(
987 "[demo dev] tenant={} team={} ngrok domains={} url={} log={}",
988 tenant,
989 team,
990 domain_list,
991 handle.url,
992 handle.log_path.display()
993 ),
994 );
995 }
996 service_tracker.record_with_log("ngrok", "ngrok", Some(&handle.log_path))?;
997 Some(handle.url)
998 }
999 } else {
1000 None
1001 };
1002
1003 let previous_public_url =
1005 crate::webhook_updater::read_previous_public_url(&paths.runtime_root());
1006
1007 let derived_local_url = if ingress_server.is_some() && enable_static_routes {
1016 let host = &config.services.gateway.listen_addr;
1017 let port = ingress_server
1018 .as_ref()
1019 .map(|server| server.actual_port)
1020 .unwrap_or(config.services.gateway.port);
1021 Some(format!("http://{}:{}", host, port))
1022 } else {
1023 None
1024 };
1025 let url_with_source: Option<(String, RuntimePublicBaseUrlSource)> = tunnel_public_base_url
1026 .clone()
1027 .map(|u| (u, RuntimePublicBaseUrlSource::Tunnel))
1028 .or_else(|| {
1029 env_store_public_base_url
1030 .clone()
1031 .map(|u| (u, RuntimePublicBaseUrlSource::EnvStore))
1032 })
1033 .or_else(|| {
1034 configured_public_base_url
1035 .clone()
1036 .map(|u| (u, RuntimePublicBaseUrlSource::Configured))
1037 })
1038 .or_else(|| derived_local_url.map(|u| (u, RuntimePublicBaseUrlSource::Derived)));
1039 let public_base_url = url_with_source.as_ref().map(|(u, _)| u.clone());
1040
1041 let webhook_summary = if let Some(ref new_url) = public_base_url {
1043 match crate::webhook_updater::update_webhooks_if_url_changed(
1044 config_dir,
1045 &discovery,
1046 &secrets_handle,
1047 Some(runner_host.as_ref()),
1048 tenant,
1049 team,
1050 previous_public_url.as_deref(),
1051 new_url,
1052 ) {
1053 Ok(summary) => summary,
1054 Err(err) => {
1055 operator_log::warn(
1056 module_path!(),
1057 format!("[webhook-updater] failed to update webhooks: {}", err),
1058 );
1059 crate::webhook_updater::WebhookUpdateSummary::default()
1060 }
1061 }
1062 } else {
1063 crate::webhook_updater::WebhookUpdateSummary::default()
1064 };
1065 let subscription_summary =
1066 match crate::subscription_updater::sync_subscriptions_if_public_url_available(
1067 config_dir,
1068 &discovery,
1069 &secrets_handle,
1070 Some(runner_host.as_ref()),
1071 tenant,
1072 team,
1073 public_base_url.as_deref().unwrap_or(""),
1074 ) {
1075 Ok(summary) => summary,
1076 Err(err) => {
1077 operator_log::warn(
1078 module_path!(),
1079 format!(
1080 "[subscription-updater] failed to sync subscriptions: {}",
1081 err
1082 ),
1083 );
1084 crate::subscription_updater::SubscriptionUpdateSummary::default()
1085 }
1086 };
1087
1088 let http_listener_enabled = ingress_server.is_some();
1091 let asset_serving_enabled = enable_static_routes;
1092 let runtime_config = url_with_source.map(|(value, source)| RuntimeConfig {
1093 public_base_url: Some(RuntimePublicBaseUrl { value, source }),
1094 });
1095
1096 let startup_contract = resolve_startup_contract(
1097 static_routes,
1098 http_listener_enabled,
1099 asset_serving_enabled,
1100 public_base_url.clone(),
1101 runtime_config,
1102 )?;
1103 write_json(
1104 &paths.runtime_root().join("startup_contract.json"),
1105 &startup_contract,
1106 )?;
1107
1108 if should_restart(restart, "nats") {
1109 let _ = supervisor::stop_pidfile(&paths.pid_path("nats"), 2_000);
1110 }
1111
1112 let nats_url = if config.services.nats.enabled {
1113 if config.services.nats.spawn.enabled {
1114 let spec = build_service_spec(
1115 config_dir,
1116 "nats",
1117 &config.services.nats.spawn.binary,
1118 &config.services.nats.spawn.args,
1119 &build_env(tenant, team, None, Some(&startup_contract)),
1120 )?;
1121 log_service_spec_debug("nats", "nats", &spec, tenant, team, debug_enabled);
1122 let nats_log = operator_log::reserve_service_log(log_dir, "nats")
1123 .with_context(|| "unable to open nats.log")?;
1124 if let Some(handle) = spawn_if_needed(&paths, &spec, restart, Some(nats_log.clone()))? {
1125 service_tracker
1126 .record_with_log("nats", "nats", Some(&handle.log_path))
1127 .with_context(|| "failed to record nats service")?;
1128 }
1129 }
1130 Some(config.services.nats.url.clone())
1131 } else {
1132 None
1133 };
1134
1135 operator_log::info(
1136 module_path!(),
1137 "events provider packs run in-process; external events components are disabled",
1138 );
1139
1140 if run_gsm_services {
1141 if should_restart(restart, "gateway") {
1142 let _ = supervisor::stop_pidfile(&paths.pid_path("gateway"), 2_000);
1143 }
1144 let gateway_spec = build_service_spec(
1145 config_dir,
1146 "gateway",
1147 &config.services.gateway.binary,
1148 &config.services.gateway.args,
1149 &build_env(tenant, team, nats_url.as_deref(), Some(&startup_contract)),
1150 )?;
1151 if let Some(handle) = spawn_if_needed(&paths, &gateway_spec, restart, None)? {
1152 service_tracker.record_with_log("gateway", "gateway", Some(&handle.log_path))?;
1153 }
1154
1155 if should_restart(restart, "egress") {
1156 let _ = supervisor::stop_pidfile(&paths.pid_path("egress"), 2_000);
1157 }
1158 let egress_spec = build_service_spec(
1159 config_dir,
1160 "egress",
1161 &config.services.egress.binary,
1162 &config.services.egress.args,
1163 &build_env(tenant, team, nats_url.as_deref(), Some(&startup_contract)),
1164 )?;
1165 if let Some(handle) = spawn_if_needed(&paths, &egress_spec, restart, None)? {
1166 service_tracker.record_with_log("egress", "egress", Some(&handle.log_path))?;
1167 }
1168
1169 match config.services.subscriptions.mode {
1170 DemoSubscriptionsMode::LegacyGsm => {
1171 if config.services.subscriptions.msgraph.enabled {
1172 if should_restart(restart, "subscriptions")
1173 || should_restart(restart, "msgraph")
1174 {
1175 let _ = supervisor::stop_pidfile(&paths.pid_path("subscriptions"), 2_000);
1176 }
1177 let mut args = config.services.subscriptions.msgraph.args.clone();
1178 if !config.services.subscriptions.msgraph.mode.is_empty() {
1179 args.insert(0, config.services.subscriptions.msgraph.mode.clone());
1180 }
1181 let spec = build_service_spec(
1182 config_dir,
1183 "subscriptions",
1184 &config.services.subscriptions.msgraph.binary,
1185 &args,
1186 &build_env(tenant, team, nats_url.as_deref(), Some(&startup_contract)),
1187 )?;
1188 if let Some(handle) = spawn_if_needed(&paths, &spec, restart, None)? {
1189 service_tracker.record_with_log(
1190 "subscriptions",
1191 "subscriptions",
1192 Some(&handle.log_path),
1193 )?;
1194 }
1195 }
1196 }
1197 DemoSubscriptionsMode::UniversalOps => {
1198 spawn_universal_subscriptions_service(
1199 config_dir,
1200 config,
1201 tenant,
1202 team,
1203 runner_binary.clone(),
1204 &mut service_tracker,
1205 log_dir,
1206 debug_enabled,
1207 )?;
1208 }
1209 }
1210 } else {
1211 operator_log::info(
1212 module_path!(),
1213 "demo running in embedded runner mode; gateway/egress disabled",
1214 );
1215 if debug_enabled {
1216 operator_log::debug(
1217 module_path!(),
1218 format!(
1219 "[demo dev] embedded runner mode only tenant={} team={} (gateway/egress/subscriptions skipped)",
1220 tenant, team
1221 ),
1222 );
1223 }
1224 }
1225
1226 let endpoints = DemoEndpoints {
1227 tenant: tenant.to_string(),
1228 team: team.to_string(),
1229 public_base_url: startup_contract.public_base_url.clone(),
1230 nats_url,
1231 gateway_listen_addr: config.services.gateway.listen_addr.clone(),
1232 gateway_port: ingress_server
1233 .as_ref()
1234 .map(|server| server.actual_port)
1235 .unwrap_or(config.services.gateway.port),
1236 };
1237 write_json(&paths.runtime_root().join("endpoints.json"), &endpoints)?;
1238
1239 let bundle_name = config_dir
1241 .file_name()
1242 .map(|n| n.to_string_lossy().into_owned())
1243 .unwrap_or_else(|| tenant.to_string());
1244 let http_url = ingress_server.as_ref().map(|s| {
1245 format!(
1246 "http://{}:{}",
1247 config.services.gateway.listen_addr, s.actual_port
1248 )
1249 });
1250 let static_route_urls = match (&ingress_server, &public_base_url) {
1252 (Some(server), Some(base_url)) => server
1253 .ui_urls
1254 .iter()
1255 .map(|local_url| {
1256 if let Some(path_start) = local_url.find("/v1/") {
1259 format!(
1260 "{}/{}",
1261 base_url.trim_end_matches('/'),
1262 local_url[path_start..].trim_start_matches('/')
1263 )
1264 } else {
1265 local_url.clone()
1266 }
1267 })
1268 .collect(),
1269 (Some(server), None) => server.ui_urls.clone(),
1270 _ => Vec::new(),
1271 };
1272 let channels: Vec<String> = discovery
1273 .providers
1274 .iter()
1275 .filter(|p| p.domain == "messaging")
1276 .map(|p| p.provider_id.clone())
1277 .collect();
1278 let mode = if run_gsm_services {
1279 "gsm gateway + egress".to_string()
1280 } else {
1281 "embedded runner".to_string()
1282 };
1283
1284 let info = StartupInfo {
1285 bundle_name,
1286 http_url,
1287 static_route_urls: static_route_urls.clone(),
1288 public_url: public_base_url,
1289 channels,
1290 mode,
1291 webhook_results: webhook_summary.results,
1292 subscription_results: subscription_summary.results,
1293 };
1294 info.print();
1295
1296 if !no_browser
1298 && let Some(url) = static_route_urls.first()
1299 && let Err(err) = open::that(url)
1300 {
1301 operator_log::warn(module_path!(), format!("failed to open browser: {err}"));
1302 }
1303
1304 Ok(ForegroundRuntimeHandles { ingress_server })
1305}
1306
1307fn validate_messaging_app_route(
1308 bundle_root: &Path,
1309 tenant: &str,
1310 team: Option<&str>,
1311) -> anyhow::Result<()> {
1312 let app_pack_path = match crate::messaging_app::resolve_app_pack_path(
1313 bundle_root,
1314 tenant,
1315 team,
1316 None,
1317 ) {
1318 Ok(path) => path,
1319 Err(err) => {
1320 operator_log::error(
1321 module_path!(),
1322 format!(
1323 "APP_PACK_NOT_RESOLVED: greentic-start cannot choose which app pack to invoke for tenant={} team={}. \
1324Expected bundle.yaml app_packs, packs/{tenant}/{}/default.gtpack, packs/{tenant}/default.gtpack, or packs/default.gtpack. \
1325Details: {err:#}",
1326 tenant,
1327 team.unwrap_or("_"),
1328 team.unwrap_or("_")
1329 ),
1330 );
1331 return Err(err).context("unable to resolve messaging app pack");
1332 }
1333 };
1334 let pack_info = match crate::messaging_app::load_app_pack_info(&app_pack_path) {
1335 Ok(info) => info,
1336 Err(err) => {
1337 operator_log::error(
1338 module_path!(),
1339 format!(
1340 "APP_PACK_MANIFEST_INVALID: greentic-start found app pack {} but could not read its manifest. Details: {err:#}",
1341 app_pack_path.display()
1342 ),
1343 );
1344 return Err(err).context("unable to read messaging app pack manifest");
1345 }
1346 };
1347 match crate::messaging_app::select_app_flow(&pack_info) {
1348 Ok(flow) => {
1349 operator_log::info(
1350 module_path!(),
1351 format!(
1352 "messaging app route resolved: pack={} pack_id={} flow={} kind={}",
1353 app_pack_path.display(),
1354 pack_info.pack_id,
1355 flow.id,
1356 flow.kind
1357 ),
1358 );
1359 Ok(())
1360 }
1361 Err(err) => {
1362 operator_log::error(
1363 module_path!(),
1364 format!(
1365 "{} pack={}",
1366 crate::messaging_app::app_flow_resolution_error(&pack_info),
1367 app_pack_path.display()
1368 ),
1369 );
1370 Err(err).context("unable to resolve messaging app flow")
1371 }
1372 }
1373}
1374
1375fn detect_http_ingress_domains(
1376 discovery: &crate::discovery::DiscoveryResult,
1377 runner_host: &DemoRunnerHost,
1378) -> Vec<Domain> {
1379 let mut domains = Vec::new();
1380 for domain in [Domain::Messaging, Domain::Events, Domain::OAuth] {
1381 let supported = discovery.providers.iter().any(|provider| {
1382 let domain_match = parse_domain_name(&provider.domain) == Some(domain);
1383 let op_support = runner_host.supports_op(domain, &provider.provider_id, "ingest_http");
1384 operator_log::debug(
1385 module_path!(),
1386 format!(
1387 "[domain-detect] domain={:?} provider={} domain_match={} op_support={}",
1388 domain, provider.provider_id, domain_match, op_support
1389 ),
1390 );
1391 domain_match && op_support
1392 });
1393 let fallback_supported = matches!(domain, Domain::Events) && discovery.domains.events;
1394 operator_log::debug(
1395 module_path!(),
1396 format!(
1397 "[domain-detect] domain={:?} supported={} fallback={} => enabled={}",
1398 domain,
1399 supported,
1400 fallback_supported,
1401 supported || fallback_supported
1402 ),
1403 );
1404 if supported || fallback_supported {
1405 domains.push(domain);
1406 }
1407 }
1408 domains
1409}
1410
1411fn parse_domain_name(value: &str) -> Option<Domain> {
1412 match value {
1413 "messaging" => Some(Domain::Messaging),
1414 "events" => Some(Domain::Events),
1415 "oauth" => Some(Domain::OAuth),
1416 "secrets" => Some(Domain::Secrets),
1417 _ => None,
1418 }
1419}
1420
1421fn start_http_ingress_server(
1422 config: &DemoConfig,
1423 domains: &[Domain],
1424 runner_host: Arc<DemoRunnerHost>,
1425 enable_static_routes: bool,
1426 public_base_url: Option<String>,
1427 notifier_config: crate::notifier::NotifierConfig,
1428) -> anyhow::Result<Option<HttpIngressServer>> {
1429 let health_probe_listener_required = std::env::var("GREENTIC_HEALTH_LIVENESS_PATH")
1431 .ok()
1432 .is_some_and(|value| !value.trim().is_empty())
1433 || std::env::var("GREENTIC_HEALTH_READINESS_PATH")
1434 .ok()
1435 .is_some_and(|value| !value.trim().is_empty());
1436 if domains.is_empty() && !enable_static_routes && !health_probe_listener_required {
1438 return Ok(None);
1439 }
1440 let addr = format!(
1441 "{}:{}",
1442 config.services.gateway.listen_addr, config.services.gateway.port
1443 );
1444 let bind_addr = addr
1445 .parse()
1446 .with_context(|| format!("invalid gateway listen address {addr}"))?;
1447 let server = HttpIngressServer::start(HttpIngressConfig {
1448 bind_addr,
1449 domains: domains.to_vec(),
1450 runner_host,
1451 enable_static_routes,
1452 tenant: config.tenant.clone(),
1453 public_base_url,
1454 notifier_config,
1455 revision_routing: None,
1458 })?;
1459 operator_log::info(
1460 module_path!(),
1461 format!(
1462 "HTTP ingress ready at http://{}:{}",
1463 config.services.gateway.listen_addr, server.actual_port
1464 ),
1465 );
1466 Ok(Some(server))
1467}
1468
1469pub fn demo_status_runtime(
1470 state_dir: &Path,
1471 tenant: &str,
1472 team: &str,
1473 verbose: bool,
1474) -> anyhow::Result<()> {
1475 let paths = RuntimePaths::new(state_dir, tenant, team);
1476 let statuses = supervisor::read_status(&paths)?;
1477 if statuses.is_empty() {
1478 println!(
1479 "{}",
1480 crate::operator_i18n::tr("demo.runtime.none_running", "none running")
1481 );
1482 return Ok(());
1483 }
1484 for status in statuses {
1485 let state = if status.running {
1486 crate::operator_i18n::tr("demo.runtime.status_running", "running")
1487 } else {
1488 crate::operator_i18n::tr("demo.runtime.status_stopped", "stopped")
1489 };
1490 let pid = status
1491 .pid
1492 .map(|value| value.to_string())
1493 .unwrap_or_else(|| "-".to_string());
1494 if verbose {
1495 println!(
1496 "{}: {} (pid={}, log={})",
1497 status.id.as_str(),
1498 &state,
1499 pid,
1500 status.log_path.display()
1501 );
1502 } else {
1503 println!("{}: {} (pid={})", status.id.as_str(), &state, pid);
1504 }
1505 }
1506 Ok(())
1507}
1508
1509pub fn demo_logs_runtime(
1510 state_dir: &Path,
1511 log_dir: &Path,
1512 tenant: &str,
1513 team: &str,
1514 service: &str,
1515 tail: bool,
1516) -> anyhow::Result<()> {
1517 let log_dir = resolve_manifest_log_dir(state_dir, tenant, team, log_dir)?;
1518 let log_path = if service == "operator" {
1519 log_dir.join("operator.log")
1520 } else {
1521 let tenant_log_path = tenant_log_path(&log_dir, service, tenant, team)?;
1522 select_log_path(&log_dir, service, tenant, &tenant_log_path)
1523 };
1524 if tail {
1525 return services::tail_log(&log_path);
1526 }
1527 let lines = read_last_lines(&log_path, 200)?;
1528 if !lines.is_empty() {
1529 println!("{lines}");
1530 }
1531 Ok(())
1532}
1533
1534pub fn demo_down_runtime(
1535 state_dir: &Path,
1536 tenant: &str,
1537 team: &str,
1538 all: bool,
1539) -> anyhow::Result<()> {
1540 let timeout_ms = 2_000;
1541 let paths = RuntimePaths::new(state_dir, tenant, team);
1542 stop_started_nats(&paths, state_dir)?;
1543 ngrok::stop_ngrok();
1545 cloudflared::stop_cloudflared();
1546 cloudflared::cleanup_url_file(&paths);
1548 if all {
1549 let pids_root = state_dir.join("pids");
1550 if !pids_root.exists() {
1551 println!(
1552 "{}",
1553 crate::operator_i18n::tr(
1554 "demo.runtime.no_services_to_stop",
1555 "No supervised background services to stop. If runtime was started in the foreground, stop it in the original terminal with Ctrl+C."
1556 )
1557 );
1558 return Ok(());
1559 }
1560 for entry in std::fs::read_dir(&pids_root)? {
1561 let entry = entry?;
1562 if !entry.file_type()?.is_dir() {
1563 continue;
1564 }
1565 for pidfile in std::fs::read_dir(entry.path())? {
1566 let pidfile = pidfile?;
1567 if pidfile.path().extension().and_then(|ext| ext.to_str()) != Some("pid") {
1568 continue;
1569 }
1570 let _ = supervisor::stop_pidfile(&pidfile.path(), timeout_ms);
1571 }
1572 }
1573 remove_service_manifest(&paths)?;
1574 println!(
1575 "{}",
1576 crate::operator_i18n::trf(
1577 "demo.runtime.stopped_all_under",
1578 "Stopped all services under {}",
1579 &[&pids_root.display().to_string()]
1580 )
1581 );
1582 return Ok(());
1583 }
1584
1585 if let Some(manifest) = read_service_manifest(&paths)? {
1586 if manifest.services.is_empty() {
1587 println!(
1588 "{}",
1589 crate::operator_i18n::tr(
1590 "demo.runtime.no_services_to_stop",
1591 "No supervised background services to stop. If runtime was started in the foreground, stop it in the original terminal with Ctrl+C."
1592 )
1593 );
1594 return Ok(());
1595 }
1596 for entry in manifest.services.iter().rev() {
1597 let id = supervisor::ServiceId::new(entry.id.clone())?;
1598 if let Err(err) = supervisor::stop_service(&paths, &id, timeout_ms) {
1599 eprintln!(
1600 "{}",
1601 crate::operator_i18n::trf(
1602 "demo.runtime.warn_failed_stop_service",
1603 "Warning: failed to stop {}: {}",
1604 &[&entry.id, &err.to_string()]
1605 )
1606 );
1607 }
1608 }
1609 remove_service_manifest(&paths)?;
1610 return Ok(());
1611 }
1612
1613 let pids_dir = paths.pids_dir();
1614 if !pids_dir.exists() {
1615 println!(
1616 "{}",
1617 crate::operator_i18n::tr(
1618 "demo.runtime.no_services_to_stop",
1619 "No supervised background services to stop. If runtime was started in the foreground, stop it in the original terminal with Ctrl+C."
1620 )
1621 );
1622 return Ok(());
1623 }
1624 for entry in std::fs::read_dir(&pids_dir)? {
1625 let entry = entry?;
1626 let path = entry.path();
1627 if path.extension().and_then(|ext| ext.to_str()) != Some("pid") {
1628 continue;
1629 }
1630 supervisor::stop_pidfile(&path, timeout_ms)?;
1631 }
1632 Ok(())
1633}
1634
1635fn select_log_path(log_dir: &Path, service: &str, tenant: &str, tenant_log: &Path) -> PathBuf {
1636 let candidates = [
1637 log_dir.join(format!("{service}.log")),
1638 log_dir.join(format!("{service}-{tenant}.log")),
1639 log_dir.join(format!("{service}.{tenant}.log")),
1640 ];
1641 for candidate in &candidates {
1642 if candidate.exists() {
1643 return candidate.clone();
1644 }
1645 }
1646 if tenant_log.exists() {
1647 return tenant_log.to_path_buf();
1648 }
1649 let _ = ensure_log_file(tenant_log);
1650 tenant_log.to_path_buf()
1651}
1652
1653fn tenant_log_path(
1654 log_dir: &Path,
1655 service: &str,
1656 tenant: &str,
1657 team: &str,
1658) -> anyhow::Result<PathBuf> {
1659 let tenant_dir = log_dir.join(format!("{tenant}.{team}"));
1660 let path = tenant_dir.join(format!("{service}.log"));
1661 ensure_log_file(&path)?;
1662 Ok(path)
1663}
1664
1665fn ensure_log_file(path: &Path) -> anyhow::Result<()> {
1666 if let Some(parent) = path.parent() {
1667 std::fs::create_dir_all(parent)?;
1668 }
1669 if !path.exists() {
1670 std::fs::File::create(path)?;
1671 }
1672 Ok(())
1673}
1674
1675fn resolve_manifest_log_dir(
1676 state_dir: &Path,
1677 tenant: &str,
1678 team: &str,
1679 default: &Path,
1680) -> anyhow::Result<PathBuf> {
1681 let paths = RuntimePaths::new(state_dir, tenant, team);
1682 if let Some(manifest) = read_service_manifest(&paths)?
1683 && let Some(dir) = manifest.log_dir
1684 {
1685 return Ok(PathBuf::from(dir));
1686 }
1687 Ok(default.to_path_buf())
1688}
1689
1690fn build_env(
1691 tenant: &str,
1692 team: &str,
1693 nats_url: Option<&str>,
1694 startup_contract: Option<&StartupContract>,
1695) -> BTreeMap<String, String> {
1696 let mut env = BTreeMap::new();
1697 env.insert("GREENTIC_TENANT".to_string(), tenant.to_string());
1698 env.insert("GREENTIC_TEAM".to_string(), team.to_string());
1699 if let Some(url) = nats_url {
1700 env.insert("NATS_URL".to_string(), url.to_string());
1701 }
1702 if let Some(contract) = startup_contract {
1703 contract.apply_env(&mut env);
1704 }
1705 env
1706}
1707
1708fn resolve_startup_contract(
1709 static_routes: &BundleStaticRoutesInspection,
1710 http_listener_enabled: bool,
1711 asset_serving_enabled: bool,
1712 public_base_url: Option<String>,
1713 runtime_config: Option<RuntimeConfig>,
1714) -> anyhow::Result<StartupContract> {
1715 crate::startup_contract::resolve(StartupContractInput {
1716 bundle_has_static_routes: static_routes.bundle_has_static_routes(),
1717 http_listener_enabled,
1718 asset_serving_enabled,
1719 public_base_url,
1720 runtime_config,
1721 })
1722}
1723
1724fn mark_nats_started(paths: &RuntimePaths) -> anyhow::Result<()> {
1725 let marker = nats_started_marker(paths);
1726 if let Some(parent) = marker.parent() {
1727 std::fs::create_dir_all(parent)?;
1728 }
1729 std::fs::write(marker, "started")?;
1730 Ok(())
1731}
1732
1733fn stop_started_nats(paths: &RuntimePaths, state_dir: &Path) -> anyhow::Result<()> {
1734 let marker = nats_started_marker(paths);
1735 if !marker.exists() {
1736 return Ok(());
1737 }
1738 let bundle_root = state_dir.parent().unwrap_or(state_dir);
1739 match services::stop_nats(bundle_root) {
1740 Ok(_) => {
1741 let _ = std::fs::remove_file(&marker);
1742 }
1743 Err(err) => {
1744 eprintln!(
1745 "{}",
1746 crate::operator_i18n::trf(
1747 "demo.runtime.warn_failed_stop_nats",
1748 "Warning: failed to stop nats: {}",
1749 &[&err.to_string()]
1750 )
1751 );
1752 }
1753 }
1754 Ok(())
1755}
1756
1757fn nats_started_marker(paths: &RuntimePaths) -> PathBuf {
1758 paths.runtime_root().join("nats.started")
1759}
1760
1761fn preflight_interpreter(binary: &Path) -> anyhow::Result<()> {
1768 use std::io::Read;
1769 let Ok(mut file) = std::fs::File::open(binary) else {
1770 return Ok(());
1771 };
1772 let mut magic = [0u8; 2];
1773 if file.read(&mut magic).unwrap_or(0) < 2 || &magic != b"#!" {
1774 return Ok(());
1775 }
1776 let mut rest = Vec::new();
1777 let mut byte = [0u8; 1];
1778 while rest.len() < 256 {
1779 match file.read(&mut byte) {
1780 Ok(0) => break,
1781 Ok(_) if byte[0] == b'\n' => break,
1782 Ok(_) => rest.push(byte[0]),
1783 Err(_) => return Ok(()),
1784 }
1785 }
1786 let line = String::from_utf8_lossy(&rest);
1787 let interpreter = line.split_whitespace().next().unwrap_or("");
1788 if interpreter.is_empty() || Path::new(interpreter).exists() {
1789 return Ok(());
1790 }
1791 anyhow::bail!(
1792 "service helper {} needs interpreter `{interpreter}`, which is not present. \
1793 The hardened distroless runtime image ships no shell or interpreters — supply a \
1794 statically-linked ELF helper, or run a runtime variant that includes `{interpreter}`.",
1795 binary.display()
1796 )
1797}
1798
1799fn build_service_spec(
1800 config_dir: &Path,
1801 service_id: &str,
1802 binary: &str,
1803 args: &[String],
1804 env: &BTreeMap<String, String>,
1805) -> anyhow::Result<supervisor::ServiceSpec> {
1806 let explicit = if looks_like_path(binary) {
1807 let path = Path::new(binary);
1808 Some(if path.is_absolute() {
1809 path.to_path_buf()
1810 } else {
1811 config_dir.join(path)
1812 })
1813 } else {
1814 None
1815 };
1816 let path = crate::bin_resolver::resolve_binary(
1817 binary,
1818 &crate::bin_resolver::ResolveCtx {
1819 config_dir: config_dir.to_path_buf(),
1820 explicit_path: explicit,
1821 },
1822 )?;
1823 preflight_interpreter(&path)?;
1824 let mut argv = vec![path.to_string_lossy().to_string()];
1825 argv.extend(args.iter().cloned());
1826 Ok(supervisor::ServiceSpec {
1827 id: supervisor::ServiceId::new(service_id)?,
1828 argv,
1829 cwd: None,
1830 env: env.clone(),
1831 })
1832}
1833
1834#[derive(serde::Serialize)]
1835struct DemoEndpoints {
1836 tenant: String,
1837 team: String,
1838 public_base_url: Option<String>,
1839 nats_url: Option<String>,
1840 gateway_listen_addr: String,
1841 gateway_port: u16,
1842}
1843
1844fn read_last_lines(path: &Path, count: usize) -> anyhow::Result<String> {
1845 if !path.exists() {
1846 return Err(anyhow::anyhow!(
1847 "Log file does not exist: {}",
1848 path.display()
1849 ));
1850 }
1851 let contents = std::fs::read_to_string(path)?;
1852 let mut lines: Vec<&str> = contents.lines().collect();
1853 if lines.len() > count {
1854 lines = lines.split_off(lines.len() - count);
1855 }
1856 Ok(lines.join("\n"))
1857}
1858
1859#[cfg(test)]
1860mod tests {
1861 use super::*;
1862 use crate::discovery::{DetectedDomains, DetectedProvider, DiscoveryResult, ProviderIdSource};
1863 use crate::domains::Domain;
1864 use crate::secrets_gate;
1865 use greentic_types::{
1866 ExtensionInline, ExtensionRef, Flow, FlowId, FlowKind, PackFlowEntry, PackId, PackKind,
1867 PackManifest, PackSignatures,
1868 };
1869 use semver::Version;
1870 use serde_json::json;
1871 use std::fs;
1872 use std::fs::File;
1873 use std::io::Write;
1874 use std::net::{Ipv4Addr, TcpListener};
1875 use tempfile::tempdir;
1876 use zip::ZipWriter;
1877 use zip::write::FileOptions;
1878
1879 #[test]
1880 fn helper_types_cover_stop_describe_and_service_tracker_manifest_updates() -> anyhow::Result<()>
1881 {
1882 let handles = ForegroundRuntimeHandles::default();
1883 handles.stop()?;
1884
1885 let info = StartupInfo {
1886 bundle_name: "demo-bundle".to_string(),
1887 http_url: Some("http://127.0.0.1:8080".to_string()),
1888 static_route_urls: Vec::new(),
1889 public_url: Some("https://demo.example".to_string()),
1890 channels: vec!["webchat".to_string()],
1891 mode: "embedded runner".to_string(),
1892 webhook_results: vec![("slack".to_string(), "ok".to_string())],
1893 subscription_results: vec![("teams".to_string(), "synced".to_string())],
1894 };
1895 info.print();
1896
1897 let empty = ServiceSummary::new("gateway", None);
1898 assert_eq!(empty.describe(), "gateway (pid=-)");
1899
1900 let mut detailed =
1901 ServiceSummary::with_details("egress", Some(42), vec!["log=/tmp/x".to_string()]);
1902 detailed.add_detail("mode=embedded");
1903 assert!(detailed.describe().contains("pid=42"));
1904 assert!(detailed.describe().contains("mode=embedded"));
1905
1906 let dir = tempdir()?;
1907 let paths = RuntimePaths::new(dir.path().join("state"), "demo", "default");
1908 let mut tracker = ServiceTracker::new(&paths, Some(dir.path()))?;
1909 tracker.record_with_log("gateway", "gateway", Some(&dir.path().join("gateway.log")))?;
1910 tracker.record(crate::runtime_state::ServiceEntry::new(
1911 "egress",
1912 "egress",
1913 Some(&dir.path().join("egress.log")),
1914 ))?;
1915
1916 let manifest = read_service_manifest(&paths)?.expect("manifest");
1917 assert_eq!(manifest.services.len(), 2);
1918 let expected_log_dir = dir.path().display().to_string();
1919 assert_eq!(manifest.log_dir.as_deref(), Some(expected_log_dir.as_str()));
1920 Ok(())
1921 }
1922
1923 #[test]
1924 fn runtime_helpers_cover_pid_paths_logs_env_and_tail_reads() -> anyhow::Result<()> {
1925 let dir = tempdir()?;
1926 let state_dir = dir.path().join("state");
1927 let log_dir = dir.path().join("logs");
1928 let paths = RuntimePaths::new(&state_dir, "demo", "default");
1929
1930 assert!(!looks_like_path("runner"));
1931 assert!(looks_like_path("bin/runner"));
1932 assert!(looks_like_path("/bin/runner"));
1933
1934 let mut restart = BTreeSet::new();
1935 restart.insert("gateway".to_string());
1936 assert!(should_restart(&restart, "gateway"));
1937 restart.insert("all".to_string());
1938 assert!(should_restart(&restart, "egress"));
1939
1940 let pid_path = paths.pid_path("gateway");
1941 assert_eq!(read_pid(&pid_path)?, None);
1942 fs::create_dir_all(pid_path.parent().expect("parent"))?;
1943 fs::write(&pid_path, "")?;
1944 assert_eq!(read_pid(&pid_path)?, None);
1945 fs::write(&pid_path, "1234\n")?;
1946 assert_eq!(read_pid(&pid_path)?, Some(1234));
1947
1948 let tenant_log = tenant_log_path(&log_dir, "gateway", "demo", "default")?;
1949 assert!(tenant_log.exists());
1950 fs::write(log_dir.join("gateway-demo.log"), "service log")?;
1951 assert_eq!(
1952 select_log_path(&log_dir, "gateway", "demo", &tenant_log),
1953 log_dir.join("gateway-demo.log")
1954 );
1955
1956 let manifest_log_dir = dir.path().join("custom-logs");
1957 let manifest = crate::runtime_state::ServiceManifest {
1958 log_dir: Some(manifest_log_dir.display().to_string()),
1959 services: Vec::new(),
1960 };
1961 persist_service_manifest(&paths, &manifest)?;
1962 assert_eq!(
1963 resolve_manifest_log_dir(&state_dir, "demo", "default", &log_dir)?,
1964 manifest_log_dir
1965 );
1966
1967 let env = build_env("demo", "default", Some("nats://127.0.0.1:4222"), None);
1968 assert_eq!(env.get("GREENTIC_TENANT").map(String::as_str), Some("demo"));
1969 assert_eq!(
1970 env.get("GREENTIC_TEAM").map(String::as_str),
1971 Some("default")
1972 );
1973 assert_eq!(
1974 env.get("NATS_URL").map(String::as_str),
1975 Some("nats://127.0.0.1:4222")
1976 );
1977
1978 let marker = nats_started_marker(&paths);
1979 mark_nats_started(&paths)?;
1980 assert!(marker.exists());
1981 stop_started_nats(&paths, &state_dir)?;
1982 assert!(!marker.exists());
1983
1984 let missing = read_last_lines(&dir.path().join("missing.log"), 10).unwrap_err();
1985 assert!(missing.to_string().contains("Log file does not exist"));
1986
1987 let tail_path = dir.path().join("tail.log");
1988 fs::write(&tail_path, "one\ntwo\nthree\nfour\n")?;
1989 assert_eq!(read_last_lines(&tail_path, 2)?, "three\nfour");
1990 Ok(())
1991 }
1992
1993 #[test]
1994 fn build_service_spec_resolves_paths_and_sets_arguments() -> anyhow::Result<()> {
1995 let dir = tempdir()?;
1996 let binary = dir.path().join("runner.sh");
1997 fs::write(&binary, "#!/bin/sh\n")?;
1998
1999 let env = BTreeMap::from([("GREENTIC_TENANT".to_string(), "demo".to_string())]);
2000 let relative = build_service_spec(
2001 dir.path(),
2002 "runner",
2003 "./runner.sh",
2004 &["--flag".to_string()],
2005 &env,
2006 )?;
2007 assert_eq!(relative.id.as_str(), "runner");
2008 assert!(relative.argv[0].ends_with("runner.sh"));
2009 assert_eq!(relative.argv[1], "--flag");
2010 assert_eq!(
2011 relative.env.get("GREENTIC_TENANT").map(String::as_str),
2012 Some("demo")
2013 );
2014
2015 let absolute = build_service_spec(
2016 dir.path(),
2017 "runner_abs",
2018 &binary.display().to_string(),
2019 &Vec::new(),
2020 &env,
2021 )?;
2022 assert_eq!(absolute.argv[0], binary.display().to_string());
2023 Ok(())
2024 }
2025
2026 #[cfg(unix)]
2027 #[test]
2028 fn build_service_spec_rejects_script_with_missing_interpreter() {
2029 use std::os::unix::fs::PermissionsExt;
2030 let dir = tempdir().expect("tempdir");
2031 let helper = dir.path().join("runner.sh");
2032 fs::write(&helper, "#!/nonexistent/xyz-interp\necho hi\n").expect("write helper");
2033 fs::set_permissions(&helper, fs::Permissions::from_mode(0o755)).expect("chmod");
2034 let err = build_service_spec(dir.path(), "runner", "./runner.sh", &[], &BTreeMap::new())
2035 .expect_err("missing interpreter must error");
2036 let msg = format!("{err:#}");
2037 assert!(msg.contains("interpreter"), "got: {msg}");
2038 assert!(msg.contains("distroless"), "got: {msg}");
2039 }
2040
2041 #[test]
2042 fn tenant_log_path_creates_file() -> anyhow::Result<()> {
2043 let dir = tempdir()?;
2044 let path = tenant_log_path(dir.path(), "messaging", "demo", "default")?;
2045 assert!(path.exists());
2046 Ok(())
2047 }
2048
2049 #[test]
2050 fn select_log_path_prefers_service_log_when_present() -> anyhow::Result<()> {
2051 let dir = tempdir()?;
2052 let tenant_path = tenant_log_path(dir.path(), "messaging", "demo", "default")?;
2053 let service_path = dir.path().join("messaging.log");
2054 fs::write(&service_path, "other")?;
2055 let selected = select_log_path(dir.path(), "messaging", "demo", &tenant_path);
2056 assert_eq!(selected, service_path);
2057 Ok(())
2058 }
2059
2060 #[test]
2061 fn demo_logs_runtime_reads_operator_log() -> anyhow::Result<()> {
2062 let dir = tempdir()?;
2063 let log = dir.path().join("operator.log");
2064 fs::write(&log, "operator ready")?;
2065 demo_logs_runtime(dir.path(), dir.path(), "demo", "default", "operator", false)?;
2066 Ok(())
2067 }
2068
2069 #[test]
2070 fn ingress_detection_and_runtime_noop_paths_cover_remaining_helpers() -> anyhow::Result<()> {
2071 let dir = tempdir()?;
2072 let discovery = DiscoveryResult {
2073 domains: DetectedDomains {
2074 messaging: true,
2075 events: true,
2076 oauth: false,
2077 },
2078 providers: vec![
2079 DetectedProvider {
2080 provider_id: "messaging-missing".to_string(),
2081 domain: "messaging".to_string(),
2082 pack_path: dir
2083 .path()
2084 .join("providers/messaging/messaging-missing.gtpack"),
2085 id_source: ProviderIdSource::Filename,
2086 },
2087 DetectedProvider {
2088 provider_id: "events-fallback".to_string(),
2089 domain: "events".to_string(),
2090 pack_path: dir.path().join("providers/events/events-fallback.gtpack"),
2091 id_source: ProviderIdSource::Filename,
2092 },
2093 ],
2094 };
2095 let secrets_handle =
2096 secrets_gate::resolve_secrets_manager(dir.path(), "demo", Some("default"))?;
2097 let runner_host = DemoRunnerHost::new(
2098 dir.path().to_path_buf(),
2099 &discovery,
2100 None,
2101 secrets_handle,
2102 false,
2103 )?;
2104
2105 assert_eq!(parse_domain_name("messaging"), Some(Domain::Messaging));
2106 assert_eq!(parse_domain_name("events"), Some(Domain::Events));
2107 assert_eq!(parse_domain_name("oauth"), Some(Domain::OAuth));
2108 assert_eq!(parse_domain_name("secrets"), Some(Domain::Secrets));
2109 assert_eq!(parse_domain_name("unknown"), None);
2110
2111 let detected = detect_http_ingress_domains(&discovery, &runner_host);
2112 assert_eq!(detected, vec![Domain::Events]);
2113
2114 let config = DemoConfig::default();
2115 assert!(
2116 start_http_ingress_server(
2117 &config,
2118 &[],
2119 Arc::new(runner_host.clone()),
2120 false,
2121 None,
2122 crate::notifier::NotifierConfig::default(),
2123 )?
2124 .is_none()
2125 );
2126
2127 let invalid_config = DemoConfig {
2128 services: crate::config::DemoServicesConfig {
2129 gateway: crate::config::DemoGatewayConfig {
2130 listen_addr: "not an addr".to_string(),
2131 ..Default::default()
2132 },
2133 ..Default::default()
2134 },
2135 ..Default::default()
2136 };
2137 let bind_err = match start_http_ingress_server(
2138 &invalid_config,
2139 &[Domain::Events],
2140 Arc::new(runner_host),
2141 false,
2142 None,
2143 crate::notifier::NotifierConfig::default(),
2144 ) {
2145 Ok(_) => panic!("expected invalid bind address to fail"),
2146 Err(err) => err,
2147 };
2148 assert!(
2149 bind_err
2150 .to_string()
2151 .contains("invalid gateway listen address")
2152 );
2153
2154 demo_status_runtime(dir.path(), "demo", "default", false)?;
2155 demo_down_runtime(dir.path(), "demo", "default", false)?;
2156
2157 let paths = RuntimePaths::new(dir.path(), "demo", "default");
2158 persist_service_manifest(
2159 &paths,
2160 &crate::runtime_state::ServiceManifest {
2161 log_dir: None,
2162 services: Vec::new(),
2163 },
2164 )?;
2165 demo_down_runtime(dir.path(), "demo", "default", false)?;
2166 demo_down_runtime(dir.path(), "demo", "default", true)?;
2167 Ok(())
2168 }
2169
2170 #[test]
2171 fn demo_up_runs_in_embedded_mode_without_supervised_services() -> anyhow::Result<()> {
2172 let dir = tempdir()?;
2173 let bundle_root = dir.path().join("bundle");
2174 let log_dir = dir.path().join("logs");
2175 fs::create_dir_all(&bundle_root)?;
2176 fs::create_dir_all(&log_dir)?;
2177
2178 demo_up(
2179 &bundle_root,
2180 "demo",
2181 Some("default"),
2182 None,
2183 NatsMode::Off,
2184 false,
2185 None,
2186 None,
2187 &log_dir,
2188 true,
2189 )?;
2190
2191 let paths = RuntimePaths::new(bundle_root.join("state"), "demo", "default");
2192 let manifest = read_service_manifest(&paths)?.expect("service manifest");
2193 assert!(manifest.services.is_empty());
2194 assert_eq!(
2195 manifest.log_dir.as_deref(),
2196 Some(log_dir.display().to_string().as_str())
2197 );
2198 Ok(())
2199 }
2200
2201 #[test]
2202 fn demo_up_services_in_embedded_mode_writes_runtime_artifacts() -> anyhow::Result<()> {
2203 let dir = tempdir()?;
2204 let bundle_root = dir.path().join("bundle");
2205 let config_path = bundle_root.join("greentic-demo.yaml");
2206 let log_dir = bundle_root.join("logs");
2207 fs::create_dir_all(&bundle_root)?;
2208 fs::create_dir_all(&log_dir)?;
2209 fs::write(&config_path, "tenant: demo\nteam: default\n")?;
2210
2211 let config = DemoConfig {
2212 tenant: "demo".to_string(),
2213 team: "default".to_string(),
2214 services: crate::config::DemoServicesConfig {
2215 nats: crate::config::DemoNatsConfig {
2216 enabled: false,
2217 ..Default::default()
2218 },
2219 ..Default::default()
2220 },
2221 providers: None,
2222 };
2223 let static_routes = BundleStaticRoutesInspection::default();
2224 let restart = BTreeSet::new();
2225
2226 let handles = demo_up_services(
2227 &config_path,
2228 &config,
2229 &static_routes,
2230 None,
2231 None,
2232 None,
2233 None,
2234 &restart,
2235 None,
2236 &log_dir,
2237 true,
2238 false,
2239 )?;
2240 assert!(handles.ingress_server.is_none());
2241
2242 let paths = RuntimePaths::new(bundle_root.join("state"), "demo", "default");
2243 let runtime_root = paths.runtime_root();
2244 assert!(runtime_root.join("startup_contract.json").exists());
2245 assert!(runtime_root.join("endpoints.json").exists());
2246
2247 let startup_contract: StartupContract =
2248 serde_json::from_slice(&fs::read(runtime_root.join("startup_contract.json"))?)?;
2249 assert!(!startup_contract.public_http_enabled);
2250 assert!(!startup_contract.static_routes_enabled);
2251
2252 let endpoints: serde_json::Value =
2253 serde_json::from_slice(&fs::read(runtime_root.join("endpoints.json"))?)?;
2254 assert_eq!(endpoints["tenant"], "demo");
2255 assert_eq!(endpoints["team"], "default");
2256 assert!(endpoints["public_base_url"].is_null());
2257 assert!(endpoints["nats_url"].is_null());
2258 Ok(())
2259 }
2260
2261 #[test]
2262 fn demo_up_services_derives_local_public_base_url_from_actual_ingress_port()
2263 -> anyhow::Result<()> {
2264 let dir = tempdir()?;
2265 let bundle_root = dir.path().join("bundle");
2266 let config_path = bundle_root.join("greentic-demo.yaml");
2267 let log_dir = bundle_root.join("logs");
2268 fs::create_dir_all(&bundle_root)?;
2269 fs::create_dir_all(&log_dir)?;
2270 fs::write(&config_path, "tenant: demo\nteam: default\n")?;
2271 write_default_app_pack(&bundle_root.join("packs/default.gtpack"))?;
2272 let static_pack_path = bundle_root.join("providers/messaging/messaging-webchat-gui.gtpack");
2273 write_static_route_provider_pack(&static_pack_path)?;
2274
2275 let port_holder = bind_available_nonterminal_port()?;
2276 let requested_port = port_holder.local_addr()?.port();
2277 let static_routes = BundleStaticRoutesInspection {
2278 pack_paths: vec![static_pack_path],
2279 };
2280 let mut config = DemoConfig {
2281 tenant: "demo".to_string(),
2282 team: "default".to_string(),
2283 services: crate::config::DemoServicesConfig {
2284 nats: crate::config::DemoNatsConfig {
2285 enabled: false,
2286 ..Default::default()
2287 },
2288 ..Default::default()
2289 },
2290 providers: None,
2291 };
2292 config.services.gateway.listen_addr = "127.0.0.1".to_string();
2293 config.services.gateway.port = requested_port;
2294 let restart = BTreeSet::new();
2295
2296 let handles = demo_up_services(
2297 &config_path,
2298 &config,
2299 &static_routes,
2300 None,
2301 None,
2302 None,
2303 None,
2304 &restart,
2305 None,
2306 &log_dir,
2307 false,
2308 true,
2309 )?;
2310 let actual_port = handles
2311 .ingress_server
2312 .as_ref()
2313 .expect("ingress server")
2314 .actual_port;
2315 assert_ne!(actual_port, requested_port);
2316
2317 let runtime_root =
2318 RuntimePaths::new(bundle_root.join("state"), "demo", "default").runtime_root();
2319 let expected_public_base_url = format!("http://127.0.0.1:{actual_port}");
2320 let startup_contract: StartupContract =
2321 serde_json::from_slice(&fs::read(runtime_root.join("startup_contract.json"))?)?;
2322 assert_eq!(
2323 startup_contract.public_base_url.as_deref(),
2324 Some(expected_public_base_url.as_str())
2325 );
2326
2327 let endpoints: serde_json::Value =
2328 serde_json::from_slice(&fs::read(runtime_root.join("endpoints.json"))?)?;
2329 assert_eq!(endpoints["public_base_url"], expected_public_base_url);
2330 assert_eq!(endpoints["gateway_port"], actual_port);
2331 assert_ne!(endpoints["gateway_port"], requested_port);
2332
2333 drop(port_holder);
2334 handles.stop()?;
2335 Ok(())
2336 }
2337
2338 #[test]
2339 fn supervised_spawn_helpers_cover_running_and_summary_paths() -> anyhow::Result<()> {
2340 let dir = tempdir()?;
2341 let state_dir = dir.path().join("state");
2342 let log_dir = dir.path().join("logs");
2343 fs::create_dir_all(&log_dir)?;
2344 let paths = RuntimePaths::new(&state_dir, "demo", "default");
2345 let mut tracker = ServiceTracker::new(&paths, Some(&log_dir))?;
2346 let restart = BTreeSet::new();
2347
2348 let spec = supervisor::ServiceSpec {
2349 id: supervisor::ServiceId::new("svc")?,
2350 argv: vec![
2351 "/bin/sh".to_string(),
2352 "-c".to_string(),
2353 "sleep 5".to_string(),
2354 ],
2355 cwd: None,
2356 env: BTreeMap::new(),
2357 };
2358
2359 log_service_spec_debug("svc", "worker", &spec, "demo", "default", true);
2360 let handle = spawn_if_needed(&paths, &spec, &restart, None)?.expect("spawned");
2361 assert!(supervisor::is_running(handle.pid));
2362 assert!(spawn_if_needed(&paths, &spec, &restart, None)?.is_none());
2363 demo_status_runtime(&state_dir, "demo", "default", true)?;
2364
2365 let spec2 = supervisor::ServiceSpec {
2366 id: supervisor::ServiceId::new("svc2")?,
2367 argv: vec![
2368 "/bin/sh".to_string(),
2369 "-c".to_string(),
2370 "sleep 5".to_string(),
2371 ],
2372 cwd: None,
2373 env: BTreeMap::from([("GREENTIC_TENANT".to_string(), "demo".to_string())]),
2374 };
2375 let summary = spawn_supervised_service(
2376 "svc2",
2377 "worker",
2378 &spec2,
2379 &log_dir,
2380 &paths,
2381 &restart,
2382 &mut tracker,
2383 "demo",
2384 "default",
2385 true,
2386 )?;
2387 assert!(summary.describe().contains("log="));
2388 print_service_summary(&[summary]);
2389
2390 supervisor::stop_service(&paths, &spec.id, 100)?;
2391 supervisor::stop_service(&paths, &spec2.id, 100)?;
2392 Ok(())
2393 }
2394
2395 fn bind_available_nonterminal_port() -> anyhow::Result<TcpListener> {
2396 for _ in 0..32 {
2397 let listener = TcpListener::bind((Ipv4Addr::LOCALHOST, 0))?;
2398 if listener.local_addr()?.port() <= u16::MAX - 10 {
2399 return Ok(listener);
2400 }
2401 }
2402 anyhow::bail!("failed to allocate test port with fallback range")
2403 }
2404
2405 fn write_static_route_provider_pack(path: &Path) -> anyhow::Result<()> {
2406 if let Some(parent) = path.parent() {
2407 fs::create_dir_all(parent)?;
2408 }
2409 let mut extensions = BTreeMap::new();
2410 extensions.insert(
2411 crate::static_routes::EXT_STATIC_ROUTES_V1.to_string(),
2412 ExtensionRef {
2413 kind: crate::static_routes::EXT_STATIC_ROUTES_V1.to_string(),
2414 version: "1.0.0".to_string(),
2415 digest: None,
2416 location: None,
2417 inline: Some(ExtensionInline::Other(json!({
2418 "schema_version": 1,
2419 "routes": [{
2420 "id": "webchat-gui",
2421 "public_path": "/v1/web/webchat/{tenant}",
2422 "source_root": "assets/webchat",
2423 "index_file": "index.html",
2424 "spa_fallback": "index.html",
2425 "tenant": true
2426 }]
2427 }))),
2428 },
2429 );
2430 let manifest = PackManifest {
2431 schema_version: "pack-v1".to_string(),
2432 pack_id: PackId::new("messaging-webchat-gui")?,
2433 name: Some("messaging-webchat-gui".to_string()),
2434 version: Version::parse("0.1.0")?,
2435 kind: PackKind::Provider,
2436 publisher: "demo".to_string(),
2437 components: Vec::new(),
2438 flows: Vec::new(),
2439 dependencies: Vec::new(),
2440 capabilities: Vec::new(),
2441 secret_requirements: Vec::new(),
2442 signatures: PackSignatures::default(),
2443 bootstrap: None,
2444 extensions: Some(extensions),
2445 };
2446 write_pack_with_manifest(path, manifest, &[("assets/webchat/index.html", b"webchat")])
2447 }
2448
2449 fn write_default_app_pack(path: &Path) -> anyhow::Result<()> {
2450 if let Some(parent) = path.parent() {
2451 fs::create_dir_all(parent)?;
2452 }
2453 let flow = Flow {
2454 schema_version: "flow-v1".to_string(),
2455 id: FlowId::new("default")?,
2456 kind: FlowKind::Messaging,
2457 entrypoints: BTreeMap::from([("default".to_string(), serde_json::Value::Null)]),
2458 nodes: Default::default(),
2459 metadata: Default::default(),
2460 };
2461 let manifest = PackManifest {
2462 schema_version: "pack-v1".to_string(),
2463 pack_id: PackId::new("demo-app")?,
2464 name: Some("demo-app".to_string()),
2465 version: Version::parse("0.1.0")?,
2466 kind: PackKind::Application,
2467 publisher: "demo".to_string(),
2468 components: Vec::new(),
2469 flows: vec![PackFlowEntry {
2470 id: FlowId::new("default")?,
2471 kind: FlowKind::Messaging,
2472 flow,
2473 tags: vec!["default".to_string()],
2474 entrypoints: vec!["default".to_string()],
2475 }],
2476 dependencies: Vec::new(),
2477 capabilities: Vec::new(),
2478 secret_requirements: Vec::new(),
2479 signatures: PackSignatures::default(),
2480 bootstrap: None,
2481 extensions: None,
2482 };
2483 write_pack_with_manifest(path, manifest, &[])
2484 }
2485
2486 fn write_pack_with_manifest(
2487 path: &Path,
2488 manifest: PackManifest,
2489 files: &[(&str, &[u8])],
2490 ) -> anyhow::Result<()> {
2491 let bytes = greentic_types::encode_pack_manifest(&manifest)?;
2492 let file = File::create(path)?;
2493 let mut zip = ZipWriter::new(file);
2494 zip.start_file("manifest.cbor", FileOptions::<()>::default())?;
2495 zip.write_all(&bytes)?;
2496 for (name, contents) in files {
2497 zip.start_file(*name, FileOptions::<()>::default())?;
2498 zip.write_all(contents)?;
2499 }
2500 zip.finish()?;
2501 Ok(())
2502 }
2503}