Skip to main content

greentic_start/
runtime.rs

1#![allow(dead_code)]
2
3use std::path::{Path, PathBuf};
4use std::sync::Arc;
5use std::thread;
6use std::time::Duration;
7
8use std::collections::{BTreeMap, BTreeSet};
9
10use crate::domains::Domain;
11use crate::http_ingress::{HttpIngressConfig, HttpIngressServer};
12use crate::operator_log;
13use crate::runner_host::DemoRunnerHost;
14use crate::runtime_state::{
15    RuntimePaths, persist_service_manifest, read_service_manifest, remove_service_manifest,
16    write_json,
17};
18use crate::secrets_gate;
19use crate::services;
20use crate::startup_contract::{
21    BundleStaticRoutesInspection, RuntimeConfig, RuntimePublicBaseUrl, RuntimePublicBaseUrlSource,
22    StartupContract, StartupContractInput,
23};
24use crate::supervisor;
25use anyhow::Context;
26
27use crate::cloudflared::{self, CloudflaredConfig};
28use crate::config::{DemoConfig, DemoSubscriptionsMode};
29use crate::ngrok::{self, NgrokConfig};
30
31use crate::subscriptions_universal::{
32    build_runner, ensure_desired_subscriptions, scheduler::Scheduler, service::SubscriptionService,
33    state_root, store::SubscriptionStore,
34};
35
36#[derive(Default)]
37pub struct ForegroundRuntimeHandles {
38    pub ingress_server: Option<HttpIngressServer>,
39}
40
41impl ForegroundRuntimeHandles {
42    pub fn stop(mut self) -> anyhow::Result<()> {
43        if let Some(server) = self.ingress_server.take() {
44            server.stop()?;
45        }
46        Ok(())
47    }
48}
49
50struct ServiceSummary {
51    id: String,
52    pid: Option<u32>,
53    details: Vec<String>,
54}
55
56impl ServiceSummary {
57    fn new(id: impl Into<String>, pid: Option<u32>) -> Self {
58        Self {
59            id: id.into(),
60            pid,
61            details: Vec::new(),
62        }
63    }
64
65    fn with_details(id: impl Into<String>, pid: Option<u32>, details: Vec<String>) -> Self {
66        Self {
67            id: id.into(),
68            pid,
69            details,
70        }
71    }
72
73    fn add_detail(&mut self, detail: impl Into<String>) {
74        self.details.push(detail.into());
75    }
76
77    fn describe(&self) -> String {
78        let pid_str = self
79            .pid
80            .map(|pid| pid.to_string())
81            .unwrap_or_else(|| "-".to_string());
82        if self.details.is_empty() {
83            format!("{} (pid={})", self.id, pid_str)
84        } else {
85            format!(
86                "{} (pid={}) [{}]",
87                self.id,
88                pid_str,
89                self.details.join(" | ")
90            )
91        }
92    }
93}
94
95struct ServiceTracker<'a> {
96    paths: &'a RuntimePaths,
97    manifest: crate::runtime_state::ServiceManifest,
98}
99
100impl<'a> ServiceTracker<'a> {
101    fn new(paths: &'a RuntimePaths, log_dir: Option<&Path>) -> anyhow::Result<Self> {
102        remove_service_manifest(paths)?;
103        let mut manifest = crate::runtime_state::ServiceManifest::default();
104        if let Some(dir) = log_dir {
105            manifest.log_dir = Some(dir.display().to_string());
106        }
107        persist_service_manifest(paths, &manifest)?;
108        Ok(Self { paths, manifest })
109    }
110
111    fn record(&mut self, entry: crate::runtime_state::ServiceEntry) -> anyhow::Result<()> {
112        self.manifest.services.push(entry);
113        persist_service_manifest(self.paths, &self.manifest)
114    }
115
116    fn record_with_log(
117        &mut self,
118        id: impl Into<String>,
119        kind: impl Into<String>,
120        log_path: Option<&Path>,
121    ) -> anyhow::Result<()> {
122        let entry = crate::runtime_state::ServiceEntry::new(id, kind, log_path);
123        self.record(entry)
124    }
125}
126
127fn log_service_spec_debug(
128    service_id: &str,
129    kind: &str,
130    spec: &supervisor::ServiceSpec,
131    tenant: &str,
132    team: &str,
133    debug_enabled: bool,
134) {
135    if !debug_enabled {
136        return;
137    }
138    let cwd = spec
139        .cwd
140        .as_ref()
141        .map(|path| path.display().to_string())
142        .unwrap_or_else(|| "<unset>".to_string());
143    let argv = spec.argv.join(" ");
144    let env_pairs = spec
145        .env
146        .iter()
147        .map(|(key, value)| format!("{}={}", key, value))
148        .collect::<Vec<_>>()
149        .join(" ");
150    let env_display = if env_pairs.is_empty() {
151        "<empty>".to_string()
152    } else {
153        env_pairs
154    };
155    operator_log::debug(
156        module_path!(),
157        format!(
158            "[demo dev] service {} kind={} tenant={} team={} cwd={} argv=\"{}\" env={}",
159            service_id, kind, tenant, team, cwd, argv, env_display
160        ),
161    );
162}
163
164#[allow(clippy::too_many_arguments)]
165fn spawn_supervised_service(
166    service_id: &str,
167    kind: &str,
168    spec: &supervisor::ServiceSpec,
169    log_dir: &Path,
170    paths: &RuntimePaths,
171    restart: &BTreeSet<String>,
172    tracker: &mut ServiceTracker,
173    tenant: &str,
174    team: &str,
175    debug_enabled: bool,
176) -> anyhow::Result<ServiceSummary> {
177    let log_path = operator_log::reserve_service_log(log_dir, service_id)?;
178    log_service_spec_debug(service_id, kind, spec, tenant, team, debug_enabled);
179    let handle = spawn_if_needed(paths, spec, restart, Some(log_path.clone()))?;
180    let pid = if let Some(handle) = &handle {
181        Some(handle.pid)
182    } else {
183        read_pid(&paths.pid_path(service_id))?
184    };
185    let actual_log = handle
186        .as_ref()
187        .map(|handle| handle.log_path.clone())
188        .unwrap_or(log_path.clone());
189    tracker.record_with_log(service_id, kind, Some(&actual_log))?;
190    operator_log::info(
191        module_path!(),
192        format!(
193            "service {} ready pid={:?} log={}",
194            service_id,
195            pid,
196            actual_log.display()
197        ),
198    );
199    let mut summary = ServiceSummary::new(service_id, pid);
200    summary.add_detail(format!("log={}", actual_log.display()));
201    Ok(summary)
202}
203
204fn print_service_summary(summaries: &[ServiceSummary]) {
205    if summaries.is_empty() {
206        return;
207    }
208    println!(
209        "\n{}",
210        crate::operator_i18n::tr("demo.runtime.started_services", "Started services:")
211    );
212    for summary in summaries {
213        println!("{}", summary.describe());
214    }
215}
216
217#[allow(clippy::too_many_arguments)]
218fn spawn_embedded_messaging(
219    bundle_root: &Path,
220    tenant: &str,
221    team: &str,
222    env: BTreeMap<String, String>,
223    log_dir: &Path,
224    restart: &BTreeSet<String>,
225    tracker: &mut ServiceTracker,
226    debug_enabled: bool,
227) -> anyhow::Result<ServiceSummary> {
228    let exe = std::env::current_exe()?;
229    let mut args = vec![
230        "dev".to_string(),
231        "embedded".to_string(),
232        "--project-root".to_string(),
233        bundle_root.display().to_string(),
234        "--no-nats".to_string(),
235    ];
236    let mut argv = vec![exe.to_string_lossy().to_string()];
237    argv.append(&mut args);
238
239    let spec = supervisor::ServiceSpec {
240        id: supervisor::ServiceId::new("messaging")?,
241        argv,
242        cwd: None,
243        env,
244    };
245
246    let mut summary = spawn_supervised_service(
247        "messaging",
248        "messaging",
249        &spec,
250        log_dir,
251        tracker.paths,
252        restart,
253        tracker,
254        tenant,
255        team,
256        debug_enabled,
257    )?;
258    summary.add_detail(format!("tenant={tenant} team={team}"));
259    summary.add_detail(format!(
260        "cmd=dev embedded --project-root {}",
261        bundle_root.display()
262    ));
263    Ok(summary)
264}
265
266#[allow(clippy::too_many_arguments)]
267fn spawn_universal_subscriptions_service(
268    bundle_root: &Path,
269    config: &DemoConfig,
270    tenant: &str,
271    team: &str,
272    runner_binary: Option<PathBuf>,
273    tracker: &mut ServiceTracker,
274    log_dir: &Path,
275    debug_enabled: bool,
276) -> anyhow::Result<ServiceSummary> {
277    let team_override = if team.trim().is_empty() {
278        None
279    } else {
280        Some(team.to_string())
281    };
282    let log_path = operator_log::reserve_service_log(log_dir, "subscriptions")
283        .with_context(|| "unable to open subscriptions log file")?;
284    tracker.record_with_log("subscriptions-universal", "subscriptions", Some(&log_path))?;
285
286    let desired = &config.services.subscriptions.universal.desired;
287    let (runner_host, context) =
288        build_runner(bundle_root, tenant, team_override.clone(), runner_binary)?;
289    let store = SubscriptionStore::new(state_root(bundle_root));
290    let scheduler = Scheduler::new(SubscriptionService::new(runner_host, context), store);
291
292    ensure_desired_subscriptions(
293        bundle_root,
294        tenant,
295        team_override.clone(),
296        desired,
297        &scheduler,
298    )?;
299
300    let renew_interval_secs = config
301        .services
302        .subscriptions
303        .universal
304        .renew_interval_seconds
305        .max(1);
306    let renew_skew_secs = config
307        .services
308        .subscriptions
309        .universal
310        .renew_skew_minutes
311        .max(1)
312        .saturating_mul(60);
313    let interval = Duration::from_secs(renew_interval_secs);
314    let skew = Duration::from_secs(renew_skew_secs);
315
316    let scheduler_handle = scheduler;
317    thread::Builder::new()
318        .name("subscriptions-universal".to_string())
319        .spawn(move || {
320            operator_log::info(
321                module_path!(),
322                format!(
323                    "subscriptions-universal scheduler running interval={}s skew={}s",
324                    renew_interval_secs, renew_skew_secs
325                ),
326            );
327            loop {
328                std::thread::sleep(interval);
329                if let Err(err) = scheduler_handle.renew_due(skew) {
330                    operator_log::error(
331                        module_path!(),
332                        format!("subscriptions-universal renew failed err={}", err),
333                    );
334                }
335            }
336        })?;
337
338    let mut summary = ServiceSummary::new("subscriptions-universal", None);
339    summary.add_detail(format!("log={}", log_path.display()));
340    summary.add_detail(format!("renew_interval={}s", renew_interval_secs));
341    summary.add_detail("mode=universal".to_string());
342    if debug_enabled {
343        operator_log::debug(
344            module_path!(),
345            format!(
346                "[demo dev] tenant={} team={} universal subscriptions running",
347                tenant, team
348            ),
349        );
350    }
351    Ok(summary)
352}
353
354fn spawn_if_needed(
355    paths: &RuntimePaths,
356    spec: &supervisor::ServiceSpec,
357    restart: &BTreeSet<String>,
358    log_path_override: Option<PathBuf>,
359) -> anyhow::Result<Option<supervisor::ServiceHandle>> {
360    if should_restart(restart, spec.id.as_str()) {
361        let _ = supervisor::stop_service(paths, &spec.id, 2_000);
362    }
363
364    let pid_path = paths.pid_path(spec.id.as_str());
365    if let Some(pid) = read_pid(&pid_path)?
366        && supervisor::is_running(pid)
367    {
368        println!(
369            "{}",
370            crate::operator_i18n::trf(
371                "demo.runtime.service_already_running",
372                "{}: already running (pid={})",
373                &[spec.id.as_str(), &pid.to_string()]
374            )
375        );
376        return Ok(None);
377    }
378    let handle = supervisor::spawn_service(paths, spec.clone(), log_path_override.clone())?;
379    println!(
380        "{}",
381        crate::operator_i18n::trf(
382            "demo.runtime.service_started",
383            "{}: started (pid={})",
384            &[spec.id.as_str(), &handle.pid.to_string()]
385        )
386    );
387    if spec.id.as_str() == "nats" {
388        operator_log::info(
389            module_path!(),
390            format!(
391                "spawned nats pid={} log={}",
392                handle.pid,
393                handle.log_path.display()
394            ),
395        );
396    }
397    Ok(Some(handle))
398}
399
400fn read_pid(path: &Path) -> anyhow::Result<Option<u32>> {
401    if !path.exists() {
402        return Ok(None);
403    }
404    let contents = std::fs::read_to_string(path)?;
405    let trimmed = contents.trim();
406    if trimmed.is_empty() {
407        return Ok(None);
408    }
409    Ok(Some(trimmed.parse()?))
410}
411
412fn looks_like_path(value: &str) -> bool {
413    value.contains('/') || value.contains('\\') || Path::new(value).is_absolute()
414}
415
416fn should_restart(restart: &BTreeSet<String>, service: &str) -> bool {
417    restart.contains("all") || restart.contains(service)
418}
419
420#[allow(clippy::too_many_arguments)]
421#[derive(Clone, Copy, Debug, PartialEq, Eq)]
422pub enum NatsMode {
423    Off,
424    On,
425    External,
426}
427
428#[allow(clippy::too_many_arguments)]
429pub fn demo_up(
430    bundle_root: &Path,
431    tenant: &str,
432    team: Option<&str>,
433    nats_url: Option<&str>,
434    nats_mode: NatsMode,
435    messaging_enabled: bool,
436    cloudflared: Option<CloudflaredConfig>,
437    ngrok: Option<NgrokConfig>,
438    log_dir: &Path,
439    debug_enabled: bool,
440) -> anyhow::Result<()> {
441    let team_id = team.unwrap_or("default");
442    let state_dir = bundle_root.join("state");
443    std::fs::create_dir_all(&state_dir)?;
444    let paths = RuntimePaths::new(&state_dir, tenant, team_id);
445    let mut service_tracker = ServiceTracker::new(&paths, Some(log_dir))?;
446    let mut service_summaries = Vec::new();
447    let restart_targets = BTreeSet::new();
448    let mut public_base_url: Option<String> = None;
449    if debug_enabled {
450        operator_log::debug(
451            module_path!(),
452            format!(
453                "[demo dev] demo_up tenant={} team={} nats_mode={:?} messaging_enabled={}",
454                tenant, team_id, nats_mode, messaging_enabled
455            ),
456        );
457    }
458    if let Some(config) = cloudflared {
459        let cloudflared_log = operator_log::reserve_service_log(log_dir, "cloudflared")
460            .with_context(|| "unable to open cloudflared.log")?;
461        operator_log::info(
462            module_path!(),
463            format!(
464                "starting cloudflared log={} bundle={}",
465                cloudflared_log.display(),
466                bundle_root.display()
467            ),
468        );
469        let handle = cloudflared::start_quick_tunnel(&paths, &config, &cloudflared_log)?;
470        operator_log::info(
471            module_path!(),
472            format!(
473                "cloudflared ready url={} log={}",
474                handle.url,
475                handle.log_path.display()
476            ),
477        );
478        if debug_enabled {
479            operator_log::debug(
480                module_path!(),
481                format!(
482                    "[demo dev] tenant={} team={} cloudflared url={} log={}",
483                    tenant,
484                    team_id,
485                    handle.url,
486                    handle.log_path.display()
487                ),
488            );
489        }
490        let url = handle.url.clone();
491        let log_path = handle.log_path.clone();
492        service_tracker.record_with_log("cloudflared", "cloudflared", Some(&log_path))?;
493        let summary = ServiceSummary::with_details(
494            "cloudflared",
495            Some(handle.pid),
496            vec![
497                format!("url={}", url),
498                format!("log={}", log_path.display()),
499            ],
500        );
501        service_summaries.push(summary);
502        public_base_url = Some(url.clone());
503        println!(
504            "{}",
505            crate::operator_i18n::trf(
506                "demo.runtime.public_url_cloudflared",
507                "Public URL (service=cloudflared): {}",
508                &[&url]
509            )
510        );
511    } else if let Some(config) = ngrok {
512        let ngrok_log = operator_log::reserve_service_log(log_dir, "ngrok")
513            .with_context(|| "unable to open ngrok.log")?;
514        operator_log::info(
515            module_path!(),
516            format!(
517                "starting ngrok log={} bundle={}",
518                ngrok_log.display(),
519                bundle_root.display()
520            ),
521        );
522        let handle = ngrok::start_tunnel(&paths, &config, &ngrok_log)?;
523        operator_log::info(
524            module_path!(),
525            format!(
526                "ngrok ready url={} log={}",
527                handle.url,
528                handle.log_path.display()
529            ),
530        );
531        if debug_enabled {
532            operator_log::debug(
533                module_path!(),
534                format!(
535                    "[demo dev] tenant={} team={} ngrok url={} log={}",
536                    tenant,
537                    team_id,
538                    handle.url,
539                    handle.log_path.display()
540                ),
541            );
542        }
543        let url = handle.url.clone();
544        let log_path = handle.log_path.clone();
545        service_tracker.record_with_log("ngrok", "ngrok", Some(&log_path))?;
546        let summary = ServiceSummary::with_details(
547            "ngrok",
548            Some(handle.pid),
549            vec![
550                format!("url={}", url),
551                format!("log={}", log_path.display()),
552            ],
553        );
554        service_summaries.push(summary);
555        public_base_url = Some(url.clone());
556        println!(
557            "{}",
558            crate::operator_i18n::trf(
559                "demo.runtime.public_url_ngrok",
560                "Public URL (service=ngrok): {}",
561                &[&url]
562            )
563        );
564    }
565
566    let mut resolved_nats_url = nats_url.map(|value| value.to_string());
567    if matches!(nats_mode, NatsMode::On) && resolved_nats_url.is_none() {
568        match operator_log::reserve_service_log(log_dir, "nats") {
569            Ok(nats_log) => {
570                operator_log::info(
571                    module_path!(),
572                    format!("starting nats log={}", nats_log.display()),
573                );
574                match services::start_nats_with_log(bundle_root, Some(nats_log.clone())) {
575                    Ok(state) => {
576                        operator_log::info(
577                            module_path!(),
578                            format!("nats started state={:?} log={}", state, nats_log.display()),
579                        );
580                        if debug_enabled {
581                            operator_log::debug(
582                                module_path!(),
583                                format!(
584                                    "[demo dev] tenant={} team={} nats state={:?} log={}",
585                                    tenant,
586                                    team_id,
587                                    state,
588                                    nats_log.display()
589                                ),
590                            );
591                        }
592                        service_tracker
593                            .record_with_log("nats", "nats", Some(&nats_log))
594                            .with_context(|| "failed to record nats service state")?;
595                        resolved_nats_url = Some(services::nats_url(bundle_root));
596                        let pid = read_pid(&paths.pid_path("nats"))?;
597                        let mut summary = ServiceSummary::new("nats", pid);
598                        summary.add_detail(format!("state={:?}", state));
599                        summary.add_detail(format!("url={}", services::nats_url(bundle_root)));
600                        summary.add_detail(format!("log={}", nats_log.display()));
601                        service_summaries.push(summary);
602                        mark_nats_started(&paths)?;
603                    }
604                    Err(err) => {
605                        eprintln!(
606                            "{}",
607                            crate::operator_i18n::trf(
608                                "demo.runtime.warn_failed_start_nats",
609                                "Warning: failed to start NATS: {}",
610                                &[&err.to_string()]
611                            )
612                        );
613                        operator_log::error(
614                            module_path!(),
615                            format!("failed to start nats (log={}): {err}", nats_log.display()),
616                        );
617                    }
618                }
619            }
620            Err(err) => {
621                eprintln!(
622                    "{}",
623                    crate::operator_i18n::trf(
624                        "demo.runtime.warn_failed_prepare_nats_log",
625                        "Warning: failed to prepare NATS log: {}",
626                        &[&err.to_string()]
627                    )
628                );
629                operator_log::error(module_path!(), format!("failed to open nats.log: {err}"));
630            }
631        }
632    }
633
634    let run_gsm_services = matches!(nats_mode, NatsMode::On);
635    if messaging_enabled && run_gsm_services {
636        let mut env_map = build_env(tenant, team_id, resolved_nats_url.as_deref(), None);
637        if let Some(url) = public_base_url.as_deref() {
638            env_map.insert("PUBLIC_BASE_URL".to_string(), url.to_string());
639        }
640        if debug_enabled {
641            operator_log::debug(
642                module_path!(),
643                format!(
644                    "[demo dev] launching GSM gateway/egress/subscriptions tenant={} team={} envs={:?}",
645                    tenant, team_id, env_map
646                ),
647            );
648        }
649        let mut messaging_summary = spawn_embedded_messaging(
650            bundle_root,
651            tenant,
652            team_id,
653            env_map,
654            log_dir,
655            &restart_targets,
656            &mut service_tracker,
657            debug_enabled,
658        )?;
659        messaging_summary.add_detail("embedded messaging stack".to_string());
660        service_summaries.push(messaging_summary);
661    } else {
662        println!(
663            "{}",
664            crate::operator_i18n::tr(
665                "demo.runtime.messaging_embedded",
666                "messaging: running embedded runner (no gsm gateway/egress)"
667            )
668        );
669    }
670
671    println!(
672        "{}",
673        crate::operator_i18n::tr(
674            "demo.runtime.events_in_process",
675            "events: handled in-process (HTTP ingress + timer scheduler)"
676        )
677    );
678    print_service_summary(&service_summaries);
679
680    if !run_gsm_services {
681        operator_log::info(
682            module_path!(),
683            "demo running in embedded runner mode; gateway/egress disabled",
684        );
685        if debug_enabled {
686            operator_log::debug(
687                module_path!(),
688                format!(
689                    "[demo dev] embedded runner mode only tenant={} team={} (gateway/egress/subscriptions skipped)",
690                    tenant, team_id
691                ),
692            );
693        }
694    }
695
696    Ok(())
697}
698
699#[allow(clippy::too_many_arguments)]
700pub fn demo_up_services(
701    config_path: &Path,
702    config: &DemoConfig,
703    static_routes: &BundleStaticRoutesInspection,
704    configured_public_base_url: Option<String>,
705    cloudflared: Option<CloudflaredConfig>,
706    ngrok: Option<NgrokConfig>,
707    restart: &BTreeSet<String>,
708    runner_binary: Option<PathBuf>,
709    log_dir: &Path,
710    debug_enabled: bool,
711) -> anyhow::Result<ForegroundRuntimeHandles> {
712    let config_dir = config_path
713        .parent()
714        .ok_or_else(|| anyhow::anyhow!("config path has no parent directory"))?;
715    let state_dir = config_dir.join("state");
716    let tenant = config.tenant.as_str();
717    let team = config.team.as_str();
718    let paths = RuntimePaths::new(&state_dir, tenant, team);
719    let mut service_tracker = ServiceTracker::new(&paths, Some(log_dir))?;
720    let discovery = crate::discovery::discover(config_dir)?;
721    crate::discovery::persist(config_dir, tenant, &discovery)?;
722    let secrets_handle = secrets_gate::resolve_secrets_manager(config_dir, tenant, Some(team))?;
723    let runner_host = Arc::new(DemoRunnerHost::new(
724        config_dir.to_path_buf(),
725        &discovery,
726        runner_binary.clone(),
727        secrets_handle.clone(),
728        debug_enabled,
729    )?);
730    let ingress_domains = detect_http_ingress_domains(&discovery, runner_host.as_ref());
731    // Enable static routes if bundle declares them - no longer requires NATS mode
732    let enable_static_routes = static_routes.bundle_has_static_routes();
733    let ingress_server = start_http_ingress_server(
734        config,
735        &ingress_domains,
736        runner_host.clone(),
737        enable_static_routes,
738    )
739    .with_context(|| "failed to start local HTTP ingress server")?;
740    let run_gsm_services = config.services.nats.enabled;
741    operator_log::info(
742        module_path!(),
743        format!(
744            "demo start services start bundle={} tenant={} team={} log_dir={}",
745            config_path.display(),
746            tenant,
747            team,
748            log_dir.display()
749        ),
750    );
751    if debug_enabled {
752        operator_log::debug(
753            module_path!(),
754            format!(
755                "[demo verbose] bundle={} tenant={} team={} logging=debug",
756                config_path.display(),
757                tenant,
758                team
759            ),
760        );
761    }
762
763    if should_restart(restart, "cloudflared") {
764        let _ = supervisor::stop_pidfile(&paths.pid_path("cloudflared"), 2_000);
765    }
766    if should_restart(restart, "ngrok") {
767        let _ = supervisor::stop_pidfile(&paths.pid_path("ngrok"), 2_000);
768    }
769
770    let tunnel_public_base_url = if let Some(cfg) = cloudflared {
771        if ingress_server.is_none() {
772            operator_log::warn(
773                module_path!(),
774                "cloudflared requested but no local HTTP ingress listener is enabled; skipping tunnel startup",
775            );
776            None
777        } else {
778            let cloudflared_log = operator_log::reserve_service_log(log_dir, "cloudflared")
779                .with_context(|| "unable to open cloudflared.log")?;
780            operator_log::info(
781                module_path!(),
782                format!("starting cloudflared log={}", cloudflared_log.display()),
783            );
784            let handle = cloudflared::start_quick_tunnel(&paths, &cfg, &cloudflared_log)?;
785            let mut domain_labels = Vec::new();
786            if discovery.domains.messaging {
787                domain_labels.push("messaging");
788            }
789            if discovery.domains.events {
790                domain_labels.push("events");
791            }
792            if discovery.domains.oauth {
793                domain_labels.push("oauth");
794            }
795            let domain_list = if domain_labels.is_empty() {
796                "none".to_string()
797            } else {
798                domain_labels.join(",")
799            };
800            operator_log::info(
801                module_path!(),
802                format!(
803                    "cloudflared ready domains={} url={} log={}",
804                    domain_list,
805                    handle.url,
806                    handle.log_path.display()
807                ),
808            );
809            if debug_enabled {
810                operator_log::debug(
811                    module_path!(),
812                    format!(
813                        "[demo dev] tenant={} team={} cloudflared domains={} url={} log={}",
814                        tenant,
815                        team,
816                        domain_list,
817                        handle.url,
818                        handle.log_path.display()
819                    ),
820                );
821            }
822            println!(
823                "{}",
824                crate::operator_i18n::trf(
825                    "demo.runtime.public_url_cloudflared_domains",
826                    "Public URL (service=cloudflared domains={}): {}",
827                    &[&domain_list, &handle.url]
828                )
829            );
830            service_tracker.record_with_log(
831                "cloudflared",
832                "cloudflared",
833                Some(&handle.log_path),
834            )?;
835            Some(handle.url)
836        }
837    } else if let Some(cfg) = ngrok {
838        if ingress_server.is_none() {
839            operator_log::warn(
840                module_path!(),
841                "ngrok requested but no local HTTP ingress listener is enabled; skipping tunnel startup",
842            );
843            None
844        } else {
845            let ngrok_log = operator_log::reserve_service_log(log_dir, "ngrok")
846                .with_context(|| "unable to open ngrok.log")?;
847            operator_log::info(
848                module_path!(),
849                format!("starting ngrok log={}", ngrok_log.display()),
850            );
851            let handle = ngrok::start_tunnel(&paths, &cfg, &ngrok_log)?;
852            let mut domain_labels = Vec::new();
853            if discovery.domains.messaging {
854                domain_labels.push("messaging");
855            }
856            if discovery.domains.events {
857                domain_labels.push("events");
858            }
859            let domain_list = if domain_labels.is_empty() {
860                "none".to_string()
861            } else {
862                domain_labels.join(",")
863            };
864            operator_log::info(
865                module_path!(),
866                format!(
867                    "ngrok ready domains={} url={} log={}",
868                    domain_list,
869                    handle.url,
870                    handle.log_path.display()
871                ),
872            );
873            if debug_enabled {
874                operator_log::debug(
875                    module_path!(),
876                    format!(
877                        "[demo dev] tenant={} team={} ngrok domains={} url={} log={}",
878                        tenant,
879                        team,
880                        domain_list,
881                        handle.url,
882                        handle.log_path.display()
883                    ),
884                );
885            }
886            println!(
887                "{}",
888                crate::operator_i18n::trf(
889                    "demo.runtime.public_url_ngrok_domains",
890                    "Public URL (service=ngrok domains={}): {}",
891                    &[&domain_list, &handle.url]
892                )
893            );
894            service_tracker.record_with_log("ngrok", "ngrok", Some(&handle.log_path))?;
895            Some(handle.url)
896        }
897    } else {
898        None
899    };
900
901    // Read previous public URL before it gets overwritten
902    let previous_public_url =
903        crate::webhook_updater::read_previous_public_url(&paths.runtime_root());
904
905    // Resolve public_base_url with fallback to local HTTP listener for local dev
906    let public_base_url = tunnel_public_base_url
907        .clone()
908        .or(configured_public_base_url.clone())
909        .or_else(|| {
910            // Fallback: derive from local HTTP listener if static routes are enabled
911            if ingress_server.is_some() && enable_static_routes {
912                let host = &config.services.gateway.listen_addr;
913                let port = config.services.gateway.port;
914                Some(format!("http://{}:{}", host, port))
915            } else {
916                None
917            }
918        });
919
920    // Auto-update webhooks if public URL changed
921    if let Some(ref new_url) = public_base_url
922        && let Err(err) = crate::webhook_updater::update_webhooks_if_url_changed(
923            config_dir,
924            &discovery,
925            &secrets_handle,
926            tenant,
927            team,
928            previous_public_url.as_deref(),
929            new_url,
930        )
931    {
932        operator_log::warn(
933            module_path!(),
934            format!("[webhook-updater] failed to update webhooks: {}", err),
935        );
936    }
937
938    // http_listener_enabled: true if HTTP ingress server started (not tied to NATS)
939    // asset_serving_enabled: true if bundle declares static routes we're enabling
940    let http_listener_enabled = ingress_server.is_some();
941    let asset_serving_enabled = enable_static_routes;
942    let runtime_config = if let Some(url) = tunnel_public_base_url {
943        Some(RuntimeConfig {
944            public_base_url: Some(RuntimePublicBaseUrl {
945                value: url,
946                source: RuntimePublicBaseUrlSource::Tunnel,
947            }),
948        })
949    } else if let Some(url) = configured_public_base_url {
950        Some(RuntimeConfig {
951            public_base_url: Some(RuntimePublicBaseUrl {
952                value: url,
953                source: RuntimePublicBaseUrlSource::Configured,
954            }),
955        })
956    } else {
957        public_base_url.clone().map(|url| RuntimeConfig {
958            public_base_url: Some(RuntimePublicBaseUrl {
959                value: url,
960                source: RuntimePublicBaseUrlSource::Derived,
961            }),
962        })
963    };
964
965    let startup_contract = resolve_startup_contract(
966        static_routes,
967        http_listener_enabled,
968        asset_serving_enabled,
969        public_base_url.clone(),
970        runtime_config,
971    )?;
972    write_json(
973        &paths.runtime_root().join("startup_contract.json"),
974        &startup_contract,
975    )?;
976
977    if should_restart(restart, "nats") {
978        let _ = supervisor::stop_pidfile(&paths.pid_path("nats"), 2_000);
979    }
980
981    let nats_url = if config.services.nats.enabled {
982        if config.services.nats.spawn.enabled {
983            let spec = build_service_spec(
984                config_dir,
985                "nats",
986                &config.services.nats.spawn.binary,
987                &config.services.nats.spawn.args,
988                &build_env(tenant, team, None, Some(&startup_contract)),
989            )?;
990            log_service_spec_debug("nats", "nats", &spec, tenant, team, debug_enabled);
991            let nats_log = operator_log::reserve_service_log(log_dir, "nats")
992                .with_context(|| "unable to open nats.log")?;
993            if let Some(handle) = spawn_if_needed(&paths, &spec, restart, Some(nats_log.clone()))? {
994                service_tracker
995                    .record_with_log("nats", "nats", Some(&handle.log_path))
996                    .with_context(|| "failed to record nats service")?;
997            }
998        }
999        Some(config.services.nats.url.clone())
1000    } else {
1001        None
1002    };
1003
1004    operator_log::info(
1005        module_path!(),
1006        "events provider packs run in-process; external events components are disabled",
1007    );
1008
1009    if run_gsm_services {
1010        if should_restart(restart, "gateway") {
1011            let _ = supervisor::stop_pidfile(&paths.pid_path("gateway"), 2_000);
1012        }
1013        let gateway_spec = build_service_spec(
1014            config_dir,
1015            "gateway",
1016            &config.services.gateway.binary,
1017            &config.services.gateway.args,
1018            &build_env(tenant, team, nats_url.as_deref(), Some(&startup_contract)),
1019        )?;
1020        if let Some(handle) = spawn_if_needed(&paths, &gateway_spec, restart, None)? {
1021            service_tracker.record_with_log("gateway", "gateway", Some(&handle.log_path))?;
1022        }
1023
1024        if should_restart(restart, "egress") {
1025            let _ = supervisor::stop_pidfile(&paths.pid_path("egress"), 2_000);
1026        }
1027        let egress_spec = build_service_spec(
1028            config_dir,
1029            "egress",
1030            &config.services.egress.binary,
1031            &config.services.egress.args,
1032            &build_env(tenant, team, nats_url.as_deref(), Some(&startup_contract)),
1033        )?;
1034        if let Some(handle) = spawn_if_needed(&paths, &egress_spec, restart, None)? {
1035            service_tracker.record_with_log("egress", "egress", Some(&handle.log_path))?;
1036        }
1037
1038        match config.services.subscriptions.mode {
1039            DemoSubscriptionsMode::LegacyGsm => {
1040                if config.services.subscriptions.msgraph.enabled {
1041                    if should_restart(restart, "subscriptions")
1042                        || should_restart(restart, "msgraph")
1043                    {
1044                        let _ = supervisor::stop_pidfile(&paths.pid_path("subscriptions"), 2_000);
1045                    }
1046                    let mut args = config.services.subscriptions.msgraph.args.clone();
1047                    if !config.services.subscriptions.msgraph.mode.is_empty() {
1048                        args.insert(0, config.services.subscriptions.msgraph.mode.clone());
1049                    }
1050                    let spec = build_service_spec(
1051                        config_dir,
1052                        "subscriptions",
1053                        &config.services.subscriptions.msgraph.binary,
1054                        &args,
1055                        &build_env(tenant, team, nats_url.as_deref(), Some(&startup_contract)),
1056                    )?;
1057                    if let Some(handle) = spawn_if_needed(&paths, &spec, restart, None)? {
1058                        service_tracker.record_with_log(
1059                            "subscriptions",
1060                            "subscriptions",
1061                            Some(&handle.log_path),
1062                        )?;
1063                    }
1064                }
1065            }
1066            DemoSubscriptionsMode::UniversalOps => {
1067                spawn_universal_subscriptions_service(
1068                    config_dir,
1069                    config,
1070                    tenant,
1071                    team,
1072                    runner_binary.clone(),
1073                    &mut service_tracker,
1074                    log_dir,
1075                    debug_enabled,
1076                )?;
1077            }
1078        }
1079    } else {
1080        println!(
1081            "{}",
1082            crate::operator_i18n::tr(
1083                "demo.runtime.messaging_embedded",
1084                "messaging: running embedded runner (no gsm gateway/egress)"
1085            )
1086        );
1087        println!(
1088            "{}",
1089            crate::operator_i18n::tr(
1090                "demo.runtime.events_in_process",
1091                "events: handled in-process (HTTP ingress + timer scheduler)"
1092            )
1093        );
1094        operator_log::info(
1095            module_path!(),
1096            "demo running in embedded runner mode; gateway/egress disabled",
1097        );
1098        if debug_enabled {
1099            operator_log::debug(
1100                module_path!(),
1101                format!(
1102                    "[demo dev] embedded runner mode only tenant={} team={} (gateway/egress/subscriptions skipped)",
1103                    tenant, team
1104                ),
1105            );
1106        }
1107    }
1108
1109    let endpoints = DemoEndpoints {
1110        tenant: tenant.to_string(),
1111        team: team.to_string(),
1112        public_base_url: startup_contract.public_base_url.clone(),
1113        nats_url,
1114        gateway_listen_addr: config.services.gateway.listen_addr.clone(),
1115        gateway_port: config.services.gateway.port,
1116    };
1117    write_json(&paths.runtime_root().join("endpoints.json"), &endpoints)?;
1118    Ok(ForegroundRuntimeHandles { ingress_server })
1119}
1120
1121fn detect_http_ingress_domains(
1122    discovery: &crate::discovery::DiscoveryResult,
1123    runner_host: &DemoRunnerHost,
1124) -> Vec<Domain> {
1125    let mut domains = Vec::new();
1126    for domain in [Domain::Messaging, Domain::Events, Domain::OAuth] {
1127        let supported = discovery.providers.iter().any(|provider| {
1128            parse_domain_name(&provider.domain) == Some(domain)
1129                && runner_host.supports_op(domain, &provider.provider_id, "ingest_http")
1130        });
1131        let fallback_supported = matches!(domain, Domain::Events) && discovery.domains.events;
1132        if supported || fallback_supported {
1133            domains.push(domain);
1134        }
1135    }
1136    domains
1137}
1138
1139fn parse_domain_name(value: &str) -> Option<Domain> {
1140    match value {
1141        "messaging" => Some(Domain::Messaging),
1142        "events" => Some(Domain::Events),
1143        "oauth" => Some(Domain::OAuth),
1144        "secrets" => Some(Domain::Secrets),
1145        _ => None,
1146    }
1147}
1148
1149fn start_http_ingress_server(
1150    config: &DemoConfig,
1151    domains: &[Domain],
1152    runner_host: Arc<DemoRunnerHost>,
1153    enable_static_routes: bool,
1154) -> anyhow::Result<Option<HttpIngressServer>> {
1155    // Start HTTP server if we have ingress domains OR static routes to serve
1156    if domains.is_empty() && !enable_static_routes {
1157        return Ok(None);
1158    }
1159    let addr = format!(
1160        "{}:{}",
1161        config.services.gateway.listen_addr, config.services.gateway.port
1162    );
1163    let bind_addr = addr
1164        .parse()
1165        .with_context(|| format!("invalid gateway listen address {addr}"))?;
1166    let server = HttpIngressServer::start(HttpIngressConfig {
1167        bind_addr,
1168        domains: domains.to_vec(),
1169        runner_host,
1170        enable_static_routes,
1171    })?;
1172    println!(
1173        "HTTP ingress ready at http://{}:{}",
1174        config.services.gateway.listen_addr, config.services.gateway.port
1175    );
1176    Ok(Some(server))
1177}
1178
1179pub fn demo_status_runtime(
1180    state_dir: &Path,
1181    tenant: &str,
1182    team: &str,
1183    verbose: bool,
1184) -> anyhow::Result<()> {
1185    let paths = RuntimePaths::new(state_dir, tenant, team);
1186    let statuses = supervisor::read_status(&paths)?;
1187    if statuses.is_empty() {
1188        println!(
1189            "{}",
1190            crate::operator_i18n::tr("demo.runtime.none_running", "none running")
1191        );
1192        return Ok(());
1193    }
1194    for status in statuses {
1195        let state = if status.running {
1196            crate::operator_i18n::tr("demo.runtime.status_running", "running")
1197        } else {
1198            crate::operator_i18n::tr("demo.runtime.status_stopped", "stopped")
1199        };
1200        let pid = status
1201            .pid
1202            .map(|value| value.to_string())
1203            .unwrap_or_else(|| "-".to_string());
1204        if verbose {
1205            println!(
1206                "{}: {} (pid={}, log={})",
1207                status.id.as_str(),
1208                &state,
1209                pid,
1210                status.log_path.display()
1211            );
1212        } else {
1213            println!("{}: {} (pid={})", status.id.as_str(), &state, pid);
1214        }
1215    }
1216    Ok(())
1217}
1218
1219pub fn demo_logs_runtime(
1220    state_dir: &Path,
1221    log_dir: &Path,
1222    tenant: &str,
1223    team: &str,
1224    service: &str,
1225    tail: bool,
1226) -> anyhow::Result<()> {
1227    let log_dir = resolve_manifest_log_dir(state_dir, tenant, team, log_dir)?;
1228    let log_path = if service == "operator" {
1229        log_dir.join("operator.log")
1230    } else {
1231        let tenant_log_path = tenant_log_path(&log_dir, service, tenant, team)?;
1232        select_log_path(&log_dir, service, tenant, &tenant_log_path)
1233    };
1234    if tail {
1235        return services::tail_log(&log_path);
1236    }
1237    let lines = read_last_lines(&log_path, 200)?;
1238    if !lines.is_empty() {
1239        println!("{lines}");
1240    }
1241    Ok(())
1242}
1243
1244pub fn demo_down_runtime(
1245    state_dir: &Path,
1246    tenant: &str,
1247    team: &str,
1248    all: bool,
1249) -> anyhow::Result<()> {
1250    let timeout_ms = 2_000;
1251    let paths = RuntimePaths::new(state_dir, tenant, team);
1252    stop_started_nats(&paths, state_dir)?;
1253    // Kill any orphaned ngrok/cloudflared processes not tracked by pidfile
1254    ngrok::stop_ngrok();
1255    cloudflared::stop_cloudflared();
1256    if all {
1257        let pids_root = state_dir.join("pids");
1258        if !pids_root.exists() {
1259            println!(
1260                "{}",
1261                crate::operator_i18n::tr(
1262                    "demo.runtime.no_services_to_stop",
1263                    "No supervised background services to stop. If runtime was started in the foreground, stop it in the original terminal with Ctrl+C."
1264                )
1265            );
1266            return Ok(());
1267        }
1268        for entry in std::fs::read_dir(&pids_root)? {
1269            let entry = entry?;
1270            if !entry.file_type()?.is_dir() {
1271                continue;
1272            }
1273            for pidfile in std::fs::read_dir(entry.path())? {
1274                let pidfile = pidfile?;
1275                if pidfile.path().extension().and_then(|ext| ext.to_str()) != Some("pid") {
1276                    continue;
1277                }
1278                let _ = supervisor::stop_pidfile(&pidfile.path(), timeout_ms);
1279            }
1280        }
1281        remove_service_manifest(&paths)?;
1282        println!(
1283            "{}",
1284            crate::operator_i18n::trf(
1285                "demo.runtime.stopped_all_under",
1286                "Stopped all services under {}",
1287                &[&pids_root.display().to_string()]
1288            )
1289        );
1290        return Ok(());
1291    }
1292
1293    if let Some(manifest) = read_service_manifest(&paths)? {
1294        if manifest.services.is_empty() {
1295            println!(
1296                "{}",
1297                crate::operator_i18n::tr(
1298                    "demo.runtime.no_services_to_stop",
1299                    "No supervised background services to stop. If runtime was started in the foreground, stop it in the original terminal with Ctrl+C."
1300                )
1301            );
1302            return Ok(());
1303        }
1304        for entry in manifest.services.iter().rev() {
1305            let id = supervisor::ServiceId::new(entry.id.clone())?;
1306            if let Err(err) = supervisor::stop_service(&paths, &id, timeout_ms) {
1307                eprintln!(
1308                    "{}",
1309                    crate::operator_i18n::trf(
1310                        "demo.runtime.warn_failed_stop_service",
1311                        "Warning: failed to stop {}: {}",
1312                        &[&entry.id, &err.to_string()]
1313                    )
1314                );
1315            }
1316        }
1317        remove_service_manifest(&paths)?;
1318        return Ok(());
1319    }
1320
1321    let pids_dir = paths.pids_dir();
1322    if !pids_dir.exists() {
1323        println!(
1324            "{}",
1325            crate::operator_i18n::tr(
1326                "demo.runtime.no_services_to_stop",
1327                "No supervised background services to stop. If runtime was started in the foreground, stop it in the original terminal with Ctrl+C."
1328            )
1329        );
1330        return Ok(());
1331    }
1332    for entry in std::fs::read_dir(&pids_dir)? {
1333        let entry = entry?;
1334        let path = entry.path();
1335        if path.extension().and_then(|ext| ext.to_str()) != Some("pid") {
1336            continue;
1337        }
1338        supervisor::stop_pidfile(&path, timeout_ms)?;
1339    }
1340    Ok(())
1341}
1342
1343fn select_log_path(log_dir: &Path, service: &str, tenant: &str, tenant_log: &Path) -> PathBuf {
1344    let candidates = [
1345        log_dir.join(format!("{service}.log")),
1346        log_dir.join(format!("{service}-{tenant}.log")),
1347        log_dir.join(format!("{service}.{tenant}.log")),
1348    ];
1349    for candidate in &candidates {
1350        if candidate.exists() {
1351            return candidate.clone();
1352        }
1353    }
1354    if tenant_log.exists() {
1355        return tenant_log.to_path_buf();
1356    }
1357    let _ = ensure_log_file(tenant_log);
1358    tenant_log.to_path_buf()
1359}
1360
1361fn tenant_log_path(
1362    log_dir: &Path,
1363    service: &str,
1364    tenant: &str,
1365    team: &str,
1366) -> anyhow::Result<PathBuf> {
1367    let tenant_dir = log_dir.join(format!("{tenant}.{team}"));
1368    let path = tenant_dir.join(format!("{service}.log"));
1369    ensure_log_file(&path)?;
1370    Ok(path)
1371}
1372
1373fn ensure_log_file(path: &Path) -> anyhow::Result<()> {
1374    if let Some(parent) = path.parent() {
1375        std::fs::create_dir_all(parent)?;
1376    }
1377    if !path.exists() {
1378        std::fs::File::create(path)?;
1379    }
1380    Ok(())
1381}
1382
1383fn resolve_manifest_log_dir(
1384    state_dir: &Path,
1385    tenant: &str,
1386    team: &str,
1387    default: &Path,
1388) -> anyhow::Result<PathBuf> {
1389    let paths = RuntimePaths::new(state_dir, tenant, team);
1390    if let Some(manifest) = read_service_manifest(&paths)?
1391        && let Some(dir) = manifest.log_dir
1392    {
1393        return Ok(PathBuf::from(dir));
1394    }
1395    Ok(default.to_path_buf())
1396}
1397
1398fn build_env(
1399    tenant: &str,
1400    team: &str,
1401    nats_url: Option<&str>,
1402    startup_contract: Option<&StartupContract>,
1403) -> BTreeMap<String, String> {
1404    let mut env = BTreeMap::new();
1405    env.insert("GREENTIC_TENANT".to_string(), tenant.to_string());
1406    env.insert("GREENTIC_TEAM".to_string(), team.to_string());
1407    if let Some(url) = nats_url {
1408        env.insert("NATS_URL".to_string(), url.to_string());
1409    }
1410    if let Some(contract) = startup_contract {
1411        contract.apply_env(&mut env);
1412    }
1413    env
1414}
1415
1416fn resolve_startup_contract(
1417    static_routes: &BundleStaticRoutesInspection,
1418    http_listener_enabled: bool,
1419    asset_serving_enabled: bool,
1420    public_base_url: Option<String>,
1421    runtime_config: Option<RuntimeConfig>,
1422) -> anyhow::Result<StartupContract> {
1423    crate::startup_contract::resolve(StartupContractInput {
1424        bundle_has_static_routes: static_routes.bundle_has_static_routes(),
1425        http_listener_enabled,
1426        asset_serving_enabled,
1427        public_base_url,
1428        runtime_config,
1429    })
1430}
1431
1432fn mark_nats_started(paths: &RuntimePaths) -> anyhow::Result<()> {
1433    let marker = nats_started_marker(paths);
1434    if let Some(parent) = marker.parent() {
1435        std::fs::create_dir_all(parent)?;
1436    }
1437    std::fs::write(marker, "started")?;
1438    Ok(())
1439}
1440
1441fn stop_started_nats(paths: &RuntimePaths, state_dir: &Path) -> anyhow::Result<()> {
1442    let marker = nats_started_marker(paths);
1443    if !marker.exists() {
1444        return Ok(());
1445    }
1446    let bundle_root = state_dir.parent().unwrap_or(state_dir);
1447    match services::stop_nats(bundle_root) {
1448        Ok(_) => {
1449            let _ = std::fs::remove_file(&marker);
1450        }
1451        Err(err) => {
1452            eprintln!(
1453                "{}",
1454                crate::operator_i18n::trf(
1455                    "demo.runtime.warn_failed_stop_nats",
1456                    "Warning: failed to stop nats: {}",
1457                    &[&err.to_string()]
1458                )
1459            );
1460        }
1461    }
1462    Ok(())
1463}
1464
1465fn nats_started_marker(paths: &RuntimePaths) -> PathBuf {
1466    paths.runtime_root().join("nats.started")
1467}
1468
1469fn build_service_spec(
1470    config_dir: &Path,
1471    service_id: &str,
1472    binary: &str,
1473    args: &[String],
1474    env: &BTreeMap<String, String>,
1475) -> anyhow::Result<supervisor::ServiceSpec> {
1476    let explicit = if looks_like_path(binary) {
1477        let path = Path::new(binary);
1478        Some(if path.is_absolute() {
1479            path.to_path_buf()
1480        } else {
1481            config_dir.join(path)
1482        })
1483    } else {
1484        None
1485    };
1486    let path = crate::bin_resolver::resolve_binary(
1487        binary,
1488        &crate::bin_resolver::ResolveCtx {
1489            config_dir: config_dir.to_path_buf(),
1490            explicit_path: explicit,
1491        },
1492    )?;
1493    let mut argv = vec![path.to_string_lossy().to_string()];
1494    argv.extend(args.iter().cloned());
1495    Ok(supervisor::ServiceSpec {
1496        id: supervisor::ServiceId::new(service_id)?,
1497        argv,
1498        cwd: None,
1499        env: env.clone(),
1500    })
1501}
1502
1503#[derive(serde::Serialize)]
1504struct DemoEndpoints {
1505    tenant: String,
1506    team: String,
1507    public_base_url: Option<String>,
1508    nats_url: Option<String>,
1509    gateway_listen_addr: String,
1510    gateway_port: u16,
1511}
1512
1513fn read_last_lines(path: &Path, count: usize) -> anyhow::Result<String> {
1514    if !path.exists() {
1515        return Err(anyhow::anyhow!(
1516            "Log file does not exist: {}",
1517            path.display()
1518        ));
1519    }
1520    let contents = std::fs::read_to_string(path)?;
1521    let mut lines: Vec<&str> = contents.lines().collect();
1522    if lines.len() > count {
1523        lines = lines.split_off(lines.len() - count);
1524    }
1525    Ok(lines.join("\n"))
1526}
1527
1528#[cfg(test)]
1529mod tests {
1530    use super::*;
1531    use std::fs;
1532    use tempfile::tempdir;
1533
1534    #[test]
1535    fn tenant_log_path_creates_file() -> anyhow::Result<()> {
1536        let dir = tempdir()?;
1537        let path = tenant_log_path(dir.path(), "messaging", "demo", "default")?;
1538        assert!(path.exists());
1539        Ok(())
1540    }
1541
1542    #[test]
1543    fn select_log_path_prefers_service_log_when_present() -> anyhow::Result<()> {
1544        let dir = tempdir()?;
1545        let tenant_path = tenant_log_path(dir.path(), "messaging", "demo", "default")?;
1546        let service_path = dir.path().join("messaging.log");
1547        fs::write(&service_path, "other")?;
1548        let selected = select_log_path(dir.path(), "messaging", "demo", &tenant_path);
1549        assert_eq!(selected, service_path);
1550        Ok(())
1551    }
1552
1553    #[test]
1554    fn demo_logs_runtime_reads_operator_log() -> anyhow::Result<()> {
1555        let dir = tempdir()?;
1556        let log = dir.path().join("operator.log");
1557        fs::write(&log, "operator ready")?;
1558        demo_logs_runtime(dir.path(), dir.path(), "demo", "default", "operator", false)?;
1559        Ok(())
1560    }
1561}