1use ranvier_core::bus::Bus;
15use ranvier_core::outcome::Outcome;
16use ranvier_core::schematic::{Edge, EdgeType, Node, NodeKind, Schematic, SourceLocation};
17use ranvier_core::timeline::{Timeline, TimelineEvent};
18use ranvier_core::transition::Transition;
19use std::any::type_name;
20use std::ffi::OsString;
21use std::fs;
22use std::future::Future;
23use std::panic::Location;
24use std::path::{Path, PathBuf};
25use std::pin::Pin;
26use std::sync::{Arc, Mutex, OnceLock};
27use std::time::{SystemTime, UNIX_EPOCH};
28use tracing::Instrument;
29
30pub type BoxFuture<'a, T> = Pin<Box<dyn Future<Output = T> + Send + 'a>>;
32
33pub type Executor<In, Out, E, Res> =
37 Arc<dyn for<'a> Fn(In, &'a Res, &'a mut Bus) -> BoxFuture<'a, Outcome<Out, E>> + Send + Sync>;
38
39fn type_name_of<T: ?Sized>() -> String {
41 let full = type_name::<T>();
42 full.split("::").last().unwrap_or(full).to_string()
43}
44
45pub struct Axon<In, Out, E, Res = ()> {
65 pub schematic: Schematic,
67 executor: Executor<In, Out, E, Res>,
69}
70
71#[derive(Debug, Clone)]
73pub struct SchematicExportRequest {
74 pub output: Option<PathBuf>,
76}
77
78impl<In, Out, E, Res> Clone for Axon<In, Out, E, Res> {
79 fn clone(&self) -> Self {
80 Self {
81 schematic: self.schematic.clone(),
82 executor: self.executor.clone(),
83 }
84 }
85}
86
87impl<In, E, Res> Axon<In, In, E, Res>
88where
89 In: Send + Sync + 'static,
90 E: Send + 'static,
91 Res: ranvier_core::transition::ResourceRequirement,
92{
93 #[track_caller]
96 pub fn new(label: &str) -> Self {
97 let caller = Location::caller();
98 Self::start_with_source(label, caller)
99 }
100
101 #[track_caller]
104 pub fn start(label: &str) -> Self {
105 let caller = Location::caller();
106 Self::start_with_source(label, caller)
107 }
108
109 fn start_with_source(
110 label: &str,
111 caller: &'static Location<'static>,
112 ) -> Self {
113 let node_id = uuid::Uuid::new_v4().to_string();
114 let node = Node {
115 id: node_id,
116 kind: NodeKind::Ingress,
117 label: label.to_string(),
118 description: None,
119 input_type: "void".to_string(),
120 output_type: type_name_of::<In>(),
121 resource_type: type_name_of::<Res>(),
122 metadata: Default::default(),
123 source_location: Some(SourceLocation::new(caller.file(), caller.line())),
124 };
125
126 let mut schematic = Schematic::new(label);
127 schematic.nodes.push(node);
128
129 let executor: Executor<In, In, E, Res> =
130 Arc::new(move |input, _res, _bus| Box::pin(std::future::ready(Outcome::Next(input))));
131
132 Self {
133 schematic,
134 executor,
135 }
136 }
137}
138
139impl<In, Out, E, Res> Axon<In, Out, E, Res>
140where
141 In: Send + Sync + 'static,
142 Out: Send + Sync + 'static,
143 E: Send + 'static,
144 Res: ranvier_core::transition::ResourceRequirement,
145{
146 #[track_caller]
150 pub fn then<Next, Trans>(self, transition: Trans) -> Axon<In, Next, E, Res>
151 where
152 Next: Send + Sync + 'static,
153 Trans: Transition<Out, Next, Resources = Res, Error = E> + Clone + Send + Sync + 'static,
154 {
155 let caller = Location::caller();
156 let Axon {
158 mut schematic,
159 executor: prev_executor,
160 } = self;
161
162 let next_node_id = uuid::Uuid::new_v4().to_string();
164 let next_node = Node {
165 id: next_node_id.clone(),
166 kind: NodeKind::Atom,
167 label: transition.label(),
168 description: transition.description(),
169 input_type: type_name_of::<Out>(),
170 output_type: type_name_of::<Next>(),
171 resource_type: type_name_of::<Res>(),
172 metadata: Default::default(),
173 source_location: Some(SourceLocation::new(caller.file(), caller.line())),
174 };
175
176 let last_node_id = schematic
177 .nodes
178 .last()
179 .map(|n| n.id.clone())
180 .unwrap_or_default();
181
182 schematic.nodes.push(next_node);
183 schematic.edges.push(Edge {
184 from: last_node_id,
185 to: next_node_id.clone(),
186 kind: EdgeType::Linear,
187 label: Some("Next".to_string()),
188 });
189
190 let node_id_for_exec = next_node_id.clone();
192 let node_label_for_exec = transition.label();
193 let next_executor: Executor<In, Next, E, Res> = Arc::new(
194 move |input: In, res: &Res, bus: &mut Bus| -> BoxFuture<'_, Outcome<Next, E>> {
195 let prev = prev_executor.clone();
196 let trans = transition.clone();
197 let timeline_node_id = node_id_for_exec.clone();
198 let timeline_node_label = node_label_for_exec.clone();
199
200 Box::pin(async move {
201 let prev_result = prev(input, res, bus).await;
203
204 let state = match prev_result {
206 Outcome::Next(t) => t,
207 other => return other.map(|_| unreachable!()),
208 };
209
210 let label = trans.label();
212 let res_type = std::any::type_name::<Res>()
213 .split("::")
214 .last()
215 .unwrap_or("unknown");
216
217 let enter_ts = now_ms();
218 if let Some(timeline) = bus.read_mut::<Timeline>() {
219 timeline.push(TimelineEvent::NodeEnter {
220 node_id: timeline_node_id.clone(),
221 node_label: timeline_node_label.clone(),
222 timestamp: enter_ts,
223 });
224 }
225
226 let started = std::time::Instant::now();
227 let result = trans
228 .run(state, res, bus)
229 .instrument(tracing::info_span!(
230 "Node",
231 ranvier.node = %label,
232 ranvier.resource_type = %res_type
233 ))
234 .await;
235 let duration_ms = started.elapsed().as_millis() as u64;
236 let exit_ts = now_ms();
237
238 if let Some(timeline) = bus.read_mut::<Timeline>() {
239 timeline.push(TimelineEvent::NodeExit {
240 node_id: timeline_node_id.clone(),
241 outcome_type: outcome_type_name(&result),
242 duration_ms,
243 timestamp: exit_ts,
244 });
245
246 if let Outcome::Branch(branch_id, _) = &result {
247 timeline.push(TimelineEvent::Branchtaken {
248 branch_id: branch_id.clone(),
249 timestamp: exit_ts,
250 });
251 }
252 }
253
254 result
255 })
256 },
257 );
258
259 Axon {
260 schematic,
261 executor: next_executor,
262 }
263 }
264
265 #[track_caller]
267 pub fn branch(mut self, branch_id: impl Into<String>, label: &str) -> Self {
268 let caller = Location::caller();
269 let branch_id_str = branch_id.into();
270 let last_node_id = self
271 .schematic
272 .nodes
273 .last()
274 .map(|n| n.id.clone())
275 .unwrap_or_default();
276
277 let branch_node = Node {
278 id: uuid::Uuid::new_v4().to_string(),
279 kind: NodeKind::Synapse,
280 label: label.to_string(),
281 description: None,
282 input_type: type_name_of::<Out>(),
283 output_type: type_name_of::<Out>(),
284 resource_type: type_name_of::<Res>(),
285 metadata: Default::default(),
286 source_location: Some(SourceLocation::new(caller.file(), caller.line())),
287 };
288
289 self.schematic.nodes.push(branch_node);
290 self.schematic.edges.push(Edge {
291 from: last_node_id,
292 to: branch_id_str.clone(),
293 kind: EdgeType::Branch(branch_id_str),
294 label: Some("Branch".to_string()),
295 });
296
297 self
298 }
299
300 pub async fn execute(&self, input: In, resources: &Res, bus: &mut Bus) -> Outcome<Out, E> {
302 let should_capture = should_attach_timeline(bus);
303 let inserted_timeline = if should_capture {
304 ensure_timeline(bus)
305 } else {
306 false
307 };
308 let ingress_started = std::time::Instant::now();
309 let ingress_enter_ts = now_ms();
310 if should_capture
311 && let (Some(timeline), Some(ingress)) =
312 (bus.read_mut::<Timeline>(), self.schematic.nodes.first())
313 {
314 timeline.push(TimelineEvent::NodeEnter {
315 node_id: ingress.id.clone(),
316 node_label: ingress.label.clone(),
317 timestamp: ingress_enter_ts,
318 });
319 }
320
321 let label = self.schematic.name.clone();
322 let outcome = (self.executor)(input, resources, bus)
323 .instrument(tracing::info_span!("Circuit", ranvier.circuit = %label))
324 .await;
325
326 let ingress_exit_ts = now_ms();
327 if should_capture
328 && let (Some(timeline), Some(ingress)) =
329 (bus.read_mut::<Timeline>(), self.schematic.nodes.first())
330 {
331 timeline.push(TimelineEvent::NodeExit {
332 node_id: ingress.id.clone(),
333 outcome_type: outcome_type_name(&outcome),
334 duration_ms: ingress_started.elapsed().as_millis() as u64,
335 timestamp: ingress_exit_ts,
336 });
337 }
338
339 if should_capture {
340 maybe_export_timeline(bus, &outcome);
341 }
342 if inserted_timeline {
343 let _ = bus.remove::<Timeline>();
344 }
345
346 outcome
347 }
348
349 pub fn serve_inspector(self, port: u16) -> Self {
352 if !inspector_enabled_from_env() {
353 tracing::info!("Inspector disabled by RANVIER_INSPECTOR");
354 return self;
355 }
356
357 let schematic = self.schematic.clone();
358 tokio::spawn(async move {
359 if let Err(e) = ranvier_inspector::Inspector::new(schematic, port)
360 .with_projection_files_from_env()
361 .with_mode_from_env()
362 .with_auth_policy_from_env()
363 .serve()
364 .await
365 {
366 tracing::error!("Inspector server failed: {}", e);
367 }
368 });
369 self
370 }
371
372 pub fn schematic(&self) -> &Schematic {
374 &self.schematic
375 }
376
377 pub fn into_schematic(self) -> Schematic {
379 self.schematic
380 }
381
382 pub fn schematic_export_request(&self) -> Option<SchematicExportRequest> {
393 schematic_export_request_from_process()
394 }
395
396 pub fn maybe_export_and_exit(
408 &self,
409 ) -> anyhow::Result<bool> {
410 self.maybe_export_and_exit_with(|_| ())
411 }
412
413 pub fn maybe_export_and_exit_with<F>(
418 &self,
419 on_before_exit: F,
420 ) -> anyhow::Result<bool>
421 where
422 F: FnOnce(&SchematicExportRequest),
423 {
424 let Some(request) = self.schematic_export_request() else {
425 return Ok(false);
426 };
427 on_before_exit(&request);
428 self.export_schematic(&request)?;
429 Ok(true)
430 }
431
432 pub fn export_schematic(
434 &self,
435 request: &SchematicExportRequest,
436 ) -> anyhow::Result<()> {
437 let json = serde_json::to_string_pretty(self.schematic())?;
438 if let Some(path) = &request.output {
439 if let Some(parent) = path.parent() {
440 if !parent.as_os_str().is_empty() {
441 fs::create_dir_all(parent)?;
442 }
443 }
444 fs::write(path, json.as_bytes())?;
445 return Ok(());
446 }
447 println!("{}", json);
448 Ok(())
449 }
450}
451
452fn schematic_export_request_from_process() -> Option<SchematicExportRequest> {
453 let args: Vec<OsString> = std::env::args_os().skip(1).collect();
454 let mut enabled = env_flag_is_true("RANVIER_SCHEMATIC");
455 let mut output = std::env::var_os("RANVIER_SCHEMATIC_OUTPUT").map(PathBuf::from);
456
457 let mut i = 0;
458 while i < args.len() {
459 let arg = args[i].to_string_lossy();
460
461 if arg == "--schematic" {
462 enabled = true;
463 i += 1;
464 continue;
465 }
466
467 if arg == "--schematic-output" || arg == "--output" {
468 if let Some(next) = args.get(i + 1) {
469 output = Some(PathBuf::from(next));
470 i += 2;
471 continue;
472 }
473 } else if let Some(value) = arg.strip_prefix("--schematic-output=") {
474 output = Some(PathBuf::from(value));
475 } else if let Some(value) = arg.strip_prefix("--output=") {
476 output = Some(PathBuf::from(value));
477 }
478
479 i += 1;
480 }
481
482 if enabled {
483 Some(SchematicExportRequest { output })
484 } else {
485 None
486 }
487}
488
489fn env_flag_is_true(key: &str) -> bool {
490 match std::env::var(key) {
491 Ok(v) => matches!(v.to_ascii_lowercase().as_str(), "1" | "true" | "on" | "yes"),
492 Err(_) => false,
493 }
494}
495
496fn inspector_enabled_from_env() -> bool {
497 match std::env::var("RANVIER_INSPECTOR") {
498 Ok(v) => matches!(v.to_ascii_lowercase().as_str(), "1" | "true" | "on" | "yes"),
499 Err(_) => true,
500 }
501}
502
503fn maybe_export_timeline<Out, E>(bus: &mut Bus, outcome: &Outcome<Out, E>) {
504 let path = match std::env::var("RANVIER_TIMELINE_OUTPUT") {
505 Ok(v) if !v.trim().is_empty() => v,
506 _ => return,
507 };
508
509 let sampled = sampled_by_bus_id(bus.id, timeline_sample_rate());
510 let policy = timeline_adaptive_policy();
511 let forced = should_force_export(outcome, &policy);
512 let should_export = sampled || forced;
513 if !should_export {
514 record_sampling_stats(false, sampled, forced, "none", &policy);
515 return;
516 }
517
518 let mut timeline = bus.read::<Timeline>().cloned().unwrap_or_default();
519 timeline.sort();
520
521 let mode = std::env::var("RANVIER_TIMELINE_MODE")
522 .unwrap_or_else(|_| "overwrite".to_string())
523 .to_ascii_lowercase();
524
525 if let Err(err) = write_timeline_with_policy(&path, &mode, timeline) {
526 tracing::warn!(
527 "Failed to persist timeline file {} (mode={}): {}",
528 path,
529 mode,
530 err
531 );
532 record_sampling_stats(false, sampled, forced, &mode, &policy);
533 } else {
534 record_sampling_stats(true, sampled, forced, &mode, &policy);
535 }
536}
537
538fn outcome_type_name<Out, E>(outcome: &Outcome<Out, E>) -> String {
539 match outcome {
540 Outcome::Next(_) => "Next".to_string(),
541 Outcome::Branch(id, _) => format!("Branch:{}", id),
542 Outcome::Jump(id, _) => format!("Jump:{}", id),
543 Outcome::Emit(event_type, _) => format!("Emit:{}", event_type),
544 Outcome::Fault(_) => "Fault".to_string(),
545 }
546}
547
548fn ensure_timeline(bus: &mut Bus) -> bool {
549 if bus.has::<Timeline>() {
550 false
551 } else {
552 bus.insert(Timeline::new());
553 true
554 }
555}
556
557fn should_attach_timeline(bus: &Bus) -> bool {
558 if bus.has::<Timeline>() {
560 return true;
561 }
562
563 has_timeline_output_path()
565}
566
567fn has_timeline_output_path() -> bool {
568 std::env::var("RANVIER_TIMELINE_OUTPUT")
569 .ok()
570 .map(|v| !v.trim().is_empty())
571 .unwrap_or(false)
572}
573
574fn timeline_sample_rate() -> f64 {
575 std::env::var("RANVIER_TIMELINE_SAMPLE_RATE")
576 .ok()
577 .and_then(|v| v.parse::<f64>().ok())
578 .map(|v| v.clamp(0.0, 1.0))
579 .unwrap_or(1.0)
580}
581
582fn sampled_by_bus_id(bus_id: uuid::Uuid, rate: f64) -> bool {
583 if rate <= 0.0 {
584 return false;
585 }
586 if rate >= 1.0 {
587 return true;
588 }
589 let bucket = (bus_id.as_u128() % 10_000) as f64 / 10_000.0;
590 bucket < rate
591}
592
593fn timeline_adaptive_policy() -> String {
594 std::env::var("RANVIER_TIMELINE_ADAPTIVE")
595 .unwrap_or_else(|_| "fault_branch".to_string())
596 .to_ascii_lowercase()
597}
598
599fn should_force_export<Out, E>(outcome: &Outcome<Out, E>, policy: &str) -> bool {
600 match policy {
601 "off" => false,
602 "fault_only" => matches!(outcome, Outcome::Fault(_)),
603 "fault_branch_emit" => {
604 matches!(
605 outcome,
606 Outcome::Fault(_) | Outcome::Branch(_, _) | Outcome::Emit(_, _)
607 )
608 }
609 _ => matches!(outcome, Outcome::Fault(_) | Outcome::Branch(_, _)),
610 }
611}
612
613#[derive(Default, Clone)]
614struct SamplingStats {
615 total_decisions: u64,
616 exported: u64,
617 skipped: u64,
618 sampled_exports: u64,
619 forced_exports: u64,
620 last_mode: String,
621 last_policy: String,
622 last_updated_ms: u64,
623}
624
625static TIMELINE_SAMPLING_STATS: OnceLock<Mutex<SamplingStats>> = OnceLock::new();
626
627fn stats_cell() -> &'static Mutex<SamplingStats> {
628 TIMELINE_SAMPLING_STATS.get_or_init(|| Mutex::new(SamplingStats::default()))
629}
630
631fn record_sampling_stats(exported: bool, sampled: bool, forced: bool, mode: &str, policy: &str) {
632 let snapshot = {
633 let mut stats = match stats_cell().lock() {
634 Ok(guard) => guard,
635 Err(_) => return,
636 };
637
638 stats.total_decisions += 1;
639 if exported {
640 stats.exported += 1;
641 } else {
642 stats.skipped += 1;
643 }
644 if sampled && exported {
645 stats.sampled_exports += 1;
646 }
647 if forced && exported {
648 stats.forced_exports += 1;
649 }
650 stats.last_mode = mode.to_string();
651 stats.last_policy = policy.to_string();
652 stats.last_updated_ms = now_ms();
653 stats.clone()
654 };
655
656 tracing::debug!(
657 ranvier.timeline.total_decisions = snapshot.total_decisions,
658 ranvier.timeline.exported = snapshot.exported,
659 ranvier.timeline.skipped = snapshot.skipped,
660 ranvier.timeline.sampled_exports = snapshot.sampled_exports,
661 ranvier.timeline.forced_exports = snapshot.forced_exports,
662 ranvier.timeline.mode = %snapshot.last_mode,
663 ranvier.timeline.policy = %snapshot.last_policy,
664 "Timeline sampling stats updated"
665 );
666
667 if let Some(path) = timeline_stats_output_path() {
668 let payload = serde_json::json!({
669 "total_decisions": snapshot.total_decisions,
670 "exported": snapshot.exported,
671 "skipped": snapshot.skipped,
672 "sampled_exports": snapshot.sampled_exports,
673 "forced_exports": snapshot.forced_exports,
674 "last_mode": snapshot.last_mode,
675 "last_policy": snapshot.last_policy,
676 "last_updated_ms": snapshot.last_updated_ms
677 });
678 if let Some(parent) = Path::new(&path).parent() {
679 let _ = fs::create_dir_all(parent);
680 }
681 if let Err(err) = fs::write(&path, payload.to_string()) {
682 tracing::warn!("Failed to write timeline sampling stats {}: {}", path, err);
683 }
684 }
685}
686
687fn timeline_stats_output_path() -> Option<String> {
688 std::env::var("RANVIER_TIMELINE_STATS_OUTPUT")
689 .ok()
690 .filter(|v| !v.trim().is_empty())
691}
692
693fn write_timeline_with_policy(path: &str, mode: &str, mut timeline: Timeline) -> Result<(), String> {
694 match mode {
695 "append" => {
696 if Path::new(path).exists() {
697 let content = fs::read_to_string(path).map_err(|e| e.to_string())?;
698 match serde_json::from_str::<Timeline>(&content) {
699 Ok(mut existing) => {
700 existing.events.append(&mut timeline.events);
701 existing.sort();
702 if let Some(max_events) = max_events_limit() {
703 truncate_timeline_events(&mut existing, max_events);
704 }
705 write_timeline_json(path, &existing)
706 }
707 Err(_) => {
708 if let Some(max_events) = max_events_limit() {
710 truncate_timeline_events(&mut timeline, max_events);
711 }
712 write_timeline_json(path, &timeline)
713 }
714 }
715 } else {
716 if let Some(max_events) = max_events_limit() {
717 truncate_timeline_events(&mut timeline, max_events);
718 }
719 write_timeline_json(path, &timeline)
720 }
721 }
722 "rotate" => {
723 let rotated_path = rotated_path(path, now_ms());
724 write_timeline_json(rotated_path.to_string_lossy().as_ref(), &timeline)?;
725 if let Some(keep) = rotate_keep_limit() {
726 cleanup_rotated_files(path, keep)?;
727 }
728 Ok(())
729 }
730 _ => write_timeline_json(path, &timeline),
731 }
732}
733
734fn write_timeline_json(path: &str, timeline: &Timeline) -> Result<(), String> {
735 if let Some(parent) = Path::new(path).parent() {
736 if !parent.as_os_str().is_empty() {
737 fs::create_dir_all(parent).map_err(|e| e.to_string())?;
738 }
739 }
740 let json = serde_json::to_string_pretty(timeline).map_err(|e| e.to_string())?;
741 fs::write(path, json).map_err(|e| e.to_string())
742}
743
744fn rotated_path(path: &str, suffix: u64) -> PathBuf {
745 let p = Path::new(path);
746 let parent = p.parent().unwrap_or_else(|| Path::new(""));
747 let stem = p
748 .file_stem()
749 .and_then(|s| s.to_str())
750 .unwrap_or("timeline");
751 let ext = p.extension().and_then(|e| e.to_str()).unwrap_or("json");
752 parent.join(format!("{}_{}.{}", stem, suffix, ext))
753}
754
755fn max_events_limit() -> Option<usize> {
756 std::env::var("RANVIER_TIMELINE_MAX_EVENTS")
757 .ok()
758 .and_then(|v| v.parse::<usize>().ok())
759 .filter(|v| *v > 0)
760}
761
762fn rotate_keep_limit() -> Option<usize> {
763 std::env::var("RANVIER_TIMELINE_ROTATE_KEEP")
764 .ok()
765 .and_then(|v| v.parse::<usize>().ok())
766 .filter(|v| *v > 0)
767}
768
769fn truncate_timeline_events(timeline: &mut Timeline, max_events: usize) {
770 let len = timeline.events.len();
771 if len > max_events {
772 let keep_from = len - max_events;
773 timeline.events = timeline.events.split_off(keep_from);
774 }
775}
776
777fn cleanup_rotated_files(base_path: &str, keep: usize) -> Result<(), String> {
778 let p = Path::new(base_path);
779 let parent = p.parent().unwrap_or_else(|| Path::new("."));
780 let stem = p
781 .file_stem()
782 .and_then(|s| s.to_str())
783 .unwrap_or("timeline");
784 let ext = p.extension().and_then(|e| e.to_str()).unwrap_or("json");
785 let prefix = format!("{}_", stem);
786 let suffix = format!(".{}", ext);
787
788 let mut files = fs::read_dir(parent)
789 .map_err(|e| e.to_string())?
790 .filter_map(|entry| entry.ok())
791 .filter(|entry| {
792 let name = entry.file_name();
793 let name = name.to_string_lossy();
794 name.starts_with(&prefix) && name.ends_with(&suffix)
795 })
796 .filter_map(|entry| {
797 let modified = entry
798 .metadata()
799 .ok()
800 .and_then(|m| m.modified().ok())
801 .unwrap_or(SystemTime::UNIX_EPOCH);
802 Some((entry.path(), modified))
803 })
804 .collect::<Vec<_>>();
805
806 files.sort_by(|a, b| b.1.cmp(&a.1));
807 for (path, _) in files.into_iter().skip(keep) {
808 let _ = fs::remove_file(path);
809 }
810 Ok(())
811}
812
813fn now_ms() -> u64 {
814 SystemTime::now()
815 .duration_since(UNIX_EPOCH)
816 .map(|d| d.as_millis() as u64)
817 .unwrap_or(0)
818}
819
820#[cfg(test)]
821mod tests {
822 use super::{sampled_by_bus_id, should_force_export};
823 use ranvier_core::Outcome;
824 use uuid::Uuid;
825
826 #[test]
827 fn adaptive_policy_force_export_matrix() {
828 let next = Outcome::<(), &'static str>::Next(());
829 let branch = Outcome::<(), &'static str>::Branch("declined".to_string(), None);
830 let emit = Outcome::<(), &'static str>::Emit("audit".to_string(), None);
831 let fault = Outcome::<(), &'static str>::Fault("boom");
832
833 assert!(!should_force_export(&next, "off"));
834 assert!(!should_force_export(&fault, "off"));
835
836 assert!(!should_force_export(&branch, "fault_only"));
837 assert!(should_force_export(&fault, "fault_only"));
838
839 assert!(should_force_export(&branch, "fault_branch"));
840 assert!(!should_force_export(&emit, "fault_branch"));
841 assert!(should_force_export(&fault, "fault_branch"));
842
843 assert!(should_force_export(&branch, "fault_branch_emit"));
844 assert!(should_force_export(&emit, "fault_branch_emit"));
845 assert!(should_force_export(&fault, "fault_branch_emit"));
846 }
847
848 #[test]
849 fn sampling_and_adaptive_combination_decisions() {
850 let bus_id = Uuid::nil();
851 let next = Outcome::<(), &'static str>::Next(());
852 let fault = Outcome::<(), &'static str>::Fault("boom");
853
854 let sampled_never = sampled_by_bus_id(bus_id, 0.0);
855 assert!(!sampled_never);
856 assert!(!(sampled_never || should_force_export(&next, "off")));
857 assert!(sampled_never || should_force_export(&fault, "fault_only"));
858
859 let sampled_always = sampled_by_bus_id(bus_id, 1.0);
860 assert!(sampled_always);
861 assert!(sampled_always || should_force_export(&next, "off"));
862 assert!(sampled_always || should_force_export(&fault, "off"));
863 }
864}