1use ranvier_core::bus::Bus;
15use ranvier_core::outcome::Outcome;
16use ranvier_core::schematic::{Edge, EdgeType, Node, NodeKind, Schematic};
17use ranvier_core::timeline::{Timeline, TimelineEvent};
18use ranvier_core::transition::Transition;
19use std::fs;
20use std::any::type_name;
21use std::future::Future;
22use std::path::{Path, PathBuf};
23use std::pin::Pin;
24use std::sync::{Arc, Mutex, OnceLock};
25use std::time::{SystemTime, UNIX_EPOCH};
26use tracing::Instrument;
27
28pub type BoxFuture<'a, T> = Pin<Box<dyn Future<Output = T> + Send + 'a>>;
30
31pub type Executor<In, Out, E, Res> =
35 Arc<dyn for<'a> Fn(In, &'a Res, &'a mut Bus) -> BoxFuture<'a, Outcome<Out, E>> + Send + Sync>;
36
37fn type_name_of<T: ?Sized>() -> String {
39 let full = type_name::<T>();
40 full.split("::").last().unwrap_or(full).to_string()
41}
42
43pub struct Axon<In, Out, E, Res = ()> {
63 pub schematic: Schematic,
65 executor: Executor<In, Out, E, Res>,
67}
68
69impl<In, Out, E, Res> Clone for Axon<In, Out, E, Res> {
70 fn clone(&self) -> Self {
71 Self {
72 schematic: self.schematic.clone(),
73 executor: self.executor.clone(),
74 }
75 }
76}
77
78impl<In, E, Res> Axon<In, In, E, Res>
79where
80 In: Send + Sync + 'static,
81 E: Send + 'static,
82 Res: ranvier_core::transition::ResourceRequirement,
83{
84 pub fn new(label: &str) -> Self {
87 Self::start(label)
88 }
89
90 pub fn start(label: &str) -> Self {
93 let node_id = uuid::Uuid::new_v4().to_string();
94 let node = Node {
95 id: node_id,
96 kind: NodeKind::Ingress,
97 label: label.to_string(),
98 description: None,
99 input_type: "void".to_string(),
100 output_type: type_name_of::<In>(),
101 resource_type: type_name_of::<Res>(),
102 metadata: Default::default(),
103 source_location: None,
104 };
105
106 let mut schematic = Schematic::new(label);
107 schematic.nodes.push(node);
108
109 let executor: Executor<In, In, E, Res> =
110 Arc::new(move |input, _res, _bus| Box::pin(std::future::ready(Outcome::Next(input))));
111
112 Self {
113 schematic,
114 executor,
115 }
116 }
117}
118
119impl<In, Out, E, Res> Axon<In, Out, E, Res>
120where
121 In: Send + Sync + 'static,
122 Out: Send + Sync + 'static,
123 E: Send + 'static,
124 Res: ranvier_core::transition::ResourceRequirement,
125{
126 pub fn then<Next, Trans>(self, transition: Trans) -> Axon<In, Next, E, Res>
130 where
131 Next: Send + Sync + 'static,
132 Trans: Transition<Out, Next, Resources = Res, Error = E> + Clone + Send + Sync + 'static,
133 {
134 let Axon {
136 mut schematic,
137 executor: prev_executor,
138 } = self;
139
140 let next_node_id = uuid::Uuid::new_v4().to_string();
142 let next_node = Node {
143 id: next_node_id.clone(),
144 kind: NodeKind::Atom,
145 label: transition.label(),
146 description: transition.description(),
147 input_type: type_name_of::<Out>(),
148 output_type: type_name_of::<Next>(),
149 resource_type: type_name_of::<Res>(),
150 metadata: Default::default(),
151 source_location: None,
152 };
153
154 let last_node_id = schematic
155 .nodes
156 .last()
157 .map(|n| n.id.clone())
158 .unwrap_or_default();
159
160 schematic.nodes.push(next_node);
161 schematic.edges.push(Edge {
162 from: last_node_id,
163 to: next_node_id.clone(),
164 kind: EdgeType::Linear,
165 label: Some("Next".to_string()),
166 });
167
168 let node_id_for_exec = next_node_id.clone();
170 let node_label_for_exec = transition.label();
171 let next_executor: Executor<In, Next, E, Res> = Arc::new(
172 move |input: In, res: &Res, bus: &mut Bus| -> BoxFuture<'_, Outcome<Next, E>> {
173 let prev = prev_executor.clone();
174 let trans = transition.clone();
175 let timeline_node_id = node_id_for_exec.clone();
176 let timeline_node_label = node_label_for_exec.clone();
177
178 Box::pin(async move {
179 let prev_result = prev(input, res, bus).await;
181
182 let state = match prev_result {
184 Outcome::Next(t) => t,
185 other => return other.map(|_| unreachable!()),
186 };
187
188 let label = trans.label();
190 let res_type = std::any::type_name::<Res>()
191 .split("::")
192 .last()
193 .unwrap_or("unknown");
194
195 let enter_ts = now_ms();
196 if let Some(timeline) = bus.read_mut::<Timeline>() {
197 timeline.push(TimelineEvent::NodeEnter {
198 node_id: timeline_node_id.clone(),
199 node_label: timeline_node_label.clone(),
200 timestamp: enter_ts,
201 });
202 }
203
204 let started = std::time::Instant::now();
205 let result = trans
206 .run(state, res, bus)
207 .instrument(tracing::info_span!(
208 "Node",
209 ranvier.node = %label,
210 ranvier.resource_type = %res_type
211 ))
212 .await;
213 let duration_ms = started.elapsed().as_millis() as u64;
214 let exit_ts = now_ms();
215
216 if let Some(timeline) = bus.read_mut::<Timeline>() {
217 timeline.push(TimelineEvent::NodeExit {
218 node_id: timeline_node_id.clone(),
219 outcome_type: outcome_type_name(&result),
220 duration_ms,
221 timestamp: exit_ts,
222 });
223
224 if let Outcome::Branch(branch_id, _) = &result {
225 timeline.push(TimelineEvent::Branchtaken {
226 branch_id: branch_id.clone(),
227 timestamp: exit_ts,
228 });
229 }
230 }
231
232 result
233 })
234 },
235 );
236
237 Axon {
238 schematic,
239 executor: next_executor,
240 }
241 }
242
243 pub fn branch(mut self, branch_id: impl Into<String>, label: &str) -> Self {
245 let branch_id_str = branch_id.into();
246 let last_node_id = self
247 .schematic
248 .nodes
249 .last()
250 .map(|n| n.id.clone())
251 .unwrap_or_default();
252
253 let branch_node = Node {
254 id: uuid::Uuid::new_v4().to_string(),
255 kind: NodeKind::Synapse,
256 label: label.to_string(),
257 description: None,
258 input_type: type_name_of::<Out>(),
259 output_type: type_name_of::<Out>(),
260 resource_type: type_name_of::<Res>(),
261 metadata: Default::default(),
262 source_location: None,
263 };
264
265 self.schematic.nodes.push(branch_node);
266 self.schematic.edges.push(Edge {
267 from: last_node_id,
268 to: branch_id_str.clone(),
269 kind: EdgeType::Branch(branch_id_str),
270 label: Some("Branch".to_string()),
271 });
272
273 self
274 }
275
276 pub async fn execute(&self, input: In, resources: &Res, bus: &mut Bus) -> Outcome<Out, E> {
278 let should_capture = should_attach_timeline(bus);
279 let inserted_timeline = if should_capture {
280 ensure_timeline(bus)
281 } else {
282 false
283 };
284 let ingress_started = std::time::Instant::now();
285 let ingress_enter_ts = now_ms();
286 if should_capture
287 && let (Some(timeline), Some(ingress)) =
288 (bus.read_mut::<Timeline>(), self.schematic.nodes.first())
289 {
290 timeline.push(TimelineEvent::NodeEnter {
291 node_id: ingress.id.clone(),
292 node_label: ingress.label.clone(),
293 timestamp: ingress_enter_ts,
294 });
295 }
296
297 let label = self.schematic.name.clone();
298 let outcome = (self.executor)(input, resources, bus)
299 .instrument(tracing::info_span!("Circuit", ranvier.circuit = %label))
300 .await;
301
302 let ingress_exit_ts = now_ms();
303 if should_capture
304 && let (Some(timeline), Some(ingress)) =
305 (bus.read_mut::<Timeline>(), self.schematic.nodes.first())
306 {
307 timeline.push(TimelineEvent::NodeExit {
308 node_id: ingress.id.clone(),
309 outcome_type: outcome_type_name(&outcome),
310 duration_ms: ingress_started.elapsed().as_millis() as u64,
311 timestamp: ingress_exit_ts,
312 });
313 }
314
315 if should_capture {
316 maybe_export_timeline(bus, &outcome);
317 }
318 if inserted_timeline {
319 let _ = bus.remove::<Timeline>();
320 }
321
322 outcome
323 }
324
325 pub fn serve_inspector(self, port: u16) -> Self {
328 let schematic = self.schematic.clone();
329 tokio::spawn(async move {
330 if let Err(e) = ranvier_inspector::Inspector::new(schematic, port)
331 .with_projection_files_from_env()
332 .serve()
333 .await
334 {
335 tracing::error!("Inspector server failed: {}", e);
336 }
337 });
338 self
339 }
340
341 pub fn schematic(&self) -> &Schematic {
343 &self.schematic
344 }
345
346 pub fn into_schematic(self) -> Schematic {
348 self.schematic
349 }
350}
351
352fn maybe_export_timeline<Out, E>(bus: &mut Bus, outcome: &Outcome<Out, E>) {
353 let path = match std::env::var("RANVIER_TIMELINE_OUTPUT") {
354 Ok(v) if !v.trim().is_empty() => v,
355 _ => return,
356 };
357
358 let sampled = sampled_by_bus_id(bus.id, timeline_sample_rate());
359 let policy = timeline_adaptive_policy();
360 let forced = should_force_export(outcome, &policy);
361 let should_export = sampled || forced;
362 if !should_export {
363 record_sampling_stats(false, sampled, forced, "none", &policy);
364 return;
365 }
366
367 let mut timeline = bus.read::<Timeline>().cloned().unwrap_or_default();
368 timeline.sort();
369
370 let mode = std::env::var("RANVIER_TIMELINE_MODE")
371 .unwrap_or_else(|_| "overwrite".to_string())
372 .to_ascii_lowercase();
373
374 if let Err(err) = write_timeline_with_policy(&path, &mode, timeline) {
375 tracing::warn!(
376 "Failed to persist timeline file {} (mode={}): {}",
377 path,
378 mode,
379 err
380 );
381 record_sampling_stats(false, sampled, forced, &mode, &policy);
382 } else {
383 record_sampling_stats(true, sampled, forced, &mode, &policy);
384 }
385}
386
387fn outcome_type_name<Out, E>(outcome: &Outcome<Out, E>) -> String {
388 match outcome {
389 Outcome::Next(_) => "Next".to_string(),
390 Outcome::Branch(id, _) => format!("Branch:{}", id),
391 Outcome::Jump(id, _) => format!("Jump:{}", id),
392 Outcome::Emit(event_type, _) => format!("Emit:{}", event_type),
393 Outcome::Fault(_) => "Fault".to_string(),
394 }
395}
396
397fn ensure_timeline(bus: &mut Bus) -> bool {
398 if bus.has::<Timeline>() {
399 false
400 } else {
401 bus.insert(Timeline::new());
402 true
403 }
404}
405
406fn should_attach_timeline(bus: &Bus) -> bool {
407 if bus.has::<Timeline>() {
409 return true;
410 }
411
412 has_timeline_output_path()
414}
415
416fn has_timeline_output_path() -> bool {
417 std::env::var("RANVIER_TIMELINE_OUTPUT")
418 .ok()
419 .map(|v| !v.trim().is_empty())
420 .unwrap_or(false)
421}
422
423fn timeline_sample_rate() -> f64 {
424 std::env::var("RANVIER_TIMELINE_SAMPLE_RATE")
425 .ok()
426 .and_then(|v| v.parse::<f64>().ok())
427 .map(|v| v.clamp(0.0, 1.0))
428 .unwrap_or(1.0)
429}
430
431fn sampled_by_bus_id(bus_id: uuid::Uuid, rate: f64) -> bool {
432 if rate <= 0.0 {
433 return false;
434 }
435 if rate >= 1.0 {
436 return true;
437 }
438 let bucket = (bus_id.as_u128() % 10_000) as f64 / 10_000.0;
439 bucket < rate
440}
441
442fn timeline_adaptive_policy() -> String {
443 std::env::var("RANVIER_TIMELINE_ADAPTIVE")
444 .unwrap_or_else(|_| "fault_branch".to_string())
445 .to_ascii_lowercase()
446}
447
448fn should_force_export<Out, E>(outcome: &Outcome<Out, E>, policy: &str) -> bool {
449 match policy {
450 "off" => false,
451 "fault_only" => matches!(outcome, Outcome::Fault(_)),
452 "fault_branch_emit" => {
453 matches!(
454 outcome,
455 Outcome::Fault(_) | Outcome::Branch(_, _) | Outcome::Emit(_, _)
456 )
457 }
458 _ => matches!(outcome, Outcome::Fault(_) | Outcome::Branch(_, _)),
459 }
460}
461
462#[derive(Default, Clone)]
463struct SamplingStats {
464 total_decisions: u64,
465 exported: u64,
466 skipped: u64,
467 sampled_exports: u64,
468 forced_exports: u64,
469 last_mode: String,
470 last_policy: String,
471 last_updated_ms: u64,
472}
473
474static TIMELINE_SAMPLING_STATS: OnceLock<Mutex<SamplingStats>> = OnceLock::new();
475
476fn stats_cell() -> &'static Mutex<SamplingStats> {
477 TIMELINE_SAMPLING_STATS.get_or_init(|| Mutex::new(SamplingStats::default()))
478}
479
480fn record_sampling_stats(exported: bool, sampled: bool, forced: bool, mode: &str, policy: &str) {
481 let snapshot = {
482 let mut stats = match stats_cell().lock() {
483 Ok(guard) => guard,
484 Err(_) => return,
485 };
486
487 stats.total_decisions += 1;
488 if exported {
489 stats.exported += 1;
490 } else {
491 stats.skipped += 1;
492 }
493 if sampled && exported {
494 stats.sampled_exports += 1;
495 }
496 if forced && exported {
497 stats.forced_exports += 1;
498 }
499 stats.last_mode = mode.to_string();
500 stats.last_policy = policy.to_string();
501 stats.last_updated_ms = now_ms();
502 stats.clone()
503 };
504
505 tracing::debug!(
506 ranvier.timeline.total_decisions = snapshot.total_decisions,
507 ranvier.timeline.exported = snapshot.exported,
508 ranvier.timeline.skipped = snapshot.skipped,
509 ranvier.timeline.sampled_exports = snapshot.sampled_exports,
510 ranvier.timeline.forced_exports = snapshot.forced_exports,
511 ranvier.timeline.mode = %snapshot.last_mode,
512 ranvier.timeline.policy = %snapshot.last_policy,
513 "Timeline sampling stats updated"
514 );
515
516 if let Some(path) = timeline_stats_output_path() {
517 let payload = serde_json::json!({
518 "total_decisions": snapshot.total_decisions,
519 "exported": snapshot.exported,
520 "skipped": snapshot.skipped,
521 "sampled_exports": snapshot.sampled_exports,
522 "forced_exports": snapshot.forced_exports,
523 "last_mode": snapshot.last_mode,
524 "last_policy": snapshot.last_policy,
525 "last_updated_ms": snapshot.last_updated_ms
526 });
527 if let Some(parent) = Path::new(&path).parent() {
528 let _ = fs::create_dir_all(parent);
529 }
530 if let Err(err) = fs::write(&path, payload.to_string()) {
531 tracing::warn!("Failed to write timeline sampling stats {}: {}", path, err);
532 }
533 }
534}
535
536fn timeline_stats_output_path() -> Option<String> {
537 std::env::var("RANVIER_TIMELINE_STATS_OUTPUT")
538 .ok()
539 .filter(|v| !v.trim().is_empty())
540}
541
542fn write_timeline_with_policy(path: &str, mode: &str, mut timeline: Timeline) -> Result<(), String> {
543 match mode {
544 "append" => {
545 if Path::new(path).exists() {
546 let content = fs::read_to_string(path).map_err(|e| e.to_string())?;
547 match serde_json::from_str::<Timeline>(&content) {
548 Ok(mut existing) => {
549 existing.events.append(&mut timeline.events);
550 existing.sort();
551 if let Some(max_events) = max_events_limit() {
552 truncate_timeline_events(&mut existing, max_events);
553 }
554 write_timeline_json(path, &existing)
555 }
556 Err(_) => {
557 if let Some(max_events) = max_events_limit() {
559 truncate_timeline_events(&mut timeline, max_events);
560 }
561 write_timeline_json(path, &timeline)
562 }
563 }
564 } else {
565 if let Some(max_events) = max_events_limit() {
566 truncate_timeline_events(&mut timeline, max_events);
567 }
568 write_timeline_json(path, &timeline)
569 }
570 }
571 "rotate" => {
572 let rotated_path = rotated_path(path, now_ms());
573 write_timeline_json(rotated_path.to_string_lossy().as_ref(), &timeline)?;
574 if let Some(keep) = rotate_keep_limit() {
575 cleanup_rotated_files(path, keep)?;
576 }
577 Ok(())
578 }
579 _ => write_timeline_json(path, &timeline),
580 }
581}
582
583fn write_timeline_json(path: &str, timeline: &Timeline) -> Result<(), String> {
584 if let Some(parent) = Path::new(path).parent() {
585 if !parent.as_os_str().is_empty() {
586 fs::create_dir_all(parent).map_err(|e| e.to_string())?;
587 }
588 }
589 let json = serde_json::to_string_pretty(timeline).map_err(|e| e.to_string())?;
590 fs::write(path, json).map_err(|e| e.to_string())
591}
592
593fn rotated_path(path: &str, suffix: u64) -> PathBuf {
594 let p = Path::new(path);
595 let parent = p.parent().unwrap_or_else(|| Path::new(""));
596 let stem = p
597 .file_stem()
598 .and_then(|s| s.to_str())
599 .unwrap_or("timeline");
600 let ext = p.extension().and_then(|e| e.to_str()).unwrap_or("json");
601 parent.join(format!("{}_{}.{}", stem, suffix, ext))
602}
603
604fn max_events_limit() -> Option<usize> {
605 std::env::var("RANVIER_TIMELINE_MAX_EVENTS")
606 .ok()
607 .and_then(|v| v.parse::<usize>().ok())
608 .filter(|v| *v > 0)
609}
610
611fn rotate_keep_limit() -> Option<usize> {
612 std::env::var("RANVIER_TIMELINE_ROTATE_KEEP")
613 .ok()
614 .and_then(|v| v.parse::<usize>().ok())
615 .filter(|v| *v > 0)
616}
617
618fn truncate_timeline_events(timeline: &mut Timeline, max_events: usize) {
619 let len = timeline.events.len();
620 if len > max_events {
621 let keep_from = len - max_events;
622 timeline.events = timeline.events.split_off(keep_from);
623 }
624}
625
626fn cleanup_rotated_files(base_path: &str, keep: usize) -> Result<(), String> {
627 let p = Path::new(base_path);
628 let parent = p.parent().unwrap_or_else(|| Path::new("."));
629 let stem = p
630 .file_stem()
631 .and_then(|s| s.to_str())
632 .unwrap_or("timeline");
633 let ext = p.extension().and_then(|e| e.to_str()).unwrap_or("json");
634 let prefix = format!("{}_", stem);
635 let suffix = format!(".{}", ext);
636
637 let mut files = fs::read_dir(parent)
638 .map_err(|e| e.to_string())?
639 .filter_map(|entry| entry.ok())
640 .filter(|entry| {
641 let name = entry.file_name();
642 let name = name.to_string_lossy();
643 name.starts_with(&prefix) && name.ends_with(&suffix)
644 })
645 .filter_map(|entry| {
646 let modified = entry
647 .metadata()
648 .ok()
649 .and_then(|m| m.modified().ok())
650 .unwrap_or(SystemTime::UNIX_EPOCH);
651 Some((entry.path(), modified))
652 })
653 .collect::<Vec<_>>();
654
655 files.sort_by(|a, b| b.1.cmp(&a.1));
656 for (path, _) in files.into_iter().skip(keep) {
657 let _ = fs::remove_file(path);
658 }
659 Ok(())
660}
661
662fn now_ms() -> u64 {
663 SystemTime::now()
664 .duration_since(UNIX_EPOCH)
665 .map(|d| d.as_millis() as u64)
666 .unwrap_or(0)
667}