1use std::any::{Any, TypeId, type_name};
2use std::collections::{HashMap, HashSet};
3
4use crate::stats::calculators::EmittedEvent;
5use crate::*;
6
7#[derive(Clone, Copy)]
8pub struct AnalysisDependency {
9 state_type_id: TypeId,
10 state_type_name: &'static str,
11 source: AnalysisDependencySource,
12}
13
14#[derive(Clone, Copy)]
15enum AnalysisDependencySource {
16 DefaultFactory(fn() -> Box<dyn AnalysisNodeDyn>),
17 External,
18}
19
20impl AnalysisDependency {
21 pub fn required<T: 'static>() -> Self {
22 Self {
23 state_type_id: TypeId::of::<T>(),
24 state_type_name: type_name::<T>(),
25 source: AnalysisDependencySource::External,
26 }
27 }
28
29 pub fn with_default<T: 'static>(default_factory: fn() -> Box<dyn AnalysisNodeDyn>) -> Self {
30 Self {
31 state_type_id: TypeId::of::<T>(),
32 state_type_name: type_name::<T>(),
33 source: AnalysisDependencySource::DefaultFactory(default_factory),
34 }
35 }
36
37 pub fn state_type_id(&self) -> TypeId {
38 self.state_type_id
39 }
40
41 pub fn state_type_name(&self) -> &'static str {
42 self.state_type_name
43 }
44
45 fn default_factory(&self) -> fn() -> Box<dyn AnalysisNodeDyn> {
46 match self.source {
47 AnalysisDependencySource::DefaultFactory(default_factory) => default_factory,
48 AnalysisDependencySource::External => panic!(
49 "analysis dependency for {} has no default factory",
50 self.state_type_name
51 ),
52 }
53 }
54
55 fn is_external(&self) -> bool {
56 matches!(self.source, AnalysisDependencySource::External)
57 }
58}
59
60pub struct AnalysisStateContext<'a> {
61 states: HashMap<TypeId, &'a dyn Any>,
62}
63
64pub struct AnalysisStateRef<'a> {
65 type_id: TypeId,
66 type_name: &'static str,
67 state: &'a dyn Any,
68}
69
70impl<'a> AnalysisStateRef<'a> {
71 pub fn of<T: 'static>(state: &'a T) -> Self {
72 Self {
73 type_id: TypeId::of::<T>(),
74 type_name: type_name::<T>(),
75 state,
76 }
77 }
78
79 fn type_id(&self) -> TypeId {
80 self.type_id
81 }
82
83 fn type_name(&self) -> &'static str {
84 self.type_name
85 }
86
87 fn state(&self) -> &'a dyn Any {
88 self.state
89 }
90}
91
92impl<'a> AnalysisStateContext<'a> {
93 fn from_parts(
94 root_states: &'a HashMap<TypeId, Box<dyn Any>>,
95 input_states: &'a [AnalysisStateRef<'a>],
96 before: &'a [Box<dyn AnalysisNodeDyn>],
97 ) -> Self {
98 let mut states =
99 HashMap::with_capacity(root_states.len() + input_states.len() + before.len());
100 for (type_id, state) in root_states {
101 states.insert(*type_id, state.as_ref());
102 }
103 for input_state in input_states {
104 states.insert(input_state.type_id(), input_state.state());
105 }
106 for node in before {
107 states.insert(node.provides_state_type_id(), node.state_any());
108 }
109 Self { states }
110 }
111
112 pub fn get<T: 'static>(&self) -> SubtrActorResult<&'a T> {
113 self.maybe_get::<T>().ok_or_else(|| {
114 analysis_node_graph_error(format!(
115 "Missing state {} in analysis context",
116 type_name::<T>()
117 ))
118 })
119 }
120
121 pub fn maybe_get<T: 'static>(&self) -> Option<&'a T> {
122 self.states
123 .get(&TypeId::of::<T>())
124 .and_then(|state| state.downcast_ref::<T>())
125 }
126}
127
128pub trait AnalysisNode: 'static {
140 type State: 'static;
142
143 fn name(&self) -> &'static str;
146
147 fn emitted_events(&self) -> &'static [EmittedEvent] {
154 &[]
155 }
156
157 fn on_replay_meta(&mut self, _meta: &ReplayMeta) -> SubtrActorResult<()> {
158 Ok(())
159 }
160
161 fn dependencies(&self) -> Vec<AnalysisDependency> {
162 Vec::new()
163 }
164
165 fn evaluate(&mut self, ctx: &AnalysisStateContext<'_>) -> SubtrActorResult<()>;
166
167 fn finish(&mut self, _ctx: &AnalysisStateContext<'_>) -> SubtrActorResult<()> {
168 Ok(())
169 }
170
171 fn state(&self) -> &Self::State;
172}
173
174pub trait AnalysisNodeDyn: 'static {
175 fn name(&self) -> &'static str;
176
177 fn emitted_events(&self) -> &'static [EmittedEvent];
178
179 fn provides_state_type_id(&self) -> TypeId;
180
181 fn provides_state_type_name(&self) -> &'static str;
182
183 fn on_replay_meta(&mut self, meta: &ReplayMeta) -> SubtrActorResult<()>;
184
185 fn dependencies(&self) -> Vec<AnalysisDependency>;
186
187 fn evaluate(&mut self, ctx: &AnalysisStateContext<'_>) -> SubtrActorResult<()>;
188
189 fn finish(&mut self, ctx: &AnalysisStateContext<'_>) -> SubtrActorResult<()>;
190
191 fn state_any(&self) -> &dyn Any;
192}
193
194impl<N> AnalysisNodeDyn for N
195where
196 N: AnalysisNode,
197{
198 fn name(&self) -> &'static str {
199 AnalysisNode::name(self)
200 }
201
202 fn emitted_events(&self) -> &'static [EmittedEvent] {
203 AnalysisNode::emitted_events(self)
204 }
205
206 fn provides_state_type_id(&self) -> TypeId {
207 TypeId::of::<N::State>()
208 }
209
210 fn provides_state_type_name(&self) -> &'static str {
211 type_name::<N::State>()
212 }
213
214 fn on_replay_meta(&mut self, meta: &ReplayMeta) -> SubtrActorResult<()> {
215 AnalysisNode::on_replay_meta(self, meta)
216 }
217
218 fn dependencies(&self) -> Vec<AnalysisDependency> {
219 AnalysisNode::dependencies(self)
220 }
221
222 fn evaluate(&mut self, ctx: &AnalysisStateContext<'_>) -> SubtrActorResult<()> {
223 AnalysisNode::evaluate(self, ctx)
224 }
225
226 fn finish(&mut self, ctx: &AnalysisStateContext<'_>) -> SubtrActorResult<()> {
227 AnalysisNode::finish(self, ctx)
228 }
229
230 fn state_any(&self) -> &dyn Any {
231 self.state()
232 }
233}
234
235#[derive(Default)]
244pub struct AnalysisGraph {
245 nodes: Vec<Box<dyn AnalysisNodeDyn>>,
246 evaluation_order: Vec<usize>,
247 declared_root_states: HashMap<TypeId, &'static str>,
248 declared_input_states: HashMap<TypeId, &'static str>,
249 root_states: HashMap<TypeId, Box<dyn Any>>,
250 resolved: bool,
251}
252
253impl AnalysisGraph {
254 pub fn new() -> Self {
255 Self::default()
256 }
257
258 pub fn with_root_state_type<T: 'static>(mut self) -> Self {
259 self.register_root_state::<T>();
260 self
261 }
262
263 pub fn register_root_state<T: 'static>(&mut self) {
264 self.declared_root_states
265 .insert(TypeId::of::<T>(), type_name::<T>());
266 }
267
268 pub fn with_input_state_type<T: 'static>(mut self) -> Self {
269 self.register_input_state::<T>();
270 self
271 }
272
273 pub fn register_input_state<T: 'static>(&mut self) {
274 self.declared_input_states
275 .insert(TypeId::of::<T>(), type_name::<T>());
276 }
277
278 pub fn set_root_state<T: 'static>(&mut self, value: T) {
279 self.register_root_state::<T>();
280 self.root_states.insert(TypeId::of::<T>(), Box::new(value));
281 }
282
283 pub fn with_node<N>(mut self, node: N) -> Self
284 where
285 N: AnalysisNode,
286 {
287 self.push_node(node);
288 self
289 }
290
291 pub fn with_boxed_node(mut self, node: Box<dyn AnalysisNodeDyn>) -> Self {
292 self.push_boxed_node(node);
293 self
294 }
295
296 pub fn push_node<N>(&mut self, node: N)
297 where
298 N: AnalysisNode,
299 {
300 self.push_boxed_node(Box::new(node));
301 }
302
303 pub fn push_boxed_node(&mut self, node: Box<dyn AnalysisNodeDyn>) {
304 self.nodes.push(node);
305 self.resolved = false;
306 }
307
308 pub fn ensure_dependency(&mut self, dependency: AnalysisDependency) -> SubtrActorResult<()> {
309 let providers = self.provider_index_by_type()?;
310 if providers.contains_key(&dependency.state_type_id())
311 || self
312 .declared_root_states
313 .contains_key(&dependency.state_type_id())
314 || self
315 .declared_input_states
316 .contains_key(&dependency.state_type_id())
317 {
318 return Ok(());
319 }
320 if dependency.is_external() {
321 return Err(analysis_node_graph_error(format!(
322 "Required state {} has no provider",
323 dependency.state_type_name(),
324 )));
325 }
326
327 self.push_boxed_node((dependency.default_factory())());
328 Ok(())
329 }
330
331 pub fn ensure_dependencies<I>(&mut self, dependencies: I) -> SubtrActorResult<()>
332 where
333 I: IntoIterator<Item = AnalysisDependency>,
334 {
335 for dependency in dependencies {
336 self.ensure_dependency(dependency)?;
337 }
338 Ok(())
339 }
340
341 pub fn resolve(&mut self) -> SubtrActorResult<()> {
342 if self.resolved {
343 return Ok(());
344 }
345
346 loop {
347 let providers = self.provider_index_by_type()?;
348 let mut additions = Vec::new();
349 let mut queued_types = HashSet::new();
350
351 for node in &self.nodes {
352 for dependency in node.dependencies() {
353 if providers.contains_key(&dependency.state_type_id())
354 || self
355 .declared_root_states
356 .contains_key(&dependency.state_type_id())
357 || self
358 .declared_input_states
359 .contains_key(&dependency.state_type_id())
360 {
361 continue;
362 }
363 if dependency.is_external() {
364 return Err(analysis_node_graph_error(format!(
365 "Node '{}' requires state {} with no provider",
366 node.name(),
367 dependency.state_type_name(),
368 )));
369 }
370 let default_factory = dependency.default_factory();
371 if queued_types.insert(dependency.state_type_id()) {
372 additions.push(default_factory());
373 }
374 }
375 }
376
377 if additions.is_empty() {
378 break;
379 }
380
381 self.nodes.extend(additions);
382 }
383
384 let providers = self.provider_index_by_type()?;
385 let mut visiting = HashSet::new();
386 let mut visited = HashSet::new();
387 let mut order = Vec::with_capacity(self.nodes.len());
388
389 for index in 0..self.nodes.len() {
390 self.visit_node(
391 index,
392 &providers,
393 &mut visiting,
394 &mut visited,
395 &mut order,
396 &mut Vec::new(),
397 )?;
398 }
399
400 let mut ordered_nodes = Vec::with_capacity(self.nodes.len());
401 let mut original_nodes: Vec<Option<Box<dyn AnalysisNodeDyn>>> =
402 std::mem::take(&mut self.nodes)
403 .into_iter()
404 .map(Some)
405 .collect();
406 for index in order {
407 ordered_nodes.push(
408 original_nodes[index]
409 .take()
410 .expect("topological order should only reference each node once"),
411 );
412 }
413
414 self.nodes = ordered_nodes;
415 self.evaluation_order = (0..self.nodes.len()).collect();
416 self.resolved = true;
417 Ok(())
418 }
419
420 pub fn on_replay_meta(&mut self, meta: &ReplayMeta) -> SubtrActorResult<()> {
421 self.resolve()?;
422 for node in &mut self.nodes {
423 node.on_replay_meta(meta)?;
424 }
425 Ok(())
426 }
427
428 pub fn evaluate(&mut self) -> SubtrActorResult<()> {
429 self.evaluate_with_states(&[])
430 }
431
432 pub fn evaluate_with_state<T: 'static>(&mut self, value: &T) -> SubtrActorResult<()> {
433 self.evaluate_with_states(&[AnalysisStateRef::of(value)])
434 }
435
436 pub fn evaluate_with_states<'a>(
437 &mut self,
438 input_states: &'a [AnalysisStateRef<'a>],
439 ) -> SubtrActorResult<()> {
440 self.resolve()?;
441
442 for (type_id, type_name) in &self.declared_root_states {
443 if !self.root_states.contains_key(type_id) {
444 return Err(analysis_node_graph_error(format!(
445 "Missing root state {type_name} for evaluation"
446 )));
447 }
448 }
449
450 let mut provided_input_types = HashMap::with_capacity(input_states.len());
451 for input_state in input_states {
452 if let Some(existing) =
453 provided_input_types.insert(input_state.type_id(), input_state.type_name())
454 {
455 return Err(analysis_node_graph_error(format!(
456 "Duplicate input states for {}: {} and {}",
457 input_state.type_name(),
458 existing,
459 input_state.type_name(),
460 )));
461 }
462 }
463 for (type_id, type_name) in self.required_input_states() {
464 if !provided_input_types.contains_key(&type_id) {
465 return Err(analysis_node_graph_error(format!(
466 "Missing input state {type_name} for evaluation"
467 )));
468 }
469 }
470
471 for node_index in self.evaluation_order.clone() {
472 let (before, current_and_after) = self.nodes.split_at_mut(node_index);
473 let (current, _) = current_and_after
474 .split_first_mut()
475 .expect("evaluation order should contain valid indexes");
476 let ctx = AnalysisStateContext::from_parts(&self.root_states, input_states, before);
477 current.evaluate(&ctx)?;
478 }
479
480 Ok(())
481 }
482
483 pub fn finish(&mut self) -> SubtrActorResult<()> {
484 self.resolve()?;
485 for node_index in self.evaluation_order.clone() {
486 let (before, current_and_after) = self.nodes.split_at_mut(node_index);
487 let (current, _) = current_and_after
488 .split_first_mut()
489 .expect("evaluation order should contain valid indexes");
490 let ctx = AnalysisStateContext::from_parts(&self.root_states, &[], before);
491 current.finish(&ctx)?;
492 }
493 Ok(())
494 }
495
496 pub fn state<T: 'static>(&self) -> Option<&T> {
497 let target = TypeId::of::<T>();
498 self.root_states
499 .get(&target)
500 .and_then(|state| state.downcast_ref::<T>())
501 .or_else(|| {
502 self.nodes
503 .iter()
504 .find(|node| node.provides_state_type_id() == target)
505 .and_then(|node| node.state_any().downcast_ref::<T>())
506 })
507 }
508
509 pub fn node_names(&self) -> impl Iterator<Item = &'static str> + '_ {
510 self.nodes.iter().map(|node| node.name())
511 }
512
513 pub fn emitted_events(&mut self) -> SubtrActorResult<Vec<EmittedEvent>> {
514 self.resolve()?;
515 Ok(self
516 .nodes
517 .iter()
518 .flat_map(|node| node.emitted_events().iter().copied())
519 .collect())
520 }
521
522 fn provider_index_by_type(&self) -> SubtrActorResult<HashMap<TypeId, usize>> {
523 let mut providers = HashMap::new();
524 for (index, node) in self.nodes.iter().enumerate() {
525 if self
526 .declared_root_states
527 .contains_key(&node.provides_state_type_id())
528 {
529 return SubtrActorError::new_result(SubtrActorErrorVariant::CallbackError(
530 format!(
531 "analysis node graph error: Duplicate providers for root state {}: root and '{}'",
532 node.provides_state_type_name(),
533 node.name(),
534 ),
535 ));
536 }
537 if self
538 .declared_input_states
539 .contains_key(&node.provides_state_type_id())
540 {
541 return SubtrActorError::new_result(SubtrActorErrorVariant::CallbackError(
542 format!(
543 "analysis node graph error: Duplicate providers for input state {}: input and '{}'",
544 node.provides_state_type_name(),
545 node.name(),
546 ),
547 ));
548 }
549 if let Some(existing) = providers.insert(node.provides_state_type_id(), index) {
550 return SubtrActorError::new_result(SubtrActorErrorVariant::CallbackError(
551 format!(
552 "analysis node graph error: Duplicate providers for state {}: '{}' and '{}'",
553 node.provides_state_type_name(),
554 self.nodes[existing].name(),
555 node.name(),
556 ),
557 ));
558 }
559 }
560 Ok(providers)
561 }
562
563 fn required_input_states(&self) -> HashMap<TypeId, &'static str> {
564 let mut required = HashMap::new();
565 for node in &self.nodes {
566 for dependency in node.dependencies() {
567 let type_id = dependency.state_type_id();
568 if self.declared_input_states.contains_key(&type_id)
569 && !self.root_states.contains_key(&type_id)
570 {
571 required.insert(type_id, dependency.state_type_name());
572 }
573 }
574 }
575 required
576 }
577
578 fn visit_node(
579 &self,
580 index: usize,
581 providers: &HashMap<TypeId, usize>,
582 visiting: &mut HashSet<usize>,
583 visited: &mut HashSet<usize>,
584 order: &mut Vec<usize>,
585 stack: &mut Vec<&'static str>,
586 ) -> SubtrActorResult<()> {
587 if visited.contains(&index) {
588 return Ok(());
589 }
590 if !visiting.insert(index) {
591 stack.push(self.nodes[index].name());
592 let cycle = stack.join(" -> ");
593 stack.pop();
594 return Err(analysis_node_graph_error(format!(
595 "Cycle detected in analysis node graph: {cycle}"
596 )));
597 }
598
599 stack.push(self.nodes[index].name());
600 for dependency in self.nodes[index].dependencies() {
601 if self
602 .declared_root_states
603 .contains_key(&dependency.state_type_id())
604 || self
605 .declared_input_states
606 .contains_key(&dependency.state_type_id())
607 {
608 continue;
609 }
610
611 let Some(dependency_index) = providers.get(&dependency.state_type_id()).copied() else {
612 stack.pop();
613 return Err(analysis_node_graph_error(format!(
614 "Node '{}' depends on missing state {}",
615 self.nodes[index].name(),
616 dependency.state_type_name(),
617 )));
618 };
619 self.visit_node(dependency_index, providers, visiting, visited, order, stack)?;
620 }
621 stack.pop();
622
623 visiting.remove(&index);
624 visited.insert(index);
625 order.push(index);
626 Ok(())
627 }
628}
629
630impl AnalysisGraph {
631 pub fn render_ascii_dag(&mut self) -> SubtrActorResult<String> {
632 self.resolve()?;
633
634 let providers = self.provider_index_by_type()?;
635 let mut external_labels = Vec::new();
636 let mut external_node_ids = HashMap::new();
637
638 for node in &self.nodes {
639 for dependency in node.dependencies() {
640 let dependency_type_id = dependency.state_type_id();
641 if providers.contains_key(&dependency_type_id) {
642 continue;
643 }
644
645 let label = if self.declared_root_states.contains_key(&dependency_type_id) {
646 format!("root:{}", short_type_name(dependency.state_type_name()))
647 } else if self.declared_input_states.contains_key(&dependency_type_id) {
648 format!("input:{}", short_type_name(dependency.state_type_name()))
649 } else {
650 return Err(analysis_node_graph_error(format!(
651 "Node '{}' depends on missing state {}",
652 node.name(),
653 dependency.state_type_name(),
654 )));
655 };
656 ensure_external_render_node(
657 &mut external_labels,
658 &mut external_node_ids,
659 dependency_type_id,
660 label,
661 );
662 }
663 }
664
665 if self.nodes.is_empty() && external_labels.is_empty() {
666 return Ok("AnalysisGraph\n\\- (empty)".to_owned());
667 }
668
669 let external_count = external_labels.len();
670 let mut lines = Vec::with_capacity(1 + external_count + self.nodes.len());
671 lines.push("AnalysisGraph".to_owned());
672
673 for (display_id, (_, label)) in external_labels.iter().enumerate() {
674 lines.push(format!("[{display_id}] {label}"));
675 }
676
677 for (index, node) in self.nodes.iter().enumerate() {
678 let display_id = external_count + index;
679 let mut dependency_refs = Vec::new();
680 for dependency in node.dependencies() {
681 let dependency_type_id = dependency.state_type_id();
682 let source_id = if let Some(provider_index) = providers.get(&dependency_type_id) {
683 external_count + *provider_index
684 } else if self.declared_root_states.contains_key(&dependency_type_id) {
685 *external_node_ids
686 .get(&dependency_type_id)
687 .expect("root node should have been prepared")
688 } else if self.declared_input_states.contains_key(&dependency_type_id) {
689 *external_node_ids
690 .get(&dependency_type_id)
691 .expect("input node should have been prepared")
692 } else {
693 return Err(analysis_node_graph_error(format!(
694 "Node '{}' depends on missing state {}",
695 node.name(),
696 dependency.state_type_name(),
697 )));
698 };
699 dependency_refs.push(format!("[{source_id}]"));
700 }
701
702 if dependency_refs.is_empty() {
703 lines.push(format!("[{display_id}] {}", node.name()));
704 } else {
705 lines.push(format!(
706 "[{display_id}] {} <- {}",
707 node.name(),
708 dependency_refs.join(", "),
709 ));
710 }
711 }
712
713 Ok(lines.join("\n"))
714 }
715}
716
717fn ensure_external_render_node(
718 labels: &mut Vec<(TypeId, Box<str>)>,
719 external_node_ids: &mut HashMap<TypeId, usize>,
720 dependency_type_id: TypeId,
721 label: String,
722) -> usize {
723 if let Some(node_id) = external_node_ids.get(&dependency_type_id) {
724 return *node_id;
725 }
726
727 let node_id = labels.len();
728 labels.push((dependency_type_id, label.into_boxed_str()));
729 external_node_ids.insert(dependency_type_id, node_id);
730 node_id
731}
732
733fn short_type_name(type_name: &str) -> String {
734 let mut shortened = String::with_capacity(type_name.len());
735 let mut token = String::new();
736
737 for character in type_name.chars() {
738 if character.is_alphanumeric() || matches!(character, '_' | ':') {
739 token.push(character);
740 continue;
741 }
742
743 if !token.is_empty() {
744 shortened.push_str(token.rsplit("::").next().unwrap_or(&token));
745 token.clear();
746 }
747 shortened.push(character);
748 }
749
750 if !token.is_empty() {
751 shortened.push_str(token.rsplit("::").next().unwrap_or(&token));
752 }
753
754 shortened
755}
756
757fn analysis_node_graph_error(message: String) -> SubtrActorError {
758 SubtrActorError::new(SubtrActorErrorVariant::CallbackError(format!(
759 "analysis node graph error: {message}"
760 )))
761}
762
763#[cfg(test)]
764#[path = "graph_tests.rs"]
765mod tests;