use std::collections::{HashMap, HashSet, VecDeque};
use std::fmt;
use std::sync::Arc;
use crate::{ActorId, Event, EventId, Label, Topic};
use super::{ActorTrace, EventEntry, EventMatcher, EventRecords, EventTrace};
#[derive(Debug)]
pub struct EventChain<E: Event, T: Topic<E>> {
root_id: EventId,
records: EventRecords<E, T>,
chain_ids: HashSet<EventId>,
children_map: HashMap<EventId, Vec<EventId>>,
}
impl<E: Event, T: Topic<E>> EventChain<E, T> {
pub(crate) fn new(records: EventRecords<E, T>, root_id: EventId) -> Self {
let mut chain_ids = HashSet::new();
let mut children_map: HashMap<EventId, Vec<EventId>> = HashMap::new();
let mut parent_map: HashMap<EventId, Option<EventId>> = HashMap::new();
for entry in records.iter() {
let id = entry.id();
let parent = entry.meta().parent_id();
parent_map.entry(id).or_insert(parent);
}
let mut queue = vec![root_id];
chain_ids.insert(root_id);
while let Some(current_id) = queue.pop() {
for (id, parent) in &parent_map {
if *parent == Some(current_id) && !chain_ids.contains(id) {
chain_ids.insert(*id);
queue.push(*id);
children_map.entry(current_id).or_default().push(*id);
}
}
}
let ts_lookup: HashMap<EventId, u64> = parent_map
.keys()
.filter_map(|&id| {
records
.iter()
.find(|e| e.id() == id)
.map(|e| (id, e.meta().timestamp()))
})
.collect();
for children in children_map.values_mut() {
children.sort_by_key(|id| ts_lookup.get(id).copied().unwrap_or(0));
}
Self {
root_id,
records,
chain_ids,
children_map,
}
}
pub fn actors(&self) -> ActorTrace<'_, E, T> {
ActorTrace { chain: self }
}
pub fn events(&self) -> EventTrace<'_, E, T> {
EventTrace { chain: self }
}
pub fn diverges_after(&self, matcher: impl Into<EventMatcher<E, T>>) -> bool
where
E: Label,
{
let matcher = matcher.into();
for entry in self.chain_entries() {
if matcher.matches(entry) {
let id = entry.id();
if let Some(children) = self.children_map.get(&id) {
return children.len() > 1;
}
}
}
false
}
pub fn branches_after(&self, matcher: impl Into<EventMatcher<E, T>>) -> usize
where
E: Label,
{
let matcher = matcher.into();
for entry in self.chain_entries() {
if matcher.matches(entry) {
let id = entry.id();
return self.children_map.get(&id).map(|c| c.len()).unwrap_or(0);
}
}
0
}
pub fn path_to(&self, actor: &ActorId) -> EventChain<E, T> {
let target_ids: HashSet<EventId> = self
.chain_entries()
.filter(|e| e.receiver() == actor)
.map(|e| e.id())
.collect();
if target_ids.is_empty() {
return EventChain {
root_id: self.root_id,
records: Arc::new(vec![]),
chain_ids: HashSet::new(),
children_map: HashMap::new(),
};
}
let mut path_ids = HashSet::new();
let mut to_process: Vec<EventId> = target_ids.into_iter().collect();
let mut parent_map: HashMap<EventId, EventId> = HashMap::new();
for (parent, children) in &self.children_map {
for child in children {
parent_map.insert(*child, *parent);
}
}
while let Some(id) = to_process.pop() {
if path_ids.insert(id) {
if let Some(parent) = parent_map.get(&id) {
to_process.push(*parent);
}
}
}
let path_records: Vec<_> = self
.records
.iter()
.filter(|e| path_ids.contains(&e.id()))
.cloned()
.collect();
let path_children: HashMap<_, _> = self
.children_map
.iter()
.filter(|(k, _)| path_ids.contains(k))
.map(|(k, v)| {
let filtered: Vec<_> = v
.iter()
.filter(|id| path_ids.contains(id))
.copied()
.collect();
(*k, filtered)
})
.filter(|(_, v)| !v.is_empty())
.collect();
EventChain {
root_id: self.root_id,
records: Arc::new(path_records),
chain_ids: path_ids,
children_map: path_children,
}
}
pub(super) fn root_id(&self) -> EventId {
self.root_id
}
pub(super) fn root_sender(&self) -> Option<&ActorId> {
self.records
.iter()
.find(|e| e.id() == self.root_id)
.map(|e| e.meta().actor_id())
}
pub(super) fn sender_of(&self, event_id: EventId) -> Option<&ActorId> {
self.records
.iter()
.find(|e| e.id() == event_id)
.map(|e| e.meta().actor_id())
}
pub(super) fn children_of(&self, event_id: EventId) -> Vec<EventId> {
self.children_map
.get(&event_id)
.cloned()
.unwrap_or_default()
}
pub(super) fn event_paths(&self) -> Vec<Vec<&EventEntry<E, T>>> {
let mut paths = Vec::new();
if let Some(root_entry) = self.records.iter().find(|e| e.id() == self.root_id) {
self.build_event_paths_dfs(self.root_id, vec![root_entry], &mut paths);
}
paths
}
fn build_event_paths_dfs<'a>(
&'a self,
event_id: EventId,
current_path: Vec<&'a EventEntry<E, T>>,
paths: &mut Vec<Vec<&'a EventEntry<E, T>>>,
) {
let children = self.children_of(event_id);
if children.is_empty() {
paths.push(current_path);
} else {
for child_id in &children {
if let Some(child_entry) = self.records.iter().find(|e| e.id() == *child_id) {
let mut path = current_path.clone();
path.push(child_entry);
self.build_event_paths_dfs(*child_id, path, paths);
}
}
}
}
pub(super) fn chain_entries(&self) -> impl Iterator<Item = &EventEntry<E, T>> {
self.records
.iter()
.filter(|e| self.chain_ids.contains(&e.id()))
}
pub(super) fn ordered_entries(&self) -> Vec<&EventEntry<E, T>> {
let mut result = Vec::new();
let mut queue = VecDeque::new();
queue.push_back(self.root_id);
let mut visited = HashSet::new();
let entries_by_id: HashMap<EventId, Vec<&EventEntry<E, T>>> = self
.records
.iter()
.filter(|e| self.chain_ids.contains(&e.id()))
.fold(HashMap::new(), |mut acc, entry| {
acc.entry(entry.id()).or_default().push(entry);
acc
});
while let Some(id) = queue.pop_front() {
if visited.insert(id) {
if let Some(entries) = entries_by_id.get(&id) {
result.extend(entries.iter().copied());
}
if let Some(children) = self.children_map.get(&id) {
queue.extend(children.iter().copied());
}
}
}
result
}
}
impl<E: Event + Label, T: Topic<E>> EventChain<E, T> {
pub fn to_string_tree(&self) -> String {
let mut output = String::new();
output.push_str(&format!("EventChain (root: {})\n", self.root_id));
let has_root = self.records.iter().any(|e| e.id() == self.root_id);
if !has_root {
output.push_str(" (empty)\n");
return output;
}
self.format_tree_node(&mut output, self.root_id, "", true);
output
}
fn format_tree_node(&self, output: &mut String, id: EventId, prefix: &str, is_last: bool) {
let entries: Vec<_> = self.records.iter().filter(|e| e.id() == id).collect();
if let Some(first) = entries.first() {
let connector = if prefix.is_empty() {
""
} else if is_last {
"└─ "
} else {
"├─ "
};
let label = first.payload().label();
let sender = first.sender();
let mut seen = HashSet::new();
let mut receivers: Vec<&str> = entries
.iter()
.map(|e| e.receiver().as_str())
.filter(|name| seen.insert(*name))
.collect();
receivers.sort();
let receivers_str = receivers.join(", ");
output.push_str(&format!(
"{}{}{} [{} -> {}]\n",
prefix, connector, label, sender, receivers_str
));
if let Some(children) = self.children_map.get(&id) {
let child_prefix = if prefix.is_empty() {
"".to_string()
} else if is_last {
format!("{} ", prefix)
} else {
format!("{}│ ", prefix)
};
for (i, child_id) in children.iter().enumerate() {
let is_last_child = i == children.len() - 1;
self.format_tree_node(output, *child_id, &child_prefix, is_last_child);
}
}
}
}
pub fn to_mermaid(&self) -> String {
let mut output = String::new();
output.push_str("sequenceDiagram\n");
if self.chain_ids.is_empty() {
return output;
}
let ordered = self.ordered_entries();
for entry in ordered {
let sender = entry.sender().as_str();
let receiver = entry.receiver().as_str();
let label = entry.payload().label();
let sender_safe = sanitize_mermaid_id(sender);
let receiver_safe = sanitize_mermaid_id(receiver);
output.push_str(&format!(
" {}->>{}:{}\n",
sender_safe, receiver_safe, label
));
}
output
}
}
fn sanitize_mermaid_id(s: &str) -> String {
s.chars()
.map(|c| {
if c.is_alphanumeric() || c == '_' {
c
} else {
'_'
}
})
.collect()
}
impl<E: Event + Label, T: Topic<E>> fmt::Display for EventChain<E, T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.to_string_tree())
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{DefaultTopic, Envelope, Label};
use std::borrow::Cow;
use std::sync::Arc;
#[derive(Clone, Debug)]
enum TestEvent {
Start,
Process,
Complete,
Branch,
}
impl Event for TestEvent {}
impl Label for TestEvent {
fn label(&self) -> Cow<'static, str> {
Cow::Borrowed(match self {
TestEvent::Start => "Start",
TestEvent::Process => "Process",
TestEvent::Complete => "Complete",
TestEvent::Branch => "Branch",
})
}
}
fn topic() -> Arc<DefaultTopic> {
Arc::new(DefaultTopic)
}
fn actor(name: &str) -> ActorId {
ActorId::new(name)
}
fn build_linear_chain() -> (EventRecords<TestEvent, DefaultTopic>, EventId) {
let alice = actor("alice");
let bob = actor("bob");
let charlie = actor("charlie");
let t = topic();
let start = Arc::new(Envelope::new(TestEvent::Start, alice.clone()));
let start_id = start.id();
let start_entry = EventEntry::new(start, t.clone(), bob.clone());
let process =
Arc::new(Envelope::new(TestEvent::Process, bob.clone()).with_parent_id(start_id));
let process_id = process.id();
let process_entry = EventEntry::new(process, t.clone(), charlie.clone());
let complete =
Arc::new(Envelope::new(TestEvent::Complete, charlie).with_parent_id(process_id));
let complete_entry = EventEntry::new(complete, t, alice);
(
Arc::new(vec![start_entry, process_entry, complete_entry]),
start_id,
)
}
fn build_branching_chain() -> (EventRecords<TestEvent, DefaultTopic>, EventId) {
let alice = actor("alice");
let bob = actor("bob");
let charlie = actor("charlie");
let t = topic();
let start = Arc::new(Envelope::new(TestEvent::Start, alice.clone()));
let start_id = start.id();
let start_entry = EventEntry::new(start, t.clone(), bob.clone());
let process =
Arc::new(Envelope::new(TestEvent::Process, bob.clone()).with_parent_id(start_id));
let process_entry = EventEntry::new(process, t.clone(), charlie.clone());
let branch = Arc::new(Envelope::new(TestEvent::Branch, bob).with_parent_id(start_id));
let branch_entry = EventEntry::new(branch, t, alice);
(
Arc::new(vec![start_entry, process_entry, branch_entry]),
start_id,
)
}
#[test]
fn actor_trace_all_includes_sender_and_receivers() {
let (records, root_id) = build_linear_chain();
let chain = EventChain::new(records, root_id);
let alice = actor("alice");
let bob = actor("bob");
let charlie = actor("charlie");
let actors = chain.actors();
let all = actors.all();
assert_eq!(all.len(), 3);
assert!(all.iter().any(|a| **a == alice));
assert!(all.iter().any(|a| **a == bob));
assert!(all.iter().any(|a| **a == charlie));
}
#[test]
fn actor_trace_paths_returns_all_paths() {
let (records, root_id) = build_linear_chain();
let chain = EventChain::new(records, root_id);
let paths = chain.actors().paths();
assert_eq!(paths.len(), 1);
assert_eq!(paths[0].len(), 4);
assert_eq!(paths[0][0].as_str(), "alice");
assert_eq!(paths[0][1].as_str(), "bob");
assert_eq!(paths[0][2].as_str(), "charlie");
assert_eq!(paths[0][3].as_str(), "alice"); }
#[test]
fn actor_trace_path_count_for_linear() {
let (records, root_id) = build_linear_chain();
let chain = EventChain::new(records, root_id);
assert_eq!(chain.actors().path_count(), 1);
}
#[test]
fn actor_trace_path_count_for_branching() {
let (records, root_id) = build_branching_chain();
let chain = EventChain::new(records, root_id);
assert_eq!(chain.actors().path_count(), 2);
}
#[test]
fn actor_trace_visited_returns_true_when_all_present() {
let (records, root_id) = build_linear_chain();
let chain = EventChain::new(records, root_id);
let alice = actor("alice");
let bob = actor("bob");
let charlie = actor("charlie");
assert!(chain.actors().visited(&[&alice, &bob, &charlie]));
}
#[test]
fn actor_trace_visited_returns_false_when_missing() {
let (records, root_id) = build_linear_chain();
let chain = EventChain::new(records, root_id);
let bob = actor("bob");
let dave = actor("dave");
assert!(!chain.actors().visited(&[&bob, &dave]));
}
#[test]
fn actor_trace_exact_returns_true_for_valid_path() {
let (records, root_id) = build_linear_chain();
let chain = EventChain::new(records, root_id);
let alice = actor("alice");
let bob = actor("bob");
let charlie = actor("charlie");
assert!(chain.actors().exact(&[&alice, &bob, &charlie, &alice]));
}
#[test]
fn actor_trace_passes_through_allows_gaps() {
let (records, root_id) = build_linear_chain();
let chain = EventChain::new(records, root_id);
let alice = actor("alice");
let charlie = actor("charlie");
assert!(chain.actors().passes_through(&[&alice, &charlie]));
}
#[test]
fn actor_trace_segment_requires_contiguous() {
let (records, root_id) = build_linear_chain();
let chain = EventChain::new(records, root_id);
let alice = actor("alice");
let bob = actor("bob");
let charlie = actor("charlie");
assert!(chain.actors().segment(&[&bob, &charlie]));
assert!(!chain.actors().segment(&[&alice, &charlie]));
}
#[test]
fn actor_trace_exact_returns_false_for_nonexistent_path() {
let (records, root_id) = build_linear_chain();
let chain = EventChain::new(records, root_id);
let dave = actor("dave");
let bob = actor("bob");
assert!(!chain.actors().exact(&[&dave, &bob]));
}
#[test]
fn actor_trace_branching_paths_are_distinct() {
let (records, root_id) = build_branching_chain();
let chain = EventChain::new(records, root_id);
let alice = actor("alice");
let bob = actor("bob");
let charlie = actor("charlie");
assert!(chain.actors().exact(&[&alice, &bob, &charlie])); assert!(chain.actors().exact(&[&alice, &bob, &alice])); }
#[test]
fn event_trace_contains_finds_event_by_label() {
let (records, root_id) = build_linear_chain();
let chain = EventChain::new(records, root_id);
assert!(chain.events().contains("Start"));
assert!(chain.events().contains("Process"));
assert!(chain.events().contains("Complete"));
assert!(!chain.events().contains("Branch"));
}
#[test]
fn event_trace_passes_through_matches_order_with_gaps() {
let (records, root_id) = build_linear_chain();
let chain = EventChain::new(records, root_id);
assert!(chain.events().passes_through(&["Start", "Complete"]));
assert!(
chain
.events()
.passes_through(&["Start", "Process", "Complete"])
);
}
#[test]
fn event_trace_passes_through_returns_false_for_wrong_order() {
let (records, root_id) = build_linear_chain();
let chain = EventChain::new(records, root_id);
assert!(!chain.events().passes_through(&["Complete", "Start"]));
}
#[test]
fn event_trace_segment_requires_consecutive() {
let (records, root_id) = build_linear_chain();
let chain = EventChain::new(records, root_id);
assert!(chain.events().segment(&["Start", "Process", "Complete"]));
assert!(chain.events().segment(&["Start", "Process"]));
assert!(chain.events().segment(&["Process", "Complete"]));
let empty: &[&str] = &[];
assert!(chain.events().segment(empty));
}
#[test]
fn event_trace_exact_matches_full_sequence() {
let (records, root_id) = build_linear_chain();
let chain = EventChain::new(records, root_id);
assert!(chain.events().exact(&["Start", "Process", "Complete"]));
assert!(!chain.events().exact(&["Start", "Process"]));
assert!(!chain.events().exact(&["Process", "Start", "Complete"]));
}
#[test]
fn event_trace_exact_matches_individual_branches() {
let (records, root_id) = build_branching_chain();
let chain = EventChain::new(records, root_id);
assert!(chain.events().exact(&["Start", "Process"]));
assert!(chain.events().exact(&["Start", "Branch"]));
assert!(!chain.events().exact(&["Start", "Process", "Branch"]));
}
#[test]
fn event_trace_segment_works_per_branch() {
let (records, root_id) = build_branching_chain();
let chain = EventChain::new(records, root_id);
assert!(chain.events().segment(&["Start", "Process"]));
assert!(chain.events().segment(&["Start", "Branch"]));
assert!(!chain.events().segment(&["Process", "Branch"]));
}
#[test]
fn event_trace_passes_through_works_per_branch() {
let (records, root_id) = build_branching_chain();
let chain = EventChain::new(records, root_id);
assert!(chain.events().passes_through(&["Start", "Process"]));
assert!(chain.events().passes_through(&["Start", "Branch"]));
assert!(!chain.events().passes_through(&["Process", "Branch"]));
}
#[test]
fn event_trace_path_count() {
let (records, root_id) = build_linear_chain();
let chain = EventChain::new(records, root_id);
assert_eq!(chain.events().path_count(), 1);
let (records, root_id) = build_branching_chain();
let chain = EventChain::new(records, root_id);
assert_eq!(chain.events().path_count(), 2);
}
#[test]
fn diverges_after_detects_branching() {
let (records, root_id) = build_branching_chain();
let chain = EventChain::new(records, root_id);
assert!(chain.diverges_after("Start"));
assert!(!chain.diverges_after("Process"));
assert!(!chain.diverges_after("Branch"));
}
#[test]
fn diverges_after_returns_false_for_linear() {
let (records, root_id) = build_linear_chain();
let chain = EventChain::new(records, root_id);
assert!(!chain.diverges_after("Start"));
assert!(!chain.diverges_after("Process"));
}
#[test]
fn branches_after_counts_children() {
let (records, root_id) = build_branching_chain();
let chain = EventChain::new(records, root_id);
assert_eq!(chain.branches_after("Start"), 2);
assert_eq!(chain.branches_after("Process"), 0);
assert_eq!(chain.branches_after("NonExistent"), 0);
}
#[test]
fn path_to_extracts_subchain() {
let (records, root_id) = build_linear_chain();
let chain = EventChain::new(records, root_id);
let charlie = actor("charlie");
let path = chain.path_to(&charlie);
assert!(path.events().contains("Start"));
assert!(path.events().contains("Process"));
}
#[test]
fn path_to_returns_empty_for_unknown_actor() {
let (records, root_id) = build_linear_chain();
let chain = EventChain::new(records, root_id);
let unknown = actor("unknown");
let path = chain.path_to(&unknown);
assert!(!path.events().contains("Start"));
assert!(!path.events().contains("Process"));
}
#[test]
fn empty_chain_handles_gracefully() {
let chain: EventChain<TestEvent, DefaultTopic> =
EventChain::new(Arc::new(vec![]), EventId::from(0));
assert!(!chain.diverges_after("Anything"));
assert_eq!(chain.branches_after("Anything"), 0);
assert!(chain.actors().visited(&[]));
assert!(chain.events().passes_through(&[] as &[&str]));
}
#[test]
fn empty_matchers_returns_true() {
let (records, root_id) = build_linear_chain();
let chain = EventChain::new(records, root_id);
let empty_actors: &[&ActorId] = &[];
let empty_events: &[&str] = &[];
assert!(chain.actors().exact(empty_actors));
assert!(chain.events().passes_through(empty_events));
}
#[test]
fn to_string_tree_shows_chain_structure() {
let (records, root_id) = build_linear_chain();
let chain = EventChain::new(records, root_id);
let tree = chain.to_string_tree();
assert!(tree.contains("EventChain"));
assert!(tree.contains("Start"));
assert!(tree.contains("Process"));
assert!(tree.contains("Complete"));
assert!(tree.contains("alice"));
assert!(tree.contains("bob"));
assert!(tree.contains("charlie"));
}
#[test]
fn to_string_tree_shows_fan_out_receivers() {
let alice = actor("alice");
let bob = actor("bob");
let charlie = actor("charlie");
let t = topic();
let start = Arc::new(Envelope::new(TestEvent::Start, alice.clone()));
let start_id = start.id();
let entry1 = EventEntry::new(start.clone(), t.clone(), bob);
let entry2 = EventEntry::new(start, t, charlie);
let records = Arc::new(vec![entry1, entry2]);
let chain = EventChain::new(records, start_id);
let tree = chain.to_string_tree();
assert!(tree.contains("bob, charlie"));
}
#[test]
fn to_string_tree_handles_empty_chain() {
let chain: EventChain<TestEvent, DefaultTopic> =
EventChain::new(Arc::new(vec![]), EventId::from(0));
let tree = chain.to_string_tree();
assert!(tree.contains("(empty)"));
}
#[test]
fn to_mermaid_generates_sequence_diagram() {
let (records, root_id) = build_linear_chain();
let chain = EventChain::new(records, root_id);
let mermaid = chain.to_mermaid();
assert!(mermaid.starts_with("sequenceDiagram\n"));
assert!(mermaid.contains("alice->>bob:Start"));
assert!(mermaid.contains("bob->>charlie:Process"));
assert!(mermaid.contains("charlie->>alice:Complete"));
}
#[test]
fn to_mermaid_handles_branching() {
let (records, root_id) = build_branching_chain();
let chain = EventChain::new(records, root_id);
let mermaid = chain.to_mermaid();
assert!(mermaid.contains("alice->>bob:Start"));
assert!(mermaid.contains("bob->>charlie:Process"));
assert!(mermaid.contains("bob->>alice:Branch"));
}
#[test]
fn to_mermaid_shows_fan_out() {
let alice = actor("alice");
let bob = actor("bob");
let charlie = actor("charlie");
let t = topic();
let start = Arc::new(Envelope::new(TestEvent::Start, alice.clone()));
let start_id = start.id();
let entry1 = EventEntry::new(start.clone(), t.clone(), bob.clone());
let entry2 = EventEntry::new(start, t.clone(), charlie.clone());
let process =
Arc::new(Envelope::new(TestEvent::Process, bob.clone()).with_parent_id(start_id));
let process_entry = EventEntry::new(process, t, charlie);
let records = Arc::new(vec![entry1, entry2, process_entry]);
let chain = EventChain::new(records, start_id);
let mermaid = chain.to_mermaid();
assert!(mermaid.contains("alice->>bob:Start"));
assert!(mermaid.contains("alice->>charlie:Start"));
assert!(mermaid.contains("bob->>charlie:Process"));
}
#[test]
fn to_mermaid_handles_empty_chain() {
let chain: EventChain<TestEvent, DefaultTopic> =
EventChain::new(Arc::new(vec![]), EventId::from(0));
let mermaid = chain.to_mermaid();
assert_eq!(mermaid, "sequenceDiagram\n");
}
}