1#![doc = include_str!("../README.md")]
2#![warn(missing_docs)]
3
4use std::fs;
13use std::path::{Path, PathBuf};
14use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
15use std::sync::{Arc, Mutex, Weak};
16use std::time::Duration;
17
18use serde::{Deserialize, Serialize};
19use tailtriage_core::{
20 BuildError, CaptureLimitsOverride, CaptureMode, InflightGuard, Outcome, OwnedRequestCompletion,
21 OwnedRequestHandle, QueueTimer, RequestOptions, RunEndReason, StageTimer, Tailtriage,
22};
23use tailtriage_tokio::{RuntimeSampler, SamplerStartError};
24
25#[derive(Debug, Clone)]
27pub struct TailtriageControllerBuilder {
28 service_name: String,
29 config_path: Option<PathBuf>,
30 initially_enabled: bool,
31 sink_template: ControllerSinkTemplate,
32 capture_limits_override: CaptureLimitsOverride,
33 strict_lifecycle: bool,
34 runtime_sampler: RuntimeSamplerTemplate,
35 run_end_policy: RunEndPolicy,
36}
37
38impl TailtriageControllerBuilder {
39 #[must_use]
41 pub fn new(service_name: impl Into<String>) -> Self {
42 Self {
43 service_name: service_name.into(),
44 config_path: None,
45 initially_enabled: false,
46 sink_template: ControllerSinkTemplate::LocalJson {
47 output_path: PathBuf::from("tailtriage-run.json"),
48 },
49 capture_limits_override: CaptureLimitsOverride::default(),
50 strict_lifecycle: false,
51 runtime_sampler: RuntimeSamplerTemplate::default(),
52 run_end_policy: RunEndPolicy::ContinueAfterLimitsHit,
53 }
54 }
55
56 #[must_use]
58 pub fn config_path(mut self, config_path: impl AsRef<Path>) -> Self {
59 self.config_path = Some(config_path.as_ref().to_path_buf());
60 self
61 }
62
63 #[must_use]
68 pub const fn initially_enabled(mut self, initially_enabled: bool) -> Self {
69 self.initially_enabled = initially_enabled;
70 self
71 }
72
73 #[must_use]
75 pub fn output(mut self, output_path: impl AsRef<Path>) -> Self {
76 self.sink_template = ControllerSinkTemplate::LocalJson {
77 output_path: output_path.as_ref().to_path_buf(),
78 };
79 self
80 }
81
82 #[must_use]
84 pub const fn capture_limits_override(
85 mut self,
86 capture_limits_override: CaptureLimitsOverride,
87 ) -> Self {
88 self.capture_limits_override = capture_limits_override;
89 self
90 }
91
92 #[must_use]
94 pub const fn strict_lifecycle(mut self, strict_lifecycle: bool) -> Self {
95 self.strict_lifecycle = strict_lifecycle;
96 self
97 }
98
99 #[must_use]
101 pub const fn runtime_sampler(mut self, runtime_sampler: RuntimeSamplerTemplate) -> Self {
102 self.runtime_sampler = runtime_sampler;
103 self
104 }
105
106 #[must_use]
108 pub const fn run_end_policy(mut self, run_end_policy: RunEndPolicy) -> Self {
109 self.run_end_policy = run_end_policy;
110 self
111 }
112
113 pub fn build(self) -> Result<TailtriageController, ControllerBuildError> {
130 let mut service_name = self.service_name;
131 let mut initially_enabled = self.initially_enabled;
132 let mut sink_template = self.sink_template;
133 let mut selected_mode = CaptureMode::Light;
134 let mut capture_limits_override = self.capture_limits_override;
135 let mut strict_lifecycle = self.strict_lifecycle;
136 let mut runtime_sampler = self.runtime_sampler;
137 let mut run_end_policy = self.run_end_policy;
138
139 if let Some(config_path) = self.config_path.as_ref() {
140 let loaded = TailtriageController::load_config_from_path(config_path)
141 .map_err(ControllerBuildError::ConfigLoad)?;
142 let activation = loaded.activation_template;
143 service_name = loaded.service_name.unwrap_or(service_name);
144 initially_enabled = loaded.initially_enabled.unwrap_or(initially_enabled);
145 sink_template = activation.sink_template;
146 selected_mode = activation.selected_mode;
147 capture_limits_override = activation.capture_limits_override;
148 strict_lifecycle = activation.strict_lifecycle;
149 runtime_sampler = activation.runtime_sampler;
150 run_end_policy = activation.run_end_policy;
151 }
152
153 if service_name.trim().is_empty() {
154 return Err(ControllerBuildError::EmptyServiceName);
155 }
156
157 let template = TailtriageControllerTemplate {
158 service_name,
159 config_path: self.config_path,
160 sink_template,
161 selected_mode: CaptureMode::Light,
162 capture_limits_override,
163 strict_lifecycle,
164 runtime_sampler,
165 run_end_policy,
166 };
167 let template = TailtriageControllerTemplate {
168 selected_mode,
169 ..template
170 };
171
172 let inner = Arc::new(ControllerInner {
173 template: Mutex::new(template),
174 lifecycle: Mutex::new(ControllerLifecycle::Disabled { next_generation: 1 }),
175 inert_request_seq: AtomicU64::new(1),
176 });
177
178 let controller = TailtriageController { inner };
179 if initially_enabled {
180 controller
181 .enable()
182 .map_err(ControllerBuildError::InitialEnable)?;
183 }
184
185 Ok(controller)
186 }
187}
188
189#[derive(Debug, Clone)]
191pub struct TailtriageController {
192 inner: Arc<ControllerInner>,
193}
194
195#[derive(Debug)]
196struct ControllerInner {
197 template: Mutex<TailtriageControllerTemplate>,
198 lifecycle: Mutex<ControllerLifecycle>,
199 inert_request_seq: AtomicU64,
200}
201
202#[derive(Debug)]
203struct ActiveGenerationRuntime {
204 state: ActiveGenerationState,
205 artifact_path: PathBuf,
206 run: Arc<Tailtriage>,
207 accepting_new: AtomicBool,
208 closing: AtomicBool,
209 inflight_captured: AtomicU64,
210 finalize_started: AtomicBool,
211 last_finalize_error: Mutex<Option<String>>,
212 runtime_sampler: Mutex<Option<RuntimeSampler>>,
213}
214
215impl ActiveGenerationRuntime {
216 fn snapshot(&self) -> ActiveGenerationState {
217 ActiveGenerationState {
218 generation_id: self.state.generation_id,
219 started_at_unix_ms: self.state.started_at_unix_ms,
220 artifact_path: self.artifact_path.clone(),
221 accepting_new_admissions: self.accepting_new.load(Ordering::Relaxed),
222 closing: self.closing.load(Ordering::Relaxed),
223 inflight_captured_requests: self.inflight_captured.load(Ordering::Relaxed),
224 finalization_in_progress: self.finalize_started.load(Ordering::Relaxed),
225 last_finalize_error: self
226 .last_finalize_error
227 .lock()
228 .unwrap_or_else(std::sync::PoisonError::into_inner)
229 .clone(),
230 activation_config: self.state.activation_config.clone(),
231 }
232 }
233
234 fn clear_finalize_error(&self) {
235 let mut last_error = self
236 .last_finalize_error
237 .lock()
238 .unwrap_or_else(std::sync::PoisonError::into_inner);
239 *last_error = None;
240 }
241
242 fn record_finalize_error(&self, error: &DisableError) {
243 let mut last_error = self
244 .last_finalize_error
245 .lock()
246 .unwrap_or_else(std::sync::PoisonError::into_inner);
247 *last_error = Some(error.to_string());
248 }
249}
250
251impl TailtriageController {
252 fn validate_template(template: &TailtriageControllerTemplate) -> Result<(), BuildError> {
253 let artifact_path = generated_artifact_path(&template.sink_template, 1);
254 let run_id = format!("{}-generation-1", template.service_name);
255
256 let mut builder = Tailtriage::builder(template.service_name.clone())
257 .run_id(run_id)
258 .output(&artifact_path);
259 builder = match template.selected_mode {
260 CaptureMode::Light => builder.light(),
261 CaptureMode::Investigation => builder.investigation(),
262 };
263 builder = builder.capture_limits_override(template.capture_limits_override);
264 builder = builder.strict_lifecycle(template.strict_lifecycle);
265 let _ = builder.build()?;
266 Ok(())
267 }
268
269 fn next_inert_request_id(&self) -> String {
270 let id = self.inner.inert_request_seq.fetch_add(1, Ordering::Relaxed);
271 format!("inert-{id}")
272 }
273
274 #[must_use]
276 pub fn builder(service_name: impl Into<String>) -> TailtriageControllerBuilder {
277 TailtriageControllerBuilder::new(service_name)
278 }
279
280 pub fn load_config_from_path(
289 path: impl AsRef<Path>,
290 ) -> Result<LoadedControllerConfig, ConfigLoadError> {
291 let path = path.as_ref();
292 let file = ControllerConfigFile::from_path(path)?;
293 Ok(file.into_loaded())
294 }
295
296 #[must_use]
299 pub fn status(&self) -> TailtriageControllerStatus {
300 let template = self
301 .inner
302 .template
303 .lock()
304 .unwrap_or_else(std::sync::PoisonError::into_inner);
305 let lifecycle = self
306 .inner
307 .lifecycle
308 .lock()
309 .unwrap_or_else(std::sync::PoisonError::into_inner);
310
311 TailtriageControllerStatus {
312 template: template.clone(),
313 generation: lifecycle.snapshot(),
314 }
315 }
316
317 pub fn reload_template(&self, next_template: TailtriageControllerTemplate) {
326 self.try_reload_template(next_template)
327 .expect("invalid template for reload_template");
328 }
329
330 pub fn try_reload_template(
342 &self,
343 next_template: TailtriageControllerTemplate,
344 ) -> Result<(), ReloadTemplateError> {
345 Self::validate_template(&next_template).map_err(ReloadTemplateError::Validate)?;
346 let mut template = self
347 .inner
348 .template
349 .lock()
350 .unwrap_or_else(std::sync::PoisonError::into_inner);
351 *template = next_template;
352 Ok(())
353 }
354
355 pub fn reload_config(&self) -> Result<(), ReloadConfigError> {
366 let (config_path, service_name) = {
367 let template = self
368 .inner
369 .template
370 .lock()
371 .unwrap_or_else(std::sync::PoisonError::into_inner);
372 let Some(config_path) = template.config_path.clone() else {
373 return Err(ReloadConfigError::MissingConfigPath);
374 };
375 (config_path, template.service_name.clone())
376 };
377
378 let loaded = TailtriageController::load_config_from_path(&config_path)
379 .map_err(ReloadConfigError::Load)?;
380 let activation = loaded.activation_template;
381 let validated = TailtriageControllerTemplate {
382 service_name: loaded.service_name.unwrap_or(service_name),
383 config_path: Some(config_path),
384 sink_template: activation.sink_template,
385 selected_mode: activation.selected_mode,
386 capture_limits_override: activation.capture_limits_override,
387 strict_lifecycle: activation.strict_lifecycle,
388 runtime_sampler: activation.runtime_sampler,
389 run_end_policy: activation.run_end_policy,
390 };
391
392 Self::validate_template(&validated).map_err(ReloadConfigError::Validate)?;
393
394 let mut template = self
395 .inner
396 .template
397 .lock()
398 .unwrap_or_else(std::sync::PoisonError::into_inner);
399 *template = validated;
400
401 Ok(())
402 }
403
404 pub fn enable(&self) -> Result<ActiveGenerationState, EnableError> {
420 let template = self
421 .inner
422 .template
423 .lock()
424 .unwrap_or_else(std::sync::PoisonError::into_inner)
425 .clone();
426
427 let mut lifecycle = self
428 .inner
429 .lifecycle
430 .lock()
431 .unwrap_or_else(std::sync::PoisonError::into_inner);
432
433 let next_generation = match *lifecycle {
434 ControllerLifecycle::Disabled { next_generation } => next_generation,
435 ControllerLifecycle::Active { ref active, .. } => {
436 return Err(EnableError::AlreadyActive {
437 generation_id: active.state.generation_id,
438 });
439 }
440 };
441
442 let artifact_path = generated_artifact_path(&template.sink_template, next_generation);
443 let run_id = format!("{}-generation-{next_generation}", template.service_name);
444
445 let mut builder = Tailtriage::builder(template.service_name.clone())
446 .run_id(run_id)
447 .output(&artifact_path);
448
449 builder = match template.selected_mode {
450 CaptureMode::Light => builder.light(),
451 CaptureMode::Investigation => builder.investigation(),
452 };
453 builder = builder.capture_limits_override(template.capture_limits_override);
454 builder = builder.strict_lifecycle(template.strict_lifecycle);
455
456 let run = Arc::new(builder.build().map_err(EnableError::Build)?);
457 let runtime = Arc::new(ActiveGenerationRuntime {
458 state: ActiveGenerationState {
459 generation_id: next_generation,
460 started_at_unix_ms: tailtriage_core::unix_time_ms(),
461 artifact_path: artifact_path.clone(),
462 accepting_new_admissions: true,
463 closing: false,
464 inflight_captured_requests: 0,
465 finalization_in_progress: false,
466 last_finalize_error: None,
467 activation_config: ControllerActivationTemplate {
468 sink_template: template.sink_template.clone(),
469 selected_mode: template.selected_mode,
470 capture_limits_override: template.capture_limits_override,
471 strict_lifecycle: template.strict_lifecycle,
472 runtime_sampler: template.runtime_sampler,
473 run_end_policy: template.run_end_policy,
474 },
475 },
476 artifact_path,
477 run: Arc::clone(&run),
478 accepting_new: AtomicBool::new(true),
479 closing: AtomicBool::new(false),
480 inflight_captured: AtomicU64::new(0),
481 finalize_started: AtomicBool::new(false),
482 last_finalize_error: Mutex::new(None),
483 runtime_sampler: Mutex::new(None),
484 });
485 if template.run_end_policy == RunEndPolicy::AutoSealOnLimitsHit {
486 let active = Arc::downgrade(&runtime);
487 let inner = Arc::downgrade(&self.inner);
488 let listener: Arc<dyn Fn() + Send + Sync> = Arc::new(move || {
489 TailtriageController::on_limits_hit_signal(&inner, &active);
490 });
491 runtime.run.set_limits_hit_listener(Some(listener));
492 }
493
494 if template.runtime_sampler.enabled_for_armed_runs {
495 let _ = tokio::runtime::Handle::try_current()
496 .map_err(|_| EnableError::MissingTokioRuntimeForSampler)?;
497 let mut sampler_builder = RuntimeSampler::builder(Arc::clone(&run));
498 if let Some(mode_override) = template.runtime_sampler.mode_override {
499 sampler_builder = sampler_builder.mode(mode_override);
500 }
501 if let Some(interval_ms) = template.runtime_sampler.interval_ms {
502 sampler_builder = sampler_builder.interval(Duration::from_millis(interval_ms));
503 }
504 if let Some(max_runtime_snapshots) = template.runtime_sampler.max_runtime_snapshots {
505 sampler_builder = sampler_builder.max_runtime_snapshots(max_runtime_snapshots);
506 }
507 let runtime_sampler = sampler_builder
508 .start()
509 .map_err(EnableError::StartRuntimeSampler)?;
510 let mut sampler_slot = runtime
511 .runtime_sampler
512 .lock()
513 .unwrap_or_else(std::sync::PoisonError::into_inner);
514 *sampler_slot = Some(runtime_sampler);
515 }
516
517 *lifecycle = ControllerLifecycle::Active {
518 active: Arc::clone(&runtime),
519 next_generation: next_generation.saturating_add(1),
520 };
521
522 Ok(runtime.snapshot())
523 }
524
525 pub fn disable(&self) -> Result<DisableOutcome, DisableError> {
536 let (active, next_generation, generation_id) = {
537 let lifecycle = self
538 .inner
539 .lifecycle
540 .lock()
541 .unwrap_or_else(std::sync::PoisonError::into_inner);
542
543 let ControllerLifecycle::Active {
544 ref active,
545 next_generation,
546 } = *lifecycle
547 else {
548 return Ok(DisableOutcome::AlreadyDisabled);
549 };
550
551 active
552 .run
553 .set_run_end_reason_if_absent(RunEndReason::ManualDisarm);
554 active.accepting_new.store(false, Ordering::Relaxed);
555 active.closing.store(true, Ordering::Relaxed);
556
557 if active.inflight_captured.load(Ordering::Relaxed) == 0 {
558 (
559 Some(Arc::clone(active)),
560 Some(next_generation),
561 active.state.generation_id,
562 )
563 } else {
564 return Ok(DisableOutcome::Closing {
565 generation_id: active.state.generation_id,
566 inflight_captured_requests: active.inflight_captured.load(Ordering::Relaxed),
567 });
568 }
569 };
570
571 if let (Some(active), Some(next_generation)) = (active, next_generation) {
572 Self::finalize_active(&self.inner, &active, next_generation)?;
573 }
574
575 Ok(DisableOutcome::Finalized { generation_id })
576 }
577
578 pub fn begin_request_with(
591 &self,
592 route: impl Into<String>,
593 options: RequestOptions,
594 ) -> ControllerStartedRequest {
595 let route = route.into();
596 if let Some(started) = self.try_begin_request_with(route.clone(), options.clone()) {
597 return started;
598 }
599
600 ControllerStartedRequest {
601 handle: ControllerRequestHandle::Inert(InertControllerRequestHandle::new(
602 route,
603 options,
604 self.next_inert_request_id(),
605 )),
606 completion: ControllerRequestCompletion {
607 kind: ControllerCompletionKind::Inert,
608 },
609 }
610 }
611
612 pub fn begin_request(&self, route: impl Into<String>) -> ControllerStartedRequest {
614 self.begin_request_with(route, RequestOptions::new())
615 }
616
617 #[must_use]
628 pub fn try_begin_request_with(
629 &self,
630 route: impl Into<String>,
631 options: RequestOptions,
632 ) -> Option<ControllerStartedRequest> {
633 let active = {
634 let lifecycle = self
635 .inner
636 .lifecycle
637 .lock()
638 .unwrap_or_else(std::sync::PoisonError::into_inner);
639
640 match *lifecycle {
641 ControllerLifecycle::Active { ref active, .. } => Arc::clone(active),
642 ControllerLifecycle::Disabled { .. } => return None,
643 }
644 };
645
646 if !active.accepting_new.load(Ordering::Acquire) {
647 return None;
648 }
649
650 if active.state.activation_config.run_end_policy == RunEndPolicy::AutoSealOnLimitsHit
651 && active.run.snapshot().truncation.limits_hit
652 {
653 active
654 .run
655 .set_run_end_reason_if_absent(RunEndReason::AutoSealOnLimitsHit);
656 active.accepting_new.store(false, Ordering::Release);
657 active.closing.store(true, Ordering::Release);
658 if active.inflight_captured.load(Ordering::Acquire) == 0 {
659 let _ = self.force_finalize_generation(&active);
660 }
661 return None;
662 }
663
664 active.inflight_captured.fetch_add(1, Ordering::AcqRel);
665 if !active.accepting_new.load(Ordering::Acquire) {
666 active.inflight_captured.fetch_sub(1, Ordering::AcqRel);
667 return None;
668 }
669
670 let started = active.run.begin_request_with_owned(route, options);
674 Self::apply_run_end_policy_if_limits_hit(&active);
675
676 Some(ControllerStartedRequest {
677 handle: ControllerRequestHandle::Active(started.handle),
678 completion: ControllerRequestCompletion {
679 kind: ControllerCompletionKind::Active(ActiveControllerCompletion {
680 completion: Some(started.completion),
681 admission_generation_id: active.state.generation_id,
682 admitted_generation: Arc::downgrade(&active),
683 inner: Arc::downgrade(&self.inner),
684 run_end_policy: active.state.activation_config.run_end_policy,
685 inflight_recorded: true,
686 }),
687 },
688 })
689 }
690
691 #[must_use]
695 pub fn try_begin_request(&self, route: impl Into<String>) -> Option<ControllerStartedRequest> {
696 self.try_begin_request_with(route, RequestOptions::new())
697 }
698
699 pub fn shutdown(&self) -> Result<(), ShutdownError> {
710 let maybe_active = {
711 let lifecycle = self
712 .inner
713 .lifecycle
714 .lock()
715 .unwrap_or_else(std::sync::PoisonError::into_inner);
716 match *lifecycle {
717 ControllerLifecycle::Active { ref active, .. } => Some(Arc::clone(active)),
718 ControllerLifecycle::Disabled { .. } => None,
719 }
720 };
721
722 if let Some(active) = maybe_active {
723 active
724 .run
725 .set_run_end_reason_if_absent(RunEndReason::Shutdown);
726 active.accepting_new.store(false, Ordering::Relaxed);
727 active.closing.store(true, Ordering::Relaxed);
728 self.force_finalize_generation(&active)
729 .map_err(ShutdownError::Finalize)?;
730 }
731
732 Ok(())
733 }
734
735 fn force_finalize_generation(
736 &self,
737 active: &Arc<ActiveGenerationRuntime>,
738 ) -> Result<(), DisableError> {
739 Self::finalize_generation_shared(&self.inner, active)
740 }
741
742 fn finalize_generation_shared(
743 inner: &Arc<ControllerInner>,
744 active: &Arc<ActiveGenerationRuntime>,
745 ) -> Result<(), DisableError> {
746 let next_generation = {
747 let lifecycle = inner
748 .lifecycle
749 .lock()
750 .unwrap_or_else(std::sync::PoisonError::into_inner);
751 match *lifecycle {
752 ControllerLifecycle::Active {
753 active: ref current_active,
754 next_generation,
755 } if current_active.state.generation_id == active.state.generation_id => {
756 next_generation
757 }
758 _ => return Ok(()),
759 }
760 };
761
762 Self::finalize_active(inner, active, next_generation)
763 }
764
765 fn finalize_active(
766 inner: &Arc<ControllerInner>,
767 active: &Arc<ActiveGenerationRuntime>,
768 next_generation: u64,
769 ) -> Result<(), DisableError> {
770 if active
771 .finalize_started
772 .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
773 .is_err()
774 {
775 return Ok(());
776 }
777
778 active.clear_finalize_error();
779 Self::stop_runtime_sampler(active);
780 if let Err(source) = active.run.shutdown() {
781 let error = DisableError::Finalize(source);
782 active.record_finalize_error(&error);
783 active.finalize_started.store(false, Ordering::Release);
784 return Err(error);
785 }
786
787 let mut lifecycle = inner
788 .lifecycle
789 .lock()
790 .unwrap_or_else(std::sync::PoisonError::into_inner);
791
792 if matches!(
793 *lifecycle,
794 ControllerLifecycle::Active {
795 active: ref current_active,
796 next_generation: ng,
797 } if current_active.state.generation_id == active.state.generation_id && ng == next_generation
798 ) {
799 *lifecycle = ControllerLifecycle::Disabled { next_generation };
800 }
801
802 Ok(())
803 }
804
805 fn stop_runtime_sampler(active: &Arc<ActiveGenerationRuntime>) {
806 let sampler = active
807 .runtime_sampler
808 .lock()
809 .unwrap_or_else(std::sync::PoisonError::into_inner)
810 .take();
811 if let Some(sampler) = sampler {
812 let shutdown_thread = std::thread::spawn(move || {
813 let runtime = tokio::runtime::Builder::new_current_thread()
814 .enable_all()
815 .build()
816 .expect("sampler shutdown runtime should build");
817 runtime.block_on(sampler.shutdown());
818 });
819 let _ = shutdown_thread.join();
820 }
821 }
822
823 fn apply_run_end_policy_if_limits_hit(active: &Arc<ActiveGenerationRuntime>) {
824 if active.state.activation_config.run_end_policy != RunEndPolicy::AutoSealOnLimitsHit {
825 return;
826 }
827
828 if !active.run.snapshot().truncation.limits_hit {
829 return;
830 }
831
832 active
833 .run
834 .set_run_end_reason_if_absent(RunEndReason::AutoSealOnLimitsHit);
835 active.accepting_new.store(false, Ordering::Release);
836 active.closing.store(true, Ordering::Release);
837 }
838
839 fn on_limits_hit_signal(inner: &Weak<ControllerInner>, active: &Weak<ActiveGenerationRuntime>) {
840 let Some(active) = active.upgrade() else {
841 return;
842 };
843 active
844 .run
845 .set_run_end_reason_if_absent(RunEndReason::AutoSealOnLimitsHit);
846 active.accepting_new.store(false, Ordering::Release);
847 active.closing.store(true, Ordering::Release);
848
849 if active.inflight_captured.load(Ordering::Acquire) > 0 {
850 return;
851 }
852
853 let Some(inner) = inner.upgrade() else {
854 return;
855 };
856 let _ = TailtriageController::finalize_generation_shared(&inner, &active);
857 }
858}
859
860#[must_use = "request completion must be finished explicitly"]
862#[derive(Debug)]
863pub struct ControllerStartedRequest {
864 pub handle: ControllerRequestHandle,
866 pub completion: ControllerRequestCompletion,
868}
869
870#[must_use = "request completion must be finished explicitly"]
872#[derive(Debug)]
873pub struct ControllerRequestCompletion {
874 kind: ControllerCompletionKind,
875}
876
877impl ControllerRequestCompletion {
878 pub fn finish(mut self, outcome: Outcome) {
880 if let ControllerCompletionKind::Active(active) = &mut self.kind {
881 if let Some(completion) = active.completion.take() {
882 completion.finish(outcome);
883 active.mark_finished();
884 }
885 }
886 }
887
888 pub fn finish_ok(self) {
890 self.finish(Outcome::Ok);
891 }
892
893 pub fn finish_result<T, E>(mut self, result: Result<T, E>) -> Result<T, E> {
900 if let ControllerCompletionKind::Active(active) = &mut self.kind {
901 if let Some(completion) = active.completion.take() {
902 completion.finish(if result.is_ok() {
903 Outcome::Ok
904 } else {
905 Outcome::Error
906 });
907 active.mark_finished();
908 }
909 }
910 result
911 }
912}
913
914#[derive(Debug)]
915enum ControllerCompletionKind {
916 Active(ActiveControllerCompletion),
917 Inert,
918}
919
920#[derive(Debug)]
921struct ActiveControllerCompletion {
922 completion: Option<OwnedRequestCompletion>,
923 admission_generation_id: u64,
929 admitted_generation: Weak<ActiveGenerationRuntime>,
935 inner: Weak<ControllerInner>,
936 run_end_policy: RunEndPolicy,
937 inflight_recorded: bool,
938}
939
940impl ActiveControllerCompletion {
941 fn mark_finished(&mut self) {
942 if !self.inflight_recorded {
943 return;
944 }
945
946 self.inflight_recorded = false;
947
948 let Some(active) = self.admitted_generation.upgrade() else {
949 return;
950 };
951
952 debug_assert_eq!(
953 active.state.generation_id, self.admission_generation_id,
954 "controller completion generation binding should remain stable"
955 );
956
957 if self.run_end_policy == RunEndPolicy::AutoSealOnLimitsHit
958 && active.run.snapshot().truncation.limits_hit
959 {
960 active
961 .run
962 .set_run_end_reason_if_absent(RunEndReason::AutoSealOnLimitsHit);
963 active.accepting_new.store(false, Ordering::Release);
964 active.closing.store(true, Ordering::Release);
965 }
966
967 let remaining = active
968 .inflight_captured
969 .fetch_sub(1, Ordering::AcqRel)
970 .saturating_sub(1);
971
972 if remaining == 0 && active.closing.load(Ordering::Acquire) {
973 self.try_finalize_bound_generation(&active);
974 }
975 }
976
977 fn try_finalize_bound_generation(&self, active: &Arc<ActiveGenerationRuntime>) {
978 let Some(inner) = self.inner.upgrade() else {
979 return;
980 };
981 let _ = TailtriageController::finalize_generation_shared(&inner, active);
982 }
983}
984
985#[derive(Debug, Clone)]
987pub enum ControllerRequestHandle {
988 Active(OwnedRequestHandle),
990 Inert(InertControllerRequestHandle),
992}
993
994impl ControllerRequestHandle {
995 #[must_use]
997 pub fn request_id(&self) -> &str {
998 match self {
999 Self::Active(handle) => handle.request_id(),
1000 Self::Inert(handle) => handle.request_id(),
1001 }
1002 }
1003
1004 #[must_use]
1006 pub fn route(&self) -> &str {
1007 match self {
1008 Self::Active(handle) => handle.route(),
1009 Self::Inert(handle) => handle.route(),
1010 }
1011 }
1012
1013 #[must_use]
1015 pub fn kind(&self) -> Option<&str> {
1016 match self {
1017 Self::Active(handle) => handle.kind(),
1018 Self::Inert(handle) => handle.kind(),
1019 }
1020 }
1021
1022 #[must_use]
1024 pub fn queue(&self, queue: impl Into<String>) -> ControllerQueueTimer<'_> {
1025 match self {
1026 Self::Active(handle) => ControllerQueueTimer::Active(handle.queue(queue)),
1027 Self::Inert(_) => ControllerQueueTimer::Inert,
1028 }
1029 }
1030
1031 #[must_use]
1033 pub fn stage(&self, stage: impl Into<String>) -> ControllerStageTimer<'_> {
1034 match self {
1035 Self::Active(handle) => ControllerStageTimer::Active(handle.stage(stage)),
1036 Self::Inert(_) => ControllerStageTimer::Inert,
1037 }
1038 }
1039
1040 #[must_use]
1042 pub fn inflight(&self, gauge: impl Into<String>) -> ControllerInflightGuard<'_> {
1043 match self {
1044 Self::Active(handle) => ControllerInflightGuard::Active(handle.inflight(gauge)),
1045 Self::Inert(_) => ControllerInflightGuard::Inert,
1046 }
1047 }
1048}
1049
1050#[derive(Debug, Clone)]
1052pub struct InertControllerRequestHandle {
1053 request_id: String,
1054 route: String,
1055 kind: Option<String>,
1056}
1057
1058impl InertControllerRequestHandle {
1059 fn new(route: String, options: RequestOptions, fallback_request_id: String) -> Self {
1060 Self {
1061 request_id: options.request_id.unwrap_or(fallback_request_id),
1062 route,
1063 kind: options.kind,
1064 }
1065 }
1066
1067 fn request_id(&self) -> &str {
1068 &self.request_id
1069 }
1070
1071 fn route(&self) -> &str {
1072 &self.route
1073 }
1074
1075 fn kind(&self) -> Option<&str> {
1076 self.kind.as_deref()
1077 }
1078}
1079
1080#[derive(Debug)]
1082pub enum ControllerQueueTimer<'a> {
1083 Active(QueueTimer<'a>),
1085 Inert,
1087}
1088
1089impl ControllerQueueTimer<'_> {
1090 #[must_use]
1092 pub fn with_depth_at_start(self, depth_at_start: u64) -> Self {
1093 match self {
1094 Self::Active(timer) => Self::Active(timer.with_depth_at_start(depth_at_start)),
1095 Self::Inert => Self::Inert,
1096 }
1097 }
1098
1099 pub async fn await_on<Fut, T>(self, fut: Fut) -> T
1101 where
1102 Fut: std::future::Future<Output = T>,
1103 {
1104 match self {
1105 Self::Active(timer) => timer.await_on(fut).await,
1106 Self::Inert => fut.await,
1107 }
1108 }
1109}
1110
1111#[derive(Debug)]
1113pub enum ControllerStageTimer<'a> {
1114 Active(StageTimer<'a>),
1116 Inert,
1118}
1119
1120impl ControllerStageTimer<'_> {
1121 pub async fn await_on<Fut, T, E>(self, fut: Fut) -> Result<T, E>
1127 where
1128 Fut: std::future::Future<Output = Result<T, E>>,
1129 {
1130 match self {
1131 Self::Active(timer) => timer.await_on(fut).await,
1132 Self::Inert => fut.await,
1133 }
1134 }
1135
1136 pub async fn await_value<Fut, T>(self, fut: Fut) -> T
1138 where
1139 Fut: std::future::Future<Output = T>,
1140 {
1141 match self {
1142 Self::Active(timer) => timer.await_value(fut).await,
1143 Self::Inert => fut.await,
1144 }
1145 }
1146}
1147
1148#[derive(Debug)]
1150pub enum ControllerInflightGuard<'a> {
1151 Active(InflightGuard<'a>),
1153 Inert,
1155}
1156
1157#[derive(Debug, Clone, PartialEq, Eq)]
1159pub struct TailtriageControllerTemplate {
1160 pub service_name: String,
1162 pub config_path: Option<PathBuf>,
1164 pub sink_template: ControllerSinkTemplate,
1166 pub selected_mode: CaptureMode,
1168 pub capture_limits_override: CaptureLimitsOverride,
1170 pub strict_lifecycle: bool,
1172 pub runtime_sampler: RuntimeSamplerTemplate,
1174 pub run_end_policy: RunEndPolicy,
1176}
1177
1178#[derive(Debug, Clone, PartialEq, Eq)]
1180pub enum ControllerSinkTemplate {
1181 LocalJson {
1183 output_path: PathBuf,
1185 },
1186}
1187
1188#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
1190pub struct RuntimeSamplerTemplate {
1191 pub enabled_for_armed_runs: bool,
1193 pub mode_override: Option<CaptureMode>,
1195 pub interval_ms: Option<u64>,
1197 pub max_runtime_snapshots: Option<usize>,
1199}
1200
1201#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1203pub enum RunEndPolicy {
1204 ContinueAfterLimitsHit,
1206 AutoSealOnLimitsHit,
1208}
1209
1210#[derive(Debug, Clone, PartialEq, Eq)]
1212pub struct TailtriageControllerStatus {
1213 pub template: TailtriageControllerTemplate,
1215 pub generation: GenerationState,
1217}
1218
1219#[derive(Debug, Clone, PartialEq, Eq)]
1221pub enum GenerationState {
1222 Disabled {
1224 next_generation: u64,
1226 },
1227 Active(Box<ActiveGenerationState>),
1229}
1230
1231#[derive(Debug, Clone, PartialEq, Eq)]
1233pub struct ActiveGenerationState {
1234 pub generation_id: u64,
1236 pub started_at_unix_ms: u64,
1238 pub artifact_path: PathBuf,
1240 pub accepting_new_admissions: bool,
1242 pub closing: bool,
1244 pub inflight_captured_requests: u64,
1246 pub finalization_in_progress: bool,
1248 pub last_finalize_error: Option<String>,
1254 pub activation_config: ControllerActivationTemplate,
1256}
1257
1258#[derive(Debug, Clone, PartialEq, Eq)]
1260pub struct ControllerActivationTemplate {
1261 pub sink_template: ControllerSinkTemplate,
1263 pub selected_mode: CaptureMode,
1265 pub capture_limits_override: CaptureLimitsOverride,
1267 pub strict_lifecycle: bool,
1269 pub runtime_sampler: RuntimeSamplerTemplate,
1271 pub run_end_policy: RunEndPolicy,
1273}
1274
1275#[derive(Debug)]
1276enum ControllerLifecycle {
1277 Disabled {
1278 next_generation: u64,
1279 },
1280 Active {
1281 active: Arc<ActiveGenerationRuntime>,
1282 next_generation: u64,
1283 },
1284}
1285
1286impl ControllerLifecycle {
1287 fn snapshot(&self) -> GenerationState {
1288 match self {
1289 Self::Disabled { next_generation } => GenerationState::Disabled {
1290 next_generation: *next_generation,
1291 },
1292 Self::Active { active, .. } => GenerationState::Active(Box::new(active.snapshot())),
1293 }
1294 }
1295}
1296
1297#[derive(Debug, Clone, Deserialize)]
1298struct ControllerConfigFile {
1299 controller: ControllerConfigToml,
1300}
1301
1302impl ControllerConfigFile {
1303 fn from_path(path: &Path) -> Result<Self, ConfigLoadError> {
1304 let raw = fs::read_to_string(path).map_err(|source| ConfigLoadError::Io {
1305 path: path.to_path_buf(),
1306 source,
1307 })?;
1308 toml::from_str(&raw).map_err(|source| ConfigLoadError::Parse {
1309 path: path.to_path_buf(),
1310 source: Box::new(source),
1311 })
1312 }
1313
1314 fn into_loaded(self) -> LoadedControllerConfig {
1315 let activation = self.controller.activation;
1316 let run_end_policy = activation.run_end_policy();
1317 LoadedControllerConfig {
1318 service_name: self.controller.service_name,
1319 initially_enabled: self.controller.initially_enabled,
1320 activation_template: ControllerActivationTemplate {
1321 sink_template: activation.sink.into_template(),
1322 selected_mode: activation.mode,
1323 capture_limits_override: activation.capture_limits_override,
1324 strict_lifecycle: activation.strict_lifecycle,
1325 runtime_sampler: activation.runtime_sampler,
1326 run_end_policy,
1327 },
1328 }
1329 }
1330}
1331
1332#[derive(Debug, Clone, PartialEq, Eq)]
1334pub struct LoadedControllerConfig {
1335 pub service_name: Option<String>,
1337 pub initially_enabled: Option<bool>,
1339 pub activation_template: ControllerActivationTemplate,
1341}
1342
1343#[derive(Debug, Clone, Deserialize)]
1344struct ControllerConfigToml {
1345 service_name: Option<String>,
1346 initially_enabled: Option<bool>,
1347 activation: ControllerActivationConfigToml,
1348}
1349
1350#[derive(Debug, Clone, Deserialize)]
1351struct ControllerActivationConfigToml {
1352 mode: CaptureMode,
1353 #[serde(default)]
1354 capture_limits_override: CaptureLimitsOverride,
1355 #[serde(default)]
1356 strict_lifecycle: bool,
1357 sink: ControllerSinkTemplateToml,
1358 #[serde(default)]
1359 runtime_sampler: RuntimeSamplerTemplate,
1360 #[serde(default)]
1361 run_end_policy: RunEndPolicyConfigToml,
1362}
1363
1364#[derive(Debug, Clone, Deserialize)]
1365#[serde(tag = "type", rename_all = "snake_case")]
1366enum ControllerSinkTemplateToml {
1367 LocalJson { output_path: PathBuf },
1368}
1369
1370impl ControllerSinkTemplateToml {
1371 fn into_template(self) -> ControllerSinkTemplate {
1372 match self {
1373 Self::LocalJson { output_path } => ControllerSinkTemplate::LocalJson { output_path },
1374 }
1375 }
1376}
1377
1378#[derive(Debug, Clone, Default, Deserialize)]
1379#[serde(tag = "kind", rename_all = "snake_case")]
1380enum RunEndPolicyConfigToml {
1381 #[default]
1382 ContinueAfterLimitsHit,
1383 AutoSealOnLimitsHit,
1384}
1385
1386impl From<RunEndPolicyConfigToml> for RunEndPolicy {
1387 fn from(value: RunEndPolicyConfigToml) -> Self {
1388 match value {
1389 RunEndPolicyConfigToml::ContinueAfterLimitsHit => Self::ContinueAfterLimitsHit,
1390 RunEndPolicyConfigToml::AutoSealOnLimitsHit => Self::AutoSealOnLimitsHit,
1391 }
1392 }
1393}
1394
1395impl ControllerActivationConfigToml {
1396 fn run_end_policy(&self) -> RunEndPolicy {
1397 self.run_end_policy.clone().into()
1398 }
1399}
1400
1401#[derive(Debug)]
1403pub enum ConfigLoadError {
1404 Io {
1406 path: PathBuf,
1408 source: std::io::Error,
1410 },
1411 Parse {
1413 path: PathBuf,
1415 source: Box<toml::de::Error>,
1417 },
1418}
1419
1420impl std::fmt::Display for ConfigLoadError {
1421 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1422 match self {
1423 Self::Io { path, source } => {
1424 write!(
1425 f,
1426 "failed to read controller config {}: {source}",
1427 path.display()
1428 )
1429 }
1430 Self::Parse { path, source } => {
1431 write!(
1432 f,
1433 "failed to parse controller config TOML {}: {source}",
1434 path.display()
1435 )
1436 }
1437 }
1438 }
1439}
1440
1441impl std::error::Error for ConfigLoadError {
1442 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
1443 match self {
1444 Self::Io { source, .. } => Some(source),
1445 Self::Parse { source, .. } => Some(source),
1446 }
1447 }
1448}
1449
1450#[derive(Debug)]
1452pub enum ControllerBuildError {
1453 EmptyServiceName,
1455 ConfigLoad(ConfigLoadError),
1457 InitialEnable(EnableError),
1459}
1460
1461impl std::fmt::Display for ControllerBuildError {
1462 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1463 match self {
1464 Self::EmptyServiceName => write!(f, "service_name cannot be empty"),
1465 Self::ConfigLoad(err) => write!(f, "failed to load config for build: {err}"),
1466 Self::InitialEnable(err) => write!(f, "failed to start initial generation: {err}"),
1467 }
1468 }
1469}
1470
1471impl std::error::Error for ControllerBuildError {}
1472
1473#[derive(Debug)]
1475pub enum ReloadConfigError {
1476 MissingConfigPath,
1478 Load(ConfigLoadError),
1480 Validate(BuildError),
1482}
1483
1484impl std::fmt::Display for ReloadConfigError {
1485 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1486 match self {
1487 Self::MissingConfigPath => write!(f, "controller has no config_path; cannot reload"),
1488 Self::Load(err) => write!(f, "failed to reload controller config: {err}"),
1489 Self::Validate(err) => {
1490 write!(f, "reloaded config did not produce a valid template: {err}")
1491 }
1492 }
1493 }
1494}
1495
1496impl std::error::Error for ReloadConfigError {
1497 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
1498 match self {
1499 Self::MissingConfigPath => None,
1500 Self::Load(err) => Some(err),
1501 Self::Validate(err) => Some(err),
1502 }
1503 }
1504}
1505
1506#[derive(Debug)]
1508pub enum ReloadTemplateError {
1509 Validate(BuildError),
1511}
1512
1513impl std::fmt::Display for ReloadTemplateError {
1514 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1515 match self {
1516 Self::Validate(err) => write!(f, "template is invalid: {err}"),
1517 }
1518 }
1519}
1520
1521impl std::error::Error for ReloadTemplateError {
1522 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
1523 match self {
1524 Self::Validate(err) => Some(err),
1525 }
1526 }
1527}
1528
1529#[derive(Debug)]
1531pub enum EnableError {
1532 AlreadyActive {
1534 generation_id: u64,
1536 },
1537 Build(BuildError),
1539 MissingTokioRuntimeForSampler,
1541 StartRuntimeSampler(SamplerStartError),
1543}
1544
1545impl std::fmt::Display for EnableError {
1546 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1547 match self {
1548 Self::AlreadyActive { generation_id } => {
1549 write!(f, "generation {generation_id} is already active")
1550 }
1551 Self::Build(err) => write!(f, "failed to build generation run: {err}"),
1552 Self::MissingTokioRuntimeForSampler => {
1553 write!(f, "runtime sampler requires an active Tokio runtime")
1554 }
1555 Self::StartRuntimeSampler(err) => {
1556 write!(f, "failed to start runtime sampler for generation: {err}")
1557 }
1558 }
1559 }
1560}
1561
1562impl std::error::Error for EnableError {}
1563
1564#[derive(Debug)]
1566pub enum DisableError {
1567 Finalize(tailtriage_core::SinkError),
1569}
1570
1571impl std::fmt::Display for DisableError {
1572 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1573 match self {
1574 Self::Finalize(err) => write!(f, "failed to finalize generation: {err}"),
1575 }
1576 }
1577}
1578
1579impl std::error::Error for DisableError {}
1580
1581#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1583pub enum DisableOutcome {
1584 AlreadyDisabled,
1586 Closing {
1588 generation_id: u64,
1590 inflight_captured_requests: u64,
1592 },
1593 Finalized {
1595 generation_id: u64,
1597 },
1598}
1599
1600#[derive(Debug)]
1602pub enum ShutdownError {
1603 Finalize(DisableError),
1605}
1606
1607impl std::fmt::Display for ShutdownError {
1608 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1609 match self {
1610 Self::Finalize(err) => write!(f, "shutdown finalization failed: {err}"),
1611 }
1612 }
1613}
1614
1615impl std::error::Error for ShutdownError {}
1616
1617fn generated_artifact_path(template: &ControllerSinkTemplate, generation_id: u64) -> PathBuf {
1618 match template {
1619 ControllerSinkTemplate::LocalJson { output_path } => {
1620 let parent = output_path
1621 .parent()
1622 .map(Path::to_path_buf)
1623 .unwrap_or_default();
1624 let stem = output_path
1625 .file_stem()
1626 .and_then(std::ffi::OsStr::to_str)
1627 .unwrap_or("tailtriage-run");
1628 let extension = output_path.extension().and_then(std::ffi::OsStr::to_str);
1629 let filename = match extension {
1630 Some(ext) if !ext.is_empty() => format!("{stem}-generation-{generation_id}.{ext}"),
1631 _ => format!("{stem}-generation-{generation_id}.json"),
1632 };
1633 parent.join(filename)
1634 }
1635 }
1636}
1637
1638#[cfg(test)]
1639mod tests {
1640 use std::fs;
1641 use std::path::{Path, PathBuf};
1642 use std::sync::Arc;
1643 use std::time::Duration;
1644
1645 use super::{
1646 ControllerBuildError, ControllerSinkTemplate, DisableOutcome, EnableError, GenerationState,
1647 ReloadConfigError, ReloadTemplateError, RunEndPolicy, RuntimeSamplerTemplate,
1648 TailtriageController, TailtriageControllerTemplate,
1649 };
1650 use serde::Serialize;
1651 use tailtriage_core::{
1652 CaptureLimitsOverride, CaptureMode, RequestOptions, Run, RuntimeSnapshot,
1653 };
1654
1655 #[derive(Serialize)]
1656 struct TestControllerConfigToml {
1657 controller: TestControllerConfigBodyToml,
1658 }
1659
1660 #[derive(Serialize)]
1661 struct TestControllerConfigBodyToml {
1662 #[serde(skip_serializing_if = "Option::is_none")]
1663 service_name: Option<String>,
1664 #[serde(skip_serializing_if = "Option::is_none")]
1665 initially_enabled: Option<bool>,
1666 activation: TestActivationToml,
1667 }
1668
1669 #[derive(Serialize)]
1670 struct TestActivationToml {
1671 mode: &'static str,
1672 #[serde(skip_serializing_if = "Option::is_none")]
1673 capture_limits_override: Option<TestCaptureLimitsOverrideToml>,
1674 #[serde(skip_serializing_if = "Option::is_none")]
1675 strict_lifecycle: Option<bool>,
1676 sink: TestSinkToml,
1677 #[serde(skip_serializing_if = "Option::is_none")]
1678 runtime_sampler: Option<TestRuntimeSamplerToml>,
1679 #[serde(skip_serializing_if = "Option::is_none")]
1680 run_end_policy: Option<TestRunEndPolicyToml>,
1681 }
1682
1683 #[derive(Serialize)]
1684 struct TestCaptureLimitsOverrideToml {
1685 #[serde(skip_serializing_if = "Option::is_none")]
1686 max_requests: Option<u64>,
1687 #[serde(skip_serializing_if = "Option::is_none")]
1688 max_stages: Option<u64>,
1689 }
1690
1691 #[derive(Serialize)]
1692 struct TestSinkToml {
1693 #[serde(rename = "type")]
1694 sink_type: &'static str,
1695 output_path: PathBuf,
1696 }
1697
1698 #[derive(Serialize)]
1699 struct TestRuntimeSamplerToml {
1700 enabled_for_armed_runs: bool,
1701 mode_override: &'static str,
1702 interval_ms: u64,
1703 max_runtime_snapshots: u64,
1704 }
1705
1706 #[derive(Serialize)]
1707 struct TestRunEndPolicyToml {
1708 kind: &'static str,
1709 }
1710
1711 fn test_output(base: &str) -> std::path::PathBuf {
1712 let unique = format!(
1713 "tailtriage-controller-{base}-{}-{}.json",
1714 std::process::id(),
1715 tailtriage_core::unix_time_ms()
1716 );
1717 std::env::temp_dir().join(unique)
1718 }
1719
1720 fn read_artifact(path: &std::path::Path) -> String {
1721 fs::read_to_string(path).expect("artifact should be readable")
1722 }
1723
1724 fn read_run(path: &std::path::Path) -> Run {
1725 let artifact = read_artifact(path);
1726 serde_json::from_str(&artifact).expect("artifact should parse as Run")
1727 }
1728
1729 fn active_runtime(controller: &TailtriageController) -> Arc<super::ActiveGenerationRuntime> {
1730 let lifecycle = controller
1731 .inner
1732 .lifecycle
1733 .lock()
1734 .expect("controller lifecycle lock poisoned");
1735 let super::ControllerLifecycle::Active { active, .. } = &*lifecycle else {
1736 panic!("expected active generation");
1737 };
1738 Arc::clone(active)
1739 }
1740
1741 fn test_config_path(base: &str) -> std::path::PathBuf {
1742 let unique = format!(
1743 "tailtriage-controller-config-{base}-{}-{}.toml",
1744 std::process::id(),
1745 tailtriage_core::unix_time_ms()
1746 );
1747 std::env::temp_dir().join(unique)
1748 }
1749
1750 fn write_config(
1751 path: &Path,
1752 output: &Path,
1753 mode: &'static str,
1754 strict: bool,
1755 sampler_enabled: bool,
1756 ) {
1757 let content = toml::to_string(&TestControllerConfigToml {
1758 controller: TestControllerConfigBodyToml {
1759 service_name: None,
1760 initially_enabled: Some(false),
1761 activation: TestActivationToml {
1762 mode,
1763 capture_limits_override: Some(TestCaptureLimitsOverrideToml {
1764 max_requests: Some(17),
1765 max_stages: Some(18),
1766 }),
1767 strict_lifecycle: Some(strict),
1768 sink: TestSinkToml {
1769 sink_type: "local_json",
1770 output_path: output.to_path_buf(),
1771 },
1772 runtime_sampler: Some(TestRuntimeSamplerToml {
1773 enabled_for_armed_runs: sampler_enabled,
1774 mode_override: "investigation",
1775 interval_ms: 250,
1776 max_runtime_snapshots: 123,
1777 }),
1778 run_end_policy: Some(TestRunEndPolicyToml {
1779 kind: "auto_seal_on_limits_hit",
1780 }),
1781 },
1782 },
1783 })
1784 .expect("config TOML serialization should succeed");
1785 fs::write(path, content).expect("config write should succeed");
1786 }
1787
1788 fn write_initially_enabled_config(path: &Path, output: &Path) {
1789 let content = toml::to_string(&TestControllerConfigToml {
1790 controller: TestControllerConfigBodyToml {
1791 service_name: Some("toml-service-name".to_owned()),
1792 initially_enabled: Some(true),
1793 activation: TestActivationToml {
1794 mode: "investigation",
1795 capture_limits_override: Some(TestCaptureLimitsOverrideToml {
1796 max_requests: Some(9),
1797 max_stages: None,
1798 }),
1799 strict_lifecycle: Some(true),
1800 sink: TestSinkToml {
1801 sink_type: "local_json",
1802 output_path: output.to_path_buf(),
1803 },
1804 runtime_sampler: None,
1805 run_end_policy: Some(TestRunEndPolicyToml {
1806 kind: "auto_seal_on_limits_hit",
1807 }),
1808 },
1809 },
1810 })
1811 .expect("config TOML serialization should succeed");
1812 fs::write(path, content).expect("config write should succeed");
1813 }
1814
1815 fn write_sparse_config(path: &Path, output: &Path, mode: &'static str) {
1816 let content = toml::to_string(&TestControllerConfigToml {
1817 controller: TestControllerConfigBodyToml {
1818 service_name: None,
1819 initially_enabled: None,
1820 activation: TestActivationToml {
1821 mode,
1822 capture_limits_override: None,
1823 strict_lifecycle: None,
1824 sink: TestSinkToml {
1825 sink_type: "local_json",
1826 output_path: output.to_path_buf(),
1827 },
1828 runtime_sampler: None,
1829 run_end_policy: None,
1830 },
1831 },
1832 })
1833 .expect("config TOML serialization should succeed");
1834 fs::write(path, content).expect("config write should succeed");
1835 }
1836
1837 fn write_config_with_optional_service_name(
1838 path: &Path,
1839 output: &Path,
1840 service_name: Option<&str>,
1841 ) {
1842 let content = toml::to_string(&TestControllerConfigToml {
1843 controller: TestControllerConfigBodyToml {
1844 service_name: service_name.map(str::to_owned),
1845 initially_enabled: Some(false),
1846 activation: TestActivationToml {
1847 mode: "light",
1848 capture_limits_override: None,
1849 strict_lifecycle: None,
1850 sink: TestSinkToml {
1851 sink_type: "local_json",
1852 output_path: output.to_path_buf(),
1853 },
1854 runtime_sampler: None,
1855 run_end_policy: None,
1856 },
1857 },
1858 })
1859 .expect("config TOML serialization should succeed");
1860 fs::write(path, content).expect("config write should succeed");
1861 }
1862
1863 fn write_raw_config(path: &std::path::Path, content: &str) {
1864 fs::write(path, content).expect("config write should succeed");
1865 }
1866
1867 #[test]
1868 fn enable_capture_disable_finalizes_generation() {
1869 let output = test_output("enable-capture-disable");
1870 let controller = TailtriageController::builder("checkout-service")
1871 .output(&output)
1872 .build()
1873 .expect("build should succeed");
1874
1875 let active = controller.enable().expect("enable should succeed");
1876 let started = controller.begin_request("/checkout");
1877 started.completion.finish_ok();
1878
1879 let disable = controller.disable().expect("disable should succeed");
1880 assert!(matches!(
1881 disable,
1882 DisableOutcome::Finalized {
1883 generation_id: id
1884 } if id == active.generation_id
1885 ));
1886
1887 let expected = output.with_file_name(format!(
1888 "{}-generation-1.json",
1889 output
1890 .file_stem()
1891 .and_then(std::ffi::OsStr::to_str)
1892 .expect("stem")
1893 ));
1894 assert!(expected.exists());
1895
1896 fs::remove_file(expected).expect("cleanup should succeed");
1897 }
1898
1899 #[test]
1900 fn initially_enabled_build_starts_first_active_generation() {
1901 let output = test_output("initially-enabled");
1902 let controller = TailtriageController::builder("checkout-service")
1903 .initially_enabled(true)
1904 .output(&output)
1905 .build()
1906 .expect("build should succeed");
1907
1908 let status = controller.status();
1909 let active = match status.generation {
1910 GenerationState::Active(active) => active,
1911 disabled @ GenerationState::Disabled { .. } => {
1912 panic!("expected active generation after build, got {disabled:?}")
1913 }
1914 };
1915 assert_eq!(active.generation_id, 1);
1916
1917 assert!(matches!(
1918 controller.disable(),
1919 Ok(DisableOutcome::Finalized { generation_id: 1 })
1920 ));
1921 fs::remove_file(active.artifact_path).expect("cleanup should succeed");
1922 }
1923
1924 #[test]
1925 fn disabled_status_reports_next_generation() {
1926 let controller = TailtriageController::builder("checkout-service")
1927 .build()
1928 .expect("build should succeed");
1929
1930 assert!(matches!(
1931 controller.status().generation,
1932 GenerationState::Disabled { next_generation: 1 }
1933 ));
1934 }
1935
1936 #[test]
1937 fn enable_disable_reenable_creates_distinct_generation_and_artifact() {
1938 let output = test_output("reenable");
1939 let controller = TailtriageController::builder("checkout-service")
1940 .output(&output)
1941 .build()
1942 .expect("build should succeed");
1943
1944 let first = controller.enable().expect("first enable should succeed");
1945 assert!(matches!(
1946 controller.disable(),
1947 Ok(DisableOutcome::Finalized { generation_id: 1 })
1948 ));
1949
1950 let second = controller.enable().expect("second enable should succeed");
1951 assert_eq!(first.generation_id + 1, second.generation_id);
1952 assert_ne!(first.artifact_path, second.artifact_path);
1953
1954 assert!(matches!(
1955 controller.disable(),
1956 Ok(DisableOutcome::Finalized { generation_id: 2 })
1957 ));
1958
1959 fs::remove_file(first.artifact_path).expect("cleanup first artifact should succeed");
1960 fs::remove_file(second.artifact_path).expect("cleanup second artifact should succeed");
1961 }
1962
1963 #[test]
1964 fn request_started_before_disable_can_finish_after_disable() {
1965 let output = test_output("finish-after-disable");
1966 let controller = TailtriageController::builder("checkout-service")
1967 .output(&output)
1968 .build()
1969 .expect("build should succeed");
1970
1971 let active = controller.enable().expect("enable should succeed");
1972 let started = controller.begin_request("/checkout");
1973
1974 let disable = controller.disable().expect("disable should succeed");
1975 assert!(matches!(
1976 disable,
1977 DisableOutcome::Closing {
1978 generation_id,
1979 inflight_captured_requests: 1
1980 } if generation_id == active.generation_id
1981 ));
1982
1983 started.completion.finish_ok();
1984
1985 let status = controller.status();
1986 assert!(matches!(
1987 status.generation,
1988 GenerationState::Disabled { next_generation: 2 }
1989 ));
1990 assert!(active.artifact_path.exists());
1991
1992 fs::remove_file(active.artifact_path).expect("cleanup should succeed");
1993 }
1994
1995 #[test]
1996 fn no_new_admissions_after_disable() {
1997 let output = test_output("no-admissions");
1998 let controller = TailtriageController::builder("checkout-service")
1999 .output(&output)
2000 .build()
2001 .expect("build should succeed");
2002
2003 let active = controller.enable().expect("enable should succeed");
2004 let started = controller.begin_request("/checkout");
2005
2006 let _ = controller.disable().expect("disable should succeed");
2007
2008 controller.begin_request("/checkout").completion.finish_ok();
2009
2010 started.completion.finish_ok();
2011 fs::remove_file(active.artifact_path).expect("cleanup should succeed");
2012 }
2013
2014 #[test]
2015 fn default_policy_preserves_cheap_drop_after_saturation() {
2016 let output = test_output("default-policy-cheap-drop");
2017 let controller = TailtriageController::builder("checkout-service")
2018 .output(&output)
2019 .capture_limits_override(CaptureLimitsOverride {
2020 max_requests: Some(1),
2021 ..CaptureLimitsOverride::default()
2022 })
2023 .build()
2024 .expect("build should succeed");
2025
2026 let active = controller.enable().expect("enable should succeed");
2027 controller.begin_request("/checkout").completion.finish_ok();
2028 controller.begin_request("/checkout").completion.finish_ok();
2029 controller.begin_request("/checkout").completion.finish_ok();
2030
2031 let status = controller.status();
2032 let GenerationState::Active(active_status) = status.generation else {
2033 panic!("default policy should keep generation active after saturation");
2034 };
2035 assert!(active_status.accepting_new_admissions);
2036 assert!(!active_status.closing);
2037
2038 assert!(matches!(
2039 controller.disable(),
2040 Ok(DisableOutcome::Finalized { generation_id }) if generation_id == active.generation_id
2041 ));
2042
2043 let run = read_run(&active.artifact_path);
2044 assert!(run.truncation.limits_hit);
2045 assert_eq!(run.truncation.dropped_requests, 2);
2046 assert_eq!(
2047 run.metadata.run_end_reason,
2048 Some(tailtriage_core::RunEndReason::ManualDisarm)
2049 );
2050
2051 fs::remove_file(active.artifact_path).expect("cleanup should succeed");
2052 }
2053
2054 #[test]
2055 fn auto_seal_policy_ends_generation_after_limits_hit() {
2056 let output = test_output("auto-seal-policy");
2057 let controller = TailtriageController::builder("checkout-service")
2058 .output(&output)
2059 .run_end_policy(RunEndPolicy::AutoSealOnLimitsHit)
2060 .capture_limits_override(CaptureLimitsOverride {
2061 max_requests: Some(1),
2062 ..CaptureLimitsOverride::default()
2063 })
2064 .build()
2065 .expect("build should succeed");
2066
2067 let active = controller.enable().expect("enable should succeed");
2068 controller.begin_request("/checkout").completion.finish_ok();
2069 controller.begin_request("/checkout").completion.finish_ok();
2070
2071 let status = controller.status();
2072 assert!(matches!(
2073 status.generation,
2074 GenerationState::Disabled { next_generation: 2 }
2075 ));
2076
2077 let run = read_run(&active.artifact_path);
2078 assert!(run.truncation.limits_hit);
2079 assert!(run.truncation.dropped_requests > 0);
2080 assert_eq!(
2081 run.metadata.run_end_reason,
2082 Some(tailtriage_core::RunEndReason::AutoSealOnLimitsHit)
2083 );
2084
2085 fs::remove_file(active.artifact_path).expect("cleanup should succeed");
2086 }
2087
2088 #[test]
2089 fn runtime_snapshot_saturation_triggers_auto_seal() {
2090 let output = test_output("auto-seal-runtime-snapshot");
2091 let controller = TailtriageController::builder("checkout-service")
2092 .output(&output)
2093 .run_end_policy(RunEndPolicy::AutoSealOnLimitsHit)
2094 .capture_limits_override(CaptureLimitsOverride {
2095 max_runtime_snapshots: Some(1),
2096 ..CaptureLimitsOverride::default()
2097 })
2098 .build()
2099 .expect("build should succeed");
2100
2101 let active = controller.enable().expect("enable should succeed");
2102 let runtime = active_runtime(&controller);
2103
2104 runtime.run.record_runtime_snapshot(RuntimeSnapshot {
2105 at_unix_ms: tailtriage_core::unix_time_ms(),
2106 alive_tasks: Some(1),
2107 global_queue_depth: Some(1),
2108 local_queue_depth: Some(1),
2109 blocking_queue_depth: Some(0),
2110 remote_schedule_count: Some(1),
2111 });
2112 runtime.run.record_runtime_snapshot(RuntimeSnapshot {
2113 at_unix_ms: tailtriage_core::unix_time_ms(),
2114 alive_tasks: Some(2),
2115 global_queue_depth: Some(2),
2116 local_queue_depth: Some(2),
2117 blocking_queue_depth: Some(0),
2118 remote_schedule_count: Some(2),
2119 });
2120
2121 assert!(matches!(
2122 controller.status().generation,
2123 GenerationState::Disabled { next_generation: 2 }
2124 ));
2125 let run = read_run(&active.artifact_path);
2126 assert!(run.truncation.limits_hit);
2127 assert!(run.truncation.dropped_runtime_snapshots > 0);
2128 assert_eq!(
2129 run.metadata.run_end_reason,
2130 Some(tailtriage_core::RunEndReason::AutoSealOnLimitsHit)
2131 );
2132
2133 fs::remove_file(active.artifact_path).expect("cleanup should succeed");
2134 }
2135
2136 #[tokio::test(flavor = "current_thread")]
2137 async fn queue_saturation_triggers_auto_seal_and_waits_for_inflight_drain() {
2138 let output = test_output("auto-seal-queue-saturation");
2139 let controller = TailtriageController::builder("checkout-service")
2140 .output(&output)
2141 .run_end_policy(RunEndPolicy::AutoSealOnLimitsHit)
2142 .capture_limits_override(CaptureLimitsOverride {
2143 max_queues: Some(1),
2144 ..CaptureLimitsOverride::default()
2145 })
2146 .build()
2147 .expect("build should succeed");
2148
2149 let active = controller.enable().expect("enable should succeed");
2150 let started = controller.begin_request("/checkout");
2151 let request = started.handle.clone();
2152 request
2153 .queue("primary")
2154 .with_depth_at_start(1)
2155 .await_on(async {})
2156 .await;
2157 request
2158 .queue("primary")
2159 .with_depth_at_start(2)
2160 .await_on(async {})
2161 .await;
2162
2163 let status = controller.status();
2164 let GenerationState::Active(active_status) = status.generation else {
2165 panic!("generation should remain active while admitted request is still in-flight");
2166 };
2167 assert!(active_status.closing);
2168 assert!(!active_status.accepting_new_admissions);
2169
2170 started.completion.finish_ok();
2171
2172 assert!(matches!(
2173 controller.status().generation,
2174 GenerationState::Disabled { next_generation: 2 }
2175 ));
2176 let run = read_run(&active.artifact_path);
2177 assert!(run.truncation.limits_hit);
2178 assert!(run.truncation.dropped_queues > 0);
2179 assert_eq!(
2180 run.metadata.run_end_reason,
2181 Some(tailtriage_core::RunEndReason::AutoSealOnLimitsHit)
2182 );
2183
2184 fs::remove_file(active.artifact_path).expect("cleanup should succeed");
2185 }
2186
2187 #[test]
2188 fn auto_seal_then_next_enable_creates_fresh_generation() {
2189 let output = test_output("auto-seal-next-generation");
2190 let controller = TailtriageController::builder("checkout-service")
2191 .output(&output)
2192 .run_end_policy(RunEndPolicy::AutoSealOnLimitsHit)
2193 .capture_limits_override(CaptureLimitsOverride {
2194 max_requests: Some(1),
2195 ..CaptureLimitsOverride::default()
2196 })
2197 .build()
2198 .expect("build should succeed");
2199
2200 let first = controller.enable().expect("first enable should succeed");
2201 controller.begin_request("/checkout").completion.finish_ok();
2202 controller.begin_request("/checkout").completion.finish_ok();
2203 assert!(matches!(
2204 controller.status().generation,
2205 GenerationState::Disabled { next_generation: 2 }
2206 ));
2207
2208 let second = controller.enable().expect("second enable should succeed");
2209 assert_eq!(second.generation_id, first.generation_id + 1);
2210 controller.begin_request("/checkout").completion.finish_ok();
2211 assert!(matches!(
2212 controller.disable(),
2213 Ok(DisableOutcome::Finalized { generation_id }) if generation_id == second.generation_id
2214 ));
2215
2216 fs::remove_file(first.artifact_path).expect("cleanup first should succeed");
2217 fs::remove_file(second.artifact_path).expect("cleanup second should succeed");
2218 }
2219
2220 #[test]
2221 fn one_active_generation_at_a_time() {
2222 let controller = TailtriageController::builder("checkout-service")
2223 .build()
2224 .expect("build should succeed");
2225
2226 let first = controller.enable().expect("first enable should succeed");
2227 let err = controller
2228 .enable()
2229 .expect_err("second enable should fail while first generation active");
2230
2231 assert!(matches!(
2232 err,
2233 EnableError::AlreadyActive {
2234 generation_id
2235 } if generation_id == first.generation_id
2236 ));
2237
2238 assert!(matches!(
2239 controller.disable(),
2240 Ok(DisableOutcome::Finalized { .. })
2241 ));
2242 fs::remove_file(first.artifact_path).expect("cleanup should succeed");
2243 }
2244
2245 #[test]
2246 fn request_completion_remains_bound_to_original_generation_after_reenable() {
2247 let output = test_output("generation-binding");
2248 let controller = TailtriageController::builder("checkout-service")
2249 .output(&output)
2250 .build()
2251 .expect("build should succeed");
2252
2253 let gen_a = controller.enable().expect("generation A should enable");
2254 let started_a = controller.begin_request_with(
2255 "/checkout",
2256 RequestOptions::new().request_id("req-generation-a"),
2257 );
2258
2259 assert!(matches!(
2260 controller.disable(),
2261 Ok(DisableOutcome::Closing {
2262 generation_id,
2263 inflight_captured_requests: 1
2264 }) if generation_id == gen_a.generation_id
2265 ));
2266
2267 started_a.completion.finish_ok();
2268
2269 let gen_b = controller.enable().expect("generation B should enable");
2270 let started_b = controller.begin_request_with(
2271 "/checkout",
2272 RequestOptions::new().request_id("req-generation-b"),
2273 );
2274 started_b.completion.finish_ok();
2275 assert!(matches!(
2276 controller.disable(),
2277 Ok(DisableOutcome::Finalized { generation_id })
2278 if generation_id == gen_b.generation_id
2279 ));
2280
2281 let run_a = read_artifact(&gen_a.artifact_path);
2282 let run_b = read_artifact(&gen_b.artifact_path);
2283 assert!(run_a.contains("req-generation-a"));
2284 assert!(!run_a.contains("req-generation-b"));
2285 assert!(run_b.contains("req-generation-b"));
2286 assert!(!run_b.contains("req-generation-a"));
2287
2288 fs::remove_file(gen_a.artifact_path).expect("cleanup generation A should succeed");
2289 fs::remove_file(gen_b.artifact_path).expect("cleanup generation B should succeed");
2290 }
2291
2292 #[test]
2293 fn disabled_begin_request_is_inert_and_never_joins_later_generation() {
2294 let output = test_output("disabled-admission");
2295 let controller = TailtriageController::builder("checkout-service")
2296 .output(&output)
2297 .build()
2298 .expect("build should succeed");
2299
2300 let disabled_started = controller.begin_request_with(
2301 "/checkout",
2302 RequestOptions::new().request_id("req-disabled"),
2303 );
2304 assert_eq!(disabled_started.handle.request_id(), "req-disabled");
2305 disabled_started.completion.finish_ok();
2306
2307 let active = controller.enable().expect("enable should succeed");
2308 let started = controller
2309 .begin_request_with("/checkout", RequestOptions::new().request_id("req-enabled"));
2310 started.completion.finish_ok();
2311 assert!(matches!(
2312 controller.disable(),
2313 Ok(DisableOutcome::Finalized { generation_id }) if generation_id == active.generation_id
2314 ));
2315
2316 let run = read_artifact(&active.artifact_path);
2317 assert!(run.contains("req-enabled"));
2318 assert!(!run.contains("req-disabled"));
2319
2320 fs::remove_file(active.artifact_path).expect("cleanup should succeed");
2321 }
2322
2323 #[test]
2324 fn disabled_handle_and_completion_operations_are_noop() {
2325 let output = test_output("disabled-noop");
2326 let controller = TailtriageController::builder("checkout-service")
2327 .output(&output)
2328 .build()
2329 .expect("build should succeed");
2330
2331 let started = controller.begin_request_with(
2332 "/checkout",
2333 RequestOptions::new()
2334 .request_id("req-disabled-noop")
2335 .kind("http"),
2336 );
2337
2338 assert_eq!(started.handle.request_id(), "req-disabled-noop");
2339 assert_eq!(started.handle.route(), "/checkout");
2340 assert_eq!(started.handle.kind(), Some("http"));
2341 let request = started.handle.clone();
2342 let _inflight = request.inflight("inflight-disabled");
2343 let _queue = request.queue("queue-disabled");
2344 let _stage = request.stage("stage-disabled");
2345 started
2346 .completion
2347 .finish_result::<(), &str>(Err("disabled-result"))
2348 .expect_err("disabled result should pass through unchanged");
2349
2350 let active = controller.enable().expect("enable should succeed");
2351 let enabled_started = controller
2352 .begin_request_with("/checkout", RequestOptions::new().request_id("req-enabled"));
2353 enabled_started.completion.finish_ok();
2354 assert!(matches!(
2355 controller.disable(),
2356 Ok(DisableOutcome::Finalized { generation_id }) if generation_id == active.generation_id
2357 ));
2358
2359 let run = read_artifact(&active.artifact_path);
2360 assert!(run.contains("req-enabled"));
2361 assert!(!run.contains("req-disabled-noop"));
2362
2363 fs::remove_file(active.artifact_path).expect("cleanup should succeed");
2364 }
2365
2366 #[test]
2367 fn inert_disabled_request_id_contract_preserves_explicit_and_generates_fallback() {
2368 let output = test_output("inert-disabled-request-id");
2369 let controller = TailtriageController::builder("checkout-service")
2370 .output(&output)
2371 .build()
2372 .expect("build should succeed");
2373
2374 let explicit = controller.begin_request_with(
2375 "/checkout",
2376 RequestOptions::new().request_id("req-disabled-explicit"),
2377 );
2378 assert_eq!(explicit.handle.request_id(), "req-disabled-explicit");
2379
2380 let implicit_a = controller.begin_request("/checkout");
2381 let implicit_b = controller.begin_request("/checkout");
2382 assert!(implicit_a.handle.request_id().starts_with("inert-"));
2383 assert!(implicit_b.handle.request_id().starts_with("inert-"));
2384 assert_ne!(
2385 implicit_a.handle.request_id(),
2386 implicit_b.handle.request_id()
2387 );
2388 }
2389
2390 #[test]
2391 fn inert_closing_request_id_contract_preserves_explicit_and_generates_fallback() {
2392 let output = test_output("inert-closing-request-id");
2393 let controller = TailtriageController::builder("checkout-service")
2394 .output(&output)
2395 .build()
2396 .expect("build should succeed");
2397
2398 let active = controller.enable().expect("enable should succeed");
2399 let admitted = controller.begin_request("/checkout");
2400 assert!(matches!(
2401 controller.disable(),
2402 Ok(DisableOutcome::Closing { .. })
2403 ));
2404
2405 let explicit = controller.begin_request_with(
2406 "/checkout",
2407 RequestOptions::new().request_id("req-closing-explicit"),
2408 );
2409 assert_eq!(explicit.handle.request_id(), "req-closing-explicit");
2410
2411 let implicit = controller.begin_request("/checkout");
2412 assert!(implicit.handle.request_id().starts_with("inert-"));
2413
2414 admitted.completion.finish_ok();
2415 assert!(matches!(
2416 controller.status().generation,
2417 GenerationState::Disabled { .. }
2418 ));
2419 fs::remove_file(active.artifact_path).expect("cleanup should succeed");
2420 }
2421
2422 #[test]
2423 fn rapid_enable_disable_boundaries_keep_generation_isolation() {
2424 let output = test_output("rapid-boundaries");
2425 let controller = TailtriageController::builder("checkout-service")
2426 .output(&output)
2427 .build()
2428 .expect("build should succeed");
2429
2430 let mut artifacts = Vec::new();
2431 for generation in 1..=3 {
2432 let active = controller.enable().expect("enable should succeed");
2433 assert_eq!(active.generation_id, generation);
2434
2435 let started = controller.begin_request_with(
2436 "/checkout",
2437 RequestOptions::new().request_id(format!("req-gen-{generation}")),
2438 );
2439
2440 assert!(matches!(
2441 controller.disable(),
2442 Ok(DisableOutcome::Closing {
2443 generation_id,
2444 inflight_captured_requests: 1
2445 }) if generation_id == generation
2446 ));
2447
2448 assert!(
2449 matches!(
2450 controller.enable(),
2451 Err(EnableError::AlreadyActive { generation_id }) if generation_id == generation
2452 ),
2453 "controller must not start next generation before admitted requests drain"
2454 );
2455
2456 started.completion.finish_ok();
2457 artifacts.push(active.artifact_path);
2458 }
2459
2460 for (idx, artifact) in artifacts.iter().enumerate() {
2461 let run = read_artifact(artifact);
2462 assert!(run.contains(&format!("req-gen-{}", idx + 1)));
2463 fs::remove_file(artifact).expect("cleanup should succeed");
2464 }
2465 }
2466
2467 #[test]
2468 fn completion_drain_finalizes_once_without_duplicate_side_effects() {
2469 let output = test_output("single-finalize");
2470 let controller = TailtriageController::builder("checkout-service")
2471 .output(&output)
2472 .build()
2473 .expect("build should succeed");
2474
2475 let active = controller.enable().expect("enable should succeed");
2476 let started = controller
2477 .begin_request_with("/checkout", RequestOptions::new().request_id("req-once"));
2478
2479 assert!(matches!(
2480 controller.disable(),
2481 Ok(DisableOutcome::Closing {
2482 generation_id,
2483 inflight_captured_requests: 1
2484 }) if generation_id == active.generation_id
2485 ));
2486
2487 started.completion.finish_ok();
2488 assert!(matches!(
2489 controller.disable(),
2490 Ok(DisableOutcome::AlreadyDisabled)
2491 ));
2492 assert!(matches!(controller.shutdown(), Ok(())));
2493
2494 let run = read_artifact(&active.artifact_path);
2495 assert_eq!(run.matches("req-once").count(), 1);
2496
2497 fs::remove_file(active.artifact_path).expect("cleanup should succeed");
2498 }
2499
2500 #[test]
2501 fn shutdown_active_generation_finalizes_and_disables_even_with_inflight_request() {
2502 let output = test_output("shutdown-active");
2503 let controller = TailtriageController::builder("checkout-service")
2504 .output(&output)
2505 .build()
2506 .expect("build should succeed");
2507
2508 let active = controller.enable().expect("enable should succeed");
2509 let started = controller.begin_request_with(
2510 "/checkout",
2511 RequestOptions::new().request_id("req-inflight-shutdown"),
2512 );
2513
2514 controller.shutdown().expect("shutdown should succeed");
2515 assert!(matches!(
2516 controller.status().generation,
2517 GenerationState::Disabled { next_generation: 2 }
2518 ));
2519 assert!(active.artifact_path.exists());
2520
2521 let run = read_run(&active.artifact_path);
2522 assert_eq!(
2523 run.metadata.run_end_reason,
2524 Some(tailtriage_core::RunEndReason::Shutdown)
2525 );
2526
2527 controller
2528 .begin_request_with(
2529 "/checkout",
2530 RequestOptions::new().request_id("req-post-shutdown"),
2531 )
2532 .completion
2533 .finish_ok();
2534
2535 let run_after = read_artifact(&active.artifact_path);
2536 assert!(!run_after.contains("req-post-shutdown"));
2537
2538 started.completion.finish_ok();
2539 fs::remove_file(active.artifact_path).expect("cleanup should succeed");
2540 }
2541
2542 #[test]
2543 fn drain_finalization_sink_failure_is_observable_and_retriable() {
2544 let output = std::env::temp_dir().join(format!(
2545 "tailtriage-controller-missing-dir-{}-{}",
2546 std::process::id(),
2547 tailtriage_core::unix_time_ms()
2548 ));
2549 let missing_output = output.join("artifact.json");
2550 let controller = TailtriageController::builder("checkout-service")
2551 .output(&missing_output)
2552 .build()
2553 .expect("build should succeed");
2554
2555 let active = controller.enable().expect("enable should succeed");
2556 let started = controller.begin_request("/checkout");
2557 assert!(matches!(
2558 controller.disable(),
2559 Ok(DisableOutcome::Closing {
2560 generation_id,
2561 inflight_captured_requests: 1
2562 }) if generation_id == active.generation_id
2563 ));
2564
2565 started.completion.finish_ok();
2566
2567 let status = controller.status();
2568 let GenerationState::Active(active_state) = status.generation else {
2569 panic!("generation should stay active after failed drain finalization");
2570 };
2571 assert!(active_state.closing);
2572 assert!(!active_state.accepting_new_admissions);
2573 assert!(!active_state.finalization_in_progress);
2574 let first_error = active_state
2575 .last_finalize_error
2576 .expect("failed drain finalization should be recorded");
2577 assert!(
2578 first_error.contains("failed to finalize generation"),
2579 "unexpected error message: {first_error}"
2580 );
2581
2582 let disable_retry = controller.disable();
2583 assert!(
2584 matches!(disable_retry, Err(super::DisableError::Finalize(_))),
2585 "disable should return finalization failure after prior failed drain finalization"
2586 );
2587
2588 let shutdown_retry = controller.shutdown();
2589 assert!(
2590 matches!(
2591 shutdown_retry,
2592 Err(super::ShutdownError::Finalize(
2593 super::DisableError::Finalize(_)
2594 ))
2595 ),
2596 "shutdown should return finalization failure after prior failed drain finalization"
2597 );
2598 }
2599
2600 #[test]
2601 fn drain_finalization_strict_lifecycle_failure_is_observable_and_retriable() {
2602 let output = test_output("strict-drain-failure");
2603 let controller = TailtriageController::builder("checkout-service")
2604 .output(&output)
2605 .strict_lifecycle(true)
2606 .build()
2607 .expect("build should succeed");
2608 let active = controller.enable().expect("enable should succeed");
2609
2610 let runtime = active_runtime(&controller);
2611 let leaked = runtime.run.begin_request("/leaked");
2612 let started = controller.begin_request("/checkout");
2613 assert!(matches!(
2614 controller.disable(),
2615 Ok(DisableOutcome::Closing {
2616 generation_id,
2617 inflight_captured_requests: 1
2618 }) if generation_id == active.generation_id
2619 ));
2620
2621 started.completion.finish_ok();
2622
2623 let status = controller.status();
2624 let GenerationState::Active(active_state) = status.generation else {
2625 panic!("strict lifecycle drain failure should keep generation active");
2626 };
2627 assert!(active_state.closing);
2628 assert_eq!(active_state.inflight_captured_requests, 0);
2629 let error = active_state
2630 .last_finalize_error
2631 .expect("strict lifecycle error should be reported");
2632 assert!(
2633 error.contains("strict lifecycle validation failed"),
2634 "unexpected strict lifecycle error message: {error}"
2635 );
2636
2637 assert!(matches!(
2638 controller.disable(),
2639 Err(super::DisableError::Finalize(
2640 tailtriage_core::SinkError::Lifecycle {
2641 unfinished_count: 1
2642 }
2643 ))
2644 ));
2645
2646 leaked.completion.finish_ok();
2647 assert!(matches!(
2648 controller.disable(),
2649 Ok(DisableOutcome::Finalized { generation_id }) if generation_id == active.generation_id
2650 ));
2651 fs::remove_file(active.artifact_path).expect("cleanup should succeed");
2652 }
2653
2654 #[test]
2655 fn drain_finalization_failure_allows_recovery_after_environment_fix() {
2656 let output_dir = std::env::temp_dir().join(format!(
2657 "tailtriage-controller-recovery-dir-{}-{}",
2658 std::process::id(),
2659 tailtriage_core::unix_time_ms()
2660 ));
2661 let output = output_dir.join("artifact.json");
2662 let controller = TailtriageController::builder("checkout-service")
2663 .output(&output)
2664 .build()
2665 .expect("build should succeed");
2666
2667 let active = controller.enable().expect("enable should succeed");
2668 let started = controller.begin_request("/checkout");
2669 assert!(matches!(
2670 controller.disable(),
2671 Ok(DisableOutcome::Closing {
2672 generation_id,
2673 inflight_captured_requests: 1
2674 }) if generation_id == active.generation_id
2675 ));
2676 started.completion.finish_ok();
2677
2678 let status_before_retry = controller.status();
2679 let GenerationState::Active(active_before_retry) = status_before_retry.generation else {
2680 panic!("failed drain finalization should keep generation active");
2681 };
2682 assert!(active_before_retry.last_finalize_error.is_some());
2683
2684 fs::create_dir_all(&output_dir).expect("create output directory for retry should succeed");
2685
2686 assert!(matches!(
2687 controller.disable(),
2688 Ok(DisableOutcome::Finalized { generation_id }) if generation_id == active.generation_id
2689 ));
2690 assert!(output_dir.join("artifact-generation-1.json").exists());
2691 fs::remove_file(output_dir.join("artifact-generation-1.json"))
2692 .expect("cleanup artifact should succeed");
2693 fs::remove_dir(output_dir).expect("cleanup output dir should succeed");
2694 }
2695
2696 #[test]
2697 fn toml_parsing_success_and_failure() {
2698 let output = test_output("toml-parse");
2699 let config = test_config_path("toml-parse");
2700 write_config(&config, &output, "light", false, true);
2701
2702 let loaded =
2703 TailtriageController::load_config_from_path(&config).expect("valid TOML should parse");
2704 assert_eq!(loaded.activation_template.selected_mode, CaptureMode::Light);
2705 assert_eq!(
2706 loaded.activation_template.capture_limits_override,
2707 CaptureLimitsOverride {
2708 max_requests: Some(17),
2709 max_stages: Some(18),
2710 max_queues: None,
2711 max_inflight_snapshots: None,
2712 max_runtime_snapshots: None,
2713 }
2714 );
2715 assert!(
2716 loaded
2717 .activation_template
2718 .runtime_sampler
2719 .enabled_for_armed_runs
2720 );
2721 assert_eq!(
2722 loaded.activation_template.run_end_policy,
2723 RunEndPolicy::AutoSealOnLimitsHit
2724 );
2725
2726 fs::write(&config, "[controller\n").expect("invalid TOML write should succeed");
2727 assert!(TailtriageController::load_config_from_path(&config).is_err());
2728
2729 fs::remove_file(config).expect("config cleanup should succeed");
2730 }
2731
2732 #[test]
2733 fn toml_parses_windows_style_escaped_output_path() {
2734 let config_toml = r#"[controller]
2735
2736[controller.activation]
2737mode = "light"
2738
2739[controller.activation.sink]
2740type = "local_json"
2741output_path = "C:\\Users\\someone\\AppData\\Local\\Temp\\tailtriage.json"
2742"#;
2743
2744 let parsed: super::ControllerConfigFile =
2745 toml::from_str(config_toml).expect("escaped Windows path should parse in TOML");
2746
2747 let loaded = parsed.into_loaded();
2748 assert_eq!(
2749 loaded.activation_template.sink_template,
2750 ControllerSinkTemplate::LocalJson {
2751 output_path: PathBuf::from(r"C:\Users\someone\AppData\Local\Temp\tailtriage.json"),
2752 }
2753 );
2754 }
2755
2756 #[test]
2757 fn reload_updates_next_activation_template_only() {
2758 let output_before = test_output("reload-template-before");
2759 let output_after = test_output("reload-template-after");
2760 let config = test_config_path("reload-template");
2761 write_config(&config, &output_before, "light", false, false);
2762
2763 let controller = TailtriageController::builder("checkout-service")
2764 .config_path(&config)
2765 .build()
2766 .expect("build should succeed");
2767 assert_eq!(
2768 controller.status().template.selected_mode,
2769 CaptureMode::Light
2770 );
2771
2772 write_config(&config, &output_after, "investigation", true, false);
2773 controller.reload_config().expect("reload should succeed");
2774
2775 let status = controller.status();
2776 assert_eq!(status.template.selected_mode, CaptureMode::Investigation);
2777 assert!(status.template.strict_lifecycle);
2778 assert_eq!(
2779 status.template.run_end_policy,
2780 RunEndPolicy::AutoSealOnLimitsHit
2781 );
2782
2783 fs::remove_file(config).expect("config cleanup should succeed");
2784 }
2785
2786 #[test]
2787 fn try_reload_template_validates_before_enable() {
2788 let output = test_output("try-reload-template-validate");
2789 let controller = TailtriageController::builder("checkout-service")
2790 .output(&output)
2791 .build()
2792 .expect("build should succeed");
2793
2794 let invalid = TailtriageControllerTemplate {
2795 service_name: String::new(),
2796 config_path: None,
2797 sink_template: ControllerSinkTemplate::LocalJson {
2798 output_path: output,
2799 },
2800 selected_mode: CaptureMode::Light,
2801 capture_limits_override: CaptureLimitsOverride::default(),
2802 strict_lifecycle: false,
2803 runtime_sampler: RuntimeSamplerTemplate::default(),
2804 run_end_policy: RunEndPolicy::ContinueAfterLimitsHit,
2805 };
2806
2807 assert!(matches!(
2808 controller.try_reload_template(invalid),
2809 Err(ReloadTemplateError::Validate(_))
2810 ));
2811 }
2812
2813 #[test]
2814 fn reload_config_validates_template_before_enable() {
2815 let output = test_output("reload-config-validate");
2816 let config = test_config_path("reload-config-validate");
2817 write_config(&config, &output, "light", false, false);
2818
2819 let controller = TailtriageController::builder("checkout-service")
2820 .config_path(&config)
2821 .build()
2822 .expect("build should succeed");
2823
2824 fs::write(
2825 &config,
2826 r#"[controller]
2827service_name = ""
2828
2829[controller.activation]
2830mode = "light"
2831strict_lifecycle = false
2832
2833[controller.activation.capture_limits_override]
2834max_requests = 17
2835max_stages = 18
2836
2837[controller.activation.sink]
2838type = "local_json"
2839output_path = "tailtriage-run.json"
2840
2841[controller.activation.runtime_sampler]
2842enabled_for_armed_runs = false
2843
2844[controller.activation.run_end_policy]
2845kind = "continue_after_limits_hit"
2846"#,
2847 )
2848 .expect("invalid config write should succeed");
2849
2850 assert!(matches!(
2851 controller.reload_config(),
2852 Err(ReloadConfigError::Validate(_))
2853 ));
2854
2855 fs::remove_file(config).expect("config cleanup should succeed");
2856 }
2857
2858 #[test]
2859 fn controller_recovers_after_poisoned_lifecycle_lock() {
2860 let output = test_output("poisoned-lock-recovery");
2861 let controller = TailtriageController::builder("checkout-service")
2862 .output(&output)
2863 .build()
2864 .expect("build should succeed");
2865
2866 let _ = std::panic::catch_unwind({
2867 let controller = controller.clone();
2868 move || {
2869 let _guard = controller
2870 .inner
2871 .lifecycle
2872 .lock()
2873 .unwrap_or_else(std::sync::PoisonError::into_inner);
2874 panic!("intentional poison");
2875 }
2876 });
2877
2878 let status = controller.status();
2879 assert_eq!(status.template.service_name, "checkout-service");
2880 assert!(matches!(
2881 status.generation,
2882 GenerationState::Disabled { .. }
2883 ));
2884 }
2885
2886 #[test]
2887 fn active_generation_keeps_original_config_after_reload() {
2888 let output_before = test_output("active-keeps-before");
2889 let output_after = test_output("active-keeps-after");
2890 let config = test_config_path("active-keeps");
2891 write_config(&config, &output_before, "light", false, false);
2892
2893 let controller = TailtriageController::builder("checkout-service")
2894 .config_path(&config)
2895 .build()
2896 .expect("build should succeed");
2897
2898 let gen1 = controller.enable().expect("first enable should succeed");
2899 assert_eq!(gen1.activation_config.selected_mode, CaptureMode::Light);
2900 assert_eq!(
2901 gen1.activation_config.sink_template,
2902 super::ControllerSinkTemplate::LocalJson {
2903 output_path: output_before.clone()
2904 }
2905 );
2906
2907 write_config(&config, &output_after, "investigation", true, false);
2908 controller.reload_config().expect("reload should succeed");
2909
2910 let GenerationState::Active(active_after_reload) = controller.status().generation else {
2911 panic!("expected active generation");
2912 };
2913 assert_eq!(
2914 active_after_reload.activation_config.selected_mode,
2915 CaptureMode::Light
2916 );
2917 assert!(!active_after_reload.activation_config.strict_lifecycle);
2918
2919 let started = controller.begin_request("/checkout");
2920 started.completion.finish_ok();
2921 assert!(matches!(
2922 controller.disable(),
2923 Ok(DisableOutcome::Finalized { generation_id }) if generation_id == gen1.generation_id
2924 ));
2925
2926 let gen2 = controller.enable().expect("second enable should succeed");
2927 assert_eq!(
2928 gen2.activation_config.selected_mode,
2929 CaptureMode::Investigation
2930 );
2931 assert!(gen2.activation_config.strict_lifecycle);
2932 assert_eq!(
2933 gen2.activation_config.sink_template,
2934 super::ControllerSinkTemplate::LocalJson {
2935 output_path: output_after.clone()
2936 }
2937 );
2938
2939 assert!(matches!(
2940 controller.disable(),
2941 Ok(DisableOutcome::Finalized { generation_id }) if generation_id == gen2.generation_id
2942 ));
2943
2944 fs::remove_file(gen1.artifact_path).expect("cleanup gen1 should succeed");
2945 fs::remove_file(gen2.artifact_path).expect("cleanup gen2 should succeed");
2946 fs::remove_file(config).expect("config cleanup should succeed");
2947 }
2948
2949 #[test]
2950 fn build_from_toml_initially_enabled_starts_generation_with_toml_activation_settings() {
2951 let output = test_output("toml-initially-enabled");
2952 let config = test_config_path("toml-initially-enabled");
2953 write_initially_enabled_config(&config, &output);
2954
2955 let controller = TailtriageController::builder("builder-service-name")
2956 .initially_enabled(false)
2957 .strict_lifecycle(false)
2958 .config_path(&config)
2959 .build()
2960 .expect("build should succeed");
2961
2962 let status = controller.status();
2963 let GenerationState::Active(active) = status.generation else {
2964 panic!("config with initially_enabled=true should start generation 1");
2965 };
2966 assert_eq!(active.generation_id, 1);
2967 assert_eq!(
2968 active.activation_config.selected_mode,
2969 CaptureMode::Investigation
2970 );
2971 assert!(active.activation_config.strict_lifecycle);
2972 assert_eq!(
2973 active.activation_config.run_end_policy,
2974 RunEndPolicy::AutoSealOnLimitsHit
2975 );
2976 assert_eq!(
2977 active.activation_config.sink_template,
2978 ControllerSinkTemplate::LocalJson {
2979 output_path: output.clone()
2980 }
2981 );
2982 assert_eq!(
2983 active.activation_config.runtime_sampler,
2984 RuntimeSamplerTemplate::default()
2985 );
2986 assert_eq!(
2987 active.activation_config.capture_limits_override,
2988 CaptureLimitsOverride {
2989 max_requests: Some(9),
2990 ..CaptureLimitsOverride::default()
2991 }
2992 );
2993 assert_eq!(status.template.service_name, "toml-service-name");
2994 assert_eq!(
2995 active.artifact_path,
2996 output.with_file_name(format!(
2997 "{}-generation-1.json",
2998 output
2999 .file_stem()
3000 .and_then(std::ffi::OsStr::to_str)
3001 .expect("stem")
3002 ))
3003 );
3004
3005 assert!(matches!(
3006 controller.disable(),
3007 Ok(DisableOutcome::Finalized { generation_id: 1 })
3008 ));
3009 let run = read_run(&active.artifact_path);
3010 assert_eq!(
3011 run.metadata.run_end_reason,
3012 Some(tailtriage_core::RunEndReason::ManualDisarm)
3013 );
3014
3015 fs::remove_file(active.artifact_path).expect("artifact cleanup should succeed");
3016 fs::remove_file(config).expect("config cleanup should succeed");
3017 }
3018
3019 #[test]
3020 fn enable_with_sampler_without_tokio_runtime_returns_missing_runtime_error() {
3021 let output = test_output("missing-runtime");
3022 let expected_artifact = output.with_file_name(format!(
3023 "{}-generation-1.json",
3024 output
3025 .file_stem()
3026 .and_then(std::ffi::OsStr::to_str)
3027 .expect("stem")
3028 ));
3029 let controller = TailtriageController::builder("checkout-service")
3030 .output(&output)
3031 .runtime_sampler(RuntimeSamplerTemplate {
3032 enabled_for_armed_runs: true,
3033 mode_override: None,
3034 interval_ms: Some(20),
3035 max_runtime_snapshots: Some(10),
3036 })
3037 .build()
3038 .expect("build should succeed");
3039
3040 let err = controller
3041 .enable()
3042 .expect_err("enable should fail without runtime");
3043 assert!(matches!(err, EnableError::MissingTokioRuntimeForSampler));
3044 assert!(matches!(
3045 controller.status().generation,
3046 GenerationState::Disabled { next_generation: 1 }
3047 ));
3048 assert!(!expected_artifact.exists());
3049 }
3050
3051 #[test]
3052 fn sparse_toml_uses_builder_fallbacks_and_activation_defaults() {
3053 let output = test_output("sparse-toml-defaults");
3054 let config = test_config_path("sparse-toml-defaults");
3055 write_sparse_config(&config, &output, "investigation");
3056
3057 let controller = TailtriageController::builder("builder-service-name")
3058 .initially_enabled(true)
3059 .config_path(&config)
3060 .build()
3061 .expect("build should succeed");
3062
3063 let status = controller.status();
3064 assert_eq!(status.template.service_name, "builder-service-name");
3065 let GenerationState::Active(active) = status.generation else {
3066 panic!("builder initially_enabled should be preserved when TOML omits it");
3067 };
3068 assert_eq!(active.generation_id, 1);
3069 assert_eq!(
3070 active.activation_config.selected_mode,
3071 CaptureMode::Investigation
3072 );
3073 assert!(!active.activation_config.strict_lifecycle);
3074 assert_eq!(
3075 active.activation_config.runtime_sampler,
3076 RuntimeSamplerTemplate::default()
3077 );
3078 assert_eq!(
3079 active.activation_config.run_end_policy,
3080 RunEndPolicy::ContinueAfterLimitsHit
3081 );
3082 assert_eq!(
3083 active.activation_config.capture_limits_override,
3084 CaptureLimitsOverride::default()
3085 );
3086 assert_eq!(
3087 active.activation_config.sink_template,
3088 ControllerSinkTemplate::LocalJson {
3089 output_path: output.clone()
3090 }
3091 );
3092
3093 assert!(matches!(
3094 controller.disable(),
3095 Ok(DisableOutcome::Finalized { generation_id: 1 })
3096 ));
3097 fs::remove_file(active.artifact_path).expect("artifact cleanup should succeed");
3098 fs::remove_file(config).expect("config cleanup should succeed");
3099 }
3100
3101 #[test]
3102 fn build_with_missing_config_path_returns_config_load_error() {
3103 let config = test_config_path("missing-config-build");
3104 assert!(!config.exists());
3105
3106 let err = TailtriageController::builder("checkout-service")
3107 .config_path(&config)
3108 .build()
3109 .expect_err("build should fail for missing config path");
3110 assert!(matches!(
3111 err,
3112 ControllerBuildError::ConfigLoad(super::ConfigLoadError::Io { .. })
3113 ));
3114 }
3115
3116 #[test]
3117 fn config_service_name_overrides_builder_service_name_when_present() {
3118 let output = test_output("build-config-service-name-overrides");
3119 let config = test_config_path("build-config-service-name-overrides");
3120 write_config_with_optional_service_name(&config, &output, Some("toml-service-name"));
3121
3122 let controller = TailtriageController::builder("builder-service-name")
3123 .config_path(&config)
3124 .build()
3125 .expect("build should succeed");
3126 assert_eq!(
3127 controller.status().template.service_name,
3128 "toml-service-name"
3129 );
3130
3131 fs::remove_file(config).expect("config cleanup should succeed");
3132 }
3133
3134 #[test]
3135 fn blank_builder_service_name_uses_non_blank_toml_service_name() {
3136 let output = test_output("build-blank-builder-uses-toml");
3137 let config = test_config_path("build-blank-builder-uses-toml");
3138 write_config_with_optional_service_name(&config, &output, Some("toml-service-name"));
3139
3140 let controller = TailtriageController::builder(" ")
3141 .config_path(&config)
3142 .build()
3143 .expect("build should succeed");
3144 assert_eq!(
3145 controller.status().template.service_name,
3146 "toml-service-name"
3147 );
3148
3149 fs::remove_file(config).expect("config cleanup should succeed");
3150 }
3151
3152 #[test]
3153 fn blank_builder_service_name_without_config_fails_build() {
3154 let err = TailtriageController::builder(" ")
3155 .build()
3156 .expect_err("blank builder service_name without config should fail");
3157 assert!(matches!(err, ControllerBuildError::EmptyServiceName));
3158 }
3159
3160 #[test]
3161 fn blank_builder_and_blank_toml_service_name_fail_build() {
3162 let output = test_output("build-blank-builder-blank-toml");
3163 let config = test_config_path("build-blank-builder-blank-toml");
3164 write_config_with_optional_service_name(&config, &output, Some(""));
3165
3166 let err = TailtriageController::builder(" ")
3167 .config_path(&config)
3168 .build()
3169 .expect_err("blank builder and blank TOML service_name should fail");
3170 assert!(matches!(err, ControllerBuildError::EmptyServiceName));
3171
3172 fs::remove_file(config).expect("config cleanup should succeed");
3173 }
3174
3175 #[test]
3176 fn build_from_toml_with_blank_service_name_returns_empty_service_name_error() {
3177 let config = test_config_path("toml-empty-service-name");
3178 write_raw_config(
3179 &config,
3180 r#"[controller]
3181service_name = ""
3182
3183[controller.activation]
3184mode = "light"
3185
3186[controller.activation.sink]
3187type = "local_json"
3188output_path = "tailtriage-run.json"
3189"#,
3190 );
3191
3192 let err = TailtriageController::builder("fallback-service-name")
3193 .config_path(&config)
3194 .build()
3195 .expect_err("blank TOML service_name should fail build");
3196 assert!(matches!(err, ControllerBuildError::EmptyServiceName));
3197
3198 fs::remove_file(config).expect("config cleanup should succeed");
3199 }
3200
3201 #[test]
3202 fn build_from_toml_with_invalid_mode_returns_parse_error() {
3203 let config = test_config_path("toml-invalid-mode");
3204 write_raw_config(
3205 &config,
3206 r#"[controller]
3207
3208[controller.activation]
3209mode = "not-a-real-mode"
3210
3211[controller.activation.sink]
3212type = "local_json"
3213output_path = "tailtriage-run.json"
3214"#,
3215 );
3216
3217 let err = TailtriageController::builder("checkout-service")
3218 .config_path(&config)
3219 .build()
3220 .expect_err("invalid mode should fail build");
3221 assert!(matches!(
3222 err,
3223 ControllerBuildError::ConfigLoad(super::ConfigLoadError::Parse { .. })
3224 ));
3225
3226 fs::remove_file(config).expect("config cleanup should succeed");
3227 }
3228
3229 #[test]
3230 fn build_from_toml_with_invalid_run_end_policy_kind_returns_parse_error() {
3231 let config = test_config_path("toml-invalid-run-end-policy");
3232 write_raw_config(
3233 &config,
3234 r#"[controller]
3235
3236[controller.activation]
3237mode = "light"
3238
3239[controller.activation.sink]
3240type = "local_json"
3241output_path = "tailtriage-run.json"
3242
3243[controller.activation.run_end_policy]
3244kind = "not-a-real-policy"
3245"#,
3246 );
3247
3248 let err = TailtriageController::builder("checkout-service")
3249 .config_path(&config)
3250 .build()
3251 .expect_err("invalid run_end_policy.kind should fail build");
3252 assert!(matches!(
3253 err,
3254 ControllerBuildError::ConfigLoad(super::ConfigLoadError::Parse { .. })
3255 ));
3256
3257 fs::remove_file(config).expect("config cleanup should succeed");
3258 }
3259
3260 #[test]
3261 fn build_from_toml_with_run_end_policy_table_missing_kind_returns_parse_error() {
3262 let config = test_config_path("toml-run-end-policy-missing-kind");
3263 write_raw_config(
3264 &config,
3265 r#"[controller]
3266
3267[controller.activation]
3268mode = "light"
3269
3270[controller.activation.sink]
3271type = "local_json"
3272output_path = "tailtriage-run.json"
3273
3274[controller.activation.run_end_policy]
3275"#,
3276 );
3277
3278 let err = TailtriageController::builder("checkout-service")
3279 .config_path(&config)
3280 .build()
3281 .expect_err("run_end_policy table without kind should fail build");
3282 assert!(matches!(
3283 err,
3284 ControllerBuildError::ConfigLoad(super::ConfigLoadError::Parse { .. })
3285 ));
3286
3287 fs::remove_file(config).expect("config cleanup should succeed");
3288 }
3289
3290 #[test]
3291 fn build_from_toml_with_invalid_sink_type_returns_parse_error() {
3292 let config = test_config_path("toml-invalid-sink-type");
3293 write_raw_config(
3294 &config,
3295 r#"[controller]
3296
3297[controller.activation]
3298mode = "light"
3299
3300[controller.activation.sink]
3301type = "not-a-real-sink"
3302output_path = "tailtriage-run.json"
3303"#,
3304 );
3305
3306 let err = TailtriageController::builder("checkout-service")
3307 .config_path(&config)
3308 .build()
3309 .expect_err("invalid sink.type should fail build");
3310 assert!(matches!(
3311 err,
3312 ControllerBuildError::ConfigLoad(super::ConfigLoadError::Parse { .. })
3313 ));
3314
3315 fs::remove_file(config).expect("config cleanup should succeed");
3316 }
3317
3318 #[test]
3319 fn build_from_toml_initially_enabled_sampler_without_runtime_returns_initial_enable_error() {
3320 let config = test_config_path("toml-initially-enabled-missing-runtime");
3321 write_raw_config(
3322 &config,
3323 r#"[controller]
3324initially_enabled = true
3325
3326[controller.activation]
3327mode = "light"
3328
3329[controller.activation.sink]
3330type = "local_json"
3331output_path = "tailtriage-run.json"
3332
3333[controller.activation.runtime_sampler]
3334enabled_for_armed_runs = true
3335interval_ms = 20
3336max_runtime_snapshots = 10
3337"#,
3338 );
3339
3340 let err = TailtriageController::builder("checkout-service")
3341 .config_path(&config)
3342 .build()
3343 .expect_err("initially_enabled with sampler should fail outside Tokio runtime");
3344 assert!(matches!(
3345 err,
3346 ControllerBuildError::InitialEnable(EnableError::MissingTokioRuntimeForSampler)
3347 ));
3348
3349 fs::remove_file(config).expect("config cleanup should succeed");
3350 }
3351
3352 #[test]
3353 fn reload_config_after_config_file_deleted_returns_load_error() {
3354 let output = test_output("reload-deleted-config");
3355 let config = test_config_path("reload-deleted-config");
3356 write_config(&config, &output, "light", false, false);
3357
3358 let controller = TailtriageController::builder("checkout-service")
3359 .config_path(&config)
3360 .build()
3361 .expect("build should succeed");
3362
3363 fs::remove_file(&config).expect("config delete should succeed");
3364 assert!(matches!(
3365 controller.reload_config(),
3366 Err(ReloadConfigError::Load(super::ConfigLoadError::Io { .. }))
3367 ));
3368 }
3369
3370 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
3371 async fn armed_generation_with_sampler_enabled_records_effective_metadata() {
3372 let output = test_output("sampler-enabled");
3373 let controller = TailtriageController::builder("checkout-service")
3374 .output(&output)
3375 .runtime_sampler(RuntimeSamplerTemplate {
3376 enabled_for_armed_runs: true,
3377 mode_override: Some(CaptureMode::Investigation),
3378 interval_ms: Some(15),
3379 max_runtime_snapshots: Some(8),
3380 })
3381 .capture_limits_override(CaptureLimitsOverride {
3382 max_runtime_snapshots: Some(3),
3383 ..CaptureLimitsOverride::default()
3384 })
3385 .build()
3386 .expect("build should succeed");
3387
3388 let active = controller.enable().expect("enable should succeed");
3389 tokio::time::sleep(Duration::from_millis(40)).await;
3390 assert!(matches!(
3391 controller.disable(),
3392 Ok(DisableOutcome::Finalized { generation_id }) if generation_id == active.generation_id
3393 ));
3394
3395 let run = read_run(&active.artifact_path);
3396 let config = run
3397 .metadata
3398 .effective_tokio_sampler_config
3399 .expect("sampler metadata should be set");
3400 assert_eq!(config.inherited_mode, CaptureMode::Light);
3401 assert_eq!(
3402 config.explicit_mode_override,
3403 Some(CaptureMode::Investigation)
3404 );
3405 assert_eq!(config.resolved_mode, CaptureMode::Investigation);
3406 assert_eq!(config.resolved_sampler_cadence_ms, 15);
3407 assert_eq!(config.resolved_runtime_snapshot_retention, 3);
3408
3409 fs::remove_file(active.artifact_path).expect("cleanup should succeed");
3410 }
3411
3412 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
3413 async fn armed_generation_with_sampler_disabled_keeps_sampler_metadata_empty() {
3414 let output = test_output("sampler-disabled");
3415 let controller = TailtriageController::builder("checkout-service")
3416 .output(&output)
3417 .runtime_sampler(RuntimeSamplerTemplate {
3418 enabled_for_armed_runs: false,
3419 mode_override: Some(CaptureMode::Investigation),
3420 interval_ms: Some(5),
3421 max_runtime_snapshots: Some(100),
3422 })
3423 .build()
3424 .expect("build should succeed");
3425
3426 let active = controller.enable().expect("enable should succeed");
3427 tokio::time::sleep(Duration::from_millis(20)).await;
3428 assert!(matches!(
3429 controller.disable(),
3430 Ok(DisableOutcome::Finalized { generation_id }) if generation_id == active.generation_id
3431 ));
3432
3433 let run = read_run(&active.artifact_path);
3434 assert!(run.metadata.effective_tokio_sampler_config.is_none());
3435 assert!(run.runtime_snapshots.is_empty());
3436
3437 fs::remove_file(active.artifact_path).expect("cleanup should succeed");
3438 }
3439
3440 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
3441 async fn sampler_stops_on_disarm_and_reenable_uses_fresh_generation_sampler_lifecycle() {
3442 let output = test_output("sampler-reenable");
3443 let controller = TailtriageController::builder("checkout-service")
3444 .output(&output)
3445 .runtime_sampler(RuntimeSamplerTemplate {
3446 enabled_for_armed_runs: true,
3447 mode_override: None,
3448 interval_ms: Some(10),
3449 max_runtime_snapshots: Some(32),
3450 })
3451 .build()
3452 .expect("build should succeed");
3453
3454 let first = controller.enable().expect("first enable should succeed");
3455 tokio::time::sleep(Duration::from_millis(35)).await;
3456 assert!(matches!(
3457 controller.disable(),
3458 Ok(DisableOutcome::Finalized { generation_id }) if generation_id == first.generation_id
3459 ));
3460 tokio::time::sleep(Duration::from_millis(30)).await;
3461
3462 let first_run = read_run(&first.artifact_path);
3463 assert!(!first_run.runtime_snapshots.is_empty());
3464 let first_metadata = first_run
3465 .metadata
3466 .effective_tokio_sampler_config
3467 .expect("first generation sampler metadata should exist");
3468
3469 let second = controller.enable().expect("second enable should succeed");
3470 assert_eq!(second.generation_id, first.generation_id + 1);
3471 tokio::time::sleep(Duration::from_millis(35)).await;
3472 assert!(matches!(
3473 controller.disable(),
3474 Ok(DisableOutcome::Finalized { generation_id }) if generation_id == second.generation_id
3475 ));
3476
3477 let second_run = read_run(&second.artifact_path);
3478 assert!(!second_run.runtime_snapshots.is_empty());
3479 let second_metadata = second_run
3480 .metadata
3481 .effective_tokio_sampler_config
3482 .expect("second generation sampler metadata should exist");
3483
3484 assert_eq!(first_metadata.resolved_sampler_cadence_ms, 10);
3485 assert_eq!(second_metadata.resolved_sampler_cadence_ms, 10);
3486 assert_ne!(first.artifact_path, second.artifact_path);
3487
3488 fs::remove_file(first.artifact_path).expect("cleanup first should succeed");
3489 fs::remove_file(second.artifact_path).expect("cleanup second should succeed");
3490 }
3491}