1use std::path::Path;
26use std::sync::{Arc, Mutex};
27
28use rayon::prelude::*;
29
30use crate::tokenizer::{Token, Tokenizer};
31use crate::Result;
32
33pub struct BatchTokenizer {
38 tokenizer_pool: Arc<Mutex<Vec<Tokenizer>>>,
40
41 pool_size: usize,
43}
44
45impl BatchTokenizer {
46 #[must_use]
48 pub fn default_pool_size() -> usize {
49 rayon::current_num_threads()
50 }
51
52 pub fn new() -> Result<Self> {
60 Self::with_pool_size(Self::default_pool_size())
61 }
62
63 pub fn with_pool_size(pool_size: usize) -> Result<Self> {
73 let mut tokenizers = Vec::with_capacity(pool_size);
74
75 for _ in 0..pool_size {
76 tokenizers.push(Tokenizer::new()?);
77 }
78
79 Ok(Self {
80 tokenizer_pool: Arc::new(Mutex::new(tokenizers)),
81 pool_size,
82 })
83 }
84
85 pub fn with_dict<P: AsRef<Path>>(dict_path: P, pool_size: usize) -> Result<Self> {
96 let mut tokenizers = Vec::with_capacity(pool_size);
97
98 for _ in 0..pool_size {
99 tokenizers.push(Tokenizer::with_dict(dict_path.as_ref())?);
100 }
101
102 Ok(Self {
103 tokenizer_pool: Arc::new(Mutex::new(tokenizers)),
104 pool_size,
105 })
106 }
107
108 #[must_use]
130 pub fn tokenize_batch(&self, texts: &[&str]) -> Vec<Vec<Token>> {
131 texts
132 .par_iter()
133 .map(|text| self.tokenize_single(text))
134 .collect()
135 }
136
137 #[must_use]
147 pub fn tokenize_batch_owned(&self, texts: &[String]) -> Vec<Vec<Token>> {
148 texts
149 .par_iter()
150 .map(|text| self.tokenize_single(text))
151 .collect()
152 }
153
154 fn tokenize_single(&self, text: &str) -> Vec<Token> {
158 let Ok(mut pool) = self.tokenizer_pool.lock() else {
160 return Vec::new(); };
162
163 if let Some(mut tokenizer) = pool.pop() {
164 drop(pool);
166
167 let tokens = tokenizer.tokenize(text);
169
170 if let Ok(mut pool) = self.tokenizer_pool.lock() {
172 pool.push(tokenizer);
173 }
174 tokens
177 } else {
178 drop(pool);
180 Tokenizer::new()
181 .map(|mut tok| tok.tokenize(text))
182 .unwrap_or_default()
183 }
184 }
185
186 pub fn tokenize_files<P: AsRef<Path> + Sync>(&self, paths: &[P]) -> Result<Vec<Vec<Token>>> {
200 paths
201 .par_iter()
202 .map(|path| {
203 let content = std::fs::read_to_string(path)
204 .map_err(|e| crate::Error::Analysis(format!("Failed to read file: {e}")))?;
205 Ok(self.tokenize_single(&content))
206 })
207 .collect()
208 }
209
210 #[must_use]
223 pub fn tokenize_chunked(&self, text: &str, chunk_size: usize) -> Vec<Token> {
224 let chunks = Self::split_into_chunks(text, chunk_size);
225
226 let results: Vec<Vec<Token>> = chunks
227 .par_iter()
228 .map(|chunk| self.tokenize_single(chunk))
229 .collect();
230
231 results.into_iter().flatten().collect()
233 }
234
235 fn split_into_chunks(text: &str, chunk_size: usize) -> Vec<String> {
239 Self::split_into_chunks_smart(text, chunk_size, &['.', '!', '?', '。', '.', '\n', ' '])
240 }
241
242 fn split_into_chunks_smart(text: &str, chunk_size: usize, delimiters: &[char]) -> Vec<String> {
250 if text.is_empty() {
251 return Vec::new();
252 }
253
254 let mut chunks = Vec::new();
255 let mut current_start = 0;
256 let chars: Vec<(usize, char)> = text.char_indices().collect();
257
258 while current_start < chars.len() {
259 let target_end = (current_start + chunk_size).min(chars.len());
261
262 if target_end >= chars.len() {
263 let byte_start = chars[current_start].0;
265 chunks.push(text[byte_start..].to_string());
266 break;
267 }
268
269 let mut split_pos = target_end;
271 let mut found_delimiter = false;
272
273 let min_pos = current_start + (chunk_size * 3 / 4).max(1);
275 while split_pos > min_pos {
276 if delimiters.contains(&chars[split_pos - 1].1) {
277 found_delimiter = true;
278 break;
279 }
280 split_pos -= 1;
281 }
282
283 if !found_delimiter {
285 split_pos = target_end;
286 let max_pos = (target_end + chunk_size / 4).min(chars.len());
287 while split_pos < max_pos {
288 if delimiters.contains(&chars[split_pos - 1].1) {
289 found_delimiter = true;
290 break;
291 }
292 split_pos += 1;
293 }
294 }
295
296 if !found_delimiter {
298 split_pos = target_end;
299 }
300
301 let byte_start = chars[current_start].0;
303 let byte_end = if split_pos < chars.len() {
304 chars[split_pos].0
305 } else {
306 text.len()
307 };
308
309 let chunk = text[byte_start..byte_end].to_string();
310 if !chunk.is_empty() {
311 chunks.push(chunk);
312 }
313
314 current_start = split_pos;
315 }
316
317 chunks
318 }
319
320 #[must_use]
330 pub fn split_with_overlap(text: &str, chunk_size: usize, overlap: usize) -> Vec<String> {
331 if text.is_empty() || chunk_size == 0 {
332 return Vec::new();
333 }
334
335 let overlap = overlap.min(chunk_size / 2); let chars: Vec<char> = text.chars().collect();
337 let mut chunks = Vec::new();
338 let mut pos = 0;
339
340 while pos < chars.len() {
341 let end = (pos + chunk_size).min(chars.len());
342 let chunk: String = chars[pos..end].iter().collect();
343 chunks.push(chunk);
344
345 if end >= chars.len() {
346 break;
347 }
348
349 pos = end.saturating_sub(overlap);
350 }
351
352 chunks
353 }
354
355 #[must_use]
357 pub const fn pool_size(&self) -> usize {
358 self.pool_size
359 }
360
361 #[must_use]
363 pub fn available_tokenizers(&self) -> usize {
364 self.tokenizer_pool.lock().map_or(0, |pool| pool.len())
365 }
366}
367
368pub struct ParallelStreamProcessor {
375 batch: BatchTokenizer,
377
378 chunk_size: usize,
380}
381
382impl ParallelStreamProcessor {
383 pub const DEFAULT_CHUNK_SIZE: usize = 16384;
385
386 pub fn new() -> Result<Self> {
392 Ok(Self {
393 batch: BatchTokenizer::new()?,
394 chunk_size: Self::DEFAULT_CHUNK_SIZE,
395 })
396 }
397
398 #[must_use]
400 pub const fn with_chunk_size(mut self, size: usize) -> Self {
401 self.chunk_size = size;
402 self
403 }
404
405 pub fn process_large_file<P: AsRef<Path>>(&self, path: P) -> Result<Vec<Token>> {
419 let content = std::fs::read_to_string(path)
420 .map_err(|e| crate::Error::Analysis(format!("Failed to read file: {e}")))?;
421
422 Ok(self.batch.tokenize_chunked(&content, self.chunk_size))
423 }
424
425 pub fn process_files<P: AsRef<Path> + Sync>(&self, paths: &[P]) -> Result<Vec<Vec<Token>>> {
439 self.batch.tokenize_files(paths)
440 }
441}
442
443pub struct LargeFileProcessor {
451 batch: BatchTokenizer,
453
454 buffer_size: usize,
456
457 progress_callback: Option<Box<dyn Fn(LargeFileProgress) + Send + Sync>>,
459}
460
461#[derive(Debug, Clone)]
463pub struct LargeFileProgress {
464 pub bytes_processed: usize,
466 pub total_bytes: usize,
468 pub tokens_generated: usize,
470}
471
472impl LargeFileProgress {
473 #[must_use]
475 #[allow(clippy::cast_precision_loss)]
476 pub fn percent(&self) -> f64 {
477 if self.total_bytes == 0 {
478 100.0
479 } else {
480 (self.bytes_processed as f64 / self.total_bytes as f64) * 100.0
481 }
482 }
483}
484
485impl LargeFileProcessor {
486 pub const DEFAULT_BUFFER_SIZE: usize = 65536;
488
489 pub fn new() -> Result<Self> {
495 Ok(Self {
496 batch: BatchTokenizer::new()?,
497 buffer_size: Self::DEFAULT_BUFFER_SIZE,
498 progress_callback: None,
499 })
500 }
501
502 #[must_use]
504 pub const fn with_buffer_size(mut self, size: usize) -> Self {
505 self.buffer_size = size;
506 self
507 }
508
509 #[must_use]
511 pub fn with_progress_callback<F>(mut self, callback: F) -> Self
512 where
513 F: Fn(LargeFileProgress) + Send + Sync + 'static,
514 {
515 self.progress_callback = Some(Box::new(callback));
516 self
517 }
518
519 pub fn process_file<P: AsRef<Path>>(&self, path: P) -> Result<Vec<Token>> {
536 use std::io::{BufRead, BufReader};
537
538 let file = std::fs::File::open(path.as_ref())
539 .map_err(|e| crate::Error::Analysis(format!("Failed to open file: {e}")))?;
540
541 let metadata = file
542 .metadata()
543 .map_err(|e| crate::Error::Analysis(format!("Failed to read metadata: {e}")))?;
544
545 #[allow(clippy::cast_possible_truncation)]
546 let total_bytes = metadata.len() as usize;
547 let reader = BufReader::with_capacity(self.buffer_size, file);
548
549 let mut all_tokens = Vec::new();
550 let mut bytes_processed = 0;
551 let mut pending_text = String::new();
552 let sentence_delimiters = ['.', '!', '?', '。', '.', '\n'];
553
554 for line in reader.lines() {
555 let line =
556 line.map_err(|e| crate::Error::Analysis(format!("Failed to read line: {e}")))?;
557
558 bytes_processed += line.len() + 1; pending_text.push_str(&line);
560 pending_text.push('\n');
561
562 if pending_text.len() >= self.buffer_size {
564 if let Some(pos) = pending_text
566 .char_indices()
567 .rev()
568 .find(|(_, c)| sentence_delimiters.contains(c))
569 .map(|(i, _)| i)
570 {
571 let to_process = pending_text[..=pos].to_string();
572 let remaining = pending_text[pos + 1..].to_string();
573
574 let tokens = self.batch.tokenize_single(&to_process);
575 all_tokens.extend(tokens);
576
577 pending_text = remaining;
578 }
579 }
580
581 if let Some(ref callback) = self.progress_callback {
583 callback(LargeFileProgress {
584 bytes_processed,
585 total_bytes,
586 tokens_generated: all_tokens.len(),
587 });
588 }
589 }
590
591 if !pending_text.is_empty() {
593 let tokens = self.batch.tokenize_single(&pending_text);
594 all_tokens.extend(tokens);
595 }
596
597 if let Some(ref callback) = self.progress_callback {
599 callback(LargeFileProgress {
600 bytes_processed: total_bytes,
601 total_bytes,
602 tokens_generated: all_tokens.len(),
603 });
604 }
605
606 Ok(all_tokens)
607 }
608
609 pub fn process_files<P: AsRef<Path> + Sync>(&self, paths: &[P]) -> Result<Vec<Vec<Token>>> {
615 paths
616 .par_iter()
617 .map(|path| self.process_file(path))
618 .collect()
619 }
620}
621
622#[cfg(test)]
626#[allow(clippy::expect_used)]
627mod tests {
628 use super::*;
629
630 #[test]
631 fn test_batch_tokenizer_creation() {
632 let batch = BatchTokenizer::new();
633 assert!(batch.is_ok());
634 }
635
636 #[test]
637 fn test_default_pool_size() {
638 let size = BatchTokenizer::default_pool_size();
639 assert!(size > 0);
640 }
641
642 #[test]
643 fn test_tokenize_batch() {
644 let batch = BatchTokenizer::new().expect("should create");
645 let texts = vec!["안녕하세요", "감사합니다"];
646
647 let results = batch.tokenize_batch(&texts);
648
649 assert_eq!(results.len(), 2);
650 assert!(!results[0].is_empty());
651 assert!(!results[1].is_empty());
652 }
653
654 #[test]
655 fn test_tokenize_batch_owned() {
656 let batch = BatchTokenizer::new().expect("should create");
657 let texts = vec!["안녕하세요".to_string(), "감사합니다".to_string()];
658
659 let results = batch.tokenize_batch_owned(&texts);
660
661 assert_eq!(results.len(), 2);
662 }
663
664 #[test]
665 fn test_tokenize_chunked() {
666 let batch = BatchTokenizer::new().expect("should create");
667 let text = "안녕하세요 감사합니다 좋은 하루 되세요";
668
669 let tokens = batch.tokenize_chunked(text, 10);
670
671 let _ = tokens.len();
673 }
674
675 #[test]
676 fn test_split_into_chunks() {
677 let text = "안녕하세요 감사합니다";
678
679 let chunks = BatchTokenizer::split_into_chunks(text, 5);
680
681 assert!(chunks.len() > 1);
682 }
683
684 #[test]
685 fn test_pool_size() {
686 let batch = BatchTokenizer::new().expect("should create");
687 assert_eq!(batch.pool_size(), BatchTokenizer::default_pool_size());
688 }
689
690 #[test]
691 fn test_available_tokenizers() {
692 let batch = BatchTokenizer::new().expect("should create");
693 let available = batch.available_tokenizers();
694 assert_eq!(available, batch.pool_size());
695 }
696
697 #[test]
698 fn test_with_pool_size() {
699 let batch = BatchTokenizer::with_pool_size(4).expect("should create");
700 assert_eq!(batch.pool_size(), 4);
701 }
702
703 #[test]
704 fn test_parallel_stream_processor_creation() {
705 let processor = ParallelStreamProcessor::new();
706 assert!(processor.is_ok());
707 }
708
709 #[test]
710 fn test_with_chunk_size() {
711 let processor = ParallelStreamProcessor::new()
712 .expect("should create")
713 .with_chunk_size(8192);
714
715 assert_eq!(processor.chunk_size, 8192);
716 }
717
718 #[test]
719 fn test_empty_batch() {
720 let batch = BatchTokenizer::new().expect("should create");
721 let texts: Vec<&str> = vec![];
722
723 let results = batch.tokenize_batch(&texts);
724
725 assert!(results.is_empty());
726 }
727
728 #[test]
729 fn test_single_item_batch() {
730 let batch = BatchTokenizer::new().expect("should create");
731 let texts = vec!["안녕하세요"];
732
733 let results = batch.tokenize_batch(&texts);
734
735 assert_eq!(results.len(), 1);
736 assert!(!results[0].is_empty());
737 }
738
739 #[test]
740 fn test_large_batch() {
741 let batch = BatchTokenizer::new().expect("should create");
742 let texts: Vec<&str> = (0..100).map(|_| "안녕하세요").collect();
743
744 let results = batch.tokenize_batch(&texts);
745
746 assert_eq!(results.len(), 100);
747 }
748
749 #[test]
750 fn test_smart_chunking_respects_sentence_boundary() {
751 let text = "안녕. 감사. 좋아. 행복. 건강.";
753
754 let chunks = BatchTokenizer::split_into_chunks(text, 6);
756
757 assert!(chunks.len() > 1, "Should split into multiple chunks");
759
760 let has_delimiter_ending = chunks[..chunks.len().saturating_sub(1)]
762 .iter()
763 .any(|chunk| {
764 let trimmed = chunk.trim();
765 trimmed.ends_with('.') || trimmed.ends_with(' ')
766 });
767
768 assert!(
770 has_delimiter_ending || chunks.len() <= 2,
771 "At least some chunks should end with delimiters"
772 );
773 }
774
775 #[test]
776 fn test_smart_chunking_with_spaces() {
777 let text = "안녕하세요 감사합니다 좋은 하루 되세요";
778
779 let chunks = BatchTokenizer::split_into_chunks_smart(text, 8, &[' ']);
780
781 for chunk in &chunks {
783 assert!(!chunk.is_empty());
784 }
785 }
786
787 #[test]
788 fn test_split_with_overlap() {
789 let text = "안녕하세요감사합니다좋은하루되세요";
790
791 let chunks = BatchTokenizer::split_with_overlap(text, 5, 2);
792
793 assert!(chunks.len() > 1);
794
795 if chunks.len() >= 2 {
797 let first_end: String = chunks[0].chars().rev().take(2).collect::<String>();
798 let first_end: String = first_end.chars().rev().collect();
799 let second_start: String = chunks[1].chars().take(2).collect();
800 assert_eq!(
801 first_end, second_start,
802 "Overlap should match: {first_end} vs {second_start}"
803 );
804 }
805 }
806
807 #[test]
808 fn test_split_with_overlap_empty_text() {
809 let chunks = BatchTokenizer::split_with_overlap("", 5, 2);
810 assert!(chunks.is_empty());
811 }
812
813 #[test]
814 fn test_split_with_overlap_large_overlap() {
815 let text = "안녕하세요";
816
817 let chunks = BatchTokenizer::split_with_overlap(text, 4, 10);
819
820 assert!(!chunks.is_empty());
821 }
822
823 #[test]
824 fn test_smart_chunking_empty_text() {
825 let chunks = BatchTokenizer::split_into_chunks("", 5);
826 assert!(chunks.is_empty());
827 }
828
829 #[test]
830 fn test_smart_chunking_no_delimiter() {
831 let text = "안녕하세요감사합니다";
833
834 let chunks = BatchTokenizer::split_into_chunks(text, 4);
835
836 assert!(!chunks.is_empty());
838 }
839
840 #[test]
841 fn test_large_file_processor_creation() {
842 let processor = LargeFileProcessor::new();
843 assert!(processor.is_ok());
844 }
845
846 #[test]
847 fn test_large_file_processor_with_buffer_size() {
848 let processor = LargeFileProcessor::new()
849 .expect("should create")
850 .with_buffer_size(32768);
851
852 assert_eq!(processor.buffer_size, 32768);
853 }
854
855 #[test]
856 fn test_large_file_progress_percent() {
857 let progress = LargeFileProgress {
858 bytes_processed: 50,
859 total_bytes: 100,
860 tokens_generated: 10,
861 };
862
863 assert!((progress.percent() - 50.0).abs() < 0.001);
864 }
865
866 #[test]
867 fn test_large_file_progress_percent_zero_total() {
868 let progress = LargeFileProgress {
869 bytes_processed: 50,
870 total_bytes: 0,
871 tokens_generated: 10,
872 };
873
874 assert!((progress.percent() - 100.0).abs() < 0.001);
875 }
876
877 #[test]
878 fn test_large_file_processor_with_callback() {
879 use std::sync::atomic::{AtomicUsize, Ordering};
880 use std::sync::Arc;
881
882 let callback_count = Arc::new(AtomicUsize::new(0));
883 let callback_count_clone = Arc::clone(&callback_count);
884
885 let _processor = LargeFileProcessor::new()
886 .expect("should create")
887 .with_progress_callback(move |_progress| {
888 callback_count_clone.fetch_add(1, Ordering::SeqCst);
889 });
890
891 assert!(callback_count.load(Ordering::SeqCst) == 0);
893 }
894}