1use ranvier_core::bus::Bus;
15use ranvier_core::outcome::Outcome;
16use ranvier_core::schematic::{Edge, EdgeType, Node, NodeKind, Schematic};
17use ranvier_core::timeline::{Timeline, TimelineEvent};
18use ranvier_core::transition::Transition;
19use std::any::type_name;
20use std::ffi::OsString;
21use std::fs;
22use std::future::Future;
23use std::path::{Path, PathBuf};
24use std::pin::Pin;
25use std::sync::{Arc, Mutex, OnceLock};
26use std::time::{SystemTime, UNIX_EPOCH};
27use tracing::Instrument;
28
29pub type BoxFuture<'a, T> = Pin<Box<dyn Future<Output = T> + Send + 'a>>;
31
32pub type Executor<In, Out, E, Res> =
36 Arc<dyn for<'a> Fn(In, &'a Res, &'a mut Bus) -> BoxFuture<'a, Outcome<Out, E>> + Send + Sync>;
37
38fn type_name_of<T: ?Sized>() -> String {
40 let full = type_name::<T>();
41 full.split("::").last().unwrap_or(full).to_string()
42}
43
44pub struct Axon<In, Out, E, Res = ()> {
64 pub schematic: Schematic,
66 executor: Executor<In, Out, E, Res>,
68}
69
70#[derive(Debug, Clone)]
72pub struct SchematicExportRequest {
73 pub output: Option<PathBuf>,
75}
76
77impl<In, Out, E, Res> Clone for Axon<In, Out, E, Res> {
78 fn clone(&self) -> Self {
79 Self {
80 schematic: self.schematic.clone(),
81 executor: self.executor.clone(),
82 }
83 }
84}
85
86impl<In, E, Res> Axon<In, In, E, Res>
87where
88 In: Send + Sync + 'static,
89 E: Send + 'static,
90 Res: ranvier_core::transition::ResourceRequirement,
91{
92 pub fn new(label: &str) -> Self {
95 Self::start(label)
96 }
97
98 pub fn start(label: &str) -> Self {
101 let node_id = uuid::Uuid::new_v4().to_string();
102 let node = Node {
103 id: node_id,
104 kind: NodeKind::Ingress,
105 label: label.to_string(),
106 description: None,
107 input_type: "void".to_string(),
108 output_type: type_name_of::<In>(),
109 resource_type: type_name_of::<Res>(),
110 metadata: Default::default(),
111 source_location: None,
112 };
113
114 let mut schematic = Schematic::new(label);
115 schematic.nodes.push(node);
116
117 let executor: Executor<In, In, E, Res> =
118 Arc::new(move |input, _res, _bus| Box::pin(std::future::ready(Outcome::Next(input))));
119
120 Self {
121 schematic,
122 executor,
123 }
124 }
125}
126
127impl<In, Out, E, Res> Axon<In, Out, E, Res>
128where
129 In: Send + Sync + 'static,
130 Out: Send + Sync + 'static,
131 E: Send + 'static,
132 Res: ranvier_core::transition::ResourceRequirement,
133{
134 pub fn then<Next, Trans>(self, transition: Trans) -> Axon<In, Next, E, Res>
138 where
139 Next: Send + Sync + 'static,
140 Trans: Transition<Out, Next, Resources = Res, Error = E> + Clone + Send + Sync + 'static,
141 {
142 let Axon {
144 mut schematic,
145 executor: prev_executor,
146 } = self;
147
148 let next_node_id = uuid::Uuid::new_v4().to_string();
150 let next_node = Node {
151 id: next_node_id.clone(),
152 kind: NodeKind::Atom,
153 label: transition.label(),
154 description: transition.description(),
155 input_type: type_name_of::<Out>(),
156 output_type: type_name_of::<Next>(),
157 resource_type: type_name_of::<Res>(),
158 metadata: Default::default(),
159 source_location: None,
160 };
161
162 let last_node_id = schematic
163 .nodes
164 .last()
165 .map(|n| n.id.clone())
166 .unwrap_or_default();
167
168 schematic.nodes.push(next_node);
169 schematic.edges.push(Edge {
170 from: last_node_id,
171 to: next_node_id.clone(),
172 kind: EdgeType::Linear,
173 label: Some("Next".to_string()),
174 });
175
176 let node_id_for_exec = next_node_id.clone();
178 let node_label_for_exec = transition.label();
179 let next_executor: Executor<In, Next, E, Res> = Arc::new(
180 move |input: In, res: &Res, bus: &mut Bus| -> BoxFuture<'_, Outcome<Next, E>> {
181 let prev = prev_executor.clone();
182 let trans = transition.clone();
183 let timeline_node_id = node_id_for_exec.clone();
184 let timeline_node_label = node_label_for_exec.clone();
185
186 Box::pin(async move {
187 let prev_result = prev(input, res, bus).await;
189
190 let state = match prev_result {
192 Outcome::Next(t) => t,
193 other => return other.map(|_| unreachable!()),
194 };
195
196 let label = trans.label();
198 let res_type = std::any::type_name::<Res>()
199 .split("::")
200 .last()
201 .unwrap_or("unknown");
202
203 let enter_ts = now_ms();
204 if let Some(timeline) = bus.read_mut::<Timeline>() {
205 timeline.push(TimelineEvent::NodeEnter {
206 node_id: timeline_node_id.clone(),
207 node_label: timeline_node_label.clone(),
208 timestamp: enter_ts,
209 });
210 }
211
212 let started = std::time::Instant::now();
213 let result = trans
214 .run(state, res, bus)
215 .instrument(tracing::info_span!(
216 "Node",
217 ranvier.node = %label,
218 ranvier.resource_type = %res_type
219 ))
220 .await;
221 let duration_ms = started.elapsed().as_millis() as u64;
222 let exit_ts = now_ms();
223
224 if let Some(timeline) = bus.read_mut::<Timeline>() {
225 timeline.push(TimelineEvent::NodeExit {
226 node_id: timeline_node_id.clone(),
227 outcome_type: outcome_type_name(&result),
228 duration_ms,
229 timestamp: exit_ts,
230 });
231
232 if let Outcome::Branch(branch_id, _) = &result {
233 timeline.push(TimelineEvent::Branchtaken {
234 branch_id: branch_id.clone(),
235 timestamp: exit_ts,
236 });
237 }
238 }
239
240 result
241 })
242 },
243 );
244
245 Axon {
246 schematic,
247 executor: next_executor,
248 }
249 }
250
251 pub fn branch(mut self, branch_id: impl Into<String>, label: &str) -> Self {
253 let branch_id_str = branch_id.into();
254 let last_node_id = self
255 .schematic
256 .nodes
257 .last()
258 .map(|n| n.id.clone())
259 .unwrap_or_default();
260
261 let branch_node = Node {
262 id: uuid::Uuid::new_v4().to_string(),
263 kind: NodeKind::Synapse,
264 label: label.to_string(),
265 description: None,
266 input_type: type_name_of::<Out>(),
267 output_type: type_name_of::<Out>(),
268 resource_type: type_name_of::<Res>(),
269 metadata: Default::default(),
270 source_location: None,
271 };
272
273 self.schematic.nodes.push(branch_node);
274 self.schematic.edges.push(Edge {
275 from: last_node_id,
276 to: branch_id_str.clone(),
277 kind: EdgeType::Branch(branch_id_str),
278 label: Some("Branch".to_string()),
279 });
280
281 self
282 }
283
284 pub async fn execute(&self, input: In, resources: &Res, bus: &mut Bus) -> Outcome<Out, E> {
286 let should_capture = should_attach_timeline(bus);
287 let inserted_timeline = if should_capture {
288 ensure_timeline(bus)
289 } else {
290 false
291 };
292 let ingress_started = std::time::Instant::now();
293 let ingress_enter_ts = now_ms();
294 if should_capture
295 && let (Some(timeline), Some(ingress)) =
296 (bus.read_mut::<Timeline>(), self.schematic.nodes.first())
297 {
298 timeline.push(TimelineEvent::NodeEnter {
299 node_id: ingress.id.clone(),
300 node_label: ingress.label.clone(),
301 timestamp: ingress_enter_ts,
302 });
303 }
304
305 let label = self.schematic.name.clone();
306 let outcome = (self.executor)(input, resources, bus)
307 .instrument(tracing::info_span!("Circuit", ranvier.circuit = %label))
308 .await;
309
310 let ingress_exit_ts = now_ms();
311 if should_capture
312 && let (Some(timeline), Some(ingress)) =
313 (bus.read_mut::<Timeline>(), self.schematic.nodes.first())
314 {
315 timeline.push(TimelineEvent::NodeExit {
316 node_id: ingress.id.clone(),
317 outcome_type: outcome_type_name(&outcome),
318 duration_ms: ingress_started.elapsed().as_millis() as u64,
319 timestamp: ingress_exit_ts,
320 });
321 }
322
323 if should_capture {
324 maybe_export_timeline(bus, &outcome);
325 }
326 if inserted_timeline {
327 let _ = bus.remove::<Timeline>();
328 }
329
330 outcome
331 }
332
333 pub fn serve_inspector(self, port: u16) -> Self {
336 if !inspector_enabled_from_env() {
337 tracing::info!("Inspector disabled by RANVIER_INSPECTOR");
338 return self;
339 }
340
341 let schematic = self.schematic.clone();
342 tokio::spawn(async move {
343 if let Err(e) = ranvier_inspector::Inspector::new(schematic, port)
344 .with_projection_files_from_env()
345 .with_mode_from_env()
346 .with_auth_policy_from_env()
347 .serve()
348 .await
349 {
350 tracing::error!("Inspector server failed: {}", e);
351 }
352 });
353 self
354 }
355
356 pub fn schematic(&self) -> &Schematic {
358 &self.schematic
359 }
360
361 pub fn into_schematic(self) -> Schematic {
363 self.schematic
364 }
365
366 pub fn schematic_export_request(&self) -> Option<SchematicExportRequest> {
377 schematic_export_request_from_process()
378 }
379
380 pub fn maybe_export_and_exit(
392 &self,
393 ) -> anyhow::Result<bool> {
394 self.maybe_export_and_exit_with(|_| ())
395 }
396
397 pub fn maybe_export_and_exit_with<F>(
402 &self,
403 on_before_exit: F,
404 ) -> anyhow::Result<bool>
405 where
406 F: FnOnce(&SchematicExportRequest),
407 {
408 let Some(request) = self.schematic_export_request() else {
409 return Ok(false);
410 };
411 on_before_exit(&request);
412 self.export_schematic(&request)?;
413 Ok(true)
414 }
415
416 pub fn export_schematic(
418 &self,
419 request: &SchematicExportRequest,
420 ) -> anyhow::Result<()> {
421 let json = serde_json::to_string_pretty(self.schematic())?;
422 if let Some(path) = &request.output {
423 if let Some(parent) = path.parent() {
424 if !parent.as_os_str().is_empty() {
425 fs::create_dir_all(parent)?;
426 }
427 }
428 fs::write(path, json.as_bytes())?;
429 return Ok(());
430 }
431 println!("{}", json);
432 Ok(())
433 }
434}
435
436fn schematic_export_request_from_process() -> Option<SchematicExportRequest> {
437 let args: Vec<OsString> = std::env::args_os().skip(1).collect();
438 let mut enabled = env_flag_is_true("RANVIER_SCHEMATIC");
439 let mut output = std::env::var_os("RANVIER_SCHEMATIC_OUTPUT").map(PathBuf::from);
440
441 let mut i = 0;
442 while i < args.len() {
443 let arg = args[i].to_string_lossy();
444
445 if arg == "--schematic" {
446 enabled = true;
447 i += 1;
448 continue;
449 }
450
451 if arg == "--schematic-output" || arg == "--output" {
452 if let Some(next) = args.get(i + 1) {
453 output = Some(PathBuf::from(next));
454 i += 2;
455 continue;
456 }
457 } else if let Some(value) = arg.strip_prefix("--schematic-output=") {
458 output = Some(PathBuf::from(value));
459 } else if let Some(value) = arg.strip_prefix("--output=") {
460 output = Some(PathBuf::from(value));
461 }
462
463 i += 1;
464 }
465
466 if enabled {
467 Some(SchematicExportRequest { output })
468 } else {
469 None
470 }
471}
472
473fn env_flag_is_true(key: &str) -> bool {
474 match std::env::var(key) {
475 Ok(v) => matches!(v.to_ascii_lowercase().as_str(), "1" | "true" | "on" | "yes"),
476 Err(_) => false,
477 }
478}
479
480fn inspector_enabled_from_env() -> bool {
481 match std::env::var("RANVIER_INSPECTOR") {
482 Ok(v) => matches!(v.to_ascii_lowercase().as_str(), "1" | "true" | "on" | "yes"),
483 Err(_) => true,
484 }
485}
486
487fn maybe_export_timeline<Out, E>(bus: &mut Bus, outcome: &Outcome<Out, E>) {
488 let path = match std::env::var("RANVIER_TIMELINE_OUTPUT") {
489 Ok(v) if !v.trim().is_empty() => v,
490 _ => return,
491 };
492
493 let sampled = sampled_by_bus_id(bus.id, timeline_sample_rate());
494 let policy = timeline_adaptive_policy();
495 let forced = should_force_export(outcome, &policy);
496 let should_export = sampled || forced;
497 if !should_export {
498 record_sampling_stats(false, sampled, forced, "none", &policy);
499 return;
500 }
501
502 let mut timeline = bus.read::<Timeline>().cloned().unwrap_or_default();
503 timeline.sort();
504
505 let mode = std::env::var("RANVIER_TIMELINE_MODE")
506 .unwrap_or_else(|_| "overwrite".to_string())
507 .to_ascii_lowercase();
508
509 if let Err(err) = write_timeline_with_policy(&path, &mode, timeline) {
510 tracing::warn!(
511 "Failed to persist timeline file {} (mode={}): {}",
512 path,
513 mode,
514 err
515 );
516 record_sampling_stats(false, sampled, forced, &mode, &policy);
517 } else {
518 record_sampling_stats(true, sampled, forced, &mode, &policy);
519 }
520}
521
522fn outcome_type_name<Out, E>(outcome: &Outcome<Out, E>) -> String {
523 match outcome {
524 Outcome::Next(_) => "Next".to_string(),
525 Outcome::Branch(id, _) => format!("Branch:{}", id),
526 Outcome::Jump(id, _) => format!("Jump:{}", id),
527 Outcome::Emit(event_type, _) => format!("Emit:{}", event_type),
528 Outcome::Fault(_) => "Fault".to_string(),
529 }
530}
531
532fn ensure_timeline(bus: &mut Bus) -> bool {
533 if bus.has::<Timeline>() {
534 false
535 } else {
536 bus.insert(Timeline::new());
537 true
538 }
539}
540
541fn should_attach_timeline(bus: &Bus) -> bool {
542 if bus.has::<Timeline>() {
544 return true;
545 }
546
547 has_timeline_output_path()
549}
550
551fn has_timeline_output_path() -> bool {
552 std::env::var("RANVIER_TIMELINE_OUTPUT")
553 .ok()
554 .map(|v| !v.trim().is_empty())
555 .unwrap_or(false)
556}
557
558fn timeline_sample_rate() -> f64 {
559 std::env::var("RANVIER_TIMELINE_SAMPLE_RATE")
560 .ok()
561 .and_then(|v| v.parse::<f64>().ok())
562 .map(|v| v.clamp(0.0, 1.0))
563 .unwrap_or(1.0)
564}
565
566fn sampled_by_bus_id(bus_id: uuid::Uuid, rate: f64) -> bool {
567 if rate <= 0.0 {
568 return false;
569 }
570 if rate >= 1.0 {
571 return true;
572 }
573 let bucket = (bus_id.as_u128() % 10_000) as f64 / 10_000.0;
574 bucket < rate
575}
576
577fn timeline_adaptive_policy() -> String {
578 std::env::var("RANVIER_TIMELINE_ADAPTIVE")
579 .unwrap_or_else(|_| "fault_branch".to_string())
580 .to_ascii_lowercase()
581}
582
583fn should_force_export<Out, E>(outcome: &Outcome<Out, E>, policy: &str) -> bool {
584 match policy {
585 "off" => false,
586 "fault_only" => matches!(outcome, Outcome::Fault(_)),
587 "fault_branch_emit" => {
588 matches!(
589 outcome,
590 Outcome::Fault(_) | Outcome::Branch(_, _) | Outcome::Emit(_, _)
591 )
592 }
593 _ => matches!(outcome, Outcome::Fault(_) | Outcome::Branch(_, _)),
594 }
595}
596
597#[derive(Default, Clone)]
598struct SamplingStats {
599 total_decisions: u64,
600 exported: u64,
601 skipped: u64,
602 sampled_exports: u64,
603 forced_exports: u64,
604 last_mode: String,
605 last_policy: String,
606 last_updated_ms: u64,
607}
608
609static TIMELINE_SAMPLING_STATS: OnceLock<Mutex<SamplingStats>> = OnceLock::new();
610
611fn stats_cell() -> &'static Mutex<SamplingStats> {
612 TIMELINE_SAMPLING_STATS.get_or_init(|| Mutex::new(SamplingStats::default()))
613}
614
615fn record_sampling_stats(exported: bool, sampled: bool, forced: bool, mode: &str, policy: &str) {
616 let snapshot = {
617 let mut stats = match stats_cell().lock() {
618 Ok(guard) => guard,
619 Err(_) => return,
620 };
621
622 stats.total_decisions += 1;
623 if exported {
624 stats.exported += 1;
625 } else {
626 stats.skipped += 1;
627 }
628 if sampled && exported {
629 stats.sampled_exports += 1;
630 }
631 if forced && exported {
632 stats.forced_exports += 1;
633 }
634 stats.last_mode = mode.to_string();
635 stats.last_policy = policy.to_string();
636 stats.last_updated_ms = now_ms();
637 stats.clone()
638 };
639
640 tracing::debug!(
641 ranvier.timeline.total_decisions = snapshot.total_decisions,
642 ranvier.timeline.exported = snapshot.exported,
643 ranvier.timeline.skipped = snapshot.skipped,
644 ranvier.timeline.sampled_exports = snapshot.sampled_exports,
645 ranvier.timeline.forced_exports = snapshot.forced_exports,
646 ranvier.timeline.mode = %snapshot.last_mode,
647 ranvier.timeline.policy = %snapshot.last_policy,
648 "Timeline sampling stats updated"
649 );
650
651 if let Some(path) = timeline_stats_output_path() {
652 let payload = serde_json::json!({
653 "total_decisions": snapshot.total_decisions,
654 "exported": snapshot.exported,
655 "skipped": snapshot.skipped,
656 "sampled_exports": snapshot.sampled_exports,
657 "forced_exports": snapshot.forced_exports,
658 "last_mode": snapshot.last_mode,
659 "last_policy": snapshot.last_policy,
660 "last_updated_ms": snapshot.last_updated_ms
661 });
662 if let Some(parent) = Path::new(&path).parent() {
663 let _ = fs::create_dir_all(parent);
664 }
665 if let Err(err) = fs::write(&path, payload.to_string()) {
666 tracing::warn!("Failed to write timeline sampling stats {}: {}", path, err);
667 }
668 }
669}
670
671fn timeline_stats_output_path() -> Option<String> {
672 std::env::var("RANVIER_TIMELINE_STATS_OUTPUT")
673 .ok()
674 .filter(|v| !v.trim().is_empty())
675}
676
677fn write_timeline_with_policy(path: &str, mode: &str, mut timeline: Timeline) -> Result<(), String> {
678 match mode {
679 "append" => {
680 if Path::new(path).exists() {
681 let content = fs::read_to_string(path).map_err(|e| e.to_string())?;
682 match serde_json::from_str::<Timeline>(&content) {
683 Ok(mut existing) => {
684 existing.events.append(&mut timeline.events);
685 existing.sort();
686 if let Some(max_events) = max_events_limit() {
687 truncate_timeline_events(&mut existing, max_events);
688 }
689 write_timeline_json(path, &existing)
690 }
691 Err(_) => {
692 if let Some(max_events) = max_events_limit() {
694 truncate_timeline_events(&mut timeline, max_events);
695 }
696 write_timeline_json(path, &timeline)
697 }
698 }
699 } else {
700 if let Some(max_events) = max_events_limit() {
701 truncate_timeline_events(&mut timeline, max_events);
702 }
703 write_timeline_json(path, &timeline)
704 }
705 }
706 "rotate" => {
707 let rotated_path = rotated_path(path, now_ms());
708 write_timeline_json(rotated_path.to_string_lossy().as_ref(), &timeline)?;
709 if let Some(keep) = rotate_keep_limit() {
710 cleanup_rotated_files(path, keep)?;
711 }
712 Ok(())
713 }
714 _ => write_timeline_json(path, &timeline),
715 }
716}
717
718fn write_timeline_json(path: &str, timeline: &Timeline) -> Result<(), String> {
719 if let Some(parent) = Path::new(path).parent() {
720 if !parent.as_os_str().is_empty() {
721 fs::create_dir_all(parent).map_err(|e| e.to_string())?;
722 }
723 }
724 let json = serde_json::to_string_pretty(timeline).map_err(|e| e.to_string())?;
725 fs::write(path, json).map_err(|e| e.to_string())
726}
727
728fn rotated_path(path: &str, suffix: u64) -> PathBuf {
729 let p = Path::new(path);
730 let parent = p.parent().unwrap_or_else(|| Path::new(""));
731 let stem = p
732 .file_stem()
733 .and_then(|s| s.to_str())
734 .unwrap_or("timeline");
735 let ext = p.extension().and_then(|e| e.to_str()).unwrap_or("json");
736 parent.join(format!("{}_{}.{}", stem, suffix, ext))
737}
738
739fn max_events_limit() -> Option<usize> {
740 std::env::var("RANVIER_TIMELINE_MAX_EVENTS")
741 .ok()
742 .and_then(|v| v.parse::<usize>().ok())
743 .filter(|v| *v > 0)
744}
745
746fn rotate_keep_limit() -> Option<usize> {
747 std::env::var("RANVIER_TIMELINE_ROTATE_KEEP")
748 .ok()
749 .and_then(|v| v.parse::<usize>().ok())
750 .filter(|v| *v > 0)
751}
752
753fn truncate_timeline_events(timeline: &mut Timeline, max_events: usize) {
754 let len = timeline.events.len();
755 if len > max_events {
756 let keep_from = len - max_events;
757 timeline.events = timeline.events.split_off(keep_from);
758 }
759}
760
761fn cleanup_rotated_files(base_path: &str, keep: usize) -> Result<(), String> {
762 let p = Path::new(base_path);
763 let parent = p.parent().unwrap_or_else(|| Path::new("."));
764 let stem = p
765 .file_stem()
766 .and_then(|s| s.to_str())
767 .unwrap_or("timeline");
768 let ext = p.extension().and_then(|e| e.to_str()).unwrap_or("json");
769 let prefix = format!("{}_", stem);
770 let suffix = format!(".{}", ext);
771
772 let mut files = fs::read_dir(parent)
773 .map_err(|e| e.to_string())?
774 .filter_map(|entry| entry.ok())
775 .filter(|entry| {
776 let name = entry.file_name();
777 let name = name.to_string_lossy();
778 name.starts_with(&prefix) && name.ends_with(&suffix)
779 })
780 .filter_map(|entry| {
781 let modified = entry
782 .metadata()
783 .ok()
784 .and_then(|m| m.modified().ok())
785 .unwrap_or(SystemTime::UNIX_EPOCH);
786 Some((entry.path(), modified))
787 })
788 .collect::<Vec<_>>();
789
790 files.sort_by(|a, b| b.1.cmp(&a.1));
791 for (path, _) in files.into_iter().skip(keep) {
792 let _ = fs::remove_file(path);
793 }
794 Ok(())
795}
796
797fn now_ms() -> u64 {
798 SystemTime::now()
799 .duration_since(UNIX_EPOCH)
800 .map(|d| d.as_millis() as u64)
801 .unwrap_or(0)
802}
803
804#[cfg(test)]
805mod tests {
806 use super::{sampled_by_bus_id, should_force_export};
807 use ranvier_core::Outcome;
808 use uuid::Uuid;
809
810 #[test]
811 fn adaptive_policy_force_export_matrix() {
812 let next = Outcome::<(), &'static str>::Next(());
813 let branch = Outcome::<(), &'static str>::Branch("declined".to_string(), None);
814 let emit = Outcome::<(), &'static str>::Emit("audit".to_string(), None);
815 let fault = Outcome::<(), &'static str>::Fault("boom");
816
817 assert!(!should_force_export(&next, "off"));
818 assert!(!should_force_export(&fault, "off"));
819
820 assert!(!should_force_export(&branch, "fault_only"));
821 assert!(should_force_export(&fault, "fault_only"));
822
823 assert!(should_force_export(&branch, "fault_branch"));
824 assert!(!should_force_export(&emit, "fault_branch"));
825 assert!(should_force_export(&fault, "fault_branch"));
826
827 assert!(should_force_export(&branch, "fault_branch_emit"));
828 assert!(should_force_export(&emit, "fault_branch_emit"));
829 assert!(should_force_export(&fault, "fault_branch_emit"));
830 }
831
832 #[test]
833 fn sampling_and_adaptive_combination_decisions() {
834 let bus_id = Uuid::nil();
835 let next = Outcome::<(), &'static str>::Next(());
836 let fault = Outcome::<(), &'static str>::Fault("boom");
837
838 let sampled_never = sampled_by_bus_id(bus_id, 0.0);
839 assert!(!sampled_never);
840 assert!(!(sampled_never || should_force_export(&next, "off")));
841 assert!(sampled_never || should_force_export(&fault, "fault_only"));
842
843 let sampled_always = sampled_by_bus_id(bus_id, 1.0);
844 assert!(sampled_always);
845 assert!(sampled_always || should_force_export(&next, "off"));
846 assert!(sampled_always || should_force_export(&fault, "off"));
847 }
848}