use crate::span_record::SpanRecord;
use anyhow::Result;
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct RleSegment {
pub syscall_name: String,
pub count: usize,
pub start_logical_clock: u64,
pub end_logical_clock: u64,
pub total_duration: u64,
pub avg_duration: u64,
pub min_duration: u64,
pub max_duration: u64,
pub common_attributes: String,
pub process_id: u32,
pub thread_id: u64,
pub trace_id: [u8; 16],
}
impl RleSegment {
pub fn compression_ratio(&self) -> f64 {
self.count as f64
}
pub fn duration_variance(&self) -> u64 {
self.max_duration.saturating_sub(self.min_duration)
}
pub fn is_tight_loop(&self) -> bool {
self.count > 1000
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CompressedTrace {
pub segments: Vec<RleSegment>,
pub uncompressed: Vec<SpanRecord>,
pub original_count: usize,
}
impl CompressedTrace {
pub fn compression_ratio(&self) -> f64 {
let compressed_count = self.segments.len() + self.uncompressed.len();
if compressed_count == 0 {
return 1.0;
}
self.original_count as f64 / compressed_count as f64
}
pub fn storage_savings_percent(&self) -> f64 {
let ratio = self.compression_ratio();
if ratio <= 1.0 {
return 0.0;
}
((ratio - 1.0) / ratio) * 100.0
}
pub fn total_span_count(&self) -> usize {
let segment_spans: usize = self.segments.iter().map(|s| s.count).sum();
segment_spans + self.uncompressed.len()
}
}
struct RunStats {
run_length: usize,
total_duration: u64,
min_duration: u64,
max_duration: u64,
}
fn find_run(sorted_spans: &[SpanRecord], start_idx: usize) -> RunStats {
let current_span = &sorted_spans[start_idx];
let mut stats = RunStats {
run_length: 1,
total_duration: current_span.duration_nanos,
min_duration: current_span.duration_nanos,
max_duration: current_span.duration_nanos,
};
while start_idx + stats.run_length < sorted_spans.len() {
let next_span = &sorted_spans[start_idx + stats.run_length];
let matches = next_span.span_name == current_span.span_name
&& next_span.process_id == current_span.process_id
&& next_span.thread_id == current_span.thread_id
&& spans_have_similar_attributes(current_span, next_span);
if !matches {
break;
}
stats.total_duration += next_span.duration_nanos;
stats.min_duration = stats.min_duration.min(next_span.duration_nanos);
stats.max_duration = stats.max_duration.max(next_span.duration_nanos);
stats.run_length += 1;
}
stats
}
fn build_segment(sorted_spans: &[SpanRecord], start_idx: usize, stats: &RunStats) -> RleSegment {
let current_span = &sorted_spans[start_idx];
let last_span = &sorted_spans[start_idx + stats.run_length - 1];
RleSegment {
syscall_name: current_span.span_name.clone(),
count: stats.run_length,
start_logical_clock: current_span.logical_clock,
end_logical_clock: last_span.logical_clock,
total_duration: stats.total_duration,
avg_duration: stats.total_duration / stats.run_length as u64,
min_duration: stats.min_duration,
max_duration: stats.max_duration,
common_attributes: current_span.attributes_json.clone(),
process_id: current_span.process_id,
thread_id: current_span.thread_id,
trace_id: current_span.trace_id,
}
}
pub fn compress_spans(spans: &[SpanRecord], min_run_length: usize) -> Result<CompressedTrace> {
if spans.is_empty() {
return Ok(CompressedTrace {
segments: Vec::new(),
uncompressed: Vec::new(),
original_count: 0,
});
}
let original_count = spans.len();
let mut sorted_spans = spans.to_vec();
sorted_spans.sort_by_key(|s| s.logical_clock);
let mut segments = Vec::new();
let mut uncompressed = Vec::new();
let mut i = 0;
while i < sorted_spans.len() {
let stats = find_run(&sorted_spans, i);
if stats.run_length >= min_run_length {
segments.push(build_segment(&sorted_spans, i, &stats));
} else {
for j in 0..stats.run_length {
uncompressed.push(sorted_spans[i + j].clone());
}
}
i += stats.run_length;
}
Ok(CompressedTrace { segments, uncompressed, original_count })
}
fn spans_have_similar_attributes(span1: &SpanRecord, span2: &SpanRecord) -> bool {
span1.attributes_json == span2.attributes_json
}
pub fn decompress_segment(segment: &RleSegment) -> Vec<SpanRecord> {
let mut spans = Vec::with_capacity(segment.count);
for i in 0..segment.count {
let logical_clock = segment.start_logical_clock + i as u64;
let start_time = logical_clock * 1000; let end_time = start_time + segment.avg_duration;
let span = SpanRecord {
trace_id: segment.trace_id,
span_id: [(i as u8); 8], parent_span_id: None, span_name: segment.syscall_name.clone(),
span_kind: crate::span_record::SpanKind::Internal,
start_time_nanos: start_time,
end_time_nanos: end_time,
duration_nanos: segment.avg_duration,
logical_clock,
status_code: crate::span_record::StatusCode::Ok,
status_message: String::new(),
attributes_json: segment.common_attributes.clone(),
resource_json: "{}".to_string(),
process_id: segment.process_id,
thread_id: segment.thread_id,
};
spans.push(span);
}
spans
}
pub fn compress_trace(spans: &[SpanRecord]) -> Result<CompressedTrace> {
compress_spans(spans, 10)
}
static_assertions::assert_impl_all!(RleSegment: Send, Sync);
static_assertions::assert_impl_all!(CompressedTrace: Send, Sync);
#[cfg(test)]
mod tests {
use super::*;
use crate::span_record::{SpanKind, StatusCode};
use std::collections::HashMap;
fn create_span(
span_id: u8,
logical_clock: u64,
syscall_name: &str,
duration: u64,
) -> SpanRecord {
SpanRecord::new(
[1; 16],
[span_id; 8],
None,
syscall_name.to_string(),
SpanKind::Internal,
logical_clock * 1000,
logical_clock * 1000 + duration,
logical_clock,
StatusCode::Ok,
String::new(),
HashMap::new(),
HashMap::new(),
1234,
5678,
)
}
#[test]
fn test_no_compression_needed() {
let spans = vec![
create_span(1, 0, "read", 100),
create_span(2, 1, "write", 100),
create_span(3, 2, "open", 100),
];
let compressed = compress_spans(&spans, 10).expect("test");
assert_eq!(compressed.segments.len(), 0);
assert_eq!(compressed.uncompressed.len(), 3);
assert_eq!(compressed.compression_ratio(), 1.0);
}
#[test]
fn test_simple_compression() {
let mut spans = vec![];
for i in 0..20 {
spans.push(create_span(i as u8, i, "read", 100));
}
let compressed = compress_spans(&spans, 10).expect("test");
assert_eq!(compressed.segments.len(), 1);
assert_eq!(compressed.segments[0].count, 20);
assert_eq!(compressed.segments[0].syscall_name, "read");
assert_eq!(compressed.compression_ratio(), 20.0);
}
#[test]
fn test_multiple_segments() {
let mut spans = vec![];
for i in 0..15 {
spans.push(create_span(i as u8, i, "read", 100));
}
for i in 15..27 {
spans.push(create_span(i as u8, i, "write", 100));
}
let compressed = compress_spans(&spans, 10).expect("test");
assert_eq!(compressed.segments.len(), 2);
assert_eq!(compressed.segments[0].count, 15);
assert_eq!(compressed.segments[0].syscall_name, "read");
assert_eq!(compressed.segments[1].count, 12);
assert_eq!(compressed.segments[1].syscall_name, "write");
}
#[test]
fn test_tight_loop_compression() {
let mut spans = vec![];
for i in 0..100_000 {
spans.push(create_span((i % 256) as u8, i, "read", 100));
}
let compressed = compress_spans(&spans, 10).expect("test");
assert_eq!(compressed.segments.len(), 1);
assert_eq!(compressed.segments[0].count, 100_000);
assert!(compressed.segments[0].is_tight_loop());
assert!(compressed.compression_ratio() > 99_000.0);
assert!(compressed.storage_savings_percent() > 99.0);
}
#[test]
fn test_duration_statistics() {
let mut spans = vec![];
for i in 0..20 {
let duration = 100 + (i % 10) * 10; spans.push(create_span(i as u8, i, "read", duration));
}
let compressed = compress_spans(&spans, 10).expect("test");
assert_eq!(compressed.segments.len(), 1);
let segment = &compressed.segments[0];
assert_eq!(segment.min_duration, 100);
assert_eq!(segment.max_duration, 190);
assert_eq!(segment.avg_duration, 145);
assert_eq!(segment.duration_variance(), 90); }
#[test]
fn test_decompress_segment() {
let mut spans = vec![];
for i in 0..50 {
spans.push(create_span(i as u8, i, "read", 100));
}
let compressed = compress_spans(&spans, 10).expect("test");
assert_eq!(compressed.segments.len(), 1);
let decompressed = decompress_segment(&compressed.segments[0]);
assert_eq!(decompressed.len(), 50);
assert_eq!(decompressed[0].span_name, "read");
assert_eq!(decompressed[0].logical_clock, 0);
assert_eq!(decompressed[49].logical_clock, 49);
}
#[test]
fn test_compression_ratio_calculation() {
let segment = RleSegment {
syscall_name: "read".to_string(),
count: 262_144,
start_logical_clock: 0,
end_logical_clock: 262_143,
total_duration: 1_000_000,
avg_duration: 100,
min_duration: 90,
max_duration: 110,
common_attributes: "{}".to_string(),
process_id: 1234,
thread_id: 5678,
trace_id: [1; 16],
};
assert_eq!(segment.compression_ratio(), 262_144.0);
assert!(segment.is_tight_loop());
}
#[test]
fn test_mixed_compression() {
let mut spans = vec![];
spans.push(create_span(0, 0, "open", 100));
spans.push(create_span(1, 1, "stat", 100));
spans.push(create_span(2, 2, "close", 100));
for i in 3..23 {
spans.push(create_span(i as u8, i, "read", 100));
}
spans.push(create_span(23, 23, "write", 100));
spans.push(create_span(24, 24, "write", 100));
let compressed = compress_spans(&spans, 10).expect("test");
assert_eq!(compressed.segments.len(), 1); assert_eq!(compressed.uncompressed.len(), 5); assert_eq!(compressed.segments[0].count, 20);
}
#[test]
fn test_empty_trace() {
let spans: Vec<SpanRecord> = vec![];
let compressed = compress_spans(&spans, 10).expect("test");
assert_eq!(compressed.segments.len(), 0);
assert_eq!(compressed.uncompressed.len(), 0);
assert_eq!(compressed.compression_ratio(), 1.0);
}
#[test]
fn test_total_span_count() {
let mut spans = vec![];
for i in 0..50 {
spans.push(create_span(i as u8, i, "read", 100));
}
spans.push(create_span(50, 50, "write", 100));
let compressed = compress_spans(&spans, 10).expect("test");
assert_eq!(compressed.total_span_count(), 51);
assert_eq!(compressed.original_count, 51);
}
}