use std::path::{Path, PathBuf};
use super::vectorized_scan::{ScanError, ScanRequest, ScanResult};
pub fn parallel_scan(
partition_dirs: &[PathBuf],
request: &ScanRequest,
) -> Result<ScanResult, ScanError> {
if partition_dirs.is_empty() {
return Ok(ScanResult::Aggregate(
super::columnar_agg::AggResult::default(),
));
}
if partition_dirs.len() == 1 {
return super::vectorized_scan::scan_partition(&partition_dirs[0], request);
}
#[cfg(not(target_arch = "wasm32"))]
{
parallel_scan_native(partition_dirs, request)
}
#[cfg(target_arch = "wasm32")]
{
let dir_refs: Vec<&Path> = partition_dirs.iter().map(|d| d.as_path()).collect();
super::vectorized_scan::scan_partitions(&dir_refs, request)
}
}
#[cfg(not(target_arch = "wasm32"))]
fn parallel_scan_native(
partition_dirs: &[PathBuf],
request: &ScanRequest,
) -> Result<ScanResult, ScanError> {
let available_threads = std::thread::available_parallelism()
.map(|n| n.get())
.unwrap_or(1);
let thread_count = available_threads.min(partition_dirs.len()).min(8);
if thread_count <= 1 {
let dir_refs: Vec<&Path> = partition_dirs.iter().map(|d| d.as_path()).collect();
return super::vectorized_scan::scan_partitions(&dir_refs, request);
}
let chunk_size = partition_dirs.len().div_ceil(thread_count);
let chunks: Vec<&[PathBuf]> = partition_dirs.chunks(chunk_size).collect();
let results: Vec<Result<ScanResult, ScanError>> = std::thread::scope(|s| {
let handles: Vec<_> = chunks
.iter()
.map(|chunk| {
let req = request.clone();
s.spawn(move || {
let dir_refs: Vec<&Path> = chunk.iter().map(|d| d.as_path()).collect();
super::vectorized_scan::scan_partitions(&dir_refs, &req)
})
})
.collect();
handles
.into_iter()
.map(|h| {
h.join()
.unwrap_or(Err(ScanError::Io("thread panicked".into())))
})
.collect()
});
merge_scan_results(results, request)
}
fn merge_scan_results(
results: Vec<Result<ScanResult, ScanError>>,
request: &ScanRequest,
) -> Result<ScanResult, ScanError> {
match request.bucket_interval_ms {
0 if request.limit > 0 => {
let mut all_raw = Vec::new();
for r in results {
if let Ok(ScanResult::Raw(rows)) = r {
all_raw.extend(rows);
}
}
all_raw.sort_by_key(|&(ts, _)| ts);
all_raw.truncate(request.limit);
Ok(ScanResult::Raw(all_raw))
}
0 => {
let mut merged = super::columnar_agg::AggResult {
count: 0,
sum: 0.0,
min: f64::INFINITY,
max: f64::NEG_INFINITY,
first: f64::NAN,
last: f64::NAN,
};
for r in results {
if let Ok(ScanResult::Aggregate(agg)) = r {
if agg.count == 0 {
continue;
}
merged.count += agg.count;
merged.sum += agg.sum;
if agg.min < merged.min {
merged.min = agg.min;
}
if agg.max > merged.max {
merged.max = agg.max;
}
if merged.first.is_nan() {
merged.first = agg.first;
}
merged.last = agg.last;
}
}
Ok(ScanResult::Aggregate(merged))
}
_ => {
let mut bucket_map: std::collections::BTreeMap<i64, super::columnar_agg::AggResult> =
std::collections::BTreeMap::new();
for r in results {
if let Ok(ScanResult::Bucketed(buckets)) = r {
for (bucket_ts, agg) in buckets {
let entry =
bucket_map
.entry(bucket_ts)
.or_insert(super::columnar_agg::AggResult {
count: 0,
sum: 0.0,
min: f64::INFINITY,
max: f64::NEG_INFINITY,
first: f64::NAN,
last: f64::NAN,
});
entry.count += agg.count;
entry.sum += agg.sum;
if agg.min < entry.min {
entry.min = agg.min;
}
if agg.max > entry.max {
entry.max = agg.max;
}
if entry.first.is_nan() {
entry.first = agg.first;
}
entry.last = agg.last;
}
}
}
Ok(ScanResult::Bucketed(bucket_map.into_iter().collect()))
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::engine::timeseries::columnar_memtable::{ColumnarMemtable, ColumnarMemtableConfig};
use crate::engine::timeseries::columnar_segment::ColumnarSegmentWriter;
use nodedb_types::timeseries::MetricSample;
use tempfile::TempDir;
fn test_config() -> ColumnarMemtableConfig {
ColumnarMemtableConfig {
max_memory_bytes: 10 * 1024 * 1024,
hard_memory_limit: 20 * 1024 * 1024,
max_tag_cardinality: 1000,
}
}
fn create_partition(dir: &Path, name: &str, count: usize, start_ts: i64) {
let mut mt = ColumnarMemtable::new_metric(test_config());
for i in 0..count {
mt.ingest_metric(
1,
MetricSample {
timestamp_ms: start_ts + i as i64 * 1000,
value: (i % 100) as f64,
},
);
}
let drain = mt.drain();
ColumnarSegmentWriter::new(dir)
.write_partition(name, &drain, 0, 0)
.unwrap();
}
#[test]
fn parallel_scan_single() {
let tmp = TempDir::new().unwrap();
create_partition(tmp.path(), "p1", 1000, 1_700_000_000_000);
let dirs = vec![tmp.path().join("p1")];
let request = ScanRequest {
start_ms: 1_700_000_000_000,
end_ms: 1_700_000_999_000,
value_column: "value".into(),
bucket_interval_ms: 0,
predicates: vec![],
limit: 0,
};
let result = parallel_scan(&dirs, &request).unwrap();
match result {
ScanResult::Aggregate(agg) => assert_eq!(agg.count, 1000),
_ => panic!("expected Aggregate"),
}
}
#[test]
fn parallel_scan_multiple() {
let tmp = TempDir::new().unwrap();
create_partition(tmp.path(), "p1", 500, 1_700_000_000_000);
create_partition(tmp.path(), "p2", 500, 1_700_000_500_000);
create_partition(tmp.path(), "p3", 500, 1_700_001_000_000);
let dirs = vec![
tmp.path().join("p1"),
tmp.path().join("p2"),
tmp.path().join("p3"),
];
let request = ScanRequest {
start_ms: 1_700_000_000_000,
end_ms: 1_700_001_500_000,
value_column: "value".into(),
bucket_interval_ms: 0,
predicates: vec![],
limit: 0,
};
let result = parallel_scan(&dirs, &request).unwrap();
match result {
ScanResult::Aggregate(agg) => assert_eq!(agg.count, 1500),
_ => panic!("expected Aggregate"),
}
}
}