Skip to main content

ranvier_runtime/
axon.rs

1//! # Axon: Executable Decision Tree
2//!
3//! The `Axon` is the **runtime execution path** of a Typed Decision Tree.
4//! It functions as a reusable Typed Decision flow (Axon<In, Out, E>).
5//!
6//! ## Design Philosophy
7//!
8//! * **Axon flows, Schematic shows**: Axon executes; Schematic describes
9//! * **Builder pattern**: `Axon::start().then().then()`
10//! * **Schematic extraction**: Every Axon carries its structural metadata
11//!
12//! "Axon is the flowing thing, Schematic is the visible thing."
13
14use ranvier_core::bus::Bus;
15use ranvier_core::outcome::Outcome;
16use ranvier_core::schematic::{Edge, EdgeType, Node, NodeKind, Schematic};
17use ranvier_core::timeline::{Timeline, TimelineEvent};
18use ranvier_core::transition::Transition;
19use std::any::type_name;
20use std::ffi::OsString;
21use std::fs;
22use std::future::Future;
23use std::path::{Path, PathBuf};
24use std::pin::Pin;
25use std::sync::{Arc, Mutex, OnceLock};
26use std::time::{SystemTime, UNIX_EPOCH};
27use tracing::Instrument;
28
29/// Type alias for async boxed futures used in Axon execution.
30pub type BoxFuture<'a, T> = Pin<Box<dyn Future<Output = T> + Send + 'a>>;
31
32/// Executor type for Axon steps.
33/// Now takes an input state `In`, a resource bundle `Res`, and returns an `Outcome<Out, E>`.
34/// Must be Send + Sync to be reusable across threads and clones.
35pub type Executor<In, Out, E, Res> =
36    Arc<dyn for<'a> Fn(In, &'a Res, &'a mut Bus) -> BoxFuture<'a, Outcome<Out, E>> + Send + Sync>;
37
38/// Helper to extract a readable type name from a type.
39fn type_name_of<T: ?Sized>() -> String {
40    let full = type_name::<T>();
41    full.split("::").last().unwrap_or(full).to_string()
42}
43
44/// The Axon Builder and Runtime.
45///
46/// `Axon` represents an executable decision tree.
47/// It is reusable and thread-safe.
48///
49/// ## Example
50///
51/// ```rust,ignore
52/// use ranvier_core::prelude::*;
53/// // ...
54/// // Start with an identity Axon (In -> In)
55/// let axon = Axon::<(), (), _>::new("My Axon")
56///     .then(StepA)
57///     .then(StepB);
58///
59/// // Execute multiple times
60/// let res1 = axon.execute((), &mut bus1).await;
61/// let res2 = axon.execute((), &mut bus2).await;
62/// ```
63pub struct Axon<In, Out, E, Res = ()> {
64    /// The static structure (for visualization/analysis)
65    pub schematic: Schematic,
66    /// The runtime executor
67    executor: Executor<In, Out, E, Res>,
68}
69
70/// Schematic export request derived from command-line args/env.
71#[derive(Debug, Clone)]
72pub struct SchematicExportRequest {
73    /// Optional output file path. If omitted, schematic is written to stdout.
74    pub output: Option<PathBuf>,
75}
76
77impl<In, Out, E, Res> Clone for Axon<In, Out, E, Res> {
78    fn clone(&self) -> Self {
79        Self {
80            schematic: self.schematic.clone(),
81            executor: self.executor.clone(),
82        }
83    }
84}
85
86impl<In, E, Res> Axon<In, In, E, Res>
87where
88    In: Send + Sync + 'static,
89    E: Send + 'static,
90    Res: ranvier_core::transition::ResourceRequirement,
91{
92    /// Create a new Axon flow with the given label.
93    /// This is the preferred entry point per Flat API guidelines.
94    pub fn new(label: &str) -> Self {
95        Self::start(label)
96    }
97
98    /// Start defining a new Axon flow.
99    /// This creates an Identity Axon (In -> In) with no initial resource requirements.
100    pub fn start(label: &str) -> Self {
101        let node_id = uuid::Uuid::new_v4().to_string();
102        let node = Node {
103            id: node_id,
104            kind: NodeKind::Ingress,
105            label: label.to_string(),
106            description: None,
107            input_type: "void".to_string(),
108            output_type: type_name_of::<In>(),
109            resource_type: type_name_of::<Res>(),
110            metadata: Default::default(),
111            source_location: None,
112        };
113
114        let mut schematic = Schematic::new(label);
115        schematic.nodes.push(node);
116
117        let executor: Executor<In, In, E, Res> =
118            Arc::new(move |input, _res, _bus| Box::pin(std::future::ready(Outcome::Next(input))));
119
120        Self {
121            schematic,
122            executor,
123        }
124    }
125}
126
127impl<In, Out, E, Res> Axon<In, Out, E, Res>
128where
129    In: Send + Sync + 'static,
130    Out: Send + Sync + 'static,
131    E: Send + 'static,
132    Res: ranvier_core::transition::ResourceRequirement,
133{
134    /// Chain a transition to this Axon.
135    ///
136    /// Requires the transition to use the SAME resource bundle as the previous steps.
137    pub fn then<Next, Trans>(self, transition: Trans) -> Axon<In, Next, E, Res>
138    where
139        Next: Send + Sync + 'static,
140        Trans: Transition<Out, Next, Resources = Res, Error = E> + Clone + Send + Sync + 'static,
141    {
142        // Decompose self to avoid partial move issues
143        let Axon {
144            mut schematic,
145            executor: prev_executor,
146        } = self;
147
148        // Update Schematic
149        let next_node_id = uuid::Uuid::new_v4().to_string();
150        let next_node = Node {
151            id: next_node_id.clone(),
152            kind: NodeKind::Atom,
153            label: transition.label(),
154            description: transition.description(),
155            input_type: type_name_of::<Out>(),
156            output_type: type_name_of::<Next>(),
157            resource_type: type_name_of::<Res>(),
158            metadata: Default::default(),
159            source_location: None,
160        };
161
162        let last_node_id = schematic
163            .nodes
164            .last()
165            .map(|n| n.id.clone())
166            .unwrap_or_default();
167
168        schematic.nodes.push(next_node);
169        schematic.edges.push(Edge {
170            from: last_node_id,
171            to: next_node_id.clone(),
172            kind: EdgeType::Linear,
173            label: Some("Next".to_string()),
174        });
175
176        // Compose Executor
177        let node_id_for_exec = next_node_id.clone();
178        let node_label_for_exec = transition.label();
179        let next_executor: Executor<In, Next, E, Res> = Arc::new(
180            move |input: In, res: &Res, bus: &mut Bus| -> BoxFuture<'_, Outcome<Next, E>> {
181                let prev = prev_executor.clone();
182                let trans = transition.clone();
183                let timeline_node_id = node_id_for_exec.clone();
184                let timeline_node_label = node_label_for_exec.clone();
185
186                Box::pin(async move {
187                    // Run previous step
188                    let prev_result = prev(input, res, bus).await;
189
190                    // Unpack
191                    let state = match prev_result {
192                        Outcome::Next(t) => t,
193                        other => return other.map(|_| unreachable!()),
194                    };
195
196                    // Run this step with automatic instrumentation
197                    let label = trans.label();
198                    let res_type = std::any::type_name::<Res>()
199                        .split("::")
200                        .last()
201                        .unwrap_or("unknown");
202
203                    let enter_ts = now_ms();
204                    if let Some(timeline) = bus.read_mut::<Timeline>() {
205                        timeline.push(TimelineEvent::NodeEnter {
206                            node_id: timeline_node_id.clone(),
207                            node_label: timeline_node_label.clone(),
208                            timestamp: enter_ts,
209                        });
210                    }
211
212                    let started = std::time::Instant::now();
213                    let result = trans
214                        .run(state, res, bus)
215                        .instrument(tracing::info_span!(
216                            "Node",
217                            ranvier.node = %label,
218                            ranvier.resource_type = %res_type
219                        ))
220                        .await;
221                    let duration_ms = started.elapsed().as_millis() as u64;
222                    let exit_ts = now_ms();
223
224                    if let Some(timeline) = bus.read_mut::<Timeline>() {
225                        timeline.push(TimelineEvent::NodeExit {
226                            node_id: timeline_node_id.clone(),
227                            outcome_type: outcome_type_name(&result),
228                            duration_ms,
229                            timestamp: exit_ts,
230                        });
231
232                        if let Outcome::Branch(branch_id, _) = &result {
233                            timeline.push(TimelineEvent::Branchtaken {
234                                branch_id: branch_id.clone(),
235                                timestamp: exit_ts,
236                            });
237                        }
238                    }
239
240                    result
241                })
242            },
243        );
244
245        Axon {
246            schematic,
247            executor: next_executor,
248        }
249    }
250
251    /// Add a branch point
252    pub fn branch(mut self, branch_id: impl Into<String>, label: &str) -> Self {
253        let branch_id_str = branch_id.into();
254        let last_node_id = self
255            .schematic
256            .nodes
257            .last()
258            .map(|n| n.id.clone())
259            .unwrap_or_default();
260
261        let branch_node = Node {
262            id: uuid::Uuid::new_v4().to_string(),
263            kind: NodeKind::Synapse,
264            label: label.to_string(),
265            description: None,
266            input_type: type_name_of::<Out>(),
267            output_type: type_name_of::<Out>(),
268            resource_type: type_name_of::<Res>(),
269            metadata: Default::default(),
270            source_location: None,
271        };
272
273        self.schematic.nodes.push(branch_node);
274        self.schematic.edges.push(Edge {
275            from: last_node_id,
276            to: branch_id_str.clone(),
277            kind: EdgeType::Branch(branch_id_str),
278            label: Some("Branch".to_string()),
279        });
280
281        self
282    }
283
284    /// Execute the Axon with the given input and resources.
285    pub async fn execute(&self, input: In, resources: &Res, bus: &mut Bus) -> Outcome<Out, E> {
286        let should_capture = should_attach_timeline(bus);
287        let inserted_timeline = if should_capture {
288            ensure_timeline(bus)
289        } else {
290            false
291        };
292        let ingress_started = std::time::Instant::now();
293        let ingress_enter_ts = now_ms();
294        if should_capture
295            && let (Some(timeline), Some(ingress)) =
296            (bus.read_mut::<Timeline>(), self.schematic.nodes.first())
297        {
298            timeline.push(TimelineEvent::NodeEnter {
299                node_id: ingress.id.clone(),
300                node_label: ingress.label.clone(),
301                timestamp: ingress_enter_ts,
302            });
303        }
304
305        let label = self.schematic.name.clone();
306        let outcome = (self.executor)(input, resources, bus)
307            .instrument(tracing::info_span!("Circuit", ranvier.circuit = %label))
308            .await;
309
310        let ingress_exit_ts = now_ms();
311        if should_capture
312            && let (Some(timeline), Some(ingress)) =
313            (bus.read_mut::<Timeline>(), self.schematic.nodes.first())
314        {
315            timeline.push(TimelineEvent::NodeExit {
316                node_id: ingress.id.clone(),
317                outcome_type: outcome_type_name(&outcome),
318                duration_ms: ingress_started.elapsed().as_millis() as u64,
319                timestamp: ingress_exit_ts,
320            });
321        }
322
323        if should_capture {
324            maybe_export_timeline(bus, &outcome);
325        }
326        if inserted_timeline {
327            let _ = bus.remove::<Timeline>();
328        }
329
330        outcome
331    }
332
333    /// Starts the Ranvier Inspector for this Axon on the specified port.
334    /// This spawns a background task to serve the Schematic.
335    pub fn serve_inspector(self, port: u16) -> Self {
336        if !inspector_enabled_from_env() {
337            tracing::info!("Inspector disabled by RANVIER_INSPECTOR");
338            return self;
339        }
340
341        let schematic = self.schematic.clone();
342        tokio::spawn(async move {
343            if let Err(e) = ranvier_inspector::Inspector::new(schematic, port)
344                .with_projection_files_from_env()
345                .with_mode_from_env()
346                .with_auth_policy_from_env()
347                .serve()
348                .await
349            {
350                tracing::error!("Inspector server failed: {}", e);
351            }
352        });
353        self
354    }
355
356    /// Get a reference to the Schematic (structural view).
357    pub fn schematic(&self) -> &Schematic {
358        &self.schematic
359    }
360
361    /// Consume and return the Schematic.
362    pub fn into_schematic(self) -> Schematic {
363        self.schematic
364    }
365
366    /// Detect schematic export mode from runtime flags.
367    ///
368    /// Supported triggers:
369    /// - `RANVIER_SCHEMATIC=1|true|on|yes`
370    /// - `--schematic`
371    ///
372    /// Optional output path:
373    /// - `RANVIER_SCHEMATIC_OUTPUT=<path>`
374    /// - `--schematic-output <path>` / `--schematic-output=<path>`
375    /// - `--output <path>` / `--output=<path>` (only relevant in schematic mode)
376    pub fn schematic_export_request(&self) -> Option<SchematicExportRequest> {
377        schematic_export_request_from_process()
378    }
379
380    /// Export schematic and return `true` when schematic mode is active.
381    ///
382    /// Use this once after circuit construction and before server/custom loops:
383    ///
384    /// ```rust,ignore
385    /// let axon = build_axon();
386    /// if axon.maybe_export_and_exit()? {
387    ///     return Ok(());
388    /// }
389    /// // Normal runtime path...
390    /// ```
391    pub fn maybe_export_and_exit(
392        &self,
393    ) -> anyhow::Result<bool> {
394        self.maybe_export_and_exit_with(|_| ())
395    }
396
397    /// Same as [`Self::maybe_export_and_exit`] but allows a custom hook right before export/exit.
398    ///
399    /// This is useful when your app has custom loop/bootstrap behavior and you want
400    /// to skip or cleanup that logic in schematic mode.
401    pub fn maybe_export_and_exit_with<F>(
402        &self,
403        on_before_exit: F,
404    ) -> anyhow::Result<bool>
405    where
406        F: FnOnce(&SchematicExportRequest),
407    {
408        let Some(request) = self.schematic_export_request() else {
409            return Ok(false);
410        };
411        on_before_exit(&request);
412        self.export_schematic(&request)?;
413        Ok(true)
414    }
415
416    /// Export schematic according to the provided request.
417    pub fn export_schematic(
418        &self,
419        request: &SchematicExportRequest,
420    ) -> anyhow::Result<()> {
421        let json = serde_json::to_string_pretty(self.schematic())?;
422        if let Some(path) = &request.output {
423            if let Some(parent) = path.parent() {
424                if !parent.as_os_str().is_empty() {
425                    fs::create_dir_all(parent)?;
426                }
427            }
428            fs::write(path, json.as_bytes())?;
429            return Ok(());
430        }
431        println!("{}", json);
432        Ok(())
433    }
434}
435
436fn schematic_export_request_from_process() -> Option<SchematicExportRequest> {
437    let args: Vec<OsString> = std::env::args_os().skip(1).collect();
438    let mut enabled = env_flag_is_true("RANVIER_SCHEMATIC");
439    let mut output = std::env::var_os("RANVIER_SCHEMATIC_OUTPUT").map(PathBuf::from);
440
441    let mut i = 0;
442    while i < args.len() {
443        let arg = args[i].to_string_lossy();
444
445        if arg == "--schematic" {
446            enabled = true;
447            i += 1;
448            continue;
449        }
450
451        if arg == "--schematic-output" || arg == "--output" {
452            if let Some(next) = args.get(i + 1) {
453                output = Some(PathBuf::from(next));
454                i += 2;
455                continue;
456            }
457        } else if let Some(value) = arg.strip_prefix("--schematic-output=") {
458            output = Some(PathBuf::from(value));
459        } else if let Some(value) = arg.strip_prefix("--output=") {
460            output = Some(PathBuf::from(value));
461        }
462
463        i += 1;
464    }
465
466    if enabled {
467        Some(SchematicExportRequest { output })
468    } else {
469        None
470    }
471}
472
473fn env_flag_is_true(key: &str) -> bool {
474    match std::env::var(key) {
475        Ok(v) => matches!(v.to_ascii_lowercase().as_str(), "1" | "true" | "on" | "yes"),
476        Err(_) => false,
477    }
478}
479
480fn inspector_enabled_from_env() -> bool {
481    match std::env::var("RANVIER_INSPECTOR") {
482        Ok(v) => matches!(v.to_ascii_lowercase().as_str(), "1" | "true" | "on" | "yes"),
483        Err(_) => true,
484    }
485}
486
487fn maybe_export_timeline<Out, E>(bus: &mut Bus, outcome: &Outcome<Out, E>) {
488    let path = match std::env::var("RANVIER_TIMELINE_OUTPUT") {
489        Ok(v) if !v.trim().is_empty() => v,
490        _ => return,
491    };
492
493    let sampled = sampled_by_bus_id(bus.id, timeline_sample_rate());
494    let policy = timeline_adaptive_policy();
495    let forced = should_force_export(outcome, &policy);
496    let should_export = sampled || forced;
497    if !should_export {
498        record_sampling_stats(false, sampled, forced, "none", &policy);
499        return;
500    }
501
502    let mut timeline = bus.read::<Timeline>().cloned().unwrap_or_default();
503    timeline.sort();
504
505    let mode = std::env::var("RANVIER_TIMELINE_MODE")
506        .unwrap_or_else(|_| "overwrite".to_string())
507        .to_ascii_lowercase();
508
509    if let Err(err) = write_timeline_with_policy(&path, &mode, timeline) {
510        tracing::warn!(
511            "Failed to persist timeline file {} (mode={}): {}",
512            path,
513            mode,
514            err
515        );
516        record_sampling_stats(false, sampled, forced, &mode, &policy);
517    } else {
518        record_sampling_stats(true, sampled, forced, &mode, &policy);
519    }
520}
521
522fn outcome_type_name<Out, E>(outcome: &Outcome<Out, E>) -> String {
523    match outcome {
524        Outcome::Next(_) => "Next".to_string(),
525        Outcome::Branch(id, _) => format!("Branch:{}", id),
526        Outcome::Jump(id, _) => format!("Jump:{}", id),
527        Outcome::Emit(event_type, _) => format!("Emit:{}", event_type),
528        Outcome::Fault(_) => "Fault".to_string(),
529    }
530}
531
532fn ensure_timeline(bus: &mut Bus) -> bool {
533    if bus.has::<Timeline>() {
534        false
535    } else {
536        bus.insert(Timeline::new());
537        true
538    }
539}
540
541fn should_attach_timeline(bus: &Bus) -> bool {
542    // Respect explicitly provided timeline collector from caller.
543    if bus.has::<Timeline>() {
544        return true;
545    }
546
547    // Attach timeline when runtime export path exists.
548    has_timeline_output_path()
549}
550
551fn has_timeline_output_path() -> bool {
552    std::env::var("RANVIER_TIMELINE_OUTPUT")
553        .ok()
554        .map(|v| !v.trim().is_empty())
555        .unwrap_or(false)
556}
557
558fn timeline_sample_rate() -> f64 {
559    std::env::var("RANVIER_TIMELINE_SAMPLE_RATE")
560        .ok()
561        .and_then(|v| v.parse::<f64>().ok())
562        .map(|v| v.clamp(0.0, 1.0))
563        .unwrap_or(1.0)
564}
565
566fn sampled_by_bus_id(bus_id: uuid::Uuid, rate: f64) -> bool {
567    if rate <= 0.0 {
568        return false;
569    }
570    if rate >= 1.0 {
571        return true;
572    }
573    let bucket = (bus_id.as_u128() % 10_000) as f64 / 10_000.0;
574    bucket < rate
575}
576
577fn timeline_adaptive_policy() -> String {
578    std::env::var("RANVIER_TIMELINE_ADAPTIVE")
579        .unwrap_or_else(|_| "fault_branch".to_string())
580        .to_ascii_lowercase()
581}
582
583fn should_force_export<Out, E>(outcome: &Outcome<Out, E>, policy: &str) -> bool {
584    match policy {
585        "off" => false,
586        "fault_only" => matches!(outcome, Outcome::Fault(_)),
587        "fault_branch_emit" => {
588            matches!(
589                outcome,
590                Outcome::Fault(_) | Outcome::Branch(_, _) | Outcome::Emit(_, _)
591            )
592        }
593        _ => matches!(outcome, Outcome::Fault(_) | Outcome::Branch(_, _)),
594    }
595}
596
597#[derive(Default, Clone)]
598struct SamplingStats {
599    total_decisions: u64,
600    exported: u64,
601    skipped: u64,
602    sampled_exports: u64,
603    forced_exports: u64,
604    last_mode: String,
605    last_policy: String,
606    last_updated_ms: u64,
607}
608
609static TIMELINE_SAMPLING_STATS: OnceLock<Mutex<SamplingStats>> = OnceLock::new();
610
611fn stats_cell() -> &'static Mutex<SamplingStats> {
612    TIMELINE_SAMPLING_STATS.get_or_init(|| Mutex::new(SamplingStats::default()))
613}
614
615fn record_sampling_stats(exported: bool, sampled: bool, forced: bool, mode: &str, policy: &str) {
616    let snapshot = {
617        let mut stats = match stats_cell().lock() {
618            Ok(guard) => guard,
619            Err(_) => return,
620        };
621
622        stats.total_decisions += 1;
623        if exported {
624            stats.exported += 1;
625        } else {
626            stats.skipped += 1;
627        }
628        if sampled && exported {
629            stats.sampled_exports += 1;
630        }
631        if forced && exported {
632            stats.forced_exports += 1;
633        }
634        stats.last_mode = mode.to_string();
635        stats.last_policy = policy.to_string();
636        stats.last_updated_ms = now_ms();
637        stats.clone()
638    };
639
640    tracing::debug!(
641        ranvier.timeline.total_decisions = snapshot.total_decisions,
642        ranvier.timeline.exported = snapshot.exported,
643        ranvier.timeline.skipped = snapshot.skipped,
644        ranvier.timeline.sampled_exports = snapshot.sampled_exports,
645        ranvier.timeline.forced_exports = snapshot.forced_exports,
646        ranvier.timeline.mode = %snapshot.last_mode,
647        ranvier.timeline.policy = %snapshot.last_policy,
648        "Timeline sampling stats updated"
649    );
650
651    if let Some(path) = timeline_stats_output_path() {
652        let payload = serde_json::json!({
653            "total_decisions": snapshot.total_decisions,
654            "exported": snapshot.exported,
655            "skipped": snapshot.skipped,
656            "sampled_exports": snapshot.sampled_exports,
657            "forced_exports": snapshot.forced_exports,
658            "last_mode": snapshot.last_mode,
659            "last_policy": snapshot.last_policy,
660            "last_updated_ms": snapshot.last_updated_ms
661        });
662        if let Some(parent) = Path::new(&path).parent() {
663            let _ = fs::create_dir_all(parent);
664        }
665        if let Err(err) = fs::write(&path, payload.to_string()) {
666            tracing::warn!("Failed to write timeline sampling stats {}: {}", path, err);
667        }
668    }
669}
670
671fn timeline_stats_output_path() -> Option<String> {
672    std::env::var("RANVIER_TIMELINE_STATS_OUTPUT")
673        .ok()
674        .filter(|v| !v.trim().is_empty())
675}
676
677fn write_timeline_with_policy(path: &str, mode: &str, mut timeline: Timeline) -> Result<(), String> {
678    match mode {
679        "append" => {
680            if Path::new(path).exists() {
681                let content = fs::read_to_string(path).map_err(|e| e.to_string())?;
682                match serde_json::from_str::<Timeline>(&content) {
683                    Ok(mut existing) => {
684                        existing.events.append(&mut timeline.events);
685                        existing.sort();
686                        if let Some(max_events) = max_events_limit() {
687                            truncate_timeline_events(&mut existing, max_events);
688                        }
689                        write_timeline_json(path, &existing)
690                    }
691                    Err(_) => {
692                        // Fallback: if existing is invalid, replace with current timeline
693                        if let Some(max_events) = max_events_limit() {
694                            truncate_timeline_events(&mut timeline, max_events);
695                        }
696                        write_timeline_json(path, &timeline)
697                    }
698                }
699            } else {
700                if let Some(max_events) = max_events_limit() {
701                    truncate_timeline_events(&mut timeline, max_events);
702                }
703                write_timeline_json(path, &timeline)
704            }
705        }
706        "rotate" => {
707            let rotated_path = rotated_path(path, now_ms());
708            write_timeline_json(rotated_path.to_string_lossy().as_ref(), &timeline)?;
709            if let Some(keep) = rotate_keep_limit() {
710                cleanup_rotated_files(path, keep)?;
711            }
712            Ok(())
713        }
714        _ => write_timeline_json(path, &timeline),
715    }
716}
717
718fn write_timeline_json(path: &str, timeline: &Timeline) -> Result<(), String> {
719    if let Some(parent) = Path::new(path).parent() {
720        if !parent.as_os_str().is_empty() {
721            fs::create_dir_all(parent).map_err(|e| e.to_string())?;
722        }
723    }
724    let json = serde_json::to_string_pretty(timeline).map_err(|e| e.to_string())?;
725    fs::write(path, json).map_err(|e| e.to_string())
726}
727
728fn rotated_path(path: &str, suffix: u64) -> PathBuf {
729    let p = Path::new(path);
730    let parent = p.parent().unwrap_or_else(|| Path::new(""));
731    let stem = p
732        .file_stem()
733        .and_then(|s| s.to_str())
734        .unwrap_or("timeline");
735    let ext = p.extension().and_then(|e| e.to_str()).unwrap_or("json");
736    parent.join(format!("{}_{}.{}", stem, suffix, ext))
737}
738
739fn max_events_limit() -> Option<usize> {
740    std::env::var("RANVIER_TIMELINE_MAX_EVENTS")
741        .ok()
742        .and_then(|v| v.parse::<usize>().ok())
743        .filter(|v| *v > 0)
744}
745
746fn rotate_keep_limit() -> Option<usize> {
747    std::env::var("RANVIER_TIMELINE_ROTATE_KEEP")
748        .ok()
749        .and_then(|v| v.parse::<usize>().ok())
750        .filter(|v| *v > 0)
751}
752
753fn truncate_timeline_events(timeline: &mut Timeline, max_events: usize) {
754    let len = timeline.events.len();
755    if len > max_events {
756        let keep_from = len - max_events;
757        timeline.events = timeline.events.split_off(keep_from);
758    }
759}
760
761fn cleanup_rotated_files(base_path: &str, keep: usize) -> Result<(), String> {
762    let p = Path::new(base_path);
763    let parent = p.parent().unwrap_or_else(|| Path::new("."));
764    let stem = p
765        .file_stem()
766        .and_then(|s| s.to_str())
767        .unwrap_or("timeline");
768    let ext = p.extension().and_then(|e| e.to_str()).unwrap_or("json");
769    let prefix = format!("{}_", stem);
770    let suffix = format!(".{}", ext);
771
772    let mut files = fs::read_dir(parent)
773        .map_err(|e| e.to_string())?
774        .filter_map(|entry| entry.ok())
775        .filter(|entry| {
776            let name = entry.file_name();
777            let name = name.to_string_lossy();
778            name.starts_with(&prefix) && name.ends_with(&suffix)
779        })
780        .filter_map(|entry| {
781            let modified = entry
782                .metadata()
783                .ok()
784                .and_then(|m| m.modified().ok())
785                .unwrap_or(SystemTime::UNIX_EPOCH);
786            Some((entry.path(), modified))
787        })
788        .collect::<Vec<_>>();
789
790    files.sort_by(|a, b| b.1.cmp(&a.1));
791    for (path, _) in files.into_iter().skip(keep) {
792        let _ = fs::remove_file(path);
793    }
794    Ok(())
795}
796
797fn now_ms() -> u64 {
798    SystemTime::now()
799        .duration_since(UNIX_EPOCH)
800        .map(|d| d.as_millis() as u64)
801        .unwrap_or(0)
802}
803
804#[cfg(test)]
805mod tests {
806    use super::{sampled_by_bus_id, should_force_export};
807    use ranvier_core::Outcome;
808    use uuid::Uuid;
809
810    #[test]
811    fn adaptive_policy_force_export_matrix() {
812        let next = Outcome::<(), &'static str>::Next(());
813        let branch = Outcome::<(), &'static str>::Branch("declined".to_string(), None);
814        let emit = Outcome::<(), &'static str>::Emit("audit".to_string(), None);
815        let fault = Outcome::<(), &'static str>::Fault("boom");
816
817        assert!(!should_force_export(&next, "off"));
818        assert!(!should_force_export(&fault, "off"));
819
820        assert!(!should_force_export(&branch, "fault_only"));
821        assert!(should_force_export(&fault, "fault_only"));
822
823        assert!(should_force_export(&branch, "fault_branch"));
824        assert!(!should_force_export(&emit, "fault_branch"));
825        assert!(should_force_export(&fault, "fault_branch"));
826
827        assert!(should_force_export(&branch, "fault_branch_emit"));
828        assert!(should_force_export(&emit, "fault_branch_emit"));
829        assert!(should_force_export(&fault, "fault_branch_emit"));
830    }
831
832    #[test]
833    fn sampling_and_adaptive_combination_decisions() {
834        let bus_id = Uuid::nil();
835        let next = Outcome::<(), &'static str>::Next(());
836        let fault = Outcome::<(), &'static str>::Fault("boom");
837
838        let sampled_never = sampled_by_bus_id(bus_id, 0.0);
839        assert!(!sampled_never);
840        assert!(!(sampled_never || should_force_export(&next, "off")));
841        assert!(sampled_never || should_force_export(&fault, "fault_only"));
842
843        let sampled_always = sampled_by_bus_id(bus_id, 1.0);
844        assert!(sampled_always);
845        assert!(sampled_always || should_force_export(&next, "off"));
846        assert!(sampled_always || should_force_export(&fault, "off"));
847    }
848}