use std::path::Path;
use std::sync::{Arc, Mutex};
use rayon::prelude::*;
use crate::tokenizer::{Token, Tokenizer};
use crate::Result;
pub struct BatchTokenizer {
tokenizer_pool: Arc<Mutex<Vec<Tokenizer>>>,
pool_size: usize,
}
impl BatchTokenizer {
#[must_use]
pub fn default_pool_size() -> usize {
rayon::current_num_threads()
}
pub fn new() -> Result<Self> {
Self::with_pool_size(Self::default_pool_size())
}
pub fn with_pool_size(pool_size: usize) -> Result<Self> {
let mut tokenizers = Vec::with_capacity(pool_size);
for _ in 0..pool_size {
tokenizers.push(Tokenizer::new()?);
}
Ok(Self {
tokenizer_pool: Arc::new(Mutex::new(tokenizers)),
pool_size,
})
}
pub fn with_dict<P: AsRef<Path>>(dict_path: P, pool_size: usize) -> Result<Self> {
let mut tokenizers = Vec::with_capacity(pool_size);
for _ in 0..pool_size {
tokenizers.push(Tokenizer::with_dict(dict_path.as_ref())?);
}
Ok(Self {
tokenizer_pool: Arc::new(Mutex::new(tokenizers)),
pool_size,
})
}
#[must_use]
pub fn tokenize_batch(&self, texts: &[&str]) -> Vec<Vec<Token>> {
texts
.par_iter()
.map(|text| self.tokenize_single(text))
.collect()
}
#[must_use]
pub fn tokenize_batch_owned(&self, texts: &[String]) -> Vec<Vec<Token>> {
texts
.par_iter()
.map(|text| self.tokenize_single(text))
.collect()
}
fn tokenize_single(&self, text: &str) -> Vec<Token> {
let Ok(mut pool) = self.tokenizer_pool.lock() else {
return Vec::new(); };
if let Some(mut tokenizer) = pool.pop() {
drop(pool);
let tokens = tokenizer.tokenize(text);
if let Ok(mut pool) = self.tokenizer_pool.lock() {
pool.push(tokenizer);
}
tokens
} else {
drop(pool);
Tokenizer::new()
.map(|mut tok| tok.tokenize(text))
.unwrap_or_default()
}
}
pub fn tokenize_files<P: AsRef<Path> + Sync>(&self, paths: &[P]) -> Result<Vec<Vec<Token>>> {
paths
.par_iter()
.map(|path| {
let content = std::fs::read_to_string(path)
.map_err(|e| crate::Error::Analysis(format!("Failed to read file: {e}")))?;
Ok(self.tokenize_single(&content))
})
.collect()
}
#[must_use]
pub fn tokenize_chunked(&self, text: &str, chunk_size: usize) -> Vec<Token> {
let chunks = Self::split_into_chunks(text, chunk_size);
let results: Vec<Vec<Token>> = chunks
.par_iter()
.map(|chunk| self.tokenize_single(chunk))
.collect();
results.into_iter().flatten().collect()
}
fn split_into_chunks(text: &str, chunk_size: usize) -> Vec<String> {
Self::split_into_chunks_smart(text, chunk_size, &['.', '!', '?', '。', '.', '\n', ' '])
}
fn split_into_chunks_smart(text: &str, chunk_size: usize, delimiters: &[char]) -> Vec<String> {
if text.is_empty() {
return Vec::new();
}
let mut chunks = Vec::new();
let mut current_start = 0;
let chars: Vec<(usize, char)> = text.char_indices().collect();
while current_start < chars.len() {
let target_end = (current_start + chunk_size).min(chars.len());
if target_end >= chars.len() {
let byte_start = chars[current_start].0;
chunks.push(text[byte_start..].to_string());
break;
}
let mut split_pos = target_end;
let mut found_delimiter = false;
let min_pos = current_start + (chunk_size * 3 / 4).max(1);
while split_pos > min_pos {
if delimiters.contains(&chars[split_pos - 1].1) {
found_delimiter = true;
break;
}
split_pos -= 1;
}
if !found_delimiter {
split_pos = target_end;
let max_pos = (target_end + chunk_size / 4).min(chars.len());
while split_pos < max_pos {
if delimiters.contains(&chars[split_pos - 1].1) {
found_delimiter = true;
break;
}
split_pos += 1;
}
}
if !found_delimiter {
split_pos = target_end;
}
let byte_start = chars[current_start].0;
let byte_end = if split_pos < chars.len() {
chars[split_pos].0
} else {
text.len()
};
let chunk = text[byte_start..byte_end].to_string();
if !chunk.is_empty() {
chunks.push(chunk);
}
current_start = split_pos;
}
chunks
}
#[must_use]
pub fn split_with_overlap(text: &str, chunk_size: usize, overlap: usize) -> Vec<String> {
if text.is_empty() || chunk_size == 0 {
return Vec::new();
}
let overlap = overlap.min(chunk_size / 2); let chars: Vec<char> = text.chars().collect();
let mut chunks = Vec::new();
let mut pos = 0;
while pos < chars.len() {
let end = (pos + chunk_size).min(chars.len());
let chunk: String = chars[pos..end].iter().collect();
chunks.push(chunk);
if end >= chars.len() {
break;
}
pos = end.saturating_sub(overlap);
}
chunks
}
#[must_use]
pub const fn pool_size(&self) -> usize {
self.pool_size
}
#[must_use]
pub fn available_tokenizers(&self) -> usize {
self.tokenizer_pool.lock().map_or(0, |pool| pool.len())
}
}
pub struct ParallelStreamProcessor {
batch: BatchTokenizer,
chunk_size: usize,
}
impl ParallelStreamProcessor {
pub const DEFAULT_CHUNK_SIZE: usize = 16384;
pub fn new() -> Result<Self> {
Ok(Self {
batch: BatchTokenizer::new()?,
chunk_size: Self::DEFAULT_CHUNK_SIZE,
})
}
#[must_use]
pub const fn with_chunk_size(mut self, size: usize) -> Self {
self.chunk_size = size;
self
}
pub fn process_large_file<P: AsRef<Path>>(&self, path: P) -> Result<Vec<Token>> {
let content = std::fs::read_to_string(path)
.map_err(|e| crate::Error::Analysis(format!("Failed to read file: {e}")))?;
Ok(self.batch.tokenize_chunked(&content, self.chunk_size))
}
pub fn process_files<P: AsRef<Path> + Sync>(&self, paths: &[P]) -> Result<Vec<Vec<Token>>> {
self.batch.tokenize_files(paths)
}
}
pub struct LargeFileProcessor {
batch: BatchTokenizer,
buffer_size: usize,
progress_callback: Option<Box<dyn Fn(LargeFileProgress) + Send + Sync>>,
}
#[derive(Debug, Clone)]
pub struct LargeFileProgress {
pub bytes_processed: usize,
pub total_bytes: usize,
pub tokens_generated: usize,
}
impl LargeFileProgress {
#[must_use]
#[allow(clippy::cast_precision_loss)]
pub fn percent(&self) -> f64 {
if self.total_bytes == 0 {
100.0
} else {
(self.bytes_processed as f64 / self.total_bytes as f64) * 100.0
}
}
}
impl LargeFileProcessor {
pub const DEFAULT_BUFFER_SIZE: usize = 65536;
pub fn new() -> Result<Self> {
Ok(Self {
batch: BatchTokenizer::new()?,
buffer_size: Self::DEFAULT_BUFFER_SIZE,
progress_callback: None,
})
}
#[must_use]
pub const fn with_buffer_size(mut self, size: usize) -> Self {
self.buffer_size = size;
self
}
#[must_use]
pub fn with_progress_callback<F>(mut self, callback: F) -> Self
where
F: Fn(LargeFileProgress) + Send + Sync + 'static,
{
self.progress_callback = Some(Box::new(callback));
self
}
pub fn process_file<P: AsRef<Path>>(&self, path: P) -> Result<Vec<Token>> {
use std::io::{BufRead, BufReader};
let file = std::fs::File::open(path.as_ref())
.map_err(|e| crate::Error::Analysis(format!("Failed to open file: {e}")))?;
let metadata = file
.metadata()
.map_err(|e| crate::Error::Analysis(format!("Failed to read metadata: {e}")))?;
#[allow(clippy::cast_possible_truncation)]
let total_bytes = metadata.len() as usize;
let reader = BufReader::with_capacity(self.buffer_size, file);
let mut all_tokens = Vec::new();
let mut bytes_processed = 0;
let mut pending_text = String::new();
let sentence_delimiters = ['.', '!', '?', '。', '.', '\n'];
for line in reader.lines() {
let line =
line.map_err(|e| crate::Error::Analysis(format!("Failed to read line: {e}")))?;
bytes_processed += line.len() + 1; pending_text.push_str(&line);
pending_text.push('\n');
if pending_text.len() >= self.buffer_size {
if let Some(pos) = pending_text
.char_indices()
.rev()
.find(|(_, c)| sentence_delimiters.contains(c))
.map(|(i, _)| i)
{
let to_process = pending_text[..=pos].to_string();
let remaining = pending_text[pos + 1..].to_string();
let tokens = self.batch.tokenize_single(&to_process);
all_tokens.extend(tokens);
pending_text = remaining;
}
}
if let Some(ref callback) = self.progress_callback {
callback(LargeFileProgress {
bytes_processed,
total_bytes,
tokens_generated: all_tokens.len(),
});
}
}
if !pending_text.is_empty() {
let tokens = self.batch.tokenize_single(&pending_text);
all_tokens.extend(tokens);
}
if let Some(ref callback) = self.progress_callback {
callback(LargeFileProgress {
bytes_processed: total_bytes,
total_bytes,
tokens_generated: all_tokens.len(),
});
}
Ok(all_tokens)
}
pub fn process_files<P: AsRef<Path> + Sync>(&self, paths: &[P]) -> Result<Vec<Vec<Token>>> {
paths
.par_iter()
.map(|path| self.process_file(path))
.collect()
}
}
#[cfg(test)]
#[allow(clippy::expect_used)]
mod tests {
use super::*;
#[test]
fn test_batch_tokenizer_creation() {
let batch = BatchTokenizer::new();
assert!(batch.is_ok());
}
#[test]
fn test_default_pool_size() {
let size = BatchTokenizer::default_pool_size();
assert!(size > 0);
}
#[test]
fn test_tokenize_batch() {
let batch = BatchTokenizer::new().expect("should create");
let texts = vec!["안녕하세요", "감사합니다"];
let results = batch.tokenize_batch(&texts);
assert_eq!(results.len(), 2);
assert!(!results[0].is_empty());
assert!(!results[1].is_empty());
}
#[test]
fn test_tokenize_batch_owned() {
let batch = BatchTokenizer::new().expect("should create");
let texts = vec!["안녕하세요".to_string(), "감사합니다".to_string()];
let results = batch.tokenize_batch_owned(&texts);
assert_eq!(results.len(), 2);
}
#[test]
fn test_tokenize_chunked() {
let batch = BatchTokenizer::new().expect("should create");
let text = "안녕하세요 감사합니다 좋은 하루 되세요";
let tokens = batch.tokenize_chunked(text, 10);
let _ = tokens.len();
}
#[test]
fn test_split_into_chunks() {
let text = "안녕하세요 감사합니다";
let chunks = BatchTokenizer::split_into_chunks(text, 5);
assert!(chunks.len() > 1);
}
#[test]
fn test_pool_size() {
let batch = BatchTokenizer::new().expect("should create");
assert_eq!(batch.pool_size(), BatchTokenizer::default_pool_size());
}
#[test]
fn test_available_tokenizers() {
let batch = BatchTokenizer::new().expect("should create");
let available = batch.available_tokenizers();
assert_eq!(available, batch.pool_size());
}
#[test]
fn test_with_pool_size() {
let batch = BatchTokenizer::with_pool_size(4).expect("should create");
assert_eq!(batch.pool_size(), 4);
}
#[test]
fn test_parallel_stream_processor_creation() {
let processor = ParallelStreamProcessor::new();
assert!(processor.is_ok());
}
#[test]
fn test_with_chunk_size() {
let processor = ParallelStreamProcessor::new()
.expect("should create")
.with_chunk_size(8192);
assert_eq!(processor.chunk_size, 8192);
}
#[test]
fn test_empty_batch() {
let batch = BatchTokenizer::new().expect("should create");
let texts: Vec<&str> = vec![];
let results = batch.tokenize_batch(&texts);
assert!(results.is_empty());
}
#[test]
fn test_single_item_batch() {
let batch = BatchTokenizer::new().expect("should create");
let texts = vec!["안녕하세요"];
let results = batch.tokenize_batch(&texts);
assert_eq!(results.len(), 1);
assert!(!results[0].is_empty());
}
#[test]
fn test_large_batch() {
let batch = BatchTokenizer::new().expect("should create");
let texts: Vec<&str> = (0..100).map(|_| "안녕하세요").collect();
let results = batch.tokenize_batch(&texts);
assert_eq!(results.len(), 100);
}
#[test]
fn test_smart_chunking_respects_sentence_boundary() {
let text = "안녕. 감사. 좋아. 행복. 건강.";
let chunks = BatchTokenizer::split_into_chunks(text, 6);
assert!(chunks.len() > 1, "Should split into multiple chunks");
let has_delimiter_ending = chunks[..chunks.len().saturating_sub(1)]
.iter()
.any(|chunk| {
let trimmed = chunk.trim();
trimmed.ends_with('.') || trimmed.ends_with(' ')
});
assert!(
has_delimiter_ending || chunks.len() <= 2,
"At least some chunks should end with delimiters"
);
}
#[test]
fn test_smart_chunking_with_spaces() {
let text = "안녕하세요 감사합니다 좋은 하루 되세요";
let chunks = BatchTokenizer::split_into_chunks_smart(text, 8, &[' ']);
for chunk in &chunks {
assert!(!chunk.is_empty());
}
}
#[test]
fn test_split_with_overlap() {
let text = "안녕하세요감사합니다좋은하루되세요";
let chunks = BatchTokenizer::split_with_overlap(text, 5, 2);
assert!(chunks.len() > 1);
if chunks.len() >= 2 {
let first_end: String = chunks[0].chars().rev().take(2).collect::<String>();
let first_end: String = first_end.chars().rev().collect();
let second_start: String = chunks[1].chars().take(2).collect();
assert_eq!(
first_end, second_start,
"Overlap should match: {first_end} vs {second_start}"
);
}
}
#[test]
fn test_split_with_overlap_empty_text() {
let chunks = BatchTokenizer::split_with_overlap("", 5, 2);
assert!(chunks.is_empty());
}
#[test]
fn test_split_with_overlap_large_overlap() {
let text = "안녕하세요";
let chunks = BatchTokenizer::split_with_overlap(text, 4, 10);
assert!(!chunks.is_empty());
}
#[test]
fn test_smart_chunking_empty_text() {
let chunks = BatchTokenizer::split_into_chunks("", 5);
assert!(chunks.is_empty());
}
#[test]
fn test_smart_chunking_no_delimiter() {
let text = "안녕하세요감사합니다";
let chunks = BatchTokenizer::split_into_chunks(text, 4);
assert!(!chunks.is_empty());
}
#[test]
fn test_large_file_processor_creation() {
let processor = LargeFileProcessor::new();
assert!(processor.is_ok());
}
#[test]
fn test_large_file_processor_with_buffer_size() {
let processor = LargeFileProcessor::new()
.expect("should create")
.with_buffer_size(32768);
assert_eq!(processor.buffer_size, 32768);
}
#[test]
fn test_large_file_progress_percent() {
let progress = LargeFileProgress {
bytes_processed: 50,
total_bytes: 100,
tokens_generated: 10,
};
assert!((progress.percent() - 50.0).abs() < 0.001);
}
#[test]
fn test_large_file_progress_percent_zero_total() {
let progress = LargeFileProgress {
bytes_processed: 50,
total_bytes: 0,
tokens_generated: 10,
};
assert!((progress.percent() - 100.0).abs() < 0.001);
}
#[test]
fn test_large_file_processor_with_callback() {
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
let callback_count = Arc::new(AtomicUsize::new(0));
let callback_count_clone = Arc::clone(&callback_count);
let _processor = LargeFileProcessor::new()
.expect("should create")
.with_progress_callback(move |_progress| {
callback_count_clone.fetch_add(1, Ordering::SeqCst);
});
assert!(callback_count.load(Ordering::SeqCst) == 0);
}
}