1use std::path::Path;
26use std::sync::{Arc, Mutex};
27
28use rayon::prelude::*;
29
30use crate::tokenizer::{Token, Tokenizer};
31use crate::Result;
32
33pub struct BatchTokenizer {
38 tokenizer_pool: Arc<Mutex<Vec<Tokenizer>>>,
40
41 pool_size: usize,
43}
44
45impl BatchTokenizer {
46 #[must_use]
48 pub fn default_pool_size() -> usize {
49 rayon::current_num_threads()
50 }
51
52 pub fn new() -> Result<Self> {
60 Self::with_pool_size(Self::default_pool_size())
61 }
62
63 pub fn with_pool_size(pool_size: usize) -> Result<Self> {
73 let mut tokenizers = Vec::with_capacity(pool_size);
74
75 for _ in 0..pool_size {
76 tokenizers.push(Tokenizer::new()?);
77 }
78
79 Ok(Self {
80 tokenizer_pool: Arc::new(Mutex::new(tokenizers)),
81 pool_size,
82 })
83 }
84
85 pub fn with_dict<P: AsRef<Path>>(dict_path: P, pool_size: usize) -> Result<Self> {
96 let mut tokenizers = Vec::with_capacity(pool_size);
97
98 for _ in 0..pool_size {
99 tokenizers.push(Tokenizer::with_dict(dict_path.as_ref())?);
100 }
101
102 Ok(Self {
103 tokenizer_pool: Arc::new(Mutex::new(tokenizers)),
104 pool_size,
105 })
106 }
107
108 #[must_use]
130 pub fn tokenize_batch(&self, texts: &[&str]) -> Vec<Vec<Token>> {
131 texts
132 .par_iter()
133 .map(|text| self.tokenize_single(text))
134 .collect()
135 }
136
137 #[must_use]
147 pub fn tokenize_batch_owned(&self, texts: &[String]) -> Vec<Vec<Token>> {
148 texts
149 .par_iter()
150 .map(|text| self.tokenize_single(text))
151 .collect()
152 }
153
154 fn tokenize_single(&self, text: &str) -> Vec<Token> {
158 let Ok(mut pool) = self.tokenizer_pool.lock() else {
160 return Vec::new(); };
162
163 if let Some(mut tokenizer) = pool.pop() {
164 drop(pool);
166
167 let tokens = tokenizer.tokenize(text);
169
170 if let Ok(mut pool) = self.tokenizer_pool.lock() {
172 pool.push(tokenizer);
173 }
174 tokens
177 } else {
178 drop(pool);
180 Tokenizer::new()
181 .map(|mut tok| tok.tokenize(text))
182 .unwrap_or_default()
183 }
184 }
185
186 pub fn tokenize_files<P: AsRef<Path> + Sync>(&self, paths: &[P]) -> Result<Vec<Vec<Token>>> {
200 paths
201 .par_iter()
202 .map(|path| {
203 let content = std::fs::read_to_string(path)
204 .map_err(|e| crate::Error::Analysis(format!("Failed to read file: {e}")))?;
205 Ok(self.tokenize_single(&content))
206 })
207 .collect()
208 }
209
210 #[must_use]
223 pub fn tokenize_chunked(&self, text: &str, chunk_size: usize) -> Vec<Token> {
224 let chunks = Self::split_into_chunks(text, chunk_size);
225
226 let results: Vec<Vec<Token>> = chunks
227 .par_iter()
228 .map(|chunk| self.tokenize_single(chunk))
229 .collect();
230
231 results.into_iter().flatten().collect()
233 }
234
235 fn split_into_chunks(text: &str, chunk_size: usize) -> Vec<String> {
239 Self::split_into_chunks_smart(text, chunk_size, &['.', '!', '?', '。', '.', '\n', ' '])
240 }
241
242 fn split_into_chunks_smart(text: &str, chunk_size: usize, delimiters: &[char]) -> Vec<String> {
250 if text.is_empty() {
251 return Vec::new();
252 }
253
254 let mut chunks = Vec::new();
255 let mut current_start = 0;
256 let chars: Vec<(usize, char)> = text.char_indices().collect();
257
258 while current_start < chars.len() {
259 let target_end = (current_start + chunk_size).min(chars.len());
261
262 if target_end >= chars.len() {
263 let byte_start = chars[current_start].0;
265 chunks.push(text[byte_start..].to_string());
266 break;
267 }
268
269 let mut split_pos = target_end;
271 let mut found_delimiter = false;
272
273 let min_pos = current_start + (chunk_size * 3 / 4).max(1);
275 while split_pos > min_pos {
276 if delimiters.contains(&chars[split_pos - 1].1) {
277 found_delimiter = true;
278 break;
279 }
280 split_pos -= 1;
281 }
282
283 if !found_delimiter {
285 split_pos = target_end;
286 let max_pos = (target_end + chunk_size / 4).min(chars.len());
287 while split_pos < max_pos {
288 if delimiters.contains(&chars[split_pos - 1].1) {
289 found_delimiter = true;
290 break;
291 }
292 split_pos += 1;
293 }
294 }
295
296 if !found_delimiter {
298 split_pos = target_end;
299 }
300
301 let byte_start = chars[current_start].0;
303 let byte_end = if split_pos < chars.len() {
304 chars[split_pos].0
305 } else {
306 text.len()
307 };
308
309 let chunk = text[byte_start..byte_end].to_string();
310 if !chunk.is_empty() {
311 chunks.push(chunk);
312 }
313
314 current_start = split_pos;
315 }
316
317 chunks
318 }
319
320 #[must_use]
330 pub fn split_with_overlap(text: &str, chunk_size: usize, overlap: usize) -> Vec<String> {
331 if text.is_empty() || chunk_size == 0 {
332 return Vec::new();
333 }
334
335 let overlap = overlap.min(chunk_size / 2); let chars: Vec<char> = text.chars().collect();
337 let mut chunks = Vec::new();
338 let mut pos = 0;
339
340 while pos < chars.len() {
341 let end = (pos + chunk_size).min(chars.len());
342 let chunk: String = chars[pos..end].iter().collect();
343 chunks.push(chunk);
344
345 if end >= chars.len() {
346 break;
347 }
348
349 pos = end.saturating_sub(overlap);
350 }
351
352 chunks
353 }
354
355 #[must_use]
357 pub const fn pool_size(&self) -> usize {
358 self.pool_size
359 }
360
361 #[must_use]
363 pub fn available_tokenizers(&self) -> usize {
364 self.tokenizer_pool
365 .lock()
366 .map(|pool| pool.len())
367 .unwrap_or(0)
368 }
369}
370
371pub struct ParallelStreamProcessor {
378 batch: BatchTokenizer,
380
381 chunk_size: usize,
383}
384
385impl ParallelStreamProcessor {
386 pub const DEFAULT_CHUNK_SIZE: usize = 16384;
388
389 pub fn new() -> Result<Self> {
395 Ok(Self {
396 batch: BatchTokenizer::new()?,
397 chunk_size: Self::DEFAULT_CHUNK_SIZE,
398 })
399 }
400
401 #[must_use]
403 pub const fn with_chunk_size(mut self, size: usize) -> Self {
404 self.chunk_size = size;
405 self
406 }
407
408 pub fn process_large_file<P: AsRef<Path>>(&self, path: P) -> Result<Vec<Token>> {
422 let content = std::fs::read_to_string(path)
423 .map_err(|e| crate::Error::Analysis(format!("Failed to read file: {e}")))?;
424
425 Ok(self.batch.tokenize_chunked(&content, self.chunk_size))
426 }
427
428 pub fn process_files<P: AsRef<Path> + Sync>(&self, paths: &[P]) -> Result<Vec<Vec<Token>>> {
442 self.batch.tokenize_files(paths)
443 }
444}
445
446pub struct LargeFileProcessor {
454 batch: BatchTokenizer,
456
457 buffer_size: usize,
459
460 progress_callback: Option<Box<dyn Fn(LargeFileProgress) + Send + Sync>>,
462}
463
464#[derive(Debug, Clone)]
466pub struct LargeFileProgress {
467 pub bytes_processed: usize,
469 pub total_bytes: usize,
471 pub tokens_generated: usize,
473}
474
475impl LargeFileProgress {
476 #[must_use]
478 #[allow(clippy::cast_precision_loss)]
479 pub fn percent(&self) -> f64 {
480 if self.total_bytes == 0 {
481 100.0
482 } else {
483 (self.bytes_processed as f64 / self.total_bytes as f64) * 100.0
484 }
485 }
486}
487
488impl LargeFileProcessor {
489 pub const DEFAULT_BUFFER_SIZE: usize = 65536;
491
492 pub fn new() -> Result<Self> {
498 Ok(Self {
499 batch: BatchTokenizer::new()?,
500 buffer_size: Self::DEFAULT_BUFFER_SIZE,
501 progress_callback: None,
502 })
503 }
504
505 #[must_use]
507 pub const fn with_buffer_size(mut self, size: usize) -> Self {
508 self.buffer_size = size;
509 self
510 }
511
512 #[must_use]
514 pub fn with_progress_callback<F>(mut self, callback: F) -> Self
515 where
516 F: Fn(LargeFileProgress) + Send + Sync + 'static,
517 {
518 self.progress_callback = Some(Box::new(callback));
519 self
520 }
521
522 pub fn process_file<P: AsRef<Path>>(&self, path: P) -> Result<Vec<Token>> {
539 use std::io::{BufRead, BufReader};
540
541 let file = std::fs::File::open(path.as_ref())
542 .map_err(|e| crate::Error::Analysis(format!("Failed to open file: {e}")))?;
543
544 let metadata = file
545 .metadata()
546 .map_err(|e| crate::Error::Analysis(format!("Failed to read metadata: {e}")))?;
547
548 #[allow(clippy::cast_possible_truncation)]
549 let total_bytes = metadata.len() as usize;
550 let reader = BufReader::with_capacity(self.buffer_size, file);
551
552 let mut all_tokens = Vec::new();
553 let mut bytes_processed = 0;
554 let mut pending_text = String::new();
555 let sentence_delimiters = ['.', '!', '?', '。', '.', '\n'];
556
557 for line in reader.lines() {
558 let line = line
559 .map_err(|e| crate::Error::Analysis(format!("Failed to read line: {e}")))?;
560
561 bytes_processed += line.len() + 1; pending_text.push_str(&line);
563 pending_text.push('\n');
564
565 if pending_text.len() >= self.buffer_size {
567 if let Some(pos) = pending_text
569 .char_indices()
570 .rev()
571 .find(|(_, c)| sentence_delimiters.contains(c))
572 .map(|(i, _)| i)
573 {
574 let to_process = pending_text[..=pos].to_string();
575 let remaining = pending_text[pos + 1..].to_string();
576
577 let tokens = self.batch.tokenize_single(&to_process);
578 all_tokens.extend(tokens);
579
580 pending_text = remaining;
581 }
582 }
583
584 if let Some(ref callback) = self.progress_callback {
586 callback(LargeFileProgress {
587 bytes_processed,
588 total_bytes,
589 tokens_generated: all_tokens.len(),
590 });
591 }
592 }
593
594 if !pending_text.is_empty() {
596 let tokens = self.batch.tokenize_single(&pending_text);
597 all_tokens.extend(tokens);
598 }
599
600 if let Some(ref callback) = self.progress_callback {
602 callback(LargeFileProgress {
603 bytes_processed: total_bytes,
604 total_bytes,
605 tokens_generated: all_tokens.len(),
606 });
607 }
608
609 Ok(all_tokens)
610 }
611
612 pub fn process_files<P: AsRef<Path> + Sync>(&self, paths: &[P]) -> Result<Vec<Vec<Token>>> {
618 paths
619 .par_iter()
620 .map(|path| self.process_file(path))
621 .collect()
622 }
623}
624
625#[cfg(test)]
629#[allow(clippy::expect_used)]
630mod tests {
631 use super::*;
632
633 #[test]
634 fn test_batch_tokenizer_creation() {
635 let batch = BatchTokenizer::new();
636 assert!(batch.is_ok());
637 }
638
639 #[test]
640 fn test_default_pool_size() {
641 let size = BatchTokenizer::default_pool_size();
642 assert!(size > 0);
643 }
644
645 #[test]
646 fn test_tokenize_batch() {
647 let batch = BatchTokenizer::new().expect("should create");
648 let texts = vec!["안녕하세요", "감사합니다"];
649
650 let results = batch.tokenize_batch(&texts);
651
652 assert_eq!(results.len(), 2);
653 assert!(!results[0].is_empty());
654 assert!(!results[1].is_empty());
655 }
656
657 #[test]
658 fn test_tokenize_batch_owned() {
659 let batch = BatchTokenizer::new().expect("should create");
660 let texts = vec!["안녕하세요".to_string(), "감사합니다".to_string()];
661
662 let results = batch.tokenize_batch_owned(&texts);
663
664 assert_eq!(results.len(), 2);
665 }
666
667 #[test]
668 fn test_tokenize_chunked() {
669 let batch = BatchTokenizer::new().expect("should create");
670 let text = "안녕하세요 감사합니다 좋은 하루 되세요";
671
672 let tokens = batch.tokenize_chunked(text, 10);
673
674 let _ = tokens.len();
676 }
677
678 #[test]
679 fn test_split_into_chunks() {
680 let text = "안녕하세요 감사합니다";
681
682 let chunks = BatchTokenizer::split_into_chunks(text, 5);
683
684 assert!(chunks.len() > 1);
685 }
686
687 #[test]
688 fn test_pool_size() {
689 let batch = BatchTokenizer::new().expect("should create");
690 assert_eq!(batch.pool_size(), BatchTokenizer::default_pool_size());
691 }
692
693 #[test]
694 fn test_available_tokenizers() {
695 let batch = BatchTokenizer::new().expect("should create");
696 let available = batch.available_tokenizers();
697 assert_eq!(available, batch.pool_size());
698 }
699
700 #[test]
701 fn test_with_pool_size() {
702 let batch = BatchTokenizer::with_pool_size(4).expect("should create");
703 assert_eq!(batch.pool_size(), 4);
704 }
705
706 #[test]
707 fn test_parallel_stream_processor_creation() {
708 let processor = ParallelStreamProcessor::new();
709 assert!(processor.is_ok());
710 }
711
712 #[test]
713 fn test_with_chunk_size() {
714 let processor = ParallelStreamProcessor::new()
715 .expect("should create")
716 .with_chunk_size(8192);
717
718 assert_eq!(processor.chunk_size, 8192);
719 }
720
721 #[test]
722 fn test_empty_batch() {
723 let batch = BatchTokenizer::new().expect("should create");
724 let texts: Vec<&str> = vec![];
725
726 let results = batch.tokenize_batch(&texts);
727
728 assert!(results.is_empty());
729 }
730
731 #[test]
732 fn test_single_item_batch() {
733 let batch = BatchTokenizer::new().expect("should create");
734 let texts = vec!["안녕하세요"];
735
736 let results = batch.tokenize_batch(&texts);
737
738 assert_eq!(results.len(), 1);
739 assert!(!results[0].is_empty());
740 }
741
742 #[test]
743 fn test_large_batch() {
744 let batch = BatchTokenizer::new().expect("should create");
745 let texts: Vec<&str> = (0..100).map(|_| "안녕하세요").collect();
746
747 let results = batch.tokenize_batch(&texts);
748
749 assert_eq!(results.len(), 100);
750 }
751
752 #[test]
753 fn test_smart_chunking_respects_sentence_boundary() {
754 let text = "안녕. 감사. 좋아. 행복. 건강.";
756
757 let chunks = BatchTokenizer::split_into_chunks(text, 6);
759
760 assert!(chunks.len() > 1, "Should split into multiple chunks");
762
763 let delimiter_ending: Vec<_> = chunks[..chunks.len().saturating_sub(1)]
765 .iter()
766 .filter(|chunk| {
767 let trimmed = chunk.trim();
768 trimmed.ends_with('.') || trimmed.ends_with(' ')
769 })
770 .collect();
771
772 assert!(
774 !delimiter_ending.is_empty() || chunks.len() <= 2,
775 "At least some chunks should end with delimiters"
776 );
777 }
778
779 #[test]
780 fn test_smart_chunking_with_spaces() {
781 let text = "안녕하세요 감사합니다 좋은 하루 되세요";
782
783 let chunks = BatchTokenizer::split_into_chunks_smart(text, 8, &[' ']);
784
785 for chunk in &chunks {
787 assert!(!chunk.is_empty());
788 }
789 }
790
791 #[test]
792 fn test_split_with_overlap() {
793 let text = "안녕하세요감사합니다좋은하루되세요";
794
795 let chunks = BatchTokenizer::split_with_overlap(text, 5, 2);
796
797 assert!(chunks.len() > 1);
798
799 if chunks.len() >= 2 {
801 let first_end: String = chunks[0].chars().rev().take(2).collect::<String>();
802 let first_end: String = first_end.chars().rev().collect();
803 let second_start: String = chunks[1].chars().take(2).collect();
804 assert_eq!(
805 first_end, second_start,
806 "Overlap should match: {} vs {}",
807 first_end, second_start
808 );
809 }
810 }
811
812 #[test]
813 fn test_split_with_overlap_empty_text() {
814 let chunks = BatchTokenizer::split_with_overlap("", 5, 2);
815 assert!(chunks.is_empty());
816 }
817
818 #[test]
819 fn test_split_with_overlap_large_overlap() {
820 let text = "안녕하세요";
821
822 let chunks = BatchTokenizer::split_with_overlap(text, 4, 10);
824
825 assert!(!chunks.is_empty());
826 }
827
828 #[test]
829 fn test_smart_chunking_empty_text() {
830 let chunks = BatchTokenizer::split_into_chunks("", 5);
831 assert!(chunks.is_empty());
832 }
833
834 #[test]
835 fn test_smart_chunking_no_delimiter() {
836 let text = "안녕하세요감사합니다";
838
839 let chunks = BatchTokenizer::split_into_chunks(text, 4);
840
841 assert!(chunks.len() >= 1);
843 }
844
845 #[test]
846 fn test_large_file_processor_creation() {
847 let processor = LargeFileProcessor::new();
848 assert!(processor.is_ok());
849 }
850
851 #[test]
852 fn test_large_file_processor_with_buffer_size() {
853 let processor = LargeFileProcessor::new()
854 .expect("should create")
855 .with_buffer_size(32768);
856
857 assert_eq!(processor.buffer_size, 32768);
858 }
859
860 #[test]
861 fn test_large_file_progress_percent() {
862 let progress = LargeFileProgress {
863 bytes_processed: 50,
864 total_bytes: 100,
865 tokens_generated: 10,
866 };
867
868 assert!((progress.percent() - 50.0).abs() < 0.001);
869 }
870
871 #[test]
872 fn test_large_file_progress_percent_zero_total() {
873 let progress = LargeFileProgress {
874 bytes_processed: 50,
875 total_bytes: 0,
876 tokens_generated: 10,
877 };
878
879 assert!((progress.percent() - 100.0).abs() < 0.001);
880 }
881
882 #[test]
883 fn test_large_file_processor_with_callback() {
884 use std::sync::atomic::{AtomicUsize, Ordering};
885 use std::sync::Arc;
886
887 let callback_count = Arc::new(AtomicUsize::new(0));
888 let callback_count_clone = Arc::clone(&callback_count);
889
890 let _processor = LargeFileProcessor::new()
891 .expect("should create")
892 .with_progress_callback(move |_progress| {
893 callback_count_clone.fetch_add(1, Ordering::SeqCst);
894 });
895
896 assert!(callback_count.load(Ordering::SeqCst) == 0);
898 }
899}