use anyhow::Result;
use rayon::prelude::*;
use std::sync::Arc;
use crate::parser_v4::{ParseNode, Parser};
use adze_glr_core::ParseTable;
use adze_ir::Grammar;
#[derive(Debug, Clone)]
pub struct ParallelConfig {
pub min_file_size: usize,
pub chunk_size: usize,
pub num_threads: usize,
}
impl Default for ParallelConfig {
fn default() -> Self {
Self {
min_file_size: 100_000, chunk_size: 50_000, num_threads: 0, }
}
}
pub struct ParallelParser {
grammar: Arc<Grammar>,
parse_table: Arc<ParseTable>,
config: ParallelConfig,
}
#[derive(Debug)]
struct ParseChunk {
id: usize,
#[allow(dead_code)]
start: usize,
#[allow(dead_code)]
end: usize,
content: String,
}
#[derive(Debug)]
struct ChunkResult {
chunk_id: usize,
tree: Option<ParseNode>,
#[allow(dead_code)]
parse_time_ms: f64,
}
impl ParallelParser {
pub fn new(grammar: Grammar, parse_table: ParseTable, config: ParallelConfig) -> Self {
if config.num_threads > 0 {
rayon::ThreadPoolBuilder::new()
.num_threads(config.num_threads)
.build_global()
.ok();
}
Self {
grammar: Arc::new(grammar),
parse_table: Arc::new(parse_table),
config,
}
}
pub fn parse(&self, input: &str) -> Result<ParseNode> {
if input.len() < self.config.min_file_size {
let mut parser = Parser::new((*self.grammar).clone(), (*self.parse_table).clone());
return parser.parse(input);
}
let chunks = self.split_into_chunks(input);
let chunk_results: Vec<ChunkResult> = chunks
.into_par_iter()
.map(|chunk| self.parse_chunk(chunk))
.collect();
self.merge_chunk_results(chunk_results, input)
}
fn split_into_chunks(&self, input: &str) -> Vec<ParseChunk> {
let mut chunks = Vec::new();
let chunk_size = self.config.chunk_size;
let mut start = 0;
let mut id = 0;
while start < input.len() {
let mut end = (start + chunk_size).min(input.len());
if end < input.len() {
let search_start = end.saturating_sub(1000);
let search_bytes = input[search_start..end].as_bytes();
if let Some(pos) = search_bytes.iter().rposition(|&b| b == b'\n') {
end = search_start + pos + 1;
}
}
chunks.push(ParseChunk {
id,
start,
end,
content: input[start..end].to_string(),
});
start = end;
id += 1;
}
chunks
}
fn parse_chunk(&self, chunk: ParseChunk) -> ChunkResult {
use std::time::Instant;
let start_time = Instant::now();
let mut parser = Parser::new((*self.grammar).clone(), (*self.parse_table).clone());
let tree = parser.parse(&chunk.content).ok();
let parse_time_ms = start_time.elapsed().as_secs_f64() * 1000.0;
ChunkResult {
chunk_id: chunk.id,
tree,
parse_time_ms,
}
}
fn merge_chunk_results(&self, mut results: Vec<ChunkResult>, input: &str) -> Result<ParseNode> {
results.sort_by_key(|r| r.chunk_id);
for result in results {
if let Some(tree) = result.tree {
return Ok(tree);
}
}
let mut parser = Parser::new((*self.grammar).clone(), (*self.parse_table).clone());
parser.parse(input)
}
}
#[derive(Debug, Default)]
pub struct ParallelStats {
pub total_chunks: usize,
pub successful_chunks: usize,
pub total_parse_time_ms: f64,
pub speedup: f64,
}
impl ParallelParser {
pub fn parse_with_stats(&self, input: &str) -> Result<(ParseNode, ParallelStats)> {
use std::time::Instant;
let baseline_start = Instant::now();
let mut baseline_parser = Parser::new((*self.grammar).clone(), (*self.parse_table).clone());
let _ = baseline_parser.parse(input);
let baseline_time = baseline_start.elapsed().as_secs_f64() * 1000.0;
let parallel_start = Instant::now();
let tree = self.parse(input)?;
let parallel_time = parallel_start.elapsed().as_secs_f64() * 1000.0;
let stats = ParallelStats {
total_chunks: (input.len() + self.config.chunk_size - 1) / self.config.chunk_size,
successful_chunks: 1, total_parse_time_ms: parallel_time,
speedup: baseline_time / parallel_time,
};
Ok((tree, stats))
}
}
#[cfg(test)]
mod tests {
use super::*;
fn create_test_grammar() -> (Grammar, ParseTable) {
let grammar = Grammar::new("test".to_string());
let table = ParseTable {
action_table: vec![],
goto_table: vec![],
symbol_metadata: vec![],
state_count: 1,
symbol_count: 1,
symbol_to_index: std::collections::BTreeMap::new(),
external_scanner_states: vec![],
};
(grammar, table)
}
#[test]
fn test_chunk_splitting() {
let (grammar, table) = create_test_grammar();
let config = ParallelConfig {
min_file_size: 10,
chunk_size: 20,
..Default::default()
};
let parser = ParallelParser::new(grammar, table, config);
let input = "line1\nline2\nline3\nline4\nline5\n";
let chunks = parser.split_into_chunks(input);
assert!(chunks.len() >= 2);
for chunk in &chunks {
assert!(chunk.end > chunk.start);
assert_eq!(&input[chunk.start..chunk.end], &chunk.content);
}
}
#[test]
fn test_small_file_handling() {
let (grammar, table) = create_test_grammar();
let config = ParallelConfig {
min_file_size: 1000,
..Default::default()
};
let parser = ParallelParser::new(grammar, table, config);
let input = "small input";
let _ = parser.parse(input);
}
}
#[cfg(all(test, not(debug_assertions)))]
pub mod bench {
use super::*;
use std::time::Instant;
pub struct ParallelBenchmark {
pub file_size: usize,
pub single_thread_ms: f64,
pub parallel_ms: f64,
pub speedup: f64,
pub num_chunks: usize,
}
pub fn benchmark_parallel_parsing(
grammar: Grammar,
table: ParseTable,
input: &str,
) -> ParallelBenchmark {
let start = Instant::now();
let mut parser = Parser::new(grammar.clone(), table.clone());
let _ = parser.parse(input);
let single_thread_ms = start.elapsed().as_secs_f64() * 1000.0;
let config = ParallelConfig::default();
let parallel_parser = ParallelParser::new(grammar, table, config.clone());
let start = Instant::now();
let _ = parallel_parser.parse(input);
let parallel_ms = start.elapsed().as_secs_f64() * 1000.0;
ParallelBenchmark {
file_size: input.len(),
single_thread_ms,
parallel_ms,
speedup: single_thread_ms / parallel_ms,
num_chunks: (input.len() + config.chunk_size - 1) / config.chunk_size,
}
}
}