use std::collections::{HashMap, HashSet};
use std::fs;
use std::io::{BufRead, BufReader, BufWriter, Write};
use std::path::{Path, PathBuf};
use serde_json::Value;
use crate::error::{IoError, Result};
pub trait StreamingOp {
fn process(&mut self, records: Vec<Value>) -> Vec<Value>;
fn reset(&mut self) {}
}
pub struct StreamingFilter {
predicate: Box<dyn Fn(&Value) -> bool + Send>,
}
impl StreamingFilter {
pub fn new<F>(predicate: F) -> Self
where
F: Fn(&Value) -> bool + Send + 'static,
{
Self {
predicate: Box::new(predicate),
}
}
}
impl StreamingOp for StreamingFilter {
fn process(&mut self, records: Vec<Value>) -> Vec<Value> {
records
.into_iter()
.filter(|r| (self.predicate)(r))
.collect()
}
}
pub struct StreamingProject {
fields: Vec<String>,
}
impl StreamingProject {
pub fn new(fields: Vec<String>) -> Self {
Self { fields }
}
}
impl StreamingOp for StreamingProject {
fn process(&mut self, records: Vec<Value>) -> Vec<Value> {
records
.into_iter()
.map(|r| {
if let Value::Object(obj) = r {
let kept: serde_json::Map<String, Value> = self
.fields
.iter()
.filter_map(|f| obj.get(f).map(|v| (f.clone(), v.clone())))
.collect();
Value::Object(kept)
} else {
r
}
})
.collect()
}
}
pub struct StreamingRename {
renames: HashMap<String, String>,
}
impl StreamingRename {
pub fn new(renames: HashMap<String, String>) -> Self {
Self { renames }
}
}
impl StreamingOp for StreamingRename {
fn process(&mut self, records: Vec<Value>) -> Vec<Value> {
records
.into_iter()
.map(|r| {
if let Value::Object(obj) = r {
let renamed: serde_json::Map<String, Value> = obj
.into_iter()
.map(|(k, v)| {
let new_k = self.renames.get(&k).cloned().unwrap_or(k);
(new_k, v)
})
.collect();
Value::Object(renamed)
} else {
r
}
})
.collect()
}
}
pub struct StreamingDeduplicate {
seen: HashSet<String>,
key_fn: Box<dyn Fn(&Value) -> String + Send>,
}
impl StreamingDeduplicate {
pub fn new<F>(key_fn: F) -> Self
where
F: Fn(&Value) -> String + Send + 'static,
{
Self {
seen: HashSet::new(),
key_fn: Box::new(key_fn),
}
}
}
impl StreamingOp for StreamingDeduplicate {
fn process(&mut self, records: Vec<Value>) -> Vec<Value> {
records
.into_iter()
.filter(|r| {
let key = (self.key_fn)(r);
self.seen.insert(key)
})
.collect()
}
fn reset(&mut self) {
self.seen.clear();
}
}
pub struct StreamingSort {
buffer_size: usize,
key_fn: Box<dyn Fn(&Value) -> String + Send>,
ascending: bool,
}
impl StreamingSort {
pub fn new<F>(buffer_size: usize, key_fn: F, ascending: bool) -> Self
where
F: Fn(&Value) -> String + Send + 'static,
{
Self {
buffer_size: buffer_size.max(1),
key_fn: Box::new(key_fn),
ascending,
}
}
pub fn sort_all(&mut self, records: Vec<Value>) -> Result<Vec<Value>> {
if records.len() <= self.buffer_size {
return Ok(self.sort_in_memory(records));
}
let run_files = self.produce_runs(&records)?;
let merged = self.kway_merge(run_files)?;
Ok(merged)
}
fn sort_in_memory(&self, mut records: Vec<Value>) -> Vec<Value> {
records.sort_by(|a, b| {
let ka = (self.key_fn)(a);
let kb = (self.key_fn)(b);
if self.ascending {
ka.cmp(&kb)
} else {
kb.cmp(&ka)
}
});
records
}
fn produce_runs(&self, records: &[Value]) -> Result<Vec<PathBuf>> {
let mut paths = Vec::new();
let temp_dir = std::env::temp_dir();
for (run_idx, chunk) in records.chunks(self.buffer_size).enumerate() {
let mut run: Vec<Value> = chunk.to_vec();
run.sort_by(|a, b| {
let ka = (self.key_fn)(a);
let kb = (self.key_fn)(b);
if self.ascending {
ka.cmp(&kb)
} else {
kb.cmp(&ka)
}
});
let path = temp_dir.join(format!(
"scirs2_sort_run_{run_idx}_{}.jsonl",
std::process::id()
));
let f = fs::File::create(&path).map_err(IoError::Io)?;
let mut w = BufWriter::new(f);
for record in &run {
let line = serde_json::to_string(record)
.map_err(|e| IoError::SerializationError(e.to_string()))?;
writeln!(w, "{line}").map_err(IoError::Io)?;
}
w.flush().map_err(IoError::Io)?;
paths.push(path);
}
Ok(paths)
}
fn kway_merge(&self, run_files: Vec<PathBuf>) -> Result<Vec<Value>> {
let mut readers: Vec<BufReader<fs::File>> = run_files
.iter()
.map(|p| fs::File::open(p).map(BufReader::new).map_err(IoError::Io))
.collect::<Result<Vec<_>>>()?;
let mut heads: Vec<Option<(String, Value)>> = readers
.iter_mut()
.map(|r| peek_next_json_with_key(r, &self.key_fn))
.collect::<Result<Vec<_>>>()?;
let mut merged = Vec::new();
loop {
let best_idx = heads
.iter()
.enumerate()
.filter_map(|(i, h)| h.as_ref().map(|(k, _)| (i, k.clone())))
.min_by(|(_, ka), (_, kb)| {
if self.ascending {
ka.cmp(kb)
} else {
kb.cmp(ka)
}
})
.map(|(i, _)| i);
match best_idx {
None => break,
Some(i) => {
if let Some((_, value)) = heads[i].take() {
merged.push(value);
}
heads[i] = peek_next_json_with_key(&mut readers[i], &self.key_fn)?;
}
}
}
for path in &run_files {
let _ = fs::remove_file(path);
}
Ok(merged)
}
}
fn peek_next_json_with_key<R: BufRead>(
reader: &mut R,
key_fn: &dyn Fn(&Value) -> String,
) -> Result<Option<(String, Value)>> {
let mut line = String::new();
match reader.read_line(&mut line) {
Ok(0) => Ok(None),
Ok(_) => {
let trimmed = line.trim_end();
if trimmed.is_empty() {
return Ok(None);
}
let val: Value = serde_json::from_str(trimmed)
.map_err(|e| IoError::DeserializationError(e.to_string()))?;
let key = key_fn(&val);
Ok(Some((key, val)))
}
Err(e) => Err(IoError::Io(e)),
}
}
pub trait StreamingAggregate: Send {
fn update(&mut self, record: &Value);
fn result(&self) -> Value;
fn reset_agg(&mut self);
fn name(&self) -> &str;
}
pub struct CountAggregate {
count: u64,
field_name: String,
}
impl CountAggregate {
pub fn new(field_name: impl Into<String>) -> Self {
Self {
count: 0,
field_name: field_name.into(),
}
}
}
impl StreamingAggregate for CountAggregate {
fn update(&mut self, _record: &Value) {
self.count += 1;
}
fn result(&self) -> Value {
Value::Number(self.count.into())
}
fn reset_agg(&mut self) {
self.count = 0;
}
fn name(&self) -> &str {
&self.field_name
}
}
pub struct SumAggregate {
field: String,
sum: f64,
output_name: String,
}
impl SumAggregate {
pub fn new(field: impl Into<String>, output_name: impl Into<String>) -> Self {
Self {
field: field.into(),
sum: 0.0,
output_name: output_name.into(),
}
}
}
impl StreamingAggregate for SumAggregate {
fn update(&mut self, record: &Value) {
if let Some(n) = record.get(&self.field).and_then(|v| v.as_f64()) {
self.sum += n;
}
}
fn result(&self) -> Value {
serde_json::json!(self.sum)
}
fn reset_agg(&mut self) {
self.sum = 0.0;
}
fn name(&self) -> &str {
&self.output_name
}
}
pub struct MinAggregate {
field: String,
min: f64,
output_name: String,
has_value: bool,
}
impl MinAggregate {
pub fn new(field: impl Into<String>, output_name: impl Into<String>) -> Self {
Self {
field: field.into(),
min: f64::MAX,
output_name: output_name.into(),
has_value: false,
}
}
}
impl StreamingAggregate for MinAggregate {
fn update(&mut self, record: &Value) {
if let Some(n) = record.get(&self.field).and_then(|v| v.as_f64()) {
if !self.has_value || n < self.min {
self.min = n;
self.has_value = true;
}
}
}
fn result(&self) -> Value {
if self.has_value {
serde_json::json!(self.min)
} else {
Value::Null
}
}
fn reset_agg(&mut self) {
self.min = f64::MAX;
self.has_value = false;
}
fn name(&self) -> &str {
&self.output_name
}
}
pub struct MaxAggregate {
field: String,
max: f64,
output_name: String,
has_value: bool,
}
impl MaxAggregate {
pub fn new(field: impl Into<String>, output_name: impl Into<String>) -> Self {
Self {
field: field.into(),
max: f64::MIN,
output_name: output_name.into(),
has_value: false,
}
}
}
impl StreamingAggregate for MaxAggregate {
fn update(&mut self, record: &Value) {
if let Some(n) = record.get(&self.field).and_then(|v| v.as_f64()) {
if !self.has_value || n > self.max {
self.max = n;
self.has_value = true;
}
}
}
fn result(&self) -> Value {
if self.has_value {
serde_json::json!(self.max)
} else {
Value::Null
}
}
fn reset_agg(&mut self) {
self.max = f64::MIN;
self.has_value = false;
}
fn name(&self) -> &str {
&self.output_name
}
}
pub struct StreamingGroupBy {
key_fn: Box<dyn Fn(&Value) -> String + Send>,
aggregates: Vec<Box<dyn StreamingAggregate>>,
buffer_size: usize,
}
impl StreamingGroupBy {
pub fn new<F>(
key_fn: F,
aggregates: Vec<Box<dyn StreamingAggregate>>,
buffer_size: usize,
) -> Self
where
F: Fn(&Value) -> String + Send + 'static,
{
Self {
key_fn: Box::new(key_fn),
aggregates,
buffer_size,
}
}
pub fn run(&mut self, records: Vec<Value>) -> Result<Vec<Value>> {
let key_fn_clone = &self.key_fn;
let mut sorted = records;
sorted.sort_by(|a, b| {
let ka = (key_fn_clone)(a);
let kb = (key_fn_clone)(b);
ka.cmp(&kb)
});
if sorted.is_empty() {
return Ok(Vec::new());
}
let mut output = Vec::new();
let mut current_key = (self.key_fn)(&sorted[0]);
for agg in &mut self.aggregates {
agg.reset_agg();
}
for record in sorted {
let key = (self.key_fn)(&record);
if key != current_key {
output.push(self.emit_group(¤t_key));
current_key = key;
for agg in &mut self.aggregates {
agg.reset_agg();
}
}
for agg in &mut self.aggregates {
agg.update(&record);
}
}
output.push(self.emit_group(¤t_key));
Ok(output)
}
fn emit_group(&self, key: &str) -> Value {
let mut obj = serde_json::Map::new();
obj.insert("_key".to_string(), Value::String(key.to_owned()));
for agg in &self.aggregates {
obj.insert(agg.name().to_string(), agg.result());
}
Value::Object(obj)
}
}
pub struct StreamingPipeline {
ops: Vec<Box<dyn StreamingOp>>,
}
impl StreamingPipeline {
pub fn new() -> Self {
Self { ops: Vec::new() }
}
pub fn add<O: StreamingOp + 'static>(&mut self, op: O) {
self.ops.push(Box::new(op));
}
pub fn run(&mut self, records: Vec<Value>) -> Vec<Value> {
let mut current = records;
for op in &mut self.ops {
current = op.process(current);
}
current
}
}
impl Default for StreamingPipeline {
fn default() -> Self {
Self::new()
}
}
pub fn sort_jsonl_file<F>(
input: &Path,
output: &Path,
key_fn: F,
ascending: bool,
buffer_size: usize,
) -> Result<usize>
where
F: Fn(&Value) -> String + Send + 'static,
{
let f = fs::File::open(input).map_err(IoError::Io)?;
let reader = BufReader::new(f);
let mut records = Vec::new();
for line in reader.lines() {
let line = line.map_err(IoError::Io)?;
let trimmed = line.trim();
if trimmed.is_empty() {
continue;
}
let val: Value = serde_json::from_str(trimmed)
.map_err(|e| IoError::DeserializationError(e.to_string()))?;
records.push(val);
}
let total = records.len();
let mut sorter = StreamingSort::new(buffer_size, key_fn, ascending);
let sorted = sorter.sort_all(records)?;
let out_f = fs::File::create(output).map_err(IoError::Io)?;
let mut w = BufWriter::new(out_f);
for record in &sorted {
let line = serde_json::to_string(record)
.map_err(|e| IoError::SerializationError(e.to_string()))?;
writeln!(w, "{line}").map_err(IoError::Io)?;
}
w.flush().map_err(IoError::Io)?;
Ok(total)
}
#[cfg(test)]
mod tests {
use super::*;
use serde_json::json;
use std::env::temp_dir;
fn records() -> Vec<Value> {
vec![
json!({"id": 1, "dept": "eng", "salary": 100.0, "name": "Alice", "active": true}),
json!({"id": 2, "dept": "hr", "salary": 80.0, "name": "Bob", "active": false}),
json!({"id": 3, "dept": "eng", "salary": 120.0, "name": "Charlie","active": true}),
json!({"id": 4, "dept": "hr", "salary": 90.0, "name": "Dave", "active": true}),
json!({"id": 5, "dept": "eng", "salary": 110.0, "name": "Eve", "active": false}),
]
}
#[test]
fn test_filter() {
let mut f = StreamingFilter::new(|v| v["active"].as_bool().unwrap_or(false));
let out = f.process(records());
assert_eq!(out.len(), 3);
}
#[test]
fn test_project() {
let mut p = StreamingProject::new(vec!["id".into(), "name".into()]);
let out = p.process(records());
assert_eq!(out.len(), 5);
let obj = out[0].as_object().expect("object");
assert!(obj.contains_key("id"));
assert!(obj.contains_key("name"));
assert!(!obj.contains_key("salary"));
assert!(!obj.contains_key("dept"));
}
#[test]
fn test_rename() {
let mut renames = HashMap::new();
renames.insert("salary".into(), "pay".into());
let mut op = StreamingRename::new(renames);
let out = op.process(records());
let obj = out[0].as_object().expect("object");
assert!(obj.contains_key("pay"));
assert!(!obj.contains_key("salary"));
}
#[test]
fn test_deduplicate() {
let input = vec![
json!({"id": 1, "v": "a"}),
json!({"id": 2, "v": "b"}),
json!({"id": 1, "v": "c"}), json!({"id": 3, "v": "d"}),
];
let mut dedup = StreamingDeduplicate::new(|v| {
v["id"].as_i64().map(|i| i.to_string()).unwrap_or_default()
});
let out = dedup.process(input);
assert_eq!(out.len(), 3);
assert_eq!(out[0]["v"], "a"); }
#[test]
fn test_sort_ascending() {
let input = vec![
json!({"name": "Charlie"}),
json!({"name": "Alice"}),
json!({"name": "Bob"}),
];
let mut sorter =
StreamingSort::new(16, |v| v["name"].as_str().unwrap_or("").to_string(), true);
let out = sorter.sort_all(input).expect("sort");
assert_eq!(out[0]["name"], "Alice");
assert_eq!(out[1]["name"], "Bob");
assert_eq!(out[2]["name"], "Charlie");
}
#[test]
fn test_sort_descending() {
let input = vec![
json!({"name": "Alice"}),
json!({"name": "Charlie"}),
json!({"name": "Bob"}),
];
let mut sorter =
StreamingSort::new(16, |v| v["name"].as_str().unwrap_or("").to_string(), false);
let out = sorter.sort_all(input).expect("sort");
assert_eq!(out[0]["name"], "Charlie");
assert_eq!(out[1]["name"], "Bob");
assert_eq!(out[2]["name"], "Alice");
}
#[test]
fn test_sort_external_merge() {
let input: Vec<Value> = (0..20_i64).rev().map(|i| json!({"v": i})).collect();
let mut sorter = StreamingSort::new(
3, |v| {
let n = v["v"].as_i64().unwrap_or(0);
format!("{n:010}") },
true,
);
let out = sorter.sort_all(input).expect("sort");
assert_eq!(out.len(), 20);
assert_eq!(out[0]["v"], 0);
assert_eq!(out[19]["v"], 19);
}
#[test]
fn test_group_by_count_sum() {
let recs = records();
let aggs: Vec<Box<dyn StreamingAggregate>> = vec![
Box::new(CountAggregate::new("count")),
Box::new(SumAggregate::new("salary", "total")),
];
let mut gb =
StreamingGroupBy::new(|v| v["dept"].as_str().unwrap_or("").to_string(), aggs, 64);
let out = gb.run(recs).expect("group by");
assert_eq!(out.len(), 2);
let eng = out.iter().find(|v| v["_key"] == "eng").expect("eng group");
assert_eq!(eng["count"], 3);
assert!((eng["total"].as_f64().unwrap() - 330.0).abs() < 1e-9);
}
#[test]
fn test_group_by_min_max() {
let recs = records();
let aggs: Vec<Box<dyn StreamingAggregate>> = vec![
Box::new(MinAggregate::new("salary", "min_sal")),
Box::new(MaxAggregate::new("salary", "max_sal")),
];
let mut gb =
StreamingGroupBy::new(|v| v["dept"].as_str().unwrap_or("").to_string(), aggs, 64);
let out = gb.run(recs).expect("group by");
let eng = out.iter().find(|v| v["_key"] == "eng").expect("eng group");
assert!((eng["min_sal"].as_f64().unwrap() - 100.0).abs() < 1e-9);
assert!((eng["max_sal"].as_f64().unwrap() - 120.0).abs() < 1e-9);
}
#[test]
fn test_pipeline() {
let mut pipeline = StreamingPipeline::new();
pipeline.add(StreamingFilter::new(|v| {
v["active"].as_bool().unwrap_or(false)
}));
pipeline.add(StreamingProject::new(vec!["id".into(), "name".into()]));
let out = pipeline.run(records());
assert_eq!(out.len(), 3);
let obj = out[0].as_object().expect("object");
assert!(obj.contains_key("name"));
assert!(!obj.contains_key("salary"));
}
#[test]
fn test_sort_jsonl_file() {
let temp = temp_dir();
let input_path = temp.join("sort_test_input.jsonl");
let output_path = temp.join("sort_test_output.jsonl");
let lines = [r#"{"v": 3}"#, r#"{"v": 1}"#, r#"{"v": 2}"#];
fs::write(&input_path, lines.join("\n")).expect("write");
let count = sort_jsonl_file(
&input_path,
&output_path,
|v| {
let n = v["v"].as_i64().unwrap_or(0);
format!("{n:010}")
},
true,
100,
)
.expect("sort file");
assert_eq!(count, 3);
let output = fs::read_to_string(&output_path).expect("read");
let vals: Vec<i64> = output
.lines()
.map(|l| {
serde_json::from_str::<Value>(l).expect("json")["v"]
.as_i64()
.unwrap()
})
.collect();
assert_eq!(vals, vec![1, 2, 3]);
}
#[test]
fn test_deduplicate_reset() {
let mut dedup = StreamingDeduplicate::new(|v| {
v["id"].as_i64().map(|i| i.to_string()).unwrap_or_default()
});
let first = dedup.process(vec![json!({"id": 1}), json!({"id": 1})]);
assert_eq!(first.len(), 1);
dedup.reset(); let second = dedup.process(vec![json!({"id": 1}), json!({"id": 1})]);
assert_eq!(second.len(), 1);
}
}