use anyhow::Result;
use rayon::prelude::*;
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use crate::incremental_v3::{Subtree, SubtreePool, Tree};
use crate::parser_v3::{ParseNode, Parser};
use adze_glr_core::ParseTable;
use adze_ir::{Grammar, SymbolId};
#[derive(Debug, Clone)]
pub struct ParallelConfig {
pub min_file_size: usize,
pub chunk_size: usize,
pub num_threads: usize,
pub enable_caching: bool,
}
impl Default for ParallelConfig {
fn default() -> Self {
Self {
min_file_size: 100_000, chunk_size: 50_000, num_threads: 0, enable_caching: true,
}
}
}
pub struct ParallelParser {
grammar: Arc<Grammar>,
parse_table: Arc<ParseTable>,
config: ParallelConfig,
subtree_cache: Arc<Mutex<SubtreeCache>>,
}
struct SubtreeCache {
cache: HashMap<u64, Arc<Subtree>>,
pool: SubtreePool,
}
impl SubtreeCache {
fn new() -> Self {
Self {
cache: HashMap::new(),
pool: SubtreePool::new(),
}
}
fn get(&self, hash: u64) -> Option<Arc<Subtree>> {
self.cache.get(&hash).cloned()
}
fn insert(&mut self, hash: u64, subtree: Arc<Subtree>) {
self.cache.insert(hash, subtree);
}
}
#[derive(Debug)]
struct ParseChunk {
start: usize,
end: usize,
content: Vec<u8>,
boundary: ChunkBoundary,
}
#[derive(Debug, Clone)]
enum ChunkBoundary {
Clean,
Dirty {
lookahead: Vec<u8>,
lookbehind: Vec<u8>,
},
}
#[derive(Debug)]
struct ChunkResult {
chunk_id: usize,
subtrees: Vec<Subtree>,
incomplete_tokens: Vec<IncompleteToken>,
parse_time_ms: f64,
}
#[derive(Debug)]
struct IncompleteToken {
start: usize,
partial_content: Vec<u8>,
expected_symbols: Vec<SymbolId>,
}
impl ParallelParser {
pub fn new(grammar: Grammar, parse_table: ParseTable, config: ParallelConfig) -> Self {
if config.num_threads > 0 {
rayon::ThreadPoolBuilder::new()
.num_threads(config.num_threads)
.build_global()
.ok();
}
Self {
grammar: Arc::new(grammar),
parse_table: Arc::new(parse_table),
config,
subtree_cache: Arc::new(Mutex::new(SubtreeCache::new())),
}
}
pub fn parse(&self, input: &str) -> Result<ParseNode> {
let bytes = input.as_bytes();
if bytes.len() < self.config.min_file_size {
let mut parser = Parser::new((*self.grammar).clone(), (*self.parse_table).clone());
return parser.parse(input);
}
let chunks = self.split_into_chunks(bytes);
let chunk_results: Vec<ChunkResult> = chunks
.into_par_iter()
.enumerate()
.map(|(id, chunk)| self.parse_chunk(id, chunk))
.collect();
self.merge_chunk_results(chunk_results, bytes)
}
fn split_into_chunks(&self, input: &[u8]) -> Vec<ParseChunk> {
let mut chunks = Vec::new();
let chunk_size = self.config.chunk_size;
let mut start = 0;
while start < input.len() {
let mut end = (start + chunk_size).min(input.len());
let boundary = if end < input.len() {
self.find_chunk_boundary(input, start, &mut end)
} else {
ChunkBoundary::Clean
};
chunks.push(ParseChunk {
start,
end,
content: input[start..end].to_vec(),
boundary,
});
start = end;
}
chunks
}
fn find_chunk_boundary(&self, input: &[u8], start: usize, end: &mut usize) -> ChunkBoundary {
let search_start = end.saturating_sub(1000);
for i in (search_start..*end).rev() {
match input[i] {
b'\n' => {
if self.is_statement_boundary(input, i) {
*end = i + 1;
return ChunkBoundary::Clean;
}
}
b';' | b'}' => {
*end = i + 1;
return ChunkBoundary::Clean;
}
_ => {}
}
}
let lookahead_start = *end;
let lookahead_end = (*end + 100).min(input.len());
let lookbehind_start = end.saturating_sub(100);
ChunkBoundary::Dirty {
lookahead: input[lookahead_start..lookahead_end].to_vec(),
lookbehind: input[lookbehind_start..*end].to_vec(),
}
}
fn is_statement_boundary(&self, input: &[u8], pos: usize) -> bool {
if pos + 1 >= input.len() {
return true;
}
let mut i = pos + 1;
while i < input.len() && (input[i] == b' ' || input[i] == b'\t') {
i += 1;
}
if i < input.len() {
match input[i] {
b'a'..=b'z' | b'A'..=b'Z' | b'_' => true,
_ => false,
}
} else {
true
}
}
fn parse_chunk(&self, chunk_id: usize, chunk: ParseChunk) -> ChunkResult {
use std::time::Instant;
let start_time = Instant::now();
let mut parser = Parser::new((*self.grammar).clone(), (*self.parse_table).clone());
let parse_input = match &chunk.boundary {
ChunkBoundary::Clean => chunk.content.clone(),
ChunkBoundary::Dirty {
lookbehind,
lookahead,
} => {
let mut combined =
Vec::with_capacity(lookbehind.len() + chunk.content.len() + lookahead.len());
combined.extend_from_slice(lookbehind);
combined.extend_from_slice(&chunk.content);
combined.extend_from_slice(lookahead);
combined
}
};
let input_str = String::from_utf8_lossy(&parse_input);
let subtrees = match parser.parse(&input_str) {
Ok(tree) => {
self.extract_subtrees(tree, chunk.start)
}
Err(_) => {
Vec::new()
}
};
let parse_time_ms = start_time.elapsed().as_secs_f64() * 1000.0;
ChunkResult {
chunk_id,
subtrees,
incomplete_tokens: Vec::new(), parse_time_ms,
}
}
fn extract_subtrees(&self, tree: ParseNode, offset: usize) -> Vec<Subtree> {
let mut subtrees = Vec::new();
let subtree = Subtree {
symbol: tree.symbol,
start_byte: tree.start_byte + offset,
end_byte: tree.end_byte + offset,
children: tree
.children
.into_iter()
.map(|child| self.extract_subtrees(child, offset))
.flatten()
.collect(),
};
if self.config.enable_caching {
let hash = self.hash_subtree(&subtree);
let mut cache = self
.subtree_cache
.lock()
.unwrap_or_else(|err| err.into_inner());
cache.insert(hash, Arc::new(subtree.clone()));
}
subtrees.push(subtree);
subtrees
}
fn hash_subtree(&self, subtree: &Subtree) -> u64 {
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
let mut hasher = DefaultHasher::new();
subtree.symbol.hash(&mut hasher);
subtree.start_byte.hash(&mut hasher);
subtree.end_byte.hash(&mut hasher);
hasher.finish()
}
fn merge_chunk_results(
&self,
mut results: Vec<ChunkResult>,
input: &[u8],
) -> Result<ParseNode> {
results.sort_by_key(|r| r.chunk_id);
let mut all_subtrees = Vec::new();
for result in results {
all_subtrees.extend(result.subtrees);
}
self.build_tree_from_subtrees(all_subtrees, input)
}
fn build_tree_from_subtrees(&self, subtrees: Vec<Subtree>, input: &[u8]) -> Result<ParseNode> {
Ok(ParseNode {
symbol: self.grammar.start_symbol.unwrap_or(SymbolId(0)),
children: subtrees
.into_iter()
.map(|st| self.subtree_to_node(st))
.collect(),
start_byte: 0,
end_byte: input.len(),
field_name: None,
})
}
fn subtree_to_node(&self, subtree: Subtree) -> ParseNode {
ParseNode {
symbol: subtree.symbol,
children: subtree
.children
.into_iter()
.map(|st| self.subtree_to_node(st))
.collect(),
start_byte: subtree.start_byte,
end_byte: subtree.end_byte,
field_name: None,
}
}
}
#[derive(Debug, Default)]
pub struct ParallelStats {
pub total_chunks: usize,
pub clean_boundaries: usize,
pub dirty_boundaries: usize,
pub cache_hits: usize,
pub cache_misses: usize,
pub total_parse_time_ms: f64,
pub merge_time_ms: f64,
}
impl ParallelParser {
pub fn parse_with_stats(&self, input: &str) -> Result<(ParseNode, ParallelStats)> {
let mut stats = ParallelStats::default();
let tree = self.parse(input)?;
Ok((tree, stats))
}
}
#[cfg(test)]
mod tests {
use super::*;
fn create_test_grammar() -> (Grammar, ParseTable) {
let grammar = Grammar::new("test".to_string());
let table = ParseTable {
action_table: vec![],
goto_table: vec![],
symbol_metadata: vec![],
state_count: 1,
symbol_count: 1,
symbol_to_index: std::collections::HashMap::new(),
};
(grammar, table)
}
#[test]
fn test_chunk_splitting() {
let (grammar, table) = create_test_grammar();
let config = ParallelConfig {
min_file_size: 10,
chunk_size: 20,
..Default::default()
};
let parser = ParallelParser::new(grammar, table, config);
let input = b"line1\nline2\nline3\nline4\nline5";
let chunks = parser.split_into_chunks(input);
assert!(chunks.len() >= 2);
for chunk in &chunks {
assert!(chunk.end > chunk.start);
assert_eq!(&input[chunk.start..chunk.end], &chunk.content[..]);
}
}
#[test]
fn test_boundary_detection() {
let (grammar, table) = create_test_grammar();
let parser = ParallelParser::new(grammar, table, Default::default());
assert!(parser.is_statement_boundary(b"}\nfunction", 1));
assert!(parser.is_statement_boundary(b";\nlet x", 1));
assert!(!parser.is_statement_boundary(b"hello\n world", 5));
}
}
#[cfg(feature = "bench")]
pub mod bench {
use super::*;
use std::time::Instant;
pub struct ParallelBenchmark {
pub file_size: usize,
pub single_thread_ms: f64,
pub parallel_ms: f64,
pub speedup: f64,
pub num_chunks: usize,
}
pub fn benchmark_parallel_parsing(
grammar: Grammar,
table: ParseTable,
input: &str,
) -> ParallelBenchmark {
let start = Instant::now();
let mut parser = Parser::new(grammar.clone(), table.clone());
let _ = parser.parse(input);
let single_thread_ms = start.elapsed().as_secs_f64() * 1000.0;
let config = ParallelConfig::default();
let parallel_parser = ParallelParser::new(grammar, table, config);
let start = Instant::now();
let stats = match parallel_parser.parse_with_stats(input) {
Ok((_, stats)) => stats,
Err(_) => ParallelStats::default(),
};
let parallel_ms = start.elapsed().as_secs_f64() * 1000.0;
ParallelBenchmark {
file_size: input.len(),
single_thread_ms,
parallel_ms,
speedup: single_thread_ms / parallel_ms,
num_chunks: stats.total_chunks,
}
}
}