use std::collections::{HashMap, HashSet};
use crate::correlate::Trace;
use super::n_plus_one::parse_timestamp_ms;
use super::{Confidence, Finding, FindingType, Pattern, Severity, TraceIndices};
struct TimedSpan<'a> {
start_ms: u64,
end_ms: u64,
template: &'a str,
duration_us: u64,
span_idx: usize,
}
#[must_use]
pub fn detect_serialized(
trace: &Trace,
indices: &TraceIndices<'_>,
min_sequential: u32,
) -> Vec<Finding> {
let min_seq = min_sequential as usize;
let siblings = &indices.children_by_parent;
let span_index = &indices.span_index;
let mut findings = Vec::new();
for (parent_id, child_indices) in siblings {
if child_indices.len() < min_seq {
continue;
}
let mut timed: Vec<TimedSpan<'_>> = Vec::with_capacity(child_indices.len());
for &idx in child_indices {
let span = &trace.spans[idx];
if let Some(start_ms) = parse_timestamp_ms(&span.event.timestamp) {
let dur_ms = span.event.duration_us / 1000;
timed.push(TimedSpan {
start_ms,
end_ms: start_ms.saturating_add(dur_ms),
template: span.template.as_ref(),
duration_us: span.event.duration_us,
span_idx: idx,
});
}
}
if timed.len() < min_seq {
continue;
}
timed.sort_unstable_by_key(|s| s.end_ms);
let best_seq = longest_non_overlapping(&timed);
evaluate_sequence(
&timed,
&best_seq,
min_seq,
trace,
span_index,
parent_id,
&mut findings,
);
}
findings
}
fn longest_non_overlapping(timed: &[TimedSpan<'_>]) -> Vec<usize> {
let n = timed.len();
if n == 0 {
return vec![];
}
let pred = compute_predecessors(timed);
let (dp, included) = fill_dp_table(&pred, n);
backtrack_selection(&included, &pred, dp[n - 1], n)
}
fn compute_predecessors(timed: &[TimedSpan<'_>]) -> Vec<Option<usize>> {
(0..timed.len())
.map(|i| {
let start = timed[i].start_ms;
let pos = timed.partition_point(|s| s.end_ms <= start);
if pos == 0 {
return None;
}
let p = pos - 1;
if p < i { Some(p) } else { None }
})
.collect()
}
fn fill_dp_table(pred: &[Option<usize>], n: usize) -> (Vec<usize>, Vec<bool>) {
let mut dp = vec![0usize; n];
let mut included = vec![false; n];
dp[0] = 1;
included[0] = true;
for i in 1..n {
let without = dp[i - 1];
let with = match pred[i] {
Some(p) => dp[p] + 1,
None => 1,
};
if with >= without {
dp[i] = with;
included[i] = true;
} else {
dp[i] = without;
included[i] = false;
}
}
(dp, included)
}
fn backtrack_selection(
included: &[bool],
pred: &[Option<usize>],
optimal_len: usize,
n: usize,
) -> Vec<usize> {
let mut selected = Vec::with_capacity(optimal_len);
let mut i = n - 1;
loop {
if included[i] {
selected.push(i);
match pred[i] {
Some(p) if p < i => i = p,
_ => break, }
} else if i == 0 {
break;
} else {
i -= 1;
}
}
selected.reverse(); selected
}
fn evaluate_sequence(
timed: &[TimedSpan<'_>],
seq: &[usize],
min_seq: usize,
trace: &Trace,
span_index: &HashMap<&str, usize>,
parent_id: &str,
findings: &mut Vec<Finding>,
) {
if seq.len() < min_seq {
return;
}
let distinct: HashSet<&str> = seq.iter().map(|&i| timed[i].template).collect();
if distinct.len() <= 1 {
return;
}
let (total_sequential_us, max_duration_us) =
seq.iter().fold((0u64, 0u64), |(total, max), &i| {
let d = timed[i].duration_us;
(total + d, max.max(d))
});
let total_ms = total_sequential_us / 1000;
let parallel_ms = max_duration_us / 1000;
let calls_str: String = seq
.iter()
.map(|&i| {
let s = &timed[i];
let dur_ms = s.duration_us / 1000;
format!("{} ({dur_ms}ms)", s.template)
})
.collect::<Vec<_>>()
.join(" -> ");
let parent_span = span_index.get(parent_id).map(|&i| &trace.spans[i]);
let first_child = &trace.spans[timed[seq[0]].span_idx];
let parent_endpoint = parent_span.map_or_else(
|| first_child.event.source.endpoint.clone(),
|s| s.event.source.endpoint.clone(),
);
let service: String = parent_span.map_or_else(
|| first_child.event.service.to_string(),
|s| s.event.service.to_string(),
);
let count = seq.len();
let (window_ms, first_ts, last_ts) = super::n_plus_one::compute_window_and_bounds_iter(
seq.iter()
.map(|&i| trace.spans[timed[i].span_idx].event.timestamp.as_str()),
);
let template = parent_endpoint.clone();
findings.push(Finding {
finding_type: FindingType::SerializedCalls,
severity: Severity::Info,
trace_id: trace.trace_id.clone(),
service,
source_endpoint: parent_endpoint,
pattern: Pattern {
template,
occurrences: count,
window_ms,
distinct_params: distinct.len(),
},
suggestion: format!(
"{count} sequential independent calls could potentially be parallelized: \
{calls_str}. Total sequential: {total_ms}ms, potential parallel: ~{parallel_ms}ms. \
If these calls are independent, consider executing them in parallel \
(e.g., tokio::join!, CompletableFuture.allOf(), Task.WhenAll())"
),
first_timestamp: first_ts.to_string(),
last_timestamp: last_ts.to_string(),
green_impact: None,
confidence: Confidence::default(),
classification_method: None,
code_location: None,
instrumentation_scopes: Vec::new(),
suggested_fix: None,
signature: String::new(),
});
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use super::*;
use crate::test_helpers::{
make_http_event_with_duration, make_sql_event_with_duration, make_trace,
};
const DISTINCT_URLS: &[&str] = &[
"http://user-svc/api/users/42",
"http://inventory-svc/api/inventory/check",
"http://pricing-svc/api/pricing/quote",
"http://notif-svc/api/notifications/send",
"http://shipping-svc/api/shipping/estimate",
"http://billing-svc/api/billing/charge",
];
fn make_sequential_children(
trace_id: &str,
parent_id: &str,
count: usize,
) -> Vec<crate::event::SpanEvent> {
let mut events = Vec::new();
let mut root = make_http_event_with_duration(
trace_id,
parent_id,
"http://gateway/api/orders/42/submit",
"2025-07-10T14:32:01.000Z",
1_000_000,
);
root.parent_span_id = None;
events.push(root);
for i in 0..count {
let start_ms = 100 + i * 120;
let mut child = make_http_event_with_duration(
trace_id,
&format!("child-{i}"),
DISTINCT_URLS[i % DISTINCT_URLS.len()],
&format!("2025-07-10T14:32:01.{start_ms:03}Z"),
100_000, );
child.parent_span_id = Some(parent_id.to_string());
child.service = Arc::from(format!("svc-{i}"));
events.push(child);
}
events
}
#[test]
fn detects_sequential_siblings() {
let events = make_sequential_children("trace-1", "root", 4);
let trace = make_trace(events);
let findings = detect_serialized(&trace, &TraceIndices::build(&trace), 3);
assert_eq!(findings.len(), 1);
assert_eq!(findings[0].finding_type, FindingType::SerializedCalls);
assert_eq!(findings[0].severity, Severity::Info);
assert_eq!(findings[0].pattern.occurrences, 4);
}
#[test]
fn no_finding_below_threshold() {
let events = make_sequential_children("trace-1", "root", 2);
let trace = make_trace(events);
let findings = detect_serialized(&trace, &TraceIndices::build(&trace), 3);
assert!(findings.is_empty());
}
#[test]
fn overlapping_siblings_no_finding() {
let mut events = Vec::new();
let mut root = make_http_event_with_duration(
"trace-1",
"root",
"http://gateway/api/orders",
"2025-07-10T14:32:01.000Z",
500_000,
);
root.parent_span_id = None;
events.push(root);
for (i, url) in DISTINCT_URLS.iter().enumerate().take(4) {
let mut child = make_http_event_with_duration(
"trace-1",
&format!("child-{i}"),
url,
"2025-07-10T14:32:01.100Z",
100_000,
);
child.parent_span_id = Some("root".to_string());
child.service = Arc::from(format!("svc-{i}"));
events.push(child);
}
let trace = make_trace(events);
let findings = detect_serialized(&trace, &TraceIndices::build(&trace), 3);
assert!(findings.is_empty(), "overlapping spans should not trigger");
}
#[test]
fn same_template_skipped() {
let mut events = Vec::new();
let mut root = make_http_event_with_duration(
"trace-1",
"root",
"http://gateway/api/orders",
"2025-07-10T14:32:01.000Z",
500_000,
);
root.parent_span_id = None;
events.push(root);
for i in 0..5 {
let start_ms = 100 + i * 120;
let mut child = make_http_event_with_duration(
"trace-1",
&format!("child-{i}"),
&format!("http://svc/api/users/{}", i + 1),
&format!("2025-07-10T14:32:01.{start_ms:03}Z"),
100_000,
);
child.parent_span_id = Some("root".to_string());
events.push(child);
}
let trace = make_trace(events);
let findings = detect_serialized(&trace, &TraceIndices::build(&trace), 3);
assert!(
findings.is_empty(),
"same template = N+1 territory, should be skipped"
);
}
#[test]
fn mixed_overlap_partial_sequence() {
let mut events = Vec::new();
let mut root = make_http_event_with_duration(
"trace-1",
"root",
"http://gateway/api/orders",
"2025-07-10T14:32:01.000Z",
1_000_000,
);
root.parent_span_id = None;
events.push(root);
for (i, url) in DISTINCT_URLS.iter().enumerate().take(3) {
let start_ms = 100 + i * 120;
let mut child = make_http_event_with_duration(
"trace-1",
&format!("child-{i}"),
url,
&format!("2025-07-10T14:32:01.{start_ms:03}Z"),
100_000,
);
child.parent_span_id = Some("root".to_string());
child.service = Arc::from(format!("svc-{i}"));
events.push(child);
}
for (i, url) in DISTINCT_URLS.iter().enumerate().take(6).skip(3) {
let mut child = make_http_event_with_duration(
"trace-1",
&format!("child-{i}"),
url,
"2025-07-10T14:32:01.400Z",
100_000,
);
child.parent_span_id = Some("root".to_string());
child.service = Arc::from(format!("svc-{i}"));
events.push(child);
}
let trace = make_trace(events);
let findings = detect_serialized(&trace, &TraceIndices::build(&trace), 3);
assert_eq!(findings.len(), 1);
assert_eq!(findings[0].pattern.occurrences, 3);
}
#[test]
fn no_parent_span_id_no_finding() {
let events: Vec<_> = (0..5)
.map(|i| {
make_http_event_with_duration(
"trace-1",
&format!("span-{i}"),
&format!("http://svc-{i}/api/{i}"),
&format!("2025-07-10T14:32:01.{:03}Z", i * 120),
100_000,
)
})
.collect();
let trace = make_trace(events);
let findings = detect_serialized(&trace, &TraceIndices::build(&trace), 3);
assert!(findings.is_empty());
}
#[test]
fn potential_time_savings_in_suggestion() {
let mut events = Vec::new();
let mut root = make_http_event_with_duration(
"trace-1",
"root",
"http://gateway/api/orders",
"2025-07-10T14:32:01.000Z",
500_000,
);
root.parent_span_id = None;
events.push(root);
let mut c0 = make_http_event_with_duration(
"trace-1",
"child-0",
"http://user-svc/api/users/42",
"2025-07-10T14:32:01.100Z",
120_000,
);
c0.parent_span_id = Some("root".to_string());
c0.service = Arc::from("user-svc");
events.push(c0);
let mut c1 = make_http_event_with_duration(
"trace-1",
"child-1",
"http://inventory-svc/api/inventory/check",
"2025-07-10T14:32:01.220Z",
95_000,
);
c1.parent_span_id = Some("root".to_string());
c1.service = Arc::from("inventory-svc");
events.push(c1);
let mut c2 = make_http_event_with_duration(
"trace-1",
"child-2",
"http://pricing-svc/api/pricing/quote",
"2025-07-10T14:32:01.315Z",
80_000,
);
c2.parent_span_id = Some("root".to_string());
c2.service = Arc::from("pricing-svc");
events.push(c2);
let trace = make_trace(events);
let findings = detect_serialized(&trace, &TraceIndices::build(&trace), 3);
assert_eq!(findings.len(), 1);
assert!(findings[0].suggestion.contains("Total sequential: 295ms"));
assert!(
findings[0]
.suggestion
.contains("potential parallel: ~120ms")
);
}
#[test]
fn sql_and_http_mixed_siblings() {
let mut events = Vec::new();
let mut root = make_http_event_with_duration(
"trace-1",
"root",
"http://gateway/api/orders",
"2025-07-10T14:32:01.000Z",
500_000,
);
root.parent_span_id = None;
events.push(root);
let mut c0 = make_sql_event_with_duration(
"trace-1",
"child-0",
"SELECT * FROM users WHERE id = 42",
"2025-07-10T14:32:01.100Z",
100_000,
);
c0.parent_span_id = Some("root".to_string());
events.push(c0);
let mut c1 = make_http_event_with_duration(
"trace-1",
"child-1",
"http://inventory-svc/api/check",
"2025-07-10T14:32:01.220Z",
100_000,
);
c1.parent_span_id = Some("root".to_string());
c1.service = Arc::from("inventory-svc");
events.push(c1);
let mut c2 = make_sql_event_with_duration(
"trace-1",
"child-2",
"INSERT INTO audit_log VALUES (1, 'order_created')",
"2025-07-10T14:32:01.340Z",
100_000,
);
c2.parent_span_id = Some("root".to_string());
events.push(c2);
let trace = make_trace(events);
let findings = detect_serialized(&trace, &TraceIndices::build(&trace), 3);
assert_eq!(
findings.len(),
1,
"mixed SQL/HTTP siblings should be detected"
);
}
#[test]
fn dp_finds_longer_sequence_than_greedy_would() {
let mut events = Vec::new();
let mut root = make_http_event_with_duration(
"trace-1",
"root",
"http://gateway/api/orders",
"2025-07-10T14:32:00.000Z",
2_000_000,
);
root.parent_span_id = None;
events.push(root);
let mut a = make_http_event_with_duration(
"trace-1",
"child-a",
"http://auth-svc/api/auth/validate",
"2025-07-10T14:32:01.000Z",
200_000,
);
a.parent_span_id = Some("root".to_string());
a.service = Arc::from("auth-svc");
events.push(a);
let mut b = make_http_event_with_duration(
"trace-1",
"child-b",
"http://user-svc/api/users/42",
"2025-07-10T14:32:01.100Z",
50_000,
);
b.parent_span_id = Some("root".to_string());
b.service = Arc::from("user-svc");
events.push(b);
let mut c = make_http_event_with_duration(
"trace-1",
"child-c",
"http://inventory-svc/api/inventory/check",
"2025-07-10T14:32:01.160Z",
140_000,
);
c.parent_span_id = Some("root".to_string());
c.service = Arc::from("inventory-svc");
events.push(c);
let mut d = make_http_event_with_duration(
"trace-1",
"child-d",
"http://pricing-svc/api/pricing/quote",
"2025-07-10T14:32:01.310Z",
90_000,
);
d.parent_span_id = Some("root".to_string());
d.service = Arc::from("pricing-svc");
events.push(d);
let trace = make_trace(events);
let findings = detect_serialized(&trace, &TraceIndices::build(&trace), 3);
assert_eq!(
findings.len(),
1,
"DP should find the optimal B,C,D sequence of length 3"
);
assert_eq!(findings[0].pattern.occurrences, 3);
}
#[test]
fn identical_timestamps_does_not_hang() {
let mut events = Vec::new();
let mut root = make_http_event_with_duration(
"trace-1",
"root",
"http://gateway/api/orders",
"2025-07-10T14:32:01.000Z",
1_000_000,
);
root.parent_span_id = None;
events.push(root);
for (i, url) in DISTINCT_URLS.iter().enumerate().take(5) {
let mut child = make_http_event_with_duration(
"trace-1",
&format!("child-{i}"),
url,
"2025-07-10T14:32:01.100Z",
0, );
child.parent_span_id = Some("root".to_string());
child.service = Arc::from(format!("svc-{i}"));
events.push(child);
}
let trace = make_trace(events);
let _findings = detect_serialized(&trace, &TraceIndices::build(&trace), 3);
}
}