1#![allow(dead_code)]
2
3use std::path::{Path, PathBuf};
4use std::sync::Arc;
5use std::thread;
6use std::time::Duration;
7
8use std::collections::{BTreeMap, BTreeSet};
9
10use crate::domains::Domain;
11use crate::http_ingress::{HttpIngressConfig, HttpIngressServer};
12use crate::operator_log;
13use crate::runner_host::DemoRunnerHost;
14use crate::runtime_state::{
15 RuntimePaths, persist_service_manifest, read_service_manifest, remove_service_manifest,
16 write_json,
17};
18use crate::secrets_gate;
19use crate::services;
20use crate::startup_contract::{
21 BundleStaticRoutesInspection, RuntimeConfig, RuntimePublicBaseUrl, RuntimePublicBaseUrlSource,
22 StartupContract, StartupContractInput,
23};
24use crate::supervisor;
25use anyhow::Context;
26
27use crate::cloudflared::{self, CloudflaredConfig};
28use crate::config::{DemoConfig, DemoSubscriptionsMode};
29use crate::ngrok::{self, NgrokConfig};
30
31use crate::subscriptions_universal::{
32 build_runner, ensure_desired_subscriptions, scheduler::Scheduler, service::SubscriptionService,
33 state_root, store::SubscriptionStore,
34};
35
36#[derive(Default)]
37pub struct ForegroundRuntimeHandles {
38 pub ingress_server: Option<HttpIngressServer>,
39}
40
41impl ForegroundRuntimeHandles {
42 pub fn stop(mut self) -> anyhow::Result<()> {
43 if let Some(server) = self.ingress_server.take() {
44 server.stop()?;
45 }
46 Ok(())
47 }
48}
49
50struct ServiceSummary {
51 id: String,
52 pid: Option<u32>,
53 details: Vec<String>,
54}
55
56impl ServiceSummary {
57 fn new(id: impl Into<String>, pid: Option<u32>) -> Self {
58 Self {
59 id: id.into(),
60 pid,
61 details: Vec::new(),
62 }
63 }
64
65 fn with_details(id: impl Into<String>, pid: Option<u32>, details: Vec<String>) -> Self {
66 Self {
67 id: id.into(),
68 pid,
69 details,
70 }
71 }
72
73 fn add_detail(&mut self, detail: impl Into<String>) {
74 self.details.push(detail.into());
75 }
76
77 fn describe(&self) -> String {
78 let pid_str = self
79 .pid
80 .map(|pid| pid.to_string())
81 .unwrap_or_else(|| "-".to_string());
82 if self.details.is_empty() {
83 format!("{} (pid={})", self.id, pid_str)
84 } else {
85 format!(
86 "{} (pid={}) [{}]",
87 self.id,
88 pid_str,
89 self.details.join(" | ")
90 )
91 }
92 }
93}
94
95struct ServiceTracker<'a> {
96 paths: &'a RuntimePaths,
97 manifest: crate::runtime_state::ServiceManifest,
98}
99
100impl<'a> ServiceTracker<'a> {
101 fn new(paths: &'a RuntimePaths, log_dir: Option<&Path>) -> anyhow::Result<Self> {
102 remove_service_manifest(paths)?;
103 let mut manifest = crate::runtime_state::ServiceManifest::default();
104 if let Some(dir) = log_dir {
105 manifest.log_dir = Some(dir.display().to_string());
106 }
107 persist_service_manifest(paths, &manifest)?;
108 Ok(Self { paths, manifest })
109 }
110
111 fn record(&mut self, entry: crate::runtime_state::ServiceEntry) -> anyhow::Result<()> {
112 self.manifest.services.push(entry);
113 persist_service_manifest(self.paths, &self.manifest)
114 }
115
116 fn record_with_log(
117 &mut self,
118 id: impl Into<String>,
119 kind: impl Into<String>,
120 log_path: Option<&Path>,
121 ) -> anyhow::Result<()> {
122 let entry = crate::runtime_state::ServiceEntry::new(id, kind, log_path);
123 self.record(entry)
124 }
125}
126
127fn log_service_spec_debug(
128 service_id: &str,
129 kind: &str,
130 spec: &supervisor::ServiceSpec,
131 tenant: &str,
132 team: &str,
133 debug_enabled: bool,
134) {
135 if !debug_enabled {
136 return;
137 }
138 let cwd = spec
139 .cwd
140 .as_ref()
141 .map(|path| path.display().to_string())
142 .unwrap_or_else(|| "<unset>".to_string());
143 let argv = spec.argv.join(" ");
144 let env_pairs = spec
145 .env
146 .iter()
147 .map(|(key, value)| format!("{}={}", key, value))
148 .collect::<Vec<_>>()
149 .join(" ");
150 let env_display = if env_pairs.is_empty() {
151 "<empty>".to_string()
152 } else {
153 env_pairs
154 };
155 operator_log::debug(
156 module_path!(),
157 format!(
158 "[demo dev] service {} kind={} tenant={} team={} cwd={} argv=\"{}\" env={}",
159 service_id, kind, tenant, team, cwd, argv, env_display
160 ),
161 );
162}
163
164#[allow(clippy::too_many_arguments)]
165fn spawn_supervised_service(
166 service_id: &str,
167 kind: &str,
168 spec: &supervisor::ServiceSpec,
169 log_dir: &Path,
170 paths: &RuntimePaths,
171 restart: &BTreeSet<String>,
172 tracker: &mut ServiceTracker,
173 tenant: &str,
174 team: &str,
175 debug_enabled: bool,
176) -> anyhow::Result<ServiceSummary> {
177 let log_path = operator_log::reserve_service_log(log_dir, service_id)?;
178 log_service_spec_debug(service_id, kind, spec, tenant, team, debug_enabled);
179 let handle = spawn_if_needed(paths, spec, restart, Some(log_path.clone()))?;
180 let pid = if let Some(handle) = &handle {
181 Some(handle.pid)
182 } else {
183 read_pid(&paths.pid_path(service_id))?
184 };
185 let actual_log = handle
186 .as_ref()
187 .map(|handle| handle.log_path.clone())
188 .unwrap_or(log_path.clone());
189 tracker.record_with_log(service_id, kind, Some(&actual_log))?;
190 operator_log::info(
191 module_path!(),
192 format!(
193 "service {} ready pid={:?} log={}",
194 service_id,
195 pid,
196 actual_log.display()
197 ),
198 );
199 let mut summary = ServiceSummary::new(service_id, pid);
200 summary.add_detail(format!("log={}", actual_log.display()));
201 Ok(summary)
202}
203
204fn print_service_summary(summaries: &[ServiceSummary]) {
205 if summaries.is_empty() {
206 return;
207 }
208 println!(
209 "\n{}",
210 crate::operator_i18n::tr("demo.runtime.started_services", "Started services:")
211 );
212 for summary in summaries {
213 println!("{}", summary.describe());
214 }
215}
216
217#[allow(clippy::too_many_arguments)]
218fn spawn_embedded_messaging(
219 bundle_root: &Path,
220 tenant: &str,
221 team: &str,
222 env: BTreeMap<String, String>,
223 log_dir: &Path,
224 restart: &BTreeSet<String>,
225 tracker: &mut ServiceTracker,
226 debug_enabled: bool,
227) -> anyhow::Result<ServiceSummary> {
228 let exe = std::env::current_exe()?;
229 let mut args = vec![
230 "dev".to_string(),
231 "embedded".to_string(),
232 "--project-root".to_string(),
233 bundle_root.display().to_string(),
234 "--no-nats".to_string(),
235 ];
236 let mut argv = vec![exe.to_string_lossy().to_string()];
237 argv.append(&mut args);
238
239 let spec = supervisor::ServiceSpec {
240 id: supervisor::ServiceId::new("messaging")?,
241 argv,
242 cwd: None,
243 env,
244 };
245
246 let mut summary = spawn_supervised_service(
247 "messaging",
248 "messaging",
249 &spec,
250 log_dir,
251 tracker.paths,
252 restart,
253 tracker,
254 tenant,
255 team,
256 debug_enabled,
257 )?;
258 summary.add_detail(format!("tenant={tenant} team={team}"));
259 summary.add_detail(format!(
260 "cmd=dev embedded --project-root {}",
261 bundle_root.display()
262 ));
263 Ok(summary)
264}
265
266#[allow(clippy::too_many_arguments)]
267fn spawn_universal_subscriptions_service(
268 bundle_root: &Path,
269 config: &DemoConfig,
270 tenant: &str,
271 team: &str,
272 runner_binary: Option<PathBuf>,
273 tracker: &mut ServiceTracker,
274 log_dir: &Path,
275 debug_enabled: bool,
276) -> anyhow::Result<ServiceSummary> {
277 let team_override = if team.trim().is_empty() {
278 None
279 } else {
280 Some(team.to_string())
281 };
282 let log_path = operator_log::reserve_service_log(log_dir, "subscriptions")
283 .with_context(|| "unable to open subscriptions log file")?;
284 tracker.record_with_log("subscriptions-universal", "subscriptions", Some(&log_path))?;
285
286 let desired = &config.services.subscriptions.universal.desired;
287 let (runner_host, context) =
288 build_runner(bundle_root, tenant, team_override.clone(), runner_binary)?;
289 let store = SubscriptionStore::new(state_root(bundle_root));
290 let scheduler = Scheduler::new(SubscriptionService::new(runner_host, context), store);
291
292 ensure_desired_subscriptions(
293 bundle_root,
294 tenant,
295 team_override.clone(),
296 desired,
297 &scheduler,
298 )?;
299
300 let renew_interval_secs = config
301 .services
302 .subscriptions
303 .universal
304 .renew_interval_seconds
305 .max(1);
306 let renew_skew_secs = config
307 .services
308 .subscriptions
309 .universal
310 .renew_skew_minutes
311 .max(1)
312 .saturating_mul(60);
313 let interval = Duration::from_secs(renew_interval_secs);
314 let skew = Duration::from_secs(renew_skew_secs);
315
316 let scheduler_handle = scheduler;
317 thread::Builder::new()
318 .name("subscriptions-universal".to_string())
319 .spawn(move || {
320 operator_log::info(
321 module_path!(),
322 format!(
323 "subscriptions-universal scheduler running interval={}s skew={}s",
324 renew_interval_secs, renew_skew_secs
325 ),
326 );
327 loop {
328 std::thread::sleep(interval);
329 if let Err(err) = scheduler_handle.renew_due(skew) {
330 operator_log::error(
331 module_path!(),
332 format!("subscriptions-universal renew failed err={}", err),
333 );
334 }
335 }
336 })?;
337
338 let mut summary = ServiceSummary::new("subscriptions-universal", None);
339 summary.add_detail(format!("log={}", log_path.display()));
340 summary.add_detail(format!("renew_interval={}s", renew_interval_secs));
341 summary.add_detail("mode=universal".to_string());
342 if debug_enabled {
343 operator_log::debug(
344 module_path!(),
345 format!(
346 "[demo dev] tenant={} team={} universal subscriptions running",
347 tenant, team
348 ),
349 );
350 }
351 Ok(summary)
352}
353
354fn spawn_if_needed(
355 paths: &RuntimePaths,
356 spec: &supervisor::ServiceSpec,
357 restart: &BTreeSet<String>,
358 log_path_override: Option<PathBuf>,
359) -> anyhow::Result<Option<supervisor::ServiceHandle>> {
360 if should_restart(restart, spec.id.as_str()) {
361 let _ = supervisor::stop_service(paths, &spec.id, 2_000);
362 }
363
364 let pid_path = paths.pid_path(spec.id.as_str());
365 if let Some(pid) = read_pid(&pid_path)?
366 && supervisor::is_running(pid)
367 {
368 println!(
369 "{}",
370 crate::operator_i18n::trf(
371 "demo.runtime.service_already_running",
372 "{}: already running (pid={})",
373 &[spec.id.as_str(), &pid.to_string()]
374 )
375 );
376 return Ok(None);
377 }
378 let handle = supervisor::spawn_service(paths, spec.clone(), log_path_override.clone())?;
379 println!(
380 "{}",
381 crate::operator_i18n::trf(
382 "demo.runtime.service_started",
383 "{}: started (pid={})",
384 &[spec.id.as_str(), &handle.pid.to_string()]
385 )
386 );
387 if spec.id.as_str() == "nats" {
388 operator_log::info(
389 module_path!(),
390 format!(
391 "spawned nats pid={} log={}",
392 handle.pid,
393 handle.log_path.display()
394 ),
395 );
396 }
397 Ok(Some(handle))
398}
399
400fn read_pid(path: &Path) -> anyhow::Result<Option<u32>> {
401 if !path.exists() {
402 return Ok(None);
403 }
404 let contents = std::fs::read_to_string(path)?;
405 let trimmed = contents.trim();
406 if trimmed.is_empty() {
407 return Ok(None);
408 }
409 Ok(Some(trimmed.parse()?))
410}
411
412fn looks_like_path(value: &str) -> bool {
413 value.contains('/') || value.contains('\\') || Path::new(value).is_absolute()
414}
415
416fn should_restart(restart: &BTreeSet<String>, service: &str) -> bool {
417 restart.contains("all") || restart.contains(service)
418}
419
420#[allow(clippy::too_many_arguments)]
421#[derive(Clone, Copy, Debug, PartialEq, Eq)]
422pub enum NatsMode {
423 Off,
424 On,
425 External,
426}
427
428#[allow(clippy::too_many_arguments)]
429pub fn demo_up(
430 bundle_root: &Path,
431 tenant: &str,
432 team: Option<&str>,
433 nats_url: Option<&str>,
434 nats_mode: NatsMode,
435 messaging_enabled: bool,
436 cloudflared: Option<CloudflaredConfig>,
437 ngrok: Option<NgrokConfig>,
438 log_dir: &Path,
439 debug_enabled: bool,
440) -> anyhow::Result<()> {
441 let team_id = team.unwrap_or("default");
442 let state_dir = bundle_root.join("state");
443 std::fs::create_dir_all(&state_dir)?;
444 let paths = RuntimePaths::new(&state_dir, tenant, team_id);
445 let mut service_tracker = ServiceTracker::new(&paths, Some(log_dir))?;
446 let mut service_summaries = Vec::new();
447 let restart_targets = BTreeSet::new();
448 let mut public_base_url: Option<String> = None;
449 if debug_enabled {
450 operator_log::debug(
451 module_path!(),
452 format!(
453 "[demo dev] demo_up tenant={} team={} nats_mode={:?} messaging_enabled={}",
454 tenant, team_id, nats_mode, messaging_enabled
455 ),
456 );
457 }
458 if let Some(config) = cloudflared {
459 let cloudflared_log = operator_log::reserve_service_log(log_dir, "cloudflared")
460 .with_context(|| "unable to open cloudflared.log")?;
461 operator_log::info(
462 module_path!(),
463 format!(
464 "starting cloudflared log={} bundle={}",
465 cloudflared_log.display(),
466 bundle_root.display()
467 ),
468 );
469 let handle = cloudflared::start_quick_tunnel(&paths, &config, &cloudflared_log)?;
470 operator_log::info(
471 module_path!(),
472 format!(
473 "cloudflared ready url={} log={}",
474 handle.url,
475 handle.log_path.display()
476 ),
477 );
478 if debug_enabled {
479 operator_log::debug(
480 module_path!(),
481 format!(
482 "[demo dev] tenant={} team={} cloudflared url={} log={}",
483 tenant,
484 team_id,
485 handle.url,
486 handle.log_path.display()
487 ),
488 );
489 }
490 let url = handle.url.clone();
491 let log_path = handle.log_path.clone();
492 service_tracker.record_with_log("cloudflared", "cloudflared", Some(&log_path))?;
493 let summary = ServiceSummary::with_details(
494 "cloudflared",
495 Some(handle.pid),
496 vec![
497 format!("url={}", url),
498 format!("log={}", log_path.display()),
499 ],
500 );
501 service_summaries.push(summary);
502 public_base_url = Some(url.clone());
503 println!(
504 "{}",
505 crate::operator_i18n::trf(
506 "demo.runtime.public_url_cloudflared",
507 "Public URL (service=cloudflared): {}",
508 &[&url]
509 )
510 );
511 } else if let Some(config) = ngrok {
512 let ngrok_log = operator_log::reserve_service_log(log_dir, "ngrok")
513 .with_context(|| "unable to open ngrok.log")?;
514 operator_log::info(
515 module_path!(),
516 format!(
517 "starting ngrok log={} bundle={}",
518 ngrok_log.display(),
519 bundle_root.display()
520 ),
521 );
522 let handle = ngrok::start_tunnel(&paths, &config, &ngrok_log)?;
523 operator_log::info(
524 module_path!(),
525 format!(
526 "ngrok ready url={} log={}",
527 handle.url,
528 handle.log_path.display()
529 ),
530 );
531 if debug_enabled {
532 operator_log::debug(
533 module_path!(),
534 format!(
535 "[demo dev] tenant={} team={} ngrok url={} log={}",
536 tenant,
537 team_id,
538 handle.url,
539 handle.log_path.display()
540 ),
541 );
542 }
543 let url = handle.url.clone();
544 let log_path = handle.log_path.clone();
545 service_tracker.record_with_log("ngrok", "ngrok", Some(&log_path))?;
546 let summary = ServiceSummary::with_details(
547 "ngrok",
548 Some(handle.pid),
549 vec![
550 format!("url={}", url),
551 format!("log={}", log_path.display()),
552 ],
553 );
554 service_summaries.push(summary);
555 public_base_url = Some(url.clone());
556 println!(
557 "{}",
558 crate::operator_i18n::trf(
559 "demo.runtime.public_url_ngrok",
560 "Public URL (service=ngrok): {}",
561 &[&url]
562 )
563 );
564 }
565
566 let mut resolved_nats_url = nats_url.map(|value| value.to_string());
567 if matches!(nats_mode, NatsMode::On) && resolved_nats_url.is_none() {
568 match operator_log::reserve_service_log(log_dir, "nats") {
569 Ok(nats_log) => {
570 operator_log::info(
571 module_path!(),
572 format!("starting nats log={}", nats_log.display()),
573 );
574 match services::start_nats_with_log(bundle_root, Some(nats_log.clone())) {
575 Ok(state) => {
576 operator_log::info(
577 module_path!(),
578 format!("nats started state={:?} log={}", state, nats_log.display()),
579 );
580 if debug_enabled {
581 operator_log::debug(
582 module_path!(),
583 format!(
584 "[demo dev] tenant={} team={} nats state={:?} log={}",
585 tenant,
586 team_id,
587 state,
588 nats_log.display()
589 ),
590 );
591 }
592 service_tracker
593 .record_with_log("nats", "nats", Some(&nats_log))
594 .with_context(|| "failed to record nats service state")?;
595 resolved_nats_url = Some(services::nats_url(bundle_root));
596 let pid = read_pid(&paths.pid_path("nats"))?;
597 let mut summary = ServiceSummary::new("nats", pid);
598 summary.add_detail(format!("state={:?}", state));
599 summary.add_detail(format!("url={}", services::nats_url(bundle_root)));
600 summary.add_detail(format!("log={}", nats_log.display()));
601 service_summaries.push(summary);
602 mark_nats_started(&paths)?;
603 }
604 Err(err) => {
605 eprintln!(
606 "{}",
607 crate::operator_i18n::trf(
608 "demo.runtime.warn_failed_start_nats",
609 "Warning: failed to start NATS: {}",
610 &[&err.to_string()]
611 )
612 );
613 operator_log::error(
614 module_path!(),
615 format!("failed to start nats (log={}): {err}", nats_log.display()),
616 );
617 }
618 }
619 }
620 Err(err) => {
621 eprintln!(
622 "{}",
623 crate::operator_i18n::trf(
624 "demo.runtime.warn_failed_prepare_nats_log",
625 "Warning: failed to prepare NATS log: {}",
626 &[&err.to_string()]
627 )
628 );
629 operator_log::error(module_path!(), format!("failed to open nats.log: {err}"));
630 }
631 }
632 }
633
634 let run_gsm_services = matches!(nats_mode, NatsMode::On);
635 if messaging_enabled && run_gsm_services {
636 let mut env_map = build_env(tenant, team_id, resolved_nats_url.as_deref(), None);
637 if let Some(url) = public_base_url.as_deref() {
638 env_map.insert("PUBLIC_BASE_URL".to_string(), url.to_string());
639 }
640 if debug_enabled {
641 operator_log::debug(
642 module_path!(),
643 format!(
644 "[demo dev] launching GSM gateway/egress/subscriptions tenant={} team={} envs={:?}",
645 tenant, team_id, env_map
646 ),
647 );
648 }
649 let mut messaging_summary = spawn_embedded_messaging(
650 bundle_root,
651 tenant,
652 team_id,
653 env_map,
654 log_dir,
655 &restart_targets,
656 &mut service_tracker,
657 debug_enabled,
658 )?;
659 messaging_summary.add_detail("embedded messaging stack".to_string());
660 service_summaries.push(messaging_summary);
661 } else {
662 println!(
663 "{}",
664 crate::operator_i18n::tr(
665 "demo.runtime.messaging_embedded",
666 "messaging: running embedded runner (no gsm gateway/egress)"
667 )
668 );
669 }
670
671 println!(
672 "{}",
673 crate::operator_i18n::tr(
674 "demo.runtime.events_in_process",
675 "events: handled in-process (HTTP ingress + timer scheduler)"
676 )
677 );
678 print_service_summary(&service_summaries);
679
680 if !run_gsm_services {
681 operator_log::info(
682 module_path!(),
683 "demo running in embedded runner mode; gateway/egress disabled",
684 );
685 if debug_enabled {
686 operator_log::debug(
687 module_path!(),
688 format!(
689 "[demo dev] embedded runner mode only tenant={} team={} (gateway/egress/subscriptions skipped)",
690 tenant, team_id
691 ),
692 );
693 }
694 }
695
696 Ok(())
697}
698
699#[allow(clippy::too_many_arguments)]
700pub fn demo_up_services(
701 config_path: &Path,
702 config: &DemoConfig,
703 static_routes: &BundleStaticRoutesInspection,
704 configured_public_base_url: Option<String>,
705 cloudflared: Option<CloudflaredConfig>,
706 ngrok: Option<NgrokConfig>,
707 restart: &BTreeSet<String>,
708 runner_binary: Option<PathBuf>,
709 log_dir: &Path,
710 debug_enabled: bool,
711) -> anyhow::Result<ForegroundRuntimeHandles> {
712 let config_dir = config_path
713 .parent()
714 .ok_or_else(|| anyhow::anyhow!("config path has no parent directory"))?;
715 let state_dir = config_dir.join("state");
716 let tenant = config.tenant.as_str();
717 let team = config.team.as_str();
718 let paths = RuntimePaths::new(&state_dir, tenant, team);
719 let mut service_tracker = ServiceTracker::new(&paths, Some(log_dir))?;
720 let discovery = crate::discovery::discover(config_dir)?;
721 crate::discovery::persist(config_dir, tenant, &discovery)?;
722 let secrets_handle = secrets_gate::resolve_secrets_manager(config_dir, tenant, Some(team))?;
723 let runner_host = Arc::new(DemoRunnerHost::new(
724 config_dir.to_path_buf(),
725 &discovery,
726 runner_binary.clone(),
727 secrets_handle.clone(),
728 debug_enabled,
729 )?);
730 let ingress_domains = detect_http_ingress_domains(&discovery, runner_host.as_ref());
731 let enable_static_routes = static_routes.bundle_has_static_routes();
733 let ingress_server = start_http_ingress_server(
734 config,
735 &ingress_domains,
736 runner_host.clone(),
737 enable_static_routes,
738 )
739 .with_context(|| "failed to start local HTTP ingress server")?;
740 let run_gsm_services = config.services.nats.enabled;
741 operator_log::info(
742 module_path!(),
743 format!(
744 "demo start services start bundle={} tenant={} team={} log_dir={}",
745 config_path.display(),
746 tenant,
747 team,
748 log_dir.display()
749 ),
750 );
751 if debug_enabled {
752 operator_log::debug(
753 module_path!(),
754 format!(
755 "[demo verbose] bundle={} tenant={} team={} logging=debug",
756 config_path.display(),
757 tenant,
758 team
759 ),
760 );
761 }
762
763 if should_restart(restart, "cloudflared") {
764 let _ = supervisor::stop_pidfile(&paths.pid_path("cloudflared"), 2_000);
765 }
766 if should_restart(restart, "ngrok") {
767 let _ = supervisor::stop_pidfile(&paths.pid_path("ngrok"), 2_000);
768 }
769
770 let tunnel_public_base_url = if let Some(cfg) = cloudflared {
771 if ingress_server.is_none() {
772 operator_log::warn(
773 module_path!(),
774 "cloudflared requested but no local HTTP ingress listener is enabled; skipping tunnel startup",
775 );
776 None
777 } else {
778 let cloudflared_log = operator_log::reserve_service_log(log_dir, "cloudflared")
779 .with_context(|| "unable to open cloudflared.log")?;
780 operator_log::info(
781 module_path!(),
782 format!("starting cloudflared log={}", cloudflared_log.display()),
783 );
784 let handle = cloudflared::start_quick_tunnel(&paths, &cfg, &cloudflared_log)?;
785 let mut domain_labels = Vec::new();
786 if discovery.domains.messaging {
787 domain_labels.push("messaging");
788 }
789 if discovery.domains.events {
790 domain_labels.push("events");
791 }
792 if discovery.domains.oauth {
793 domain_labels.push("oauth");
794 }
795 let domain_list = if domain_labels.is_empty() {
796 "none".to_string()
797 } else {
798 domain_labels.join(",")
799 };
800 operator_log::info(
801 module_path!(),
802 format!(
803 "cloudflared ready domains={} url={} log={}",
804 domain_list,
805 handle.url,
806 handle.log_path.display()
807 ),
808 );
809 if debug_enabled {
810 operator_log::debug(
811 module_path!(),
812 format!(
813 "[demo dev] tenant={} team={} cloudflared domains={} url={} log={}",
814 tenant,
815 team,
816 domain_list,
817 handle.url,
818 handle.log_path.display()
819 ),
820 );
821 }
822 println!(
823 "{}",
824 crate::operator_i18n::trf(
825 "demo.runtime.public_url_cloudflared_domains",
826 "Public URL (service=cloudflared domains={}): {}",
827 &[&domain_list, &handle.url]
828 )
829 );
830 service_tracker.record_with_log(
831 "cloudflared",
832 "cloudflared",
833 Some(&handle.log_path),
834 )?;
835 Some(handle.url)
836 }
837 } else if let Some(cfg) = ngrok {
838 if ingress_server.is_none() {
839 operator_log::warn(
840 module_path!(),
841 "ngrok requested but no local HTTP ingress listener is enabled; skipping tunnel startup",
842 );
843 None
844 } else {
845 let ngrok_log = operator_log::reserve_service_log(log_dir, "ngrok")
846 .with_context(|| "unable to open ngrok.log")?;
847 operator_log::info(
848 module_path!(),
849 format!("starting ngrok log={}", ngrok_log.display()),
850 );
851 let handle = ngrok::start_tunnel(&paths, &cfg, &ngrok_log)?;
852 let mut domain_labels = Vec::new();
853 if discovery.domains.messaging {
854 domain_labels.push("messaging");
855 }
856 if discovery.domains.events {
857 domain_labels.push("events");
858 }
859 let domain_list = if domain_labels.is_empty() {
860 "none".to_string()
861 } else {
862 domain_labels.join(",")
863 };
864 operator_log::info(
865 module_path!(),
866 format!(
867 "ngrok ready domains={} url={} log={}",
868 domain_list,
869 handle.url,
870 handle.log_path.display()
871 ),
872 );
873 if debug_enabled {
874 operator_log::debug(
875 module_path!(),
876 format!(
877 "[demo dev] tenant={} team={} ngrok domains={} url={} log={}",
878 tenant,
879 team,
880 domain_list,
881 handle.url,
882 handle.log_path.display()
883 ),
884 );
885 }
886 println!(
887 "{}",
888 crate::operator_i18n::trf(
889 "demo.runtime.public_url_ngrok_domains",
890 "Public URL (service=ngrok domains={}): {}",
891 &[&domain_list, &handle.url]
892 )
893 );
894 service_tracker.record_with_log("ngrok", "ngrok", Some(&handle.log_path))?;
895 Some(handle.url)
896 }
897 } else {
898 None
899 };
900
901 let previous_public_url =
903 crate::webhook_updater::read_previous_public_url(&paths.runtime_root());
904
905 let public_base_url = tunnel_public_base_url
907 .clone()
908 .or(configured_public_base_url.clone())
909 .or_else(|| {
910 if ingress_server.is_some() && enable_static_routes {
912 let host = &config.services.gateway.listen_addr;
913 let port = config.services.gateway.port;
914 Some(format!("http://{}:{}", host, port))
915 } else {
916 None
917 }
918 });
919
920 if let Some(ref new_url) = public_base_url
922 && let Err(err) = crate::webhook_updater::update_webhooks_if_url_changed(
923 config_dir,
924 &discovery,
925 &secrets_handle,
926 tenant,
927 team,
928 previous_public_url.as_deref(),
929 new_url,
930 )
931 {
932 operator_log::warn(
933 module_path!(),
934 format!("[webhook-updater] failed to update webhooks: {}", err),
935 );
936 }
937
938 let http_listener_enabled = ingress_server.is_some();
941 let asset_serving_enabled = enable_static_routes;
942 let runtime_config = if let Some(url) = tunnel_public_base_url {
943 Some(RuntimeConfig {
944 public_base_url: Some(RuntimePublicBaseUrl {
945 value: url,
946 source: RuntimePublicBaseUrlSource::Tunnel,
947 }),
948 })
949 } else if let Some(url) = configured_public_base_url {
950 Some(RuntimeConfig {
951 public_base_url: Some(RuntimePublicBaseUrl {
952 value: url,
953 source: RuntimePublicBaseUrlSource::Configured,
954 }),
955 })
956 } else {
957 public_base_url.clone().map(|url| RuntimeConfig {
958 public_base_url: Some(RuntimePublicBaseUrl {
959 value: url,
960 source: RuntimePublicBaseUrlSource::Derived,
961 }),
962 })
963 };
964
965 let startup_contract = resolve_startup_contract(
966 static_routes,
967 http_listener_enabled,
968 asset_serving_enabled,
969 public_base_url.clone(),
970 runtime_config,
971 )?;
972 write_json(
973 &paths.runtime_root().join("startup_contract.json"),
974 &startup_contract,
975 )?;
976
977 if should_restart(restart, "nats") {
978 let _ = supervisor::stop_pidfile(&paths.pid_path("nats"), 2_000);
979 }
980
981 let nats_url = if config.services.nats.enabled {
982 if config.services.nats.spawn.enabled {
983 let spec = build_service_spec(
984 config_dir,
985 "nats",
986 &config.services.nats.spawn.binary,
987 &config.services.nats.spawn.args,
988 &build_env(tenant, team, None, Some(&startup_contract)),
989 )?;
990 log_service_spec_debug("nats", "nats", &spec, tenant, team, debug_enabled);
991 let nats_log = operator_log::reserve_service_log(log_dir, "nats")
992 .with_context(|| "unable to open nats.log")?;
993 if let Some(handle) = spawn_if_needed(&paths, &spec, restart, Some(nats_log.clone()))? {
994 service_tracker
995 .record_with_log("nats", "nats", Some(&handle.log_path))
996 .with_context(|| "failed to record nats service")?;
997 }
998 }
999 Some(config.services.nats.url.clone())
1000 } else {
1001 None
1002 };
1003
1004 operator_log::info(
1005 module_path!(),
1006 "events provider packs run in-process; external events components are disabled",
1007 );
1008
1009 if run_gsm_services {
1010 if should_restart(restart, "gateway") {
1011 let _ = supervisor::stop_pidfile(&paths.pid_path("gateway"), 2_000);
1012 }
1013 let gateway_spec = build_service_spec(
1014 config_dir,
1015 "gateway",
1016 &config.services.gateway.binary,
1017 &config.services.gateway.args,
1018 &build_env(tenant, team, nats_url.as_deref(), Some(&startup_contract)),
1019 )?;
1020 if let Some(handle) = spawn_if_needed(&paths, &gateway_spec, restart, None)? {
1021 service_tracker.record_with_log("gateway", "gateway", Some(&handle.log_path))?;
1022 }
1023
1024 if should_restart(restart, "egress") {
1025 let _ = supervisor::stop_pidfile(&paths.pid_path("egress"), 2_000);
1026 }
1027 let egress_spec = build_service_spec(
1028 config_dir,
1029 "egress",
1030 &config.services.egress.binary,
1031 &config.services.egress.args,
1032 &build_env(tenant, team, nats_url.as_deref(), Some(&startup_contract)),
1033 )?;
1034 if let Some(handle) = spawn_if_needed(&paths, &egress_spec, restart, None)? {
1035 service_tracker.record_with_log("egress", "egress", Some(&handle.log_path))?;
1036 }
1037
1038 match config.services.subscriptions.mode {
1039 DemoSubscriptionsMode::LegacyGsm => {
1040 if config.services.subscriptions.msgraph.enabled {
1041 if should_restart(restart, "subscriptions")
1042 || should_restart(restart, "msgraph")
1043 {
1044 let _ = supervisor::stop_pidfile(&paths.pid_path("subscriptions"), 2_000);
1045 }
1046 let mut args = config.services.subscriptions.msgraph.args.clone();
1047 if !config.services.subscriptions.msgraph.mode.is_empty() {
1048 args.insert(0, config.services.subscriptions.msgraph.mode.clone());
1049 }
1050 let spec = build_service_spec(
1051 config_dir,
1052 "subscriptions",
1053 &config.services.subscriptions.msgraph.binary,
1054 &args,
1055 &build_env(tenant, team, nats_url.as_deref(), Some(&startup_contract)),
1056 )?;
1057 if let Some(handle) = spawn_if_needed(&paths, &spec, restart, None)? {
1058 service_tracker.record_with_log(
1059 "subscriptions",
1060 "subscriptions",
1061 Some(&handle.log_path),
1062 )?;
1063 }
1064 }
1065 }
1066 DemoSubscriptionsMode::UniversalOps => {
1067 spawn_universal_subscriptions_service(
1068 config_dir,
1069 config,
1070 tenant,
1071 team,
1072 runner_binary.clone(),
1073 &mut service_tracker,
1074 log_dir,
1075 debug_enabled,
1076 )?;
1077 }
1078 }
1079 } else {
1080 println!(
1081 "{}",
1082 crate::operator_i18n::tr(
1083 "demo.runtime.messaging_embedded",
1084 "messaging: running embedded runner (no gsm gateway/egress)"
1085 )
1086 );
1087 println!(
1088 "{}",
1089 crate::operator_i18n::tr(
1090 "demo.runtime.events_in_process",
1091 "events: handled in-process (HTTP ingress + timer scheduler)"
1092 )
1093 );
1094 operator_log::info(
1095 module_path!(),
1096 "demo running in embedded runner mode; gateway/egress disabled",
1097 );
1098 if debug_enabled {
1099 operator_log::debug(
1100 module_path!(),
1101 format!(
1102 "[demo dev] embedded runner mode only tenant={} team={} (gateway/egress/subscriptions skipped)",
1103 tenant, team
1104 ),
1105 );
1106 }
1107 }
1108
1109 let endpoints = DemoEndpoints {
1110 tenant: tenant.to_string(),
1111 team: team.to_string(),
1112 public_base_url: startup_contract.public_base_url.clone(),
1113 nats_url,
1114 gateway_listen_addr: config.services.gateway.listen_addr.clone(),
1115 gateway_port: config.services.gateway.port,
1116 };
1117 write_json(&paths.runtime_root().join("endpoints.json"), &endpoints)?;
1118 Ok(ForegroundRuntimeHandles { ingress_server })
1119}
1120
1121fn detect_http_ingress_domains(
1122 discovery: &crate::discovery::DiscoveryResult,
1123 runner_host: &DemoRunnerHost,
1124) -> Vec<Domain> {
1125 let mut domains = Vec::new();
1126 for domain in [Domain::Messaging, Domain::Events, Domain::OAuth] {
1127 let supported = discovery.providers.iter().any(|provider| {
1128 parse_domain_name(&provider.domain) == Some(domain)
1129 && runner_host.supports_op(domain, &provider.provider_id, "ingest_http")
1130 });
1131 let fallback_supported = matches!(domain, Domain::Events) && discovery.domains.events;
1132 if supported || fallback_supported {
1133 domains.push(domain);
1134 }
1135 }
1136 domains
1137}
1138
1139fn parse_domain_name(value: &str) -> Option<Domain> {
1140 match value {
1141 "messaging" => Some(Domain::Messaging),
1142 "events" => Some(Domain::Events),
1143 "oauth" => Some(Domain::OAuth),
1144 "secrets" => Some(Domain::Secrets),
1145 _ => None,
1146 }
1147}
1148
1149fn start_http_ingress_server(
1150 config: &DemoConfig,
1151 domains: &[Domain],
1152 runner_host: Arc<DemoRunnerHost>,
1153 enable_static_routes: bool,
1154) -> anyhow::Result<Option<HttpIngressServer>> {
1155 if domains.is_empty() && !enable_static_routes {
1157 return Ok(None);
1158 }
1159 let addr = format!(
1160 "{}:{}",
1161 config.services.gateway.listen_addr, config.services.gateway.port
1162 );
1163 let bind_addr = addr
1164 .parse()
1165 .with_context(|| format!("invalid gateway listen address {addr}"))?;
1166 let server = HttpIngressServer::start(HttpIngressConfig {
1167 bind_addr,
1168 domains: domains.to_vec(),
1169 runner_host,
1170 enable_static_routes,
1171 })?;
1172 println!(
1173 "HTTP ingress ready at http://{}:{}",
1174 config.services.gateway.listen_addr, config.services.gateway.port
1175 );
1176 Ok(Some(server))
1177}
1178
1179pub fn demo_status_runtime(
1180 state_dir: &Path,
1181 tenant: &str,
1182 team: &str,
1183 verbose: bool,
1184) -> anyhow::Result<()> {
1185 let paths = RuntimePaths::new(state_dir, tenant, team);
1186 let statuses = supervisor::read_status(&paths)?;
1187 if statuses.is_empty() {
1188 println!(
1189 "{}",
1190 crate::operator_i18n::tr("demo.runtime.none_running", "none running")
1191 );
1192 return Ok(());
1193 }
1194 for status in statuses {
1195 let state = if status.running {
1196 crate::operator_i18n::tr("demo.runtime.status_running", "running")
1197 } else {
1198 crate::operator_i18n::tr("demo.runtime.status_stopped", "stopped")
1199 };
1200 let pid = status
1201 .pid
1202 .map(|value| value.to_string())
1203 .unwrap_or_else(|| "-".to_string());
1204 if verbose {
1205 println!(
1206 "{}: {} (pid={}, log={})",
1207 status.id.as_str(),
1208 &state,
1209 pid,
1210 status.log_path.display()
1211 );
1212 } else {
1213 println!("{}: {} (pid={})", status.id.as_str(), &state, pid);
1214 }
1215 }
1216 Ok(())
1217}
1218
1219pub fn demo_logs_runtime(
1220 state_dir: &Path,
1221 log_dir: &Path,
1222 tenant: &str,
1223 team: &str,
1224 service: &str,
1225 tail: bool,
1226) -> anyhow::Result<()> {
1227 let log_dir = resolve_manifest_log_dir(state_dir, tenant, team, log_dir)?;
1228 let log_path = if service == "operator" {
1229 log_dir.join("operator.log")
1230 } else {
1231 let tenant_log_path = tenant_log_path(&log_dir, service, tenant, team)?;
1232 select_log_path(&log_dir, service, tenant, &tenant_log_path)
1233 };
1234 if tail {
1235 return services::tail_log(&log_path);
1236 }
1237 let lines = read_last_lines(&log_path, 200)?;
1238 if !lines.is_empty() {
1239 println!("{lines}");
1240 }
1241 Ok(())
1242}
1243
1244pub fn demo_down_runtime(
1245 state_dir: &Path,
1246 tenant: &str,
1247 team: &str,
1248 all: bool,
1249) -> anyhow::Result<()> {
1250 let timeout_ms = 2_000;
1251 let paths = RuntimePaths::new(state_dir, tenant, team);
1252 stop_started_nats(&paths, state_dir)?;
1253 ngrok::stop_ngrok();
1255 cloudflared::stop_cloudflared();
1256 if all {
1257 let pids_root = state_dir.join("pids");
1258 if !pids_root.exists() {
1259 println!(
1260 "{}",
1261 crate::operator_i18n::tr(
1262 "demo.runtime.no_services_to_stop",
1263 "No supervised background services to stop. If runtime was started in the foreground, stop it in the original terminal with Ctrl+C."
1264 )
1265 );
1266 return Ok(());
1267 }
1268 for entry in std::fs::read_dir(&pids_root)? {
1269 let entry = entry?;
1270 if !entry.file_type()?.is_dir() {
1271 continue;
1272 }
1273 for pidfile in std::fs::read_dir(entry.path())? {
1274 let pidfile = pidfile?;
1275 if pidfile.path().extension().and_then(|ext| ext.to_str()) != Some("pid") {
1276 continue;
1277 }
1278 let _ = supervisor::stop_pidfile(&pidfile.path(), timeout_ms);
1279 }
1280 }
1281 remove_service_manifest(&paths)?;
1282 println!(
1283 "{}",
1284 crate::operator_i18n::trf(
1285 "demo.runtime.stopped_all_under",
1286 "Stopped all services under {}",
1287 &[&pids_root.display().to_string()]
1288 )
1289 );
1290 return Ok(());
1291 }
1292
1293 if let Some(manifest) = read_service_manifest(&paths)? {
1294 if manifest.services.is_empty() {
1295 println!(
1296 "{}",
1297 crate::operator_i18n::tr(
1298 "demo.runtime.no_services_to_stop",
1299 "No supervised background services to stop. If runtime was started in the foreground, stop it in the original terminal with Ctrl+C."
1300 )
1301 );
1302 return Ok(());
1303 }
1304 for entry in manifest.services.iter().rev() {
1305 let id = supervisor::ServiceId::new(entry.id.clone())?;
1306 if let Err(err) = supervisor::stop_service(&paths, &id, timeout_ms) {
1307 eprintln!(
1308 "{}",
1309 crate::operator_i18n::trf(
1310 "demo.runtime.warn_failed_stop_service",
1311 "Warning: failed to stop {}: {}",
1312 &[&entry.id, &err.to_string()]
1313 )
1314 );
1315 }
1316 }
1317 remove_service_manifest(&paths)?;
1318 return Ok(());
1319 }
1320
1321 let pids_dir = paths.pids_dir();
1322 if !pids_dir.exists() {
1323 println!(
1324 "{}",
1325 crate::operator_i18n::tr(
1326 "demo.runtime.no_services_to_stop",
1327 "No supervised background services to stop. If runtime was started in the foreground, stop it in the original terminal with Ctrl+C."
1328 )
1329 );
1330 return Ok(());
1331 }
1332 for entry in std::fs::read_dir(&pids_dir)? {
1333 let entry = entry?;
1334 let path = entry.path();
1335 if path.extension().and_then(|ext| ext.to_str()) != Some("pid") {
1336 continue;
1337 }
1338 supervisor::stop_pidfile(&path, timeout_ms)?;
1339 }
1340 Ok(())
1341}
1342
1343fn select_log_path(log_dir: &Path, service: &str, tenant: &str, tenant_log: &Path) -> PathBuf {
1344 let candidates = [
1345 log_dir.join(format!("{service}.log")),
1346 log_dir.join(format!("{service}-{tenant}.log")),
1347 log_dir.join(format!("{service}.{tenant}.log")),
1348 ];
1349 for candidate in &candidates {
1350 if candidate.exists() {
1351 return candidate.clone();
1352 }
1353 }
1354 if tenant_log.exists() {
1355 return tenant_log.to_path_buf();
1356 }
1357 let _ = ensure_log_file(tenant_log);
1358 tenant_log.to_path_buf()
1359}
1360
1361fn tenant_log_path(
1362 log_dir: &Path,
1363 service: &str,
1364 tenant: &str,
1365 team: &str,
1366) -> anyhow::Result<PathBuf> {
1367 let tenant_dir = log_dir.join(format!("{tenant}.{team}"));
1368 let path = tenant_dir.join(format!("{service}.log"));
1369 ensure_log_file(&path)?;
1370 Ok(path)
1371}
1372
1373fn ensure_log_file(path: &Path) -> anyhow::Result<()> {
1374 if let Some(parent) = path.parent() {
1375 std::fs::create_dir_all(parent)?;
1376 }
1377 if !path.exists() {
1378 std::fs::File::create(path)?;
1379 }
1380 Ok(())
1381}
1382
1383fn resolve_manifest_log_dir(
1384 state_dir: &Path,
1385 tenant: &str,
1386 team: &str,
1387 default: &Path,
1388) -> anyhow::Result<PathBuf> {
1389 let paths = RuntimePaths::new(state_dir, tenant, team);
1390 if let Some(manifest) = read_service_manifest(&paths)?
1391 && let Some(dir) = manifest.log_dir
1392 {
1393 return Ok(PathBuf::from(dir));
1394 }
1395 Ok(default.to_path_buf())
1396}
1397
1398fn build_env(
1399 tenant: &str,
1400 team: &str,
1401 nats_url: Option<&str>,
1402 startup_contract: Option<&StartupContract>,
1403) -> BTreeMap<String, String> {
1404 let mut env = BTreeMap::new();
1405 env.insert("GREENTIC_TENANT".to_string(), tenant.to_string());
1406 env.insert("GREENTIC_TEAM".to_string(), team.to_string());
1407 if let Some(url) = nats_url {
1408 env.insert("NATS_URL".to_string(), url.to_string());
1409 }
1410 if let Some(contract) = startup_contract {
1411 contract.apply_env(&mut env);
1412 }
1413 env
1414}
1415
1416fn resolve_startup_contract(
1417 static_routes: &BundleStaticRoutesInspection,
1418 http_listener_enabled: bool,
1419 asset_serving_enabled: bool,
1420 public_base_url: Option<String>,
1421 runtime_config: Option<RuntimeConfig>,
1422) -> anyhow::Result<StartupContract> {
1423 crate::startup_contract::resolve(StartupContractInput {
1424 bundle_has_static_routes: static_routes.bundle_has_static_routes(),
1425 http_listener_enabled,
1426 asset_serving_enabled,
1427 public_base_url,
1428 runtime_config,
1429 })
1430}
1431
1432fn mark_nats_started(paths: &RuntimePaths) -> anyhow::Result<()> {
1433 let marker = nats_started_marker(paths);
1434 if let Some(parent) = marker.parent() {
1435 std::fs::create_dir_all(parent)?;
1436 }
1437 std::fs::write(marker, "started")?;
1438 Ok(())
1439}
1440
1441fn stop_started_nats(paths: &RuntimePaths, state_dir: &Path) -> anyhow::Result<()> {
1442 let marker = nats_started_marker(paths);
1443 if !marker.exists() {
1444 return Ok(());
1445 }
1446 let bundle_root = state_dir.parent().unwrap_or(state_dir);
1447 match services::stop_nats(bundle_root) {
1448 Ok(_) => {
1449 let _ = std::fs::remove_file(&marker);
1450 }
1451 Err(err) => {
1452 eprintln!(
1453 "{}",
1454 crate::operator_i18n::trf(
1455 "demo.runtime.warn_failed_stop_nats",
1456 "Warning: failed to stop nats: {}",
1457 &[&err.to_string()]
1458 )
1459 );
1460 }
1461 }
1462 Ok(())
1463}
1464
1465fn nats_started_marker(paths: &RuntimePaths) -> PathBuf {
1466 paths.runtime_root().join("nats.started")
1467}
1468
1469fn build_service_spec(
1470 config_dir: &Path,
1471 service_id: &str,
1472 binary: &str,
1473 args: &[String],
1474 env: &BTreeMap<String, String>,
1475) -> anyhow::Result<supervisor::ServiceSpec> {
1476 let explicit = if looks_like_path(binary) {
1477 let path = Path::new(binary);
1478 Some(if path.is_absolute() {
1479 path.to_path_buf()
1480 } else {
1481 config_dir.join(path)
1482 })
1483 } else {
1484 None
1485 };
1486 let path = crate::bin_resolver::resolve_binary(
1487 binary,
1488 &crate::bin_resolver::ResolveCtx {
1489 config_dir: config_dir.to_path_buf(),
1490 explicit_path: explicit,
1491 },
1492 )?;
1493 let mut argv = vec![path.to_string_lossy().to_string()];
1494 argv.extend(args.iter().cloned());
1495 Ok(supervisor::ServiceSpec {
1496 id: supervisor::ServiceId::new(service_id)?,
1497 argv,
1498 cwd: None,
1499 env: env.clone(),
1500 })
1501}
1502
1503#[derive(serde::Serialize)]
1504struct DemoEndpoints {
1505 tenant: String,
1506 team: String,
1507 public_base_url: Option<String>,
1508 nats_url: Option<String>,
1509 gateway_listen_addr: String,
1510 gateway_port: u16,
1511}
1512
1513fn read_last_lines(path: &Path, count: usize) -> anyhow::Result<String> {
1514 if !path.exists() {
1515 return Err(anyhow::anyhow!(
1516 "Log file does not exist: {}",
1517 path.display()
1518 ));
1519 }
1520 let contents = std::fs::read_to_string(path)?;
1521 let mut lines: Vec<&str> = contents.lines().collect();
1522 if lines.len() > count {
1523 lines = lines.split_off(lines.len() - count);
1524 }
1525 Ok(lines.join("\n"))
1526}
1527
1528#[cfg(test)]
1529mod tests {
1530 use super::*;
1531 use std::fs;
1532 use tempfile::tempdir;
1533
1534 #[test]
1535 fn tenant_log_path_creates_file() -> anyhow::Result<()> {
1536 let dir = tempdir()?;
1537 let path = tenant_log_path(dir.path(), "messaging", "demo", "default")?;
1538 assert!(path.exists());
1539 Ok(())
1540 }
1541
1542 #[test]
1543 fn select_log_path_prefers_service_log_when_present() -> anyhow::Result<()> {
1544 let dir = tempdir()?;
1545 let tenant_path = tenant_log_path(dir.path(), "messaging", "demo", "default")?;
1546 let service_path = dir.path().join("messaging.log");
1547 fs::write(&service_path, "other")?;
1548 let selected = select_log_path(dir.path(), "messaging", "demo", &tenant_path);
1549 assert_eq!(selected, service_path);
1550 Ok(())
1551 }
1552
1553 #[test]
1554 fn demo_logs_runtime_reads_operator_log() -> anyhow::Result<()> {
1555 let dir = tempdir()?;
1556 let log = dir.path().join("operator.log");
1557 fs::write(&log, "operator ready")?;
1558 demo_logs_runtime(dir.path(), dir.path(), "demo", "default", "operator", false)?;
1559 Ok(())
1560 }
1561}