Skip to main content

ranvier_runtime/
axon.rs

1//! # Axon: Executable Decision Tree
2//!
3//! The `Axon` is the **runtime execution path** of a Typed Decision Tree.
4//! It functions as a reusable Typed Decision flow (Axon<In, Out, E>).
5//!
6//! ## Design Philosophy
7//!
8//! * **Axon flows, Schematic shows**: Axon executes; Schematic describes
9//! * **Builder pattern**: `Axon::start().then().then()`
10//! * **Schematic extraction**: Every Axon carries its structural metadata
11//!
12//! "Axon is the flowing thing, Schematic is the visible thing."
13
14use ranvier_core::bus::Bus;
15use ranvier_core::outcome::Outcome;
16use ranvier_core::schematic::{Edge, EdgeType, Node, NodeKind, Schematic};
17use ranvier_core::timeline::{Timeline, TimelineEvent};
18use ranvier_core::transition::Transition;
19use std::fs;
20use std::any::type_name;
21use std::future::Future;
22use std::path::{Path, PathBuf};
23use std::pin::Pin;
24use std::sync::{Arc, Mutex, OnceLock};
25use std::time::{SystemTime, UNIX_EPOCH};
26use tracing::Instrument;
27
28/// Type alias for async boxed futures used in Axon execution.
29pub type BoxFuture<'a, T> = Pin<Box<dyn Future<Output = T> + Send + 'a>>;
30
31/// Executor type for Axon steps.
32/// Now takes an input state `In`, a resource bundle `Res`, and returns an `Outcome<Out, E>`.
33/// Must be Send + Sync to be reusable across threads and clones.
34pub type Executor<In, Out, E, Res> =
35    Arc<dyn for<'a> Fn(In, &'a Res, &'a mut Bus) -> BoxFuture<'a, Outcome<Out, E>> + Send + Sync>;
36
37/// Helper to extract a readable type name from a type.
38fn type_name_of<T: ?Sized>() -> String {
39    let full = type_name::<T>();
40    full.split("::").last().unwrap_or(full).to_string()
41}
42
43/// The Axon Builder and Runtime.
44///
45/// `Axon` represents an executable decision tree.
46/// It is reusable and thread-safe.
47///
48/// ## Example
49///
50/// ```rust,ignore
51/// use ranvier_core::prelude::*;
52/// // ...
53/// // Start with an identity Axon (In -> In)
54/// let axon = Axon::<(), (), _>::new("My Axon")
55///     .then(StepA)
56///     .then(StepB);
57///
58/// // Execute multiple times
59/// let res1 = axon.execute((), &mut bus1).await;
60/// let res2 = axon.execute((), &mut bus2).await;
61/// ```
62pub struct Axon<In, Out, E, Res = ()> {
63    /// The static structure (for visualization/analysis)
64    pub schematic: Schematic,
65    /// The runtime executor
66    executor: Executor<In, Out, E, Res>,
67}
68
69impl<In, Out, E, Res> Clone for Axon<In, Out, E, Res> {
70    fn clone(&self) -> Self {
71        Self {
72            schematic: self.schematic.clone(),
73            executor: self.executor.clone(),
74        }
75    }
76}
77
78impl<In, E, Res> Axon<In, In, E, Res>
79where
80    In: Send + Sync + 'static,
81    E: Send + 'static,
82    Res: ranvier_core::transition::ResourceRequirement,
83{
84    /// Create a new Axon flow with the given label.
85    /// This is the preferred entry point per Flat API guidelines.
86    pub fn new(label: &str) -> Self {
87        Self::start(label)
88    }
89
90    /// Start defining a new Axon flow.
91    /// This creates an Identity Axon (In -> In) with no initial resource requirements.
92    pub fn start(label: &str) -> Self {
93        let node_id = uuid::Uuid::new_v4().to_string();
94        let node = Node {
95            id: node_id,
96            kind: NodeKind::Ingress,
97            label: label.to_string(),
98            description: None,
99            input_type: "void".to_string(),
100            output_type: type_name_of::<In>(),
101            resource_type: type_name_of::<Res>(),
102            metadata: Default::default(),
103            source_location: None,
104        };
105
106        let mut schematic = Schematic::new(label);
107        schematic.nodes.push(node);
108
109        let executor: Executor<In, In, E, Res> =
110            Arc::new(move |input, _res, _bus| Box::pin(std::future::ready(Outcome::Next(input))));
111
112        Self {
113            schematic,
114            executor,
115        }
116    }
117}
118
119impl<In, Out, E, Res> Axon<In, Out, E, Res>
120where
121    In: Send + Sync + 'static,
122    Out: Send + Sync + 'static,
123    E: Send + 'static,
124    Res: ranvier_core::transition::ResourceRequirement,
125{
126    /// Chain a transition to this Axon.
127    ///
128    /// Requires the transition to use the SAME resource bundle as the previous steps.
129    pub fn then<Next, Trans>(self, transition: Trans) -> Axon<In, Next, E, Res>
130    where
131        Next: Send + Sync + 'static,
132        Trans: Transition<Out, Next, Resources = Res, Error = E> + Clone + Send + Sync + 'static,
133    {
134        // Decompose self to avoid partial move issues
135        let Axon {
136            mut schematic,
137            executor: prev_executor,
138        } = self;
139
140        // Update Schematic
141        let next_node_id = uuid::Uuid::new_v4().to_string();
142        let next_node = Node {
143            id: next_node_id.clone(),
144            kind: NodeKind::Atom,
145            label: transition.label(),
146            description: transition.description(),
147            input_type: type_name_of::<Out>(),
148            output_type: type_name_of::<Next>(),
149            resource_type: type_name_of::<Res>(),
150            metadata: Default::default(),
151            source_location: None,
152        };
153
154        let last_node_id = schematic
155            .nodes
156            .last()
157            .map(|n| n.id.clone())
158            .unwrap_or_default();
159
160        schematic.nodes.push(next_node);
161        schematic.edges.push(Edge {
162            from: last_node_id,
163            to: next_node_id.clone(),
164            kind: EdgeType::Linear,
165            label: Some("Next".to_string()),
166        });
167
168        // Compose Executor
169        let node_id_for_exec = next_node_id.clone();
170        let node_label_for_exec = transition.label();
171        let next_executor: Executor<In, Next, E, Res> = Arc::new(
172            move |input: In, res: &Res, bus: &mut Bus| -> BoxFuture<'_, Outcome<Next, E>> {
173                let prev = prev_executor.clone();
174                let trans = transition.clone();
175                let timeline_node_id = node_id_for_exec.clone();
176                let timeline_node_label = node_label_for_exec.clone();
177
178                Box::pin(async move {
179                    // Run previous step
180                    let prev_result = prev(input, res, bus).await;
181
182                    // Unpack
183                    let state = match prev_result {
184                        Outcome::Next(t) => t,
185                        other => return other.map(|_| unreachable!()),
186                    };
187
188                    // Run this step with automatic instrumentation
189                    let label = trans.label();
190                    let res_type = std::any::type_name::<Res>()
191                        .split("::")
192                        .last()
193                        .unwrap_or("unknown");
194
195                    let enter_ts = now_ms();
196                    if let Some(timeline) = bus.read_mut::<Timeline>() {
197                        timeline.push(TimelineEvent::NodeEnter {
198                            node_id: timeline_node_id.clone(),
199                            node_label: timeline_node_label.clone(),
200                            timestamp: enter_ts,
201                        });
202                    }
203
204                    let started = std::time::Instant::now();
205                    let result = trans
206                        .run(state, res, bus)
207                        .instrument(tracing::info_span!(
208                            "Node",
209                            ranvier.node = %label,
210                            ranvier.resource_type = %res_type
211                        ))
212                        .await;
213                    let duration_ms = started.elapsed().as_millis() as u64;
214                    let exit_ts = now_ms();
215
216                    if let Some(timeline) = bus.read_mut::<Timeline>() {
217                        timeline.push(TimelineEvent::NodeExit {
218                            node_id: timeline_node_id.clone(),
219                            outcome_type: outcome_type_name(&result),
220                            duration_ms,
221                            timestamp: exit_ts,
222                        });
223
224                        if let Outcome::Branch(branch_id, _) = &result {
225                            timeline.push(TimelineEvent::Branchtaken {
226                                branch_id: branch_id.clone(),
227                                timestamp: exit_ts,
228                            });
229                        }
230                    }
231
232                    result
233                })
234            },
235        );
236
237        Axon {
238            schematic,
239            executor: next_executor,
240        }
241    }
242
243    /// Add a branch point
244    pub fn branch(mut self, branch_id: impl Into<String>, label: &str) -> Self {
245        let branch_id_str = branch_id.into();
246        let last_node_id = self
247            .schematic
248            .nodes
249            .last()
250            .map(|n| n.id.clone())
251            .unwrap_or_default();
252
253        let branch_node = Node {
254            id: uuid::Uuid::new_v4().to_string(),
255            kind: NodeKind::Synapse,
256            label: label.to_string(),
257            description: None,
258            input_type: type_name_of::<Out>(),
259            output_type: type_name_of::<Out>(),
260            resource_type: type_name_of::<Res>(),
261            metadata: Default::default(),
262            source_location: None,
263        };
264
265        self.schematic.nodes.push(branch_node);
266        self.schematic.edges.push(Edge {
267            from: last_node_id,
268            to: branch_id_str.clone(),
269            kind: EdgeType::Branch(branch_id_str),
270            label: Some("Branch".to_string()),
271        });
272
273        self
274    }
275
276    /// Execute the Axon with the given input and resources.
277    pub async fn execute(&self, input: In, resources: &Res, bus: &mut Bus) -> Outcome<Out, E> {
278        let should_capture = should_attach_timeline(bus);
279        let inserted_timeline = if should_capture {
280            ensure_timeline(bus)
281        } else {
282            false
283        };
284        let ingress_started = std::time::Instant::now();
285        let ingress_enter_ts = now_ms();
286        if should_capture
287            && let (Some(timeline), Some(ingress)) =
288            (bus.read_mut::<Timeline>(), self.schematic.nodes.first())
289        {
290            timeline.push(TimelineEvent::NodeEnter {
291                node_id: ingress.id.clone(),
292                node_label: ingress.label.clone(),
293                timestamp: ingress_enter_ts,
294            });
295        }
296
297        let label = self.schematic.name.clone();
298        let outcome = (self.executor)(input, resources, bus)
299            .instrument(tracing::info_span!("Circuit", ranvier.circuit = %label))
300            .await;
301
302        let ingress_exit_ts = now_ms();
303        if should_capture
304            && let (Some(timeline), Some(ingress)) =
305            (bus.read_mut::<Timeline>(), self.schematic.nodes.first())
306        {
307            timeline.push(TimelineEvent::NodeExit {
308                node_id: ingress.id.clone(),
309                outcome_type: outcome_type_name(&outcome),
310                duration_ms: ingress_started.elapsed().as_millis() as u64,
311                timestamp: ingress_exit_ts,
312            });
313        }
314
315        if should_capture {
316            maybe_export_timeline(bus, &outcome);
317        }
318        if inserted_timeline {
319            let _ = bus.remove::<Timeline>();
320        }
321
322        outcome
323    }
324
325    /// Starts the Ranvier Inspector for this Axon on the specified port.
326    /// This spawns a background task to serve the Schematic.
327    pub fn serve_inspector(self, port: u16) -> Self {
328        let schematic = self.schematic.clone();
329        tokio::spawn(async move {
330            if let Err(e) = ranvier_inspector::Inspector::new(schematic, port)
331                .with_projection_files_from_env()
332                .serve()
333                .await
334            {
335                tracing::error!("Inspector server failed: {}", e);
336            }
337        });
338        self
339    }
340
341    /// Get a reference to the Schematic (structural view).
342    pub fn schematic(&self) -> &Schematic {
343        &self.schematic
344    }
345
346    /// Consume and return the Schematic.
347    pub fn into_schematic(self) -> Schematic {
348        self.schematic
349    }
350}
351
352fn maybe_export_timeline<Out, E>(bus: &mut Bus, outcome: &Outcome<Out, E>) {
353    let path = match std::env::var("RANVIER_TIMELINE_OUTPUT") {
354        Ok(v) if !v.trim().is_empty() => v,
355        _ => return,
356    };
357
358    let sampled = sampled_by_bus_id(bus.id, timeline_sample_rate());
359    let policy = timeline_adaptive_policy();
360    let forced = should_force_export(outcome, &policy);
361    let should_export = sampled || forced;
362    if !should_export {
363        record_sampling_stats(false, sampled, forced, "none", &policy);
364        return;
365    }
366
367    let mut timeline = bus.read::<Timeline>().cloned().unwrap_or_default();
368    timeline.sort();
369
370    let mode = std::env::var("RANVIER_TIMELINE_MODE")
371        .unwrap_or_else(|_| "overwrite".to_string())
372        .to_ascii_lowercase();
373
374    if let Err(err) = write_timeline_with_policy(&path, &mode, timeline) {
375        tracing::warn!(
376            "Failed to persist timeline file {} (mode={}): {}",
377            path,
378            mode,
379            err
380        );
381        record_sampling_stats(false, sampled, forced, &mode, &policy);
382    } else {
383        record_sampling_stats(true, sampled, forced, &mode, &policy);
384    }
385}
386
387fn outcome_type_name<Out, E>(outcome: &Outcome<Out, E>) -> String {
388    match outcome {
389        Outcome::Next(_) => "Next".to_string(),
390        Outcome::Branch(id, _) => format!("Branch:{}", id),
391        Outcome::Jump(id, _) => format!("Jump:{}", id),
392        Outcome::Emit(event_type, _) => format!("Emit:{}", event_type),
393        Outcome::Fault(_) => "Fault".to_string(),
394    }
395}
396
397fn ensure_timeline(bus: &mut Bus) -> bool {
398    if bus.has::<Timeline>() {
399        false
400    } else {
401        bus.insert(Timeline::new());
402        true
403    }
404}
405
406fn should_attach_timeline(bus: &Bus) -> bool {
407    // Respect explicitly provided timeline collector from caller.
408    if bus.has::<Timeline>() {
409        return true;
410    }
411
412    // Attach timeline when runtime export path exists.
413    has_timeline_output_path()
414}
415
416fn has_timeline_output_path() -> bool {
417    std::env::var("RANVIER_TIMELINE_OUTPUT")
418        .ok()
419        .map(|v| !v.trim().is_empty())
420        .unwrap_or(false)
421}
422
423fn timeline_sample_rate() -> f64 {
424    std::env::var("RANVIER_TIMELINE_SAMPLE_RATE")
425        .ok()
426        .and_then(|v| v.parse::<f64>().ok())
427        .map(|v| v.clamp(0.0, 1.0))
428        .unwrap_or(1.0)
429}
430
431fn sampled_by_bus_id(bus_id: uuid::Uuid, rate: f64) -> bool {
432    if rate <= 0.0 {
433        return false;
434    }
435    if rate >= 1.0 {
436        return true;
437    }
438    let bucket = (bus_id.as_u128() % 10_000) as f64 / 10_000.0;
439    bucket < rate
440}
441
442fn timeline_adaptive_policy() -> String {
443    std::env::var("RANVIER_TIMELINE_ADAPTIVE")
444        .unwrap_or_else(|_| "fault_branch".to_string())
445        .to_ascii_lowercase()
446}
447
448fn should_force_export<Out, E>(outcome: &Outcome<Out, E>, policy: &str) -> bool {
449    match policy {
450        "off" => false,
451        "fault_only" => matches!(outcome, Outcome::Fault(_)),
452        "fault_branch_emit" => {
453            matches!(
454                outcome,
455                Outcome::Fault(_) | Outcome::Branch(_, _) | Outcome::Emit(_, _)
456            )
457        }
458        _ => matches!(outcome, Outcome::Fault(_) | Outcome::Branch(_, _)),
459    }
460}
461
462#[derive(Default, Clone)]
463struct SamplingStats {
464    total_decisions: u64,
465    exported: u64,
466    skipped: u64,
467    sampled_exports: u64,
468    forced_exports: u64,
469    last_mode: String,
470    last_policy: String,
471    last_updated_ms: u64,
472}
473
474static TIMELINE_SAMPLING_STATS: OnceLock<Mutex<SamplingStats>> = OnceLock::new();
475
476fn stats_cell() -> &'static Mutex<SamplingStats> {
477    TIMELINE_SAMPLING_STATS.get_or_init(|| Mutex::new(SamplingStats::default()))
478}
479
480fn record_sampling_stats(exported: bool, sampled: bool, forced: bool, mode: &str, policy: &str) {
481    let snapshot = {
482        let mut stats = match stats_cell().lock() {
483            Ok(guard) => guard,
484            Err(_) => return,
485        };
486
487        stats.total_decisions += 1;
488        if exported {
489            stats.exported += 1;
490        } else {
491            stats.skipped += 1;
492        }
493        if sampled && exported {
494            stats.sampled_exports += 1;
495        }
496        if forced && exported {
497            stats.forced_exports += 1;
498        }
499        stats.last_mode = mode.to_string();
500        stats.last_policy = policy.to_string();
501        stats.last_updated_ms = now_ms();
502        stats.clone()
503    };
504
505    tracing::debug!(
506        ranvier.timeline.total_decisions = snapshot.total_decisions,
507        ranvier.timeline.exported = snapshot.exported,
508        ranvier.timeline.skipped = snapshot.skipped,
509        ranvier.timeline.sampled_exports = snapshot.sampled_exports,
510        ranvier.timeline.forced_exports = snapshot.forced_exports,
511        ranvier.timeline.mode = %snapshot.last_mode,
512        ranvier.timeline.policy = %snapshot.last_policy,
513        "Timeline sampling stats updated"
514    );
515
516    if let Some(path) = timeline_stats_output_path() {
517        let payload = serde_json::json!({
518            "total_decisions": snapshot.total_decisions,
519            "exported": snapshot.exported,
520            "skipped": snapshot.skipped,
521            "sampled_exports": snapshot.sampled_exports,
522            "forced_exports": snapshot.forced_exports,
523            "last_mode": snapshot.last_mode,
524            "last_policy": snapshot.last_policy,
525            "last_updated_ms": snapshot.last_updated_ms
526        });
527        if let Some(parent) = Path::new(&path).parent() {
528            let _ = fs::create_dir_all(parent);
529        }
530        if let Err(err) = fs::write(&path, payload.to_string()) {
531            tracing::warn!("Failed to write timeline sampling stats {}: {}", path, err);
532        }
533    }
534}
535
536fn timeline_stats_output_path() -> Option<String> {
537    std::env::var("RANVIER_TIMELINE_STATS_OUTPUT")
538        .ok()
539        .filter(|v| !v.trim().is_empty())
540}
541
542fn write_timeline_with_policy(path: &str, mode: &str, mut timeline: Timeline) -> Result<(), String> {
543    match mode {
544        "append" => {
545            if Path::new(path).exists() {
546                let content = fs::read_to_string(path).map_err(|e| e.to_string())?;
547                match serde_json::from_str::<Timeline>(&content) {
548                    Ok(mut existing) => {
549                        existing.events.append(&mut timeline.events);
550                        existing.sort();
551                        if let Some(max_events) = max_events_limit() {
552                            truncate_timeline_events(&mut existing, max_events);
553                        }
554                        write_timeline_json(path, &existing)
555                    }
556                    Err(_) => {
557                        // Fallback: if existing is invalid, replace with current timeline
558                        if let Some(max_events) = max_events_limit() {
559                            truncate_timeline_events(&mut timeline, max_events);
560                        }
561                        write_timeline_json(path, &timeline)
562                    }
563                }
564            } else {
565                if let Some(max_events) = max_events_limit() {
566                    truncate_timeline_events(&mut timeline, max_events);
567                }
568                write_timeline_json(path, &timeline)
569            }
570        }
571        "rotate" => {
572            let rotated_path = rotated_path(path, now_ms());
573            write_timeline_json(rotated_path.to_string_lossy().as_ref(), &timeline)?;
574            if let Some(keep) = rotate_keep_limit() {
575                cleanup_rotated_files(path, keep)?;
576            }
577            Ok(())
578        }
579        _ => write_timeline_json(path, &timeline),
580    }
581}
582
583fn write_timeline_json(path: &str, timeline: &Timeline) -> Result<(), String> {
584    if let Some(parent) = Path::new(path).parent() {
585        if !parent.as_os_str().is_empty() {
586            fs::create_dir_all(parent).map_err(|e| e.to_string())?;
587        }
588    }
589    let json = serde_json::to_string_pretty(timeline).map_err(|e| e.to_string())?;
590    fs::write(path, json).map_err(|e| e.to_string())
591}
592
593fn rotated_path(path: &str, suffix: u64) -> PathBuf {
594    let p = Path::new(path);
595    let parent = p.parent().unwrap_or_else(|| Path::new(""));
596    let stem = p
597        .file_stem()
598        .and_then(|s| s.to_str())
599        .unwrap_or("timeline");
600    let ext = p.extension().and_then(|e| e.to_str()).unwrap_or("json");
601    parent.join(format!("{}_{}.{}", stem, suffix, ext))
602}
603
604fn max_events_limit() -> Option<usize> {
605    std::env::var("RANVIER_TIMELINE_MAX_EVENTS")
606        .ok()
607        .and_then(|v| v.parse::<usize>().ok())
608        .filter(|v| *v > 0)
609}
610
611fn rotate_keep_limit() -> Option<usize> {
612    std::env::var("RANVIER_TIMELINE_ROTATE_KEEP")
613        .ok()
614        .and_then(|v| v.parse::<usize>().ok())
615        .filter(|v| *v > 0)
616}
617
618fn truncate_timeline_events(timeline: &mut Timeline, max_events: usize) {
619    let len = timeline.events.len();
620    if len > max_events {
621        let keep_from = len - max_events;
622        timeline.events = timeline.events.split_off(keep_from);
623    }
624}
625
626fn cleanup_rotated_files(base_path: &str, keep: usize) -> Result<(), String> {
627    let p = Path::new(base_path);
628    let parent = p.parent().unwrap_or_else(|| Path::new("."));
629    let stem = p
630        .file_stem()
631        .and_then(|s| s.to_str())
632        .unwrap_or("timeline");
633    let ext = p.extension().and_then(|e| e.to_str()).unwrap_or("json");
634    let prefix = format!("{}_", stem);
635    let suffix = format!(".{}", ext);
636
637    let mut files = fs::read_dir(parent)
638        .map_err(|e| e.to_string())?
639        .filter_map(|entry| entry.ok())
640        .filter(|entry| {
641            let name = entry.file_name();
642            let name = name.to_string_lossy();
643            name.starts_with(&prefix) && name.ends_with(&suffix)
644        })
645        .filter_map(|entry| {
646            let modified = entry
647                .metadata()
648                .ok()
649                .and_then(|m| m.modified().ok())
650                .unwrap_or(SystemTime::UNIX_EPOCH);
651            Some((entry.path(), modified))
652        })
653        .collect::<Vec<_>>();
654
655    files.sort_by(|a, b| b.1.cmp(&a.1));
656    for (path, _) in files.into_iter().skip(keep) {
657        let _ = fs::remove_file(path);
658    }
659    Ok(())
660}
661
662fn now_ms() -> u64 {
663    SystemTime::now()
664        .duration_since(UNIX_EPOCH)
665        .map(|d| d.as_millis() as u64)
666        .unwrap_or(0)
667}