use super::TimedEvent;
use crate::core::types::Bar;
pub fn merge_sorted(streams: Vec<Vec<TimedEvent>>) -> Vec<TimedEvent> {
let total = streams.iter().map(|s| s.len()).sum();
let mut all: Vec<TimedEvent> = Vec::with_capacity(total);
for s in streams {
all.extend(s);
}
all.sort_by_key(|e| e.timestamp_ms());
all
}
pub fn bar_boundaries(bars: &[Bar], events: &[TimedEvent]) -> Vec<std::ops::Range<usize>> {
let mut result = Vec::with_capacity(bars.len());
let mut event_idx = 0usize;
for (i, _bar) in bars.iter().enumerate() {
let next_bar_time = bars.get(i + 1).map(|b| b.time).unwrap_or(i64::MAX);
let start = event_idx;
while event_idx < events.len() && events[event_idx].timestamp_ms() < next_bar_time {
event_idx += 1;
}
result.push(start..event_idx);
}
result
}
pub fn align_to_bars<F, T>(bars: &[Bar], events: &[TimedEvent], mut sampler: F) -> Vec<T>
where
F: FnMut(&[TimedEvent]) -> T,
{
let boundaries = bar_boundaries(bars, events);
boundaries
.iter()
.map(|r| sampler(&events[r.clone()]))
.collect()
}
#[cfg(test)]
mod tests {
use super::*;
use crate::core::types::{Bar, FundingRate};
use crate::data_loader::TimedEvent;
fn make_bar(t: i64) -> Bar {
Bar::new(t, 1.0, 2.0, 0.5, 1.5, 100.0)
}
fn make_funding(ts: i64) -> TimedEvent {
TimedEvent::Funding(FundingRate {
rate: 0.0001,
next_funding_time: None,
timestamp: ts,
})
}
#[test]
fn empty_merge() {
let result = merge_sorted(vec![]);
assert!(result.is_empty());
}
#[test]
fn single_stream_passthrough() {
let stream = vec![make_bar(1000), make_bar(2000), make_bar(3000)]
.into_iter()
.map(TimedEvent::Bar)
.collect::<Vec<_>>();
let merged = merge_sorted(vec![stream.clone()]);
assert_eq!(merged.len(), 3);
assert_eq!(merged[0].timestamp_ms(), 1000);
assert_eq!(merged[2].timestamp_ms(), 3000);
}
#[test]
fn multi_stream_merge() {
let bars = vec![make_bar(1000), make_bar(3000)]
.into_iter()
.map(TimedEvent::Bar)
.collect::<Vec<_>>();
let funding = vec![make_funding(500), make_funding(2000), make_funding(4000)];
let merged = merge_sorted(vec![bars, funding]);
let timestamps: Vec<i64> = merged.iter().map(|e| e.timestamp_ms()).collect();
assert_eq!(timestamps, vec![500, 1000, 2000, 3000, 4000]);
}
#[test]
fn stable_ordering_equal_timestamps() {
let stream_a = vec![TimedEvent::Bar(make_bar(1000))];
let stream_b = vec![make_funding(1000)];
let merged = merge_sorted(vec![stream_a, stream_b]);
assert_eq!(merged.len(), 2);
assert!(matches!(merged[0], TimedEvent::Bar(_)));
assert!(matches!(merged[1], TimedEvent::Funding(_)));
}
#[test]
fn bar_boundaries_basic() {
let bars = vec![make_bar(0), make_bar(1000), make_bar(2000)];
let events = vec![
make_funding(100), make_funding(500), make_funding(1000), make_funding(1500), make_funding(2500), ];
let ranges = bar_boundaries(&bars, &events);
assert_eq!(ranges.len(), 3);
assert_eq!(ranges[0], 0..2); assert_eq!(ranges[1], 2..4); assert_eq!(ranges[2], 4..5); }
#[test]
fn bar_boundaries_empty_bars() {
let bars: Vec<Bar> = vec![];
let events = vec![make_funding(100)];
let ranges = bar_boundaries(&bars, &events);
assert!(ranges.is_empty());
}
#[test]
fn align_to_bars_count_per_window() {
let bars = vec![make_bar(0), make_bar(1000)];
let events = vec![make_funding(100), make_funding(200), make_funding(1500)];
let counts: Vec<usize> = align_to_bars(&bars, &events, |evs| evs.len());
assert_eq!(counts, vec![2, 1]);
}
}