use crate::cli::CliContext;
use anyhow::{anyhow, Result};
use colored::Colorize;
use indicatif::{ProgressBar, ProgressStyle};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::time::{Duration, Instant};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct BindingRow {
pub bindings: HashMap<String, String>,
}
impl BindingRow {
pub fn new(bindings: HashMap<String, String>) -> Self {
Self { bindings }
}
pub fn get(&self, var: &str) -> Option<&str> {
self.bindings.get(var).map(|s| s.as_str())
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ResultChunk {
pub chunk_index: usize,
pub variables: Vec<String>,
pub rows: Vec<BindingRow>,
pub is_final: bool,
pub total_received: usize,
}
impl ResultChunk {
pub fn new(
chunk_index: usize,
variables: Vec<String>,
rows: Vec<BindingRow>,
is_final: bool,
total_received: usize,
) -> Self {
Self {
chunk_index,
variables,
rows,
is_final,
total_received,
}
}
pub fn len(&self) -> usize {
self.rows.len()
}
pub fn is_empty(&self) -> bool {
self.rows.is_empty()
}
}
#[derive(Debug, Clone)]
pub struct StreamConfig {
pub chunk_size: usize,
pub format: StreamFormat,
pub show_progress: bool,
pub max_rows: Option<usize>,
pub chunk_delay_ms: u64,
pub chunk_timeout_secs: u64,
}
impl Default for StreamConfig {
fn default() -> Self {
Self {
chunk_size: 1000,
format: StreamFormat::Table,
show_progress: true,
max_rows: None,
chunk_delay_ms: 0,
chunk_timeout_secs: 30,
}
}
}
impl StreamConfig {
pub fn from_args(
chunk_size: usize,
format: &str,
max_rows: Option<usize>,
no_progress: bool,
) -> Result<Self> {
let stream_format = StreamFormat::parse(format)?;
Ok(Self {
chunk_size,
format: stream_format,
show_progress: !no_progress,
max_rows,
chunk_delay_ms: 0,
chunk_timeout_secs: 30,
})
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum StreamFormat {
Json,
Csv,
Tsv,
Table,
}
impl StreamFormat {
pub fn parse(s: &str) -> Result<Self> {
match s.to_lowercase().as_str() {
"json" | "ndjson" => Ok(Self::Json),
"csv" => Ok(Self::Csv),
"tsv" => Ok(Self::Tsv),
"table" | "text" => Ok(Self::Table),
other => Err(anyhow!(
"Unknown stream format '{}'. Valid: json, csv, tsv, table",
other
)),
}
}
pub fn extension(&self) -> &'static str {
match self {
Self::Json => "ndjson",
Self::Csv => "csv",
Self::Tsv => "tsv",
Self::Table => "txt",
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct StreamStats {
pub total_rows: usize,
pub total_chunks: usize,
pub total_bytes: usize,
pub elapsed_ms: u64,
pub rows_per_second: f64,
pub peak_chunk_ms: u64,
pub avg_chunk_ms: f64,
}
impl StreamStats {
pub fn finalize(&mut self, elapsed: Duration) {
self.elapsed_ms = elapsed.as_millis() as u64;
if self.elapsed_ms > 0 {
self.rows_per_second = self.total_rows as f64 / (self.elapsed_ms as f64 / 1000.0);
}
if self.total_chunks > 0 {
self.avg_chunk_ms = self.elapsed_ms as f64 / self.total_chunks as f64;
}
}
}
pub struct StreamingQueryExecutor {
config: StreamConfig,
}
impl StreamingQueryExecutor {
pub fn new(config: StreamConfig) -> Self {
Self { config }
}
pub fn stream_results(
&self,
variables: Vec<String>,
all_rows: Vec<BindingRow>,
writer: &mut dyn std::io::Write,
) -> Result<StreamStats> {
let start = Instant::now();
let mut stats = StreamStats::default();
let total = match self.config.max_rows {
Some(max) => all_rows.len().min(max),
None => all_rows.len(),
};
let pb = if self.config.show_progress {
let pb = ProgressBar::new(total as u64);
pb.set_style(
ProgressStyle::default_bar()
.template(
"{spinner:.green} [{bar:50.cyan/blue}] {pos}/{len} rows ({per_sec:.1} rows/s) [{elapsed_precise}]"
)
.unwrap_or_else(|_| ProgressStyle::default_bar())
.progress_chars("=>-"),
);
pb.enable_steady_tick(Duration::from_millis(100));
Some(pb)
} else {
None
};
match self.config.format {
StreamFormat::Csv | StreamFormat::Tsv => {
let sep = if self.config.format == StreamFormat::Csv {
","
} else {
"\t"
};
let header = variables.join(sep);
let bytes = header.len() + 1;
writeln!(writer, "{}", header)?;
stats.total_bytes += bytes;
}
StreamFormat::Table => {
let header = self.format_table_header(&variables);
let bytes = header.len();
write!(writer, "{}", header)?;
stats.total_bytes += bytes;
}
StreamFormat::Json => {
}
}
let chunk_size = self.config.chunk_size;
let mut total_received = 0;
for (chunk_index, chunk_rows) in all_rows[..total].chunks(chunk_size).enumerate() {
let chunk_start = Instant::now();
let is_final = total_received + chunk_rows.len() >= total;
let chunk = ResultChunk::new(
chunk_index,
variables.clone(),
chunk_rows.to_vec(),
is_final,
total_received + chunk_rows.len(),
);
let bytes = self.write_chunk(&chunk, writer)?;
let chunk_elapsed = chunk_start.elapsed().as_millis() as u64;
stats.total_rows += chunk.len();
stats.total_chunks += 1;
stats.total_bytes += bytes;
if chunk_elapsed > stats.peak_chunk_ms {
stats.peak_chunk_ms = chunk_elapsed;
}
total_received += chunk.len();
if let Some(ref pb) = pb {
pb.set_position(total_received as u64);
}
if self.config.chunk_delay_ms > 0 {
std::thread::sleep(Duration::from_millis(self.config.chunk_delay_ms));
}
}
if let Some(pb) = pb {
pb.finish_with_message(format!("Streamed {} rows", stats.total_rows));
}
if self.config.format == StreamFormat::Table {
let footer = self.format_table_footer(&variables, stats.total_rows);
let bytes = footer.len();
write!(writer, "{}", footer)?;
stats.total_bytes += bytes;
}
stats.finalize(start.elapsed());
Ok(stats)
}
fn write_chunk(&self, chunk: &ResultChunk, writer: &mut dyn std::io::Write) -> Result<usize> {
let mut total_bytes = 0usize;
match self.config.format {
StreamFormat::Json => {
for row in &chunk.rows {
let json = serde_json::to_string(&row.bindings)?;
let bytes = json.len() + 1;
writeln!(writer, "{}", json)?;
total_bytes += bytes;
}
}
StreamFormat::Csv => {
for row in &chunk.rows {
let line = chunk
.variables
.iter()
.map(|v| {
let val = row.get(v).unwrap_or("");
if val.contains(',') || val.contains('"') || val.contains('\n') {
format!("\"{}\"", val.replace('"', "\"\""))
} else {
val.to_string()
}
})
.collect::<Vec<_>>()
.join(",");
let bytes = line.len() + 1;
writeln!(writer, "{}", line)?;
total_bytes += bytes;
}
}
StreamFormat::Tsv => {
for row in &chunk.rows {
let line = chunk
.variables
.iter()
.map(|v| row.get(v).unwrap_or("").replace('\t', " "))
.collect::<Vec<_>>()
.join("\t");
let bytes = line.len() + 1;
writeln!(writer, "{}", line)?;
total_bytes += bytes;
}
}
StreamFormat::Table => {
let col_widths: Vec<usize> = chunk
.variables
.iter()
.map(|v| {
let max_val = chunk
.rows
.iter()
.map(|r| r.get(v).unwrap_or("").len())
.max()
.unwrap_or(0);
max_val.max(v.len()).min(80)
})
.collect();
for row in &chunk.rows {
let line = chunk
.variables
.iter()
.zip(col_widths.iter())
.map(|(v, &w)| {
let val = row.get(v).unwrap_or("");
let truncated = if val.len() > w {
format!("{}...", &val[..w.saturating_sub(3)])
} else {
val.to_string()
};
format!("{:<width$}", truncated, width = w)
})
.collect::<Vec<_>>()
.join(" | ");
let bytes = line.len() + 1;
writeln!(writer, "| {} |", line)?;
total_bytes += bytes;
}
}
}
Ok(total_bytes)
}
fn format_table_header(&self, variables: &[String]) -> String {
let col_widths: Vec<usize> = variables.iter().map(|v| v.len().clamp(10, 80)).collect();
let header_row = variables
.iter()
.zip(col_widths.iter())
.map(|(v, &w)| format!("{:<width$}", v, width = w))
.collect::<Vec<_>>()
.join(" | ");
let separator = col_widths
.iter()
.map(|&w| "-".repeat(w))
.collect::<Vec<_>>()
.join("-+-");
format!("| {} |\n+-{}-+\n", header_row, separator)
}
fn format_table_footer(&self, variables: &[String], total: usize) -> String {
let col_widths: Vec<usize> = variables.iter().map(|v| v.len().clamp(10, 80)).collect();
let separator = col_widths
.iter()
.map(|&w| "-".repeat(w))
.collect::<Vec<_>>()
.join("-+-");
format!("+-{}-+\n{} rows total\n", separator, total)
}
}
#[allow(clippy::too_many_arguments)]
pub async fn run_stream_command(
dataset: String,
query: String,
is_file: bool,
chunk_size: usize,
format: String,
max_rows: Option<usize>,
no_progress: bool,
output: Option<std::path::PathBuf>,
) -> Result<()> {
let ctx = CliContext::new();
ctx.info(&format!("Streaming SPARQL query on dataset '{}'", dataset));
let sparql_query = if is_file {
std::fs::read_to_string(&query)
.map_err(|e| anyhow!("Failed to read query file '{}': {}", query, e))?
} else {
query
};
let stream_config = StreamConfig::from_args(chunk_size, &format, max_rows, no_progress)?;
let dataset_path = std::path::PathBuf::from(&dataset);
let data_file = dataset_path.join("data.nq");
let (variables, rows) = if data_file.exists() {
load_and_execute_query(&data_file, &sparql_query)?
} else {
let vars = extract_select_variables(&sparql_query);
(vars, Vec::new())
};
ctx.info(&format!(
"Executing query, streaming in chunks of {} rows...",
stream_config.chunk_size
));
let executor = StreamingQueryExecutor::new(stream_config);
let stats = if let Some(output_path) = output {
let mut file = std::fs::File::create(&output_path).map_err(|e| {
anyhow!(
"Failed to create output file '{}': {}",
output_path.display(),
e
)
})?;
executor.stream_results(variables, rows, &mut file)?
} else {
let stdout = std::io::stdout();
let mut out = stdout.lock();
executor.stream_results(variables, rows, &mut out)?
};
println!();
println!("{}", "Streaming complete:".green().bold());
println!(" Total rows: {}", stats.total_rows.to_string().cyan());
println!(" Chunks: {}", stats.total_chunks.to_string().cyan());
println!(
" Bytes written: {}",
format_bytes(stats.total_bytes).cyan()
);
println!(" Elapsed: {:.2}s", stats.elapsed_ms as f64 / 1000.0);
println!(" Throughput: {:.1} rows/s", stats.rows_per_second);
Ok(())
}
fn load_and_execute_query(
data_file: &std::path::Path,
_query: &str,
) -> Result<(Vec<String>, Vec<BindingRow>)> {
use std::io::{BufRead, BufReader};
let file =
std::fs::File::open(data_file).map_err(|e| anyhow!("Failed to open data file: {}", e))?;
let reader = BufReader::new(file);
let variables = vec!["s".to_string(), "p".to_string(), "o".to_string()];
let mut rows = Vec::new();
for line in reader.lines() {
let line = line.map_err(|e| anyhow!("IO error reading data: {}", e))?;
let trimmed = line.trim();
if trimmed.is_empty() || trimmed.starts_with('#') {
continue;
}
let parts: Vec<&str> = trimmed.splitn(5, ' ').collect();
if parts.len() >= 3 {
let mut bindings = HashMap::new();
bindings.insert("s".to_string(), parts[0].to_string());
bindings.insert("p".to_string(), parts[1].to_string());
bindings.insert("o".to_string(), parts[2].to_string());
rows.push(BindingRow::new(bindings));
}
}
Ok((variables, rows))
}
fn extract_select_variables(query: &str) -> Vec<String> {
let upper = query.to_uppercase();
if let Some(select_pos) = upper.find("SELECT") {
if let Some(where_pos) = upper.find("WHERE") {
let projection = &query[select_pos + 6..where_pos].trim().to_string();
if projection.contains('*') {
return vec!["s".to_string(), "p".to_string(), "o".to_string()];
}
return projection
.split_whitespace()
.filter(|s| s.starts_with('?'))
.map(|s| s[1..].to_string())
.collect();
}
}
vec!["s".to_string(), "p".to_string(), "o".to_string()]
}
fn format_bytes(bytes: usize) -> String {
const KB: usize = 1024;
const MB: usize = KB * 1024;
const GB: usize = MB * 1024;
if bytes >= GB {
format!("{:.2} GB", bytes as f64 / GB as f64)
} else if bytes >= MB {
format!("{:.2} MB", bytes as f64 / MB as f64)
} else if bytes >= KB {
format!("{:.2} KB", bytes as f64 / KB as f64)
} else {
format!("{} B", bytes)
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::io::Cursor;
fn make_rows(n: usize) -> (Vec<String>, Vec<BindingRow>) {
let vars = vec![
"subject".to_string(),
"predicate".to_string(),
"object".to_string(),
];
let rows = (0..n)
.map(|i| {
let mut m = HashMap::new();
m.insert("subject".to_string(), format!("<http://ex.org/s{}>", i));
m.insert("predicate".to_string(), "<http://ex.org/p>".to_string());
m.insert("object".to_string(), format!("\"value{}\"", i));
BindingRow::new(m)
})
.collect();
(vars, rows)
}
#[test]
fn test_stream_format_parse() {
assert_eq!(StreamFormat::parse("json").unwrap(), StreamFormat::Json);
assert_eq!(StreamFormat::parse("csv").unwrap(), StreamFormat::Csv);
assert_eq!(StreamFormat::parse("tsv").unwrap(), StreamFormat::Tsv);
assert_eq!(StreamFormat::parse("table").unwrap(), StreamFormat::Table);
assert_eq!(StreamFormat::parse("ndjson").unwrap(), StreamFormat::Json);
assert!(StreamFormat::parse("xml").is_err());
}
#[test]
fn test_stream_format_extension() {
assert_eq!(StreamFormat::Json.extension(), "ndjson");
assert_eq!(StreamFormat::Csv.extension(), "csv");
assert_eq!(StreamFormat::Tsv.extension(), "tsv");
assert_eq!(StreamFormat::Table.extension(), "txt");
}
#[test]
fn test_binding_row_get() {
let mut map = HashMap::new();
map.insert("x".to_string(), "hello".to_string());
let row = BindingRow::new(map);
assert_eq!(row.get("x"), Some("hello"));
assert_eq!(row.get("y"), None);
}
#[test]
fn test_result_chunk_len() {
let (vars, rows) = make_rows(5);
let chunk = ResultChunk::new(0, vars, rows, true, 5);
assert_eq!(chunk.len(), 5);
assert!(!chunk.is_empty());
}
#[test]
fn test_result_chunk_empty() {
let vars = vec!["s".to_string()];
let chunk = ResultChunk::new(0, vars, vec![], true, 0);
assert!(chunk.is_empty());
}
#[test]
fn test_stream_config_default() {
let config = StreamConfig::default();
assert_eq!(config.chunk_size, 1000);
assert_eq!(config.format, StreamFormat::Table);
assert!(config.show_progress);
assert!(config.max_rows.is_none());
}
#[test]
fn test_stream_config_from_args() {
let config = StreamConfig::from_args(500, "json", Some(10000), true).unwrap();
assert_eq!(config.chunk_size, 500);
assert_eq!(config.format, StreamFormat::Json);
assert_eq!(config.max_rows, Some(10000));
assert!(!config.show_progress);
}
#[test]
fn test_stream_json_output() {
let (vars, rows) = make_rows(3);
let config = StreamConfig {
chunk_size: 10,
format: StreamFormat::Json,
show_progress: false,
max_rows: None,
chunk_delay_ms: 0,
chunk_timeout_secs: 30,
};
let executor = StreamingQueryExecutor::new(config);
let mut buf = Cursor::new(Vec::new());
let stats = executor.stream_results(vars, rows, &mut buf).unwrap();
assert_eq!(stats.total_rows, 3);
assert_eq!(stats.total_chunks, 1);
let output = String::from_utf8(buf.into_inner()).unwrap();
let lines: Vec<&str> = output.lines().collect();
assert_eq!(lines.len(), 3);
for line in lines {
let parsed: serde_json::Value = serde_json::from_str(line).unwrap();
assert!(parsed.is_object());
}
}
#[test]
fn test_stream_csv_output() {
let (vars, rows) = make_rows(5);
let config = StreamConfig {
chunk_size: 2,
format: StreamFormat::Csv,
show_progress: false,
max_rows: None,
chunk_delay_ms: 0,
chunk_timeout_secs: 30,
};
let executor = StreamingQueryExecutor::new(config);
let mut buf = Cursor::new(Vec::new());
let stats = executor.stream_results(vars, rows, &mut buf).unwrap();
assert_eq!(stats.total_rows, 5);
assert_eq!(stats.total_chunks, 3);
let output = String::from_utf8(buf.into_inner()).unwrap();
let lines: Vec<&str> = output.lines().collect();
assert_eq!(lines.len(), 6);
assert_eq!(lines[0], "subject,predicate,object");
}
#[test]
fn test_stream_tsv_output() {
let (vars, rows) = make_rows(2);
let config = StreamConfig {
chunk_size: 100,
format: StreamFormat::Tsv,
show_progress: false,
max_rows: None,
chunk_delay_ms: 0,
chunk_timeout_secs: 30,
};
let executor = StreamingQueryExecutor::new(config);
let mut buf = Cursor::new(Vec::new());
let stats = executor.stream_results(vars, rows, &mut buf).unwrap();
assert_eq!(stats.total_rows, 2);
let output = String::from_utf8(buf.into_inner()).unwrap();
let lines: Vec<&str> = output.lines().collect();
assert_eq!(lines.len(), 3); assert!(lines[0].contains('\t'));
}
#[test]
fn test_stream_table_output() {
let (vars, rows) = make_rows(4);
let config = StreamConfig {
chunk_size: 100,
format: StreamFormat::Table,
show_progress: false,
max_rows: None,
chunk_delay_ms: 0,
chunk_timeout_secs: 30,
};
let executor = StreamingQueryExecutor::new(config);
let mut buf = Cursor::new(Vec::new());
let stats = executor.stream_results(vars, rows, &mut buf).unwrap();
assert_eq!(stats.total_rows, 4);
let output = String::from_utf8(buf.into_inner()).unwrap();
assert!(output.contains("subject"));
assert!(output.contains("predicate"));
assert!(output.contains("4 rows total"));
}
#[test]
fn test_stream_max_rows_limit() {
let (vars, rows) = make_rows(100);
let config = StreamConfig {
chunk_size: 10,
format: StreamFormat::Json,
show_progress: false,
max_rows: Some(25),
chunk_delay_ms: 0,
chunk_timeout_secs: 30,
};
let executor = StreamingQueryExecutor::new(config);
let mut buf = Cursor::new(Vec::new());
let stats = executor.stream_results(vars, rows, &mut buf).unwrap();
assert_eq!(stats.total_rows, 25);
}
#[test]
fn test_stream_empty_results() {
let vars = vec!["s".to_string(), "p".to_string(), "o".to_string()];
let rows: Vec<BindingRow> = vec![];
let config = StreamConfig {
chunk_size: 10,
format: StreamFormat::Csv,
show_progress: false,
max_rows: None,
chunk_delay_ms: 0,
chunk_timeout_secs: 30,
};
let executor = StreamingQueryExecutor::new(config);
let mut buf = Cursor::new(Vec::new());
let stats = executor.stream_results(vars, rows, &mut buf).unwrap();
assert_eq!(stats.total_rows, 0);
assert_eq!(stats.total_chunks, 0);
}
#[test]
fn test_stream_stats_finalize() {
let mut stats = StreamStats {
total_rows: 1000,
total_chunks: 10,
..Default::default()
};
stats.finalize(Duration::from_secs(2));
assert_eq!(stats.elapsed_ms, 2000);
assert!((stats.rows_per_second - 500.0).abs() < 1.0);
assert!((stats.avg_chunk_ms - 200.0).abs() < 1.0);
}
#[test]
fn test_format_bytes() {
assert_eq!(format_bytes(512), "512 B");
assert_eq!(format_bytes(1024), "1.00 KB");
assert_eq!(format_bytes(1_048_576), "1.00 MB");
assert_eq!(format_bytes(1_073_741_824), "1.00 GB");
}
#[test]
fn test_extract_select_variables() {
let q = "SELECT ?name ?age WHERE { ?s ?p ?o }";
let vars = extract_select_variables(q);
assert!(vars.contains(&"name".to_string()));
assert!(vars.contains(&"age".to_string()));
}
#[test]
fn test_extract_select_variables_star() {
let q = "SELECT * WHERE { ?s ?p ?o }";
let vars = extract_select_variables(q);
assert_eq!(vars, vec!["s", "p", "o"]);
}
#[test]
fn test_csv_escape_special_chars() {
let vars = vec!["value".to_string()];
let mut map = HashMap::new();
map.insert("value".to_string(), "hello, world".to_string());
let rows = vec![BindingRow::new(map)];
let config = StreamConfig {
chunk_size: 10,
format: StreamFormat::Csv,
show_progress: false,
max_rows: None,
chunk_delay_ms: 0,
chunk_timeout_secs: 30,
};
let executor = StreamingQueryExecutor::new(config);
let mut buf = Cursor::new(Vec::new());
executor.stream_results(vars, rows, &mut buf).unwrap();
let output = String::from_utf8(buf.into_inner()).unwrap();
assert!(output.contains("\"hello, world\""));
}
#[test]
fn test_chunking_large_result_set() {
let (vars, rows) = make_rows(1000);
let config = StreamConfig {
chunk_size: 100,
format: StreamFormat::Json,
show_progress: false,
max_rows: None,
chunk_delay_ms: 0,
chunk_timeout_secs: 30,
};
let executor = StreamingQueryExecutor::new(config);
let mut buf = Cursor::new(Vec::new());
let stats = executor.stream_results(vars, rows, &mut buf).unwrap();
assert_eq!(stats.total_rows, 1000);
assert_eq!(stats.total_chunks, 10);
}
#[test]
fn test_stream_config_invalid_format() {
assert!(StreamConfig::from_args(100, "xml", None, false).is_err());
assert!(StreamConfig::from_args(100, "rdf", None, false).is_err());
}
#[test]
fn test_stream_stats_zero_elapsed() {
let mut stats = StreamStats {
total_rows: 0,
total_chunks: 0,
..Default::default()
};
stats.finalize(Duration::from_millis(0));
assert_eq!(stats.rows_per_second, 0.0);
assert_eq!(stats.avg_chunk_ms, 0.0);
}
#[test]
fn test_binding_row_new() {
let mut m = HashMap::new();
m.insert("foo".to_string(), "bar".to_string());
let row = BindingRow::new(m.clone());
assert_eq!(row.bindings, m);
}
}