Skip to main content

ranvier_runtime/
axon.rs

1//! # Axon: Executable Decision Tree
2//!
3//! The `Axon` is the **runtime execution path** of a Typed Decision Tree.
4//! It functions as a reusable Typed Decision flow (Axon<In, Out, E>).
5//!
6//! ## Design Philosophy
7//!
8//! * **Axon flows, Schematic shows**: Axon executes; Schematic describes
9//! * **Builder pattern**: `Axon::start().then().then()`
10//! * **Schematic extraction**: Every Axon carries its structural metadata
11//!
12//! "Axon is the flowing thing, Schematic is the visible thing."
13
14use ranvier_core::bus::Bus;
15use ranvier_core::outcome::Outcome;
16use ranvier_core::schematic::{Edge, EdgeType, Node, NodeKind, Schematic, SourceLocation};
17use ranvier_core::timeline::{Timeline, TimelineEvent};
18use ranvier_core::transition::Transition;
19use std::any::type_name;
20use std::ffi::OsString;
21use std::fs;
22use std::future::Future;
23use std::panic::Location;
24use std::path::{Path, PathBuf};
25use std::pin::Pin;
26use std::sync::{Arc, Mutex, OnceLock};
27use std::time::{SystemTime, UNIX_EPOCH};
28use tracing::Instrument;
29
30/// Type alias for async boxed futures used in Axon execution.
31pub type BoxFuture<'a, T> = Pin<Box<dyn Future<Output = T> + Send + 'a>>;
32
33/// Executor type for Axon steps.
34/// Now takes an input state `In`, a resource bundle `Res`, and returns an `Outcome<Out, E>`.
35/// Must be Send + Sync to be reusable across threads and clones.
36pub type Executor<In, Out, E, Res> =
37    Arc<dyn for<'a> Fn(In, &'a Res, &'a mut Bus) -> BoxFuture<'a, Outcome<Out, E>> + Send + Sync>;
38
39/// Helper to extract a readable type name from a type.
40fn type_name_of<T: ?Sized>() -> String {
41    let full = type_name::<T>();
42    full.split("::").last().unwrap_or(full).to_string()
43}
44
45/// The Axon Builder and Runtime.
46///
47/// `Axon` represents an executable decision tree.
48/// It is reusable and thread-safe.
49///
50/// ## Example
51///
52/// ```rust,ignore
53/// use ranvier_core::prelude::*;
54/// // ...
55/// // Start with an identity Axon (In -> In)
56/// let axon = Axon::<(), (), _>::new("My Axon")
57///     .then(StepA)
58///     .then(StepB);
59///
60/// // Execute multiple times
61/// let res1 = axon.execute((), &mut bus1).await;
62/// let res2 = axon.execute((), &mut bus2).await;
63/// ```
64pub struct Axon<In, Out, E, Res = ()> {
65    /// The static structure (for visualization/analysis)
66    pub schematic: Schematic,
67    /// The runtime executor
68    executor: Executor<In, Out, E, Res>,
69}
70
71/// Schematic export request derived from command-line args/env.
72#[derive(Debug, Clone)]
73pub struct SchematicExportRequest {
74    /// Optional output file path. If omitted, schematic is written to stdout.
75    pub output: Option<PathBuf>,
76}
77
78impl<In, Out, E, Res> Clone for Axon<In, Out, E, Res> {
79    fn clone(&self) -> Self {
80        Self {
81            schematic: self.schematic.clone(),
82            executor: self.executor.clone(),
83        }
84    }
85}
86
87impl<In, E, Res> Axon<In, In, E, Res>
88where
89    In: Send + Sync + 'static,
90    E: Send + 'static,
91    Res: ranvier_core::transition::ResourceRequirement,
92{
93    /// Create a new Axon flow with the given label.
94    /// This is the preferred entry point per Flat API guidelines.
95    #[track_caller]
96    pub fn new(label: &str) -> Self {
97        let caller = Location::caller();
98        Self::start_with_source(label, caller)
99    }
100
101    /// Start defining a new Axon flow.
102    /// This creates an Identity Axon (In -> In) with no initial resource requirements.
103    #[track_caller]
104    pub fn start(label: &str) -> Self {
105        let caller = Location::caller();
106        Self::start_with_source(label, caller)
107    }
108
109    fn start_with_source(
110        label: &str,
111        caller: &'static Location<'static>,
112    ) -> Self {
113        let node_id = uuid::Uuid::new_v4().to_string();
114        let node = Node {
115            id: node_id,
116            kind: NodeKind::Ingress,
117            label: label.to_string(),
118            description: None,
119            input_type: "void".to_string(),
120            output_type: type_name_of::<In>(),
121            resource_type: type_name_of::<Res>(),
122            metadata: Default::default(),
123            source_location: Some(SourceLocation::new(caller.file(), caller.line())),
124        };
125
126        let mut schematic = Schematic::new(label);
127        schematic.nodes.push(node);
128
129        let executor: Executor<In, In, E, Res> =
130            Arc::new(move |input, _res, _bus| Box::pin(std::future::ready(Outcome::Next(input))));
131
132        Self {
133            schematic,
134            executor,
135        }
136    }
137}
138
139impl<In, Out, E, Res> Axon<In, Out, E, Res>
140where
141    In: Send + Sync + 'static,
142    Out: Send + Sync + 'static,
143    E: Send + 'static,
144    Res: ranvier_core::transition::ResourceRequirement,
145{
146    /// Chain a transition to this Axon.
147    ///
148    /// Requires the transition to use the SAME resource bundle as the previous steps.
149    #[track_caller]
150    pub fn then<Next, Trans>(self, transition: Trans) -> Axon<In, Next, E, Res>
151    where
152        Next: Send + Sync + 'static,
153        Trans: Transition<Out, Next, Resources = Res, Error = E> + Clone + Send + Sync + 'static,
154    {
155        let caller = Location::caller();
156        // Decompose self to avoid partial move issues
157        let Axon {
158            mut schematic,
159            executor: prev_executor,
160        } = self;
161
162        // Update Schematic
163        let next_node_id = uuid::Uuid::new_v4().to_string();
164        let next_node = Node {
165            id: next_node_id.clone(),
166            kind: NodeKind::Atom,
167            label: transition.label(),
168            description: transition.description(),
169            input_type: type_name_of::<Out>(),
170            output_type: type_name_of::<Next>(),
171            resource_type: type_name_of::<Res>(),
172            metadata: Default::default(),
173            source_location: Some(SourceLocation::new(caller.file(), caller.line())),
174        };
175
176        let last_node_id = schematic
177            .nodes
178            .last()
179            .map(|n| n.id.clone())
180            .unwrap_or_default();
181
182        schematic.nodes.push(next_node);
183        schematic.edges.push(Edge {
184            from: last_node_id,
185            to: next_node_id.clone(),
186            kind: EdgeType::Linear,
187            label: Some("Next".to_string()),
188        });
189
190        // Compose Executor
191        let node_id_for_exec = next_node_id.clone();
192        let node_label_for_exec = transition.label();
193        let next_executor: Executor<In, Next, E, Res> = Arc::new(
194            move |input: In, res: &Res, bus: &mut Bus| -> BoxFuture<'_, Outcome<Next, E>> {
195                let prev = prev_executor.clone();
196                let trans = transition.clone();
197                let timeline_node_id = node_id_for_exec.clone();
198                let timeline_node_label = node_label_for_exec.clone();
199
200                Box::pin(async move {
201                    // Run previous step
202                    let prev_result = prev(input, res, bus).await;
203
204                    // Unpack
205                    let state = match prev_result {
206                        Outcome::Next(t) => t,
207                        other => return other.map(|_| unreachable!()),
208                    };
209
210                    // Run this step with automatic instrumentation
211                    let label = trans.label();
212                    let res_type = std::any::type_name::<Res>()
213                        .split("::")
214                        .last()
215                        .unwrap_or("unknown");
216
217                    let enter_ts = now_ms();
218                    if let Some(timeline) = bus.read_mut::<Timeline>() {
219                        timeline.push(TimelineEvent::NodeEnter {
220                            node_id: timeline_node_id.clone(),
221                            node_label: timeline_node_label.clone(),
222                            timestamp: enter_ts,
223                        });
224                    }
225
226                    let started = std::time::Instant::now();
227                    let result = trans
228                        .run(state, res, bus)
229                        .instrument(tracing::info_span!(
230                            "Node",
231                            ranvier.node = %label,
232                            ranvier.resource_type = %res_type
233                        ))
234                        .await;
235                    let duration_ms = started.elapsed().as_millis() as u64;
236                    let exit_ts = now_ms();
237
238                    if let Some(timeline) = bus.read_mut::<Timeline>() {
239                        timeline.push(TimelineEvent::NodeExit {
240                            node_id: timeline_node_id.clone(),
241                            outcome_type: outcome_type_name(&result),
242                            duration_ms,
243                            timestamp: exit_ts,
244                        });
245
246                        if let Outcome::Branch(branch_id, _) = &result {
247                            timeline.push(TimelineEvent::Branchtaken {
248                                branch_id: branch_id.clone(),
249                                timestamp: exit_ts,
250                            });
251                        }
252                    }
253
254                    result
255                })
256            },
257        );
258
259        Axon {
260            schematic,
261            executor: next_executor,
262        }
263    }
264
265    /// Add a branch point
266    #[track_caller]
267    pub fn branch(mut self, branch_id: impl Into<String>, label: &str) -> Self {
268        let caller = Location::caller();
269        let branch_id_str = branch_id.into();
270        let last_node_id = self
271            .schematic
272            .nodes
273            .last()
274            .map(|n| n.id.clone())
275            .unwrap_or_default();
276
277        let branch_node = Node {
278            id: uuid::Uuid::new_v4().to_string(),
279            kind: NodeKind::Synapse,
280            label: label.to_string(),
281            description: None,
282            input_type: type_name_of::<Out>(),
283            output_type: type_name_of::<Out>(),
284            resource_type: type_name_of::<Res>(),
285            metadata: Default::default(),
286            source_location: Some(SourceLocation::new(caller.file(), caller.line())),
287        };
288
289        self.schematic.nodes.push(branch_node);
290        self.schematic.edges.push(Edge {
291            from: last_node_id,
292            to: branch_id_str.clone(),
293            kind: EdgeType::Branch(branch_id_str),
294            label: Some("Branch".to_string()),
295        });
296
297        self
298    }
299
300    /// Execute the Axon with the given input and resources.
301    pub async fn execute(&self, input: In, resources: &Res, bus: &mut Bus) -> Outcome<Out, E> {
302        let should_capture = should_attach_timeline(bus);
303        let inserted_timeline = if should_capture {
304            ensure_timeline(bus)
305        } else {
306            false
307        };
308        let ingress_started = std::time::Instant::now();
309        let ingress_enter_ts = now_ms();
310        if should_capture
311            && let (Some(timeline), Some(ingress)) =
312            (bus.read_mut::<Timeline>(), self.schematic.nodes.first())
313        {
314            timeline.push(TimelineEvent::NodeEnter {
315                node_id: ingress.id.clone(),
316                node_label: ingress.label.clone(),
317                timestamp: ingress_enter_ts,
318            });
319        }
320
321        let label = self.schematic.name.clone();
322        let outcome = (self.executor)(input, resources, bus)
323            .instrument(tracing::info_span!("Circuit", ranvier.circuit = %label))
324            .await;
325
326        let ingress_exit_ts = now_ms();
327        if should_capture
328            && let (Some(timeline), Some(ingress)) =
329            (bus.read_mut::<Timeline>(), self.schematic.nodes.first())
330        {
331            timeline.push(TimelineEvent::NodeExit {
332                node_id: ingress.id.clone(),
333                outcome_type: outcome_type_name(&outcome),
334                duration_ms: ingress_started.elapsed().as_millis() as u64,
335                timestamp: ingress_exit_ts,
336            });
337        }
338
339        if should_capture {
340            maybe_export_timeline(bus, &outcome);
341        }
342        if inserted_timeline {
343            let _ = bus.remove::<Timeline>();
344        }
345
346        outcome
347    }
348
349    /// Starts the Ranvier Inspector for this Axon on the specified port.
350    /// This spawns a background task to serve the Schematic.
351    pub fn serve_inspector(self, port: u16) -> Self {
352        if !inspector_enabled_from_env() {
353            tracing::info!("Inspector disabled by RANVIER_INSPECTOR");
354            return self;
355        }
356
357        let schematic = self.schematic.clone();
358        tokio::spawn(async move {
359            if let Err(e) = ranvier_inspector::Inspector::new(schematic, port)
360                .with_projection_files_from_env()
361                .with_mode_from_env()
362                .with_auth_policy_from_env()
363                .serve()
364                .await
365            {
366                tracing::error!("Inspector server failed: {}", e);
367            }
368        });
369        self
370    }
371
372    /// Get a reference to the Schematic (structural view).
373    pub fn schematic(&self) -> &Schematic {
374        &self.schematic
375    }
376
377    /// Consume and return the Schematic.
378    pub fn into_schematic(self) -> Schematic {
379        self.schematic
380    }
381
382    /// Detect schematic export mode from runtime flags.
383    ///
384    /// Supported triggers:
385    /// - `RANVIER_SCHEMATIC=1|true|on|yes`
386    /// - `--schematic`
387    ///
388    /// Optional output path:
389    /// - `RANVIER_SCHEMATIC_OUTPUT=<path>`
390    /// - `--schematic-output <path>` / `--schematic-output=<path>`
391    /// - `--output <path>` / `--output=<path>` (only relevant in schematic mode)
392    pub fn schematic_export_request(&self) -> Option<SchematicExportRequest> {
393        schematic_export_request_from_process()
394    }
395
396    /// Export schematic and return `true` when schematic mode is active.
397    ///
398    /// Use this once after circuit construction and before server/custom loops:
399    ///
400    /// ```rust,ignore
401    /// let axon = build_axon();
402    /// if axon.maybe_export_and_exit()? {
403    ///     return Ok(());
404    /// }
405    /// // Normal runtime path...
406    /// ```
407    pub fn maybe_export_and_exit(
408        &self,
409    ) -> anyhow::Result<bool> {
410        self.maybe_export_and_exit_with(|_| ())
411    }
412
413    /// Same as [`Self::maybe_export_and_exit`] but allows a custom hook right before export/exit.
414    ///
415    /// This is useful when your app has custom loop/bootstrap behavior and you want
416    /// to skip or cleanup that logic in schematic mode.
417    pub fn maybe_export_and_exit_with<F>(
418        &self,
419        on_before_exit: F,
420    ) -> anyhow::Result<bool>
421    where
422        F: FnOnce(&SchematicExportRequest),
423    {
424        let Some(request) = self.schematic_export_request() else {
425            return Ok(false);
426        };
427        on_before_exit(&request);
428        self.export_schematic(&request)?;
429        Ok(true)
430    }
431
432    /// Export schematic according to the provided request.
433    pub fn export_schematic(
434        &self,
435        request: &SchematicExportRequest,
436    ) -> anyhow::Result<()> {
437        let json = serde_json::to_string_pretty(self.schematic())?;
438        if let Some(path) = &request.output {
439            if let Some(parent) = path.parent() {
440                if !parent.as_os_str().is_empty() {
441                    fs::create_dir_all(parent)?;
442                }
443            }
444            fs::write(path, json.as_bytes())?;
445            return Ok(());
446        }
447        println!("{}", json);
448        Ok(())
449    }
450}
451
452fn schematic_export_request_from_process() -> Option<SchematicExportRequest> {
453    let args: Vec<OsString> = std::env::args_os().skip(1).collect();
454    let mut enabled = env_flag_is_true("RANVIER_SCHEMATIC");
455    let mut output = std::env::var_os("RANVIER_SCHEMATIC_OUTPUT").map(PathBuf::from);
456
457    let mut i = 0;
458    while i < args.len() {
459        let arg = args[i].to_string_lossy();
460
461        if arg == "--schematic" {
462            enabled = true;
463            i += 1;
464            continue;
465        }
466
467        if arg == "--schematic-output" || arg == "--output" {
468            if let Some(next) = args.get(i + 1) {
469                output = Some(PathBuf::from(next));
470                i += 2;
471                continue;
472            }
473        } else if let Some(value) = arg.strip_prefix("--schematic-output=") {
474            output = Some(PathBuf::from(value));
475        } else if let Some(value) = arg.strip_prefix("--output=") {
476            output = Some(PathBuf::from(value));
477        }
478
479        i += 1;
480    }
481
482    if enabled {
483        Some(SchematicExportRequest { output })
484    } else {
485        None
486    }
487}
488
489fn env_flag_is_true(key: &str) -> bool {
490    match std::env::var(key) {
491        Ok(v) => matches!(v.to_ascii_lowercase().as_str(), "1" | "true" | "on" | "yes"),
492        Err(_) => false,
493    }
494}
495
496fn inspector_enabled_from_env() -> bool {
497    match std::env::var("RANVIER_INSPECTOR") {
498        Ok(v) => matches!(v.to_ascii_lowercase().as_str(), "1" | "true" | "on" | "yes"),
499        Err(_) => true,
500    }
501}
502
503fn maybe_export_timeline<Out, E>(bus: &mut Bus, outcome: &Outcome<Out, E>) {
504    let path = match std::env::var("RANVIER_TIMELINE_OUTPUT") {
505        Ok(v) if !v.trim().is_empty() => v,
506        _ => return,
507    };
508
509    let sampled = sampled_by_bus_id(bus.id, timeline_sample_rate());
510    let policy = timeline_adaptive_policy();
511    let forced = should_force_export(outcome, &policy);
512    let should_export = sampled || forced;
513    if !should_export {
514        record_sampling_stats(false, sampled, forced, "none", &policy);
515        return;
516    }
517
518    let mut timeline = bus.read::<Timeline>().cloned().unwrap_or_default();
519    timeline.sort();
520
521    let mode = std::env::var("RANVIER_TIMELINE_MODE")
522        .unwrap_or_else(|_| "overwrite".to_string())
523        .to_ascii_lowercase();
524
525    if let Err(err) = write_timeline_with_policy(&path, &mode, timeline) {
526        tracing::warn!(
527            "Failed to persist timeline file {} (mode={}): {}",
528            path,
529            mode,
530            err
531        );
532        record_sampling_stats(false, sampled, forced, &mode, &policy);
533    } else {
534        record_sampling_stats(true, sampled, forced, &mode, &policy);
535    }
536}
537
538fn outcome_type_name<Out, E>(outcome: &Outcome<Out, E>) -> String {
539    match outcome {
540        Outcome::Next(_) => "Next".to_string(),
541        Outcome::Branch(id, _) => format!("Branch:{}", id),
542        Outcome::Jump(id, _) => format!("Jump:{}", id),
543        Outcome::Emit(event_type, _) => format!("Emit:{}", event_type),
544        Outcome::Fault(_) => "Fault".to_string(),
545    }
546}
547
548fn ensure_timeline(bus: &mut Bus) -> bool {
549    if bus.has::<Timeline>() {
550        false
551    } else {
552        bus.insert(Timeline::new());
553        true
554    }
555}
556
557fn should_attach_timeline(bus: &Bus) -> bool {
558    // Respect explicitly provided timeline collector from caller.
559    if bus.has::<Timeline>() {
560        return true;
561    }
562
563    // Attach timeline when runtime export path exists.
564    has_timeline_output_path()
565}
566
567fn has_timeline_output_path() -> bool {
568    std::env::var("RANVIER_TIMELINE_OUTPUT")
569        .ok()
570        .map(|v| !v.trim().is_empty())
571        .unwrap_or(false)
572}
573
574fn timeline_sample_rate() -> f64 {
575    std::env::var("RANVIER_TIMELINE_SAMPLE_RATE")
576        .ok()
577        .and_then(|v| v.parse::<f64>().ok())
578        .map(|v| v.clamp(0.0, 1.0))
579        .unwrap_or(1.0)
580}
581
582fn sampled_by_bus_id(bus_id: uuid::Uuid, rate: f64) -> bool {
583    if rate <= 0.0 {
584        return false;
585    }
586    if rate >= 1.0 {
587        return true;
588    }
589    let bucket = (bus_id.as_u128() % 10_000) as f64 / 10_000.0;
590    bucket < rate
591}
592
593fn timeline_adaptive_policy() -> String {
594    std::env::var("RANVIER_TIMELINE_ADAPTIVE")
595        .unwrap_or_else(|_| "fault_branch".to_string())
596        .to_ascii_lowercase()
597}
598
599fn should_force_export<Out, E>(outcome: &Outcome<Out, E>, policy: &str) -> bool {
600    match policy {
601        "off" => false,
602        "fault_only" => matches!(outcome, Outcome::Fault(_)),
603        "fault_branch_emit" => {
604            matches!(
605                outcome,
606                Outcome::Fault(_) | Outcome::Branch(_, _) | Outcome::Emit(_, _)
607            )
608        }
609        _ => matches!(outcome, Outcome::Fault(_) | Outcome::Branch(_, _)),
610    }
611}
612
613#[derive(Default, Clone)]
614struct SamplingStats {
615    total_decisions: u64,
616    exported: u64,
617    skipped: u64,
618    sampled_exports: u64,
619    forced_exports: u64,
620    last_mode: String,
621    last_policy: String,
622    last_updated_ms: u64,
623}
624
625static TIMELINE_SAMPLING_STATS: OnceLock<Mutex<SamplingStats>> = OnceLock::new();
626
627fn stats_cell() -> &'static Mutex<SamplingStats> {
628    TIMELINE_SAMPLING_STATS.get_or_init(|| Mutex::new(SamplingStats::default()))
629}
630
631fn record_sampling_stats(exported: bool, sampled: bool, forced: bool, mode: &str, policy: &str) {
632    let snapshot = {
633        let mut stats = match stats_cell().lock() {
634            Ok(guard) => guard,
635            Err(_) => return,
636        };
637
638        stats.total_decisions += 1;
639        if exported {
640            stats.exported += 1;
641        } else {
642            stats.skipped += 1;
643        }
644        if sampled && exported {
645            stats.sampled_exports += 1;
646        }
647        if forced && exported {
648            stats.forced_exports += 1;
649        }
650        stats.last_mode = mode.to_string();
651        stats.last_policy = policy.to_string();
652        stats.last_updated_ms = now_ms();
653        stats.clone()
654    };
655
656    tracing::debug!(
657        ranvier.timeline.total_decisions = snapshot.total_decisions,
658        ranvier.timeline.exported = snapshot.exported,
659        ranvier.timeline.skipped = snapshot.skipped,
660        ranvier.timeline.sampled_exports = snapshot.sampled_exports,
661        ranvier.timeline.forced_exports = snapshot.forced_exports,
662        ranvier.timeline.mode = %snapshot.last_mode,
663        ranvier.timeline.policy = %snapshot.last_policy,
664        "Timeline sampling stats updated"
665    );
666
667    if let Some(path) = timeline_stats_output_path() {
668        let payload = serde_json::json!({
669            "total_decisions": snapshot.total_decisions,
670            "exported": snapshot.exported,
671            "skipped": snapshot.skipped,
672            "sampled_exports": snapshot.sampled_exports,
673            "forced_exports": snapshot.forced_exports,
674            "last_mode": snapshot.last_mode,
675            "last_policy": snapshot.last_policy,
676            "last_updated_ms": snapshot.last_updated_ms
677        });
678        if let Some(parent) = Path::new(&path).parent() {
679            let _ = fs::create_dir_all(parent);
680        }
681        if let Err(err) = fs::write(&path, payload.to_string()) {
682            tracing::warn!("Failed to write timeline sampling stats {}: {}", path, err);
683        }
684    }
685}
686
687fn timeline_stats_output_path() -> Option<String> {
688    std::env::var("RANVIER_TIMELINE_STATS_OUTPUT")
689        .ok()
690        .filter(|v| !v.trim().is_empty())
691}
692
693fn write_timeline_with_policy(path: &str, mode: &str, mut timeline: Timeline) -> Result<(), String> {
694    match mode {
695        "append" => {
696            if Path::new(path).exists() {
697                let content = fs::read_to_string(path).map_err(|e| e.to_string())?;
698                match serde_json::from_str::<Timeline>(&content) {
699                    Ok(mut existing) => {
700                        existing.events.append(&mut timeline.events);
701                        existing.sort();
702                        if let Some(max_events) = max_events_limit() {
703                            truncate_timeline_events(&mut existing, max_events);
704                        }
705                        write_timeline_json(path, &existing)
706                    }
707                    Err(_) => {
708                        // Fallback: if existing is invalid, replace with current timeline
709                        if let Some(max_events) = max_events_limit() {
710                            truncate_timeline_events(&mut timeline, max_events);
711                        }
712                        write_timeline_json(path, &timeline)
713                    }
714                }
715            } else {
716                if let Some(max_events) = max_events_limit() {
717                    truncate_timeline_events(&mut timeline, max_events);
718                }
719                write_timeline_json(path, &timeline)
720            }
721        }
722        "rotate" => {
723            let rotated_path = rotated_path(path, now_ms());
724            write_timeline_json(rotated_path.to_string_lossy().as_ref(), &timeline)?;
725            if let Some(keep) = rotate_keep_limit() {
726                cleanup_rotated_files(path, keep)?;
727            }
728            Ok(())
729        }
730        _ => write_timeline_json(path, &timeline),
731    }
732}
733
734fn write_timeline_json(path: &str, timeline: &Timeline) -> Result<(), String> {
735    if let Some(parent) = Path::new(path).parent() {
736        if !parent.as_os_str().is_empty() {
737            fs::create_dir_all(parent).map_err(|e| e.to_string())?;
738        }
739    }
740    let json = serde_json::to_string_pretty(timeline).map_err(|e| e.to_string())?;
741    fs::write(path, json).map_err(|e| e.to_string())
742}
743
744fn rotated_path(path: &str, suffix: u64) -> PathBuf {
745    let p = Path::new(path);
746    let parent = p.parent().unwrap_or_else(|| Path::new(""));
747    let stem = p
748        .file_stem()
749        .and_then(|s| s.to_str())
750        .unwrap_or("timeline");
751    let ext = p.extension().and_then(|e| e.to_str()).unwrap_or("json");
752    parent.join(format!("{}_{}.{}", stem, suffix, ext))
753}
754
755fn max_events_limit() -> Option<usize> {
756    std::env::var("RANVIER_TIMELINE_MAX_EVENTS")
757        .ok()
758        .and_then(|v| v.parse::<usize>().ok())
759        .filter(|v| *v > 0)
760}
761
762fn rotate_keep_limit() -> Option<usize> {
763    std::env::var("RANVIER_TIMELINE_ROTATE_KEEP")
764        .ok()
765        .and_then(|v| v.parse::<usize>().ok())
766        .filter(|v| *v > 0)
767}
768
769fn truncate_timeline_events(timeline: &mut Timeline, max_events: usize) {
770    let len = timeline.events.len();
771    if len > max_events {
772        let keep_from = len - max_events;
773        timeline.events = timeline.events.split_off(keep_from);
774    }
775}
776
777fn cleanup_rotated_files(base_path: &str, keep: usize) -> Result<(), String> {
778    let p = Path::new(base_path);
779    let parent = p.parent().unwrap_or_else(|| Path::new("."));
780    let stem = p
781        .file_stem()
782        .and_then(|s| s.to_str())
783        .unwrap_or("timeline");
784    let ext = p.extension().and_then(|e| e.to_str()).unwrap_or("json");
785    let prefix = format!("{}_", stem);
786    let suffix = format!(".{}", ext);
787
788    let mut files = fs::read_dir(parent)
789        .map_err(|e| e.to_string())?
790        .filter_map(|entry| entry.ok())
791        .filter(|entry| {
792            let name = entry.file_name();
793            let name = name.to_string_lossy();
794            name.starts_with(&prefix) && name.ends_with(&suffix)
795        })
796        .filter_map(|entry| {
797            let modified = entry
798                .metadata()
799                .ok()
800                .and_then(|m| m.modified().ok())
801                .unwrap_or(SystemTime::UNIX_EPOCH);
802            Some((entry.path(), modified))
803        })
804        .collect::<Vec<_>>();
805
806    files.sort_by(|a, b| b.1.cmp(&a.1));
807    for (path, _) in files.into_iter().skip(keep) {
808        let _ = fs::remove_file(path);
809    }
810    Ok(())
811}
812
813fn now_ms() -> u64 {
814    SystemTime::now()
815        .duration_since(UNIX_EPOCH)
816        .map(|d| d.as_millis() as u64)
817        .unwrap_or(0)
818}
819
820#[cfg(test)]
821mod tests {
822    use super::{sampled_by_bus_id, should_force_export};
823    use ranvier_core::Outcome;
824    use uuid::Uuid;
825
826    #[test]
827    fn adaptive_policy_force_export_matrix() {
828        let next = Outcome::<(), &'static str>::Next(());
829        let branch = Outcome::<(), &'static str>::Branch("declined".to_string(), None);
830        let emit = Outcome::<(), &'static str>::Emit("audit".to_string(), None);
831        let fault = Outcome::<(), &'static str>::Fault("boom");
832
833        assert!(!should_force_export(&next, "off"));
834        assert!(!should_force_export(&fault, "off"));
835
836        assert!(!should_force_export(&branch, "fault_only"));
837        assert!(should_force_export(&fault, "fault_only"));
838
839        assert!(should_force_export(&branch, "fault_branch"));
840        assert!(!should_force_export(&emit, "fault_branch"));
841        assert!(should_force_export(&fault, "fault_branch"));
842
843        assert!(should_force_export(&branch, "fault_branch_emit"));
844        assert!(should_force_export(&emit, "fault_branch_emit"));
845        assert!(should_force_export(&fault, "fault_branch_emit"));
846    }
847
848    #[test]
849    fn sampling_and_adaptive_combination_decisions() {
850        let bus_id = Uuid::nil();
851        let next = Outcome::<(), &'static str>::Next(());
852        let fault = Outcome::<(), &'static str>::Fault("boom");
853
854        let sampled_never = sampled_by_bus_id(bus_id, 0.0);
855        assert!(!sampled_never);
856        assert!(!(sampled_never || should_force_export(&next, "off")));
857        assert!(sampled_never || should_force_export(&fault, "fault_only"));
858
859        let sampled_always = sampled_by_bus_id(bus_id, 1.0);
860        assert!(sampled_always);
861        assert!(sampled_always || should_force_export(&next, "off"));
862        assert!(sampled_always || should_force_export(&fault, "off"));
863    }
864}