1use std::collections::VecDeque;
34use std::io::{BufRead, BufReader, Read};
35
36use crate::tokenizer::{Token, Tokenizer};
37use crate::Result;
38
39pub struct StreamingTokenizer {
44 tokenizer: Tokenizer,
46
47 buffer: String,
49
50 chunk_size: usize,
52
53 sentence_delimiters: Vec<char>,
55
56 total_chars_processed: usize,
58}
59
60impl StreamingTokenizer {
61 pub const DEFAULT_CHUNK_SIZE: usize = 8192;
63
64 #[must_use]
80 pub fn new(tokenizer: Tokenizer) -> Self {
81 Self {
82 tokenizer,
83 buffer: String::with_capacity(Self::DEFAULT_CHUNK_SIZE),
84 chunk_size: Self::DEFAULT_CHUNK_SIZE,
85 sentence_delimiters: vec!['.', '!', '?', '。', '.', '\n'],
86 total_chars_processed: 0,
87 }
88 }
89
90 #[must_use]
96 pub fn with_chunk_size(mut self, size: usize) -> Self {
97 self.chunk_size = size;
98 self.buffer = String::with_capacity(size);
99 self
100 }
101
102 #[must_use]
108 pub fn with_sentence_delimiters(mut self, delimiters: Vec<char>) -> Self {
109 self.sentence_delimiters = delimiters;
110 self
111 }
112
113 pub fn process_chunk(&mut self, chunk: &str) -> Vec<Token> {
125 self.buffer.push_str(chunk);
127
128 let split_pos = self.find_last_sentence_boundary();
130
131 if let Some(pos) = split_pos {
132 let to_process = self.buffer[..=pos].to_string();
134 let remaining = self.buffer[pos + 1..].to_string();
135
136 let mut tokens = self.tokenizer.tokenize(&to_process);
138
139 for token in &mut tokens {
141 token.start_pos += self.total_chars_processed;
142 token.end_pos += self.total_chars_processed;
143 }
144
145 self.total_chars_processed += to_process.chars().count();
146 self.buffer = remaining;
147
148 tokens
149 } else {
150 if self.buffer.len() > self.chunk_size * 2 {
153 self.force_flush_partial()
154 } else {
155 Vec::new()
156 }
157 }
158 }
159
160 fn find_last_sentence_boundary(&self) -> Option<usize> {
162 for (i, ch) in self.buffer.char_indices().rev() {
164 if self.sentence_delimiters.contains(&ch) {
165 return Some(i);
166 }
167 }
168 None
169 }
170
171 fn find_safe_split_point(&self, target_pos: usize) -> usize {
173 let mut pos = target_pos.min(self.buffer.len());
175
176 while pos > 0 {
178 if let Some(ch) = self.buffer[..pos].chars().last() {
179 if ch.is_whitespace() || self.sentence_delimiters.contains(&ch) {
180 if self.buffer.is_char_boundary(pos) {
182 return pos;
183 }
184 }
185 }
186 pos -= 1;
187 }
188
189 let mut safe_pos = target_pos.min(self.buffer.len());
191 while safe_pos > 0 && !self.buffer.is_char_boundary(safe_pos) {
192 safe_pos -= 1;
193 }
194 safe_pos
195 }
196
197 fn force_flush_partial(&mut self) -> Vec<Token> {
199 let target_pos = self.buffer.len() / 2;
201 let split_pos = self.find_safe_split_point(target_pos);
202
203 if split_pos == 0 {
204 return self.flush();
206 }
207
208 let to_process = self.buffer[..split_pos].to_string();
209 let remaining = self.buffer[split_pos..].to_string();
210
211 let mut tokens = self.tokenizer.tokenize(&to_process);
212
213 for token in &mut tokens {
214 token.start_pos += self.total_chars_processed;
215 token.end_pos += self.total_chars_processed;
216 }
217
218 self.total_chars_processed += to_process.chars().count();
219 self.buffer = remaining;
220
221 tokens
222 }
223
224 pub fn flush(&mut self) -> Vec<Token> {
232 if self.buffer.is_empty() {
233 return Vec::new();
234 }
235
236 let to_process = std::mem::take(&mut self.buffer);
237 let mut tokens = self.tokenizer.tokenize(&to_process);
238
239 for token in &mut tokens {
240 token.start_pos += self.total_chars_processed;
241 token.end_pos += self.total_chars_processed;
242 }
243
244 self.total_chars_processed += to_process.chars().count();
245
246 tokens
247 }
248
249 pub fn process_reader<R: Read>(&mut self, reader: R) -> Result<Vec<Token>> {
263 let mut buf_reader = BufReader::with_capacity(self.chunk_size, reader);
264 let mut all_tokens = Vec::new();
265
266 loop {
267 let mut line = String::new();
268 let bytes_read = buf_reader
269 .read_line(&mut line)
270 .map_err(|e| crate::Error::Analysis(format!("Failed to read line: {e}")))?;
271
272 if bytes_read == 0 {
273 break; }
275
276 let tokens = self.process_chunk(&line);
277 all_tokens.extend(tokens);
278 }
279
280 let remaining = self.flush();
282 all_tokens.extend(remaining);
283
284 Ok(all_tokens)
285 }
286
287 pub fn process_file<P: AsRef<std::path::Path>>(&mut self, path: P) -> Result<Vec<Token>> {
301 let file = std::fs::File::open(path)
302 .map_err(|e| crate::Error::Analysis(format!("Failed to open file: {e}")))?;
303 self.process_reader(file)
304 }
305
306 #[must_use]
308 pub fn buffer_len(&self) -> usize {
309 self.buffer.len()
310 }
311
312 #[must_use]
314 pub const fn total_chars_processed(&self) -> usize {
315 self.total_chars_processed
316 }
317
318 pub fn reset(&mut self) {
320 self.buffer.clear();
321 self.total_chars_processed = 0;
322 }
323}
324
325pub struct TokenStream<I>
330where
331 I: Iterator<Item = String>,
332{
333 chunks: I,
335
336 streaming: StreamingTokenizer,
338
339 token_buffer: VecDeque<Token>,
341
342 finished: bool,
344
345 tokens_yielded: usize,
347}
348
349impl<I> TokenStream<I>
350where
351 I: Iterator<Item = String>,
352{
353 #[must_use]
360 pub fn new(chunks: I, tokenizer: Tokenizer) -> Self {
361 Self {
362 chunks,
363 streaming: StreamingTokenizer::new(tokenizer),
364 token_buffer: VecDeque::new(),
365 finished: false,
366 tokens_yielded: 0,
367 }
368 }
369
370 #[must_use]
372 pub fn with_chunk_size(mut self, size: usize) -> Self {
373 self.streaming = self.streaming.with_chunk_size(size);
374 self
375 }
376
377 #[must_use]
379 pub const fn tokens_yielded(&self) -> usize {
380 self.tokens_yielded
381 }
382}
383
384impl<I> Iterator for TokenStream<I>
385where
386 I: Iterator<Item = String>,
387{
388 type Item = Token;
389
390 fn next(&mut self) -> Option<Self::Item> {
391 if let Some(token) = self.token_buffer.pop_front() {
393 self.tokens_yielded += 1;
394 return Some(token);
395 }
396
397 if self.finished {
399 return None;
400 }
401
402 for chunk in self.chunks.by_ref() {
404 let tokens = self.streaming.process_chunk(&chunk);
405
406 if !tokens.is_empty() {
407 self.token_buffer.extend(tokens);
408 if let Some(token) = self.token_buffer.pop_front() {
409 self.tokens_yielded += 1;
410 return Some(token);
411 }
412 }
413 }
414
415 self.finished = true;
417 let remaining = self.streaming.flush();
418
419 if !remaining.is_empty() {
420 self.token_buffer.extend(remaining);
421 if let Some(token) = self.token_buffer.pop_front() {
422 self.tokens_yielded += 1;
423 return Some(token);
424 }
425 }
426
427 None
428 }
429
430 fn size_hint(&self) -> (usize, Option<usize>) {
431 (self.token_buffer.len(), None)
433 }
434}
435
436pub type ProgressCallback = Box<dyn Fn(StreamingProgress) + Send>;
438
439#[derive(Debug, Clone)]
441pub struct StreamingProgress {
442 pub bytes_processed: usize,
444 pub total_bytes: Option<usize>,
446 pub tokens_generated: usize,
448 pub chunks_processed: usize,
450}
451
452impl StreamingProgress {
453 #[must_use]
455 #[allow(clippy::cast_precision_loss)]
456 pub fn percent(&self) -> Option<f64> {
457 self.total_bytes
458 .map(|total| (self.bytes_processed as f64 / total as f64) * 100.0)
459 }
460}
461
462pub struct ProgressStreamingTokenizer {
466 inner: StreamingTokenizer,
468
469 callback: Option<ProgressCallback>,
471
472 bytes_processed: usize,
474
475 total_bytes: Option<usize>,
477
478 tokens_generated: usize,
480
481 chunks_processed: usize,
483
484 callback_interval: usize,
486
487 last_callback_bytes: usize,
489}
490
491impl ProgressStreamingTokenizer {
492 pub const DEFAULT_CALLBACK_INTERVAL: usize = 65536;
494
495 #[must_use]
497 pub fn new(tokenizer: Tokenizer) -> Self {
498 Self {
499 inner: StreamingTokenizer::new(tokenizer),
500 callback: None,
501 bytes_processed: 0,
502 total_bytes: None,
503 tokens_generated: 0,
504 chunks_processed: 0,
505 callback_interval: Self::DEFAULT_CALLBACK_INTERVAL,
506 last_callback_bytes: 0,
507 }
508 }
509
510 #[must_use]
512 pub fn with_progress_callback<F>(mut self, callback: F) -> Self
513 where
514 F: Fn(StreamingProgress) + Send + 'static,
515 {
516 self.callback = Some(Box::new(callback));
517 self
518 }
519
520 #[must_use]
522 pub const fn with_total_bytes(mut self, total: usize) -> Self {
523 self.total_bytes = Some(total);
524 self
525 }
526
527 #[must_use]
529 pub const fn with_callback_interval(mut self, interval: usize) -> Self {
530 self.callback_interval = interval;
531 self
532 }
533
534 #[must_use]
536 pub fn with_chunk_size(mut self, size: usize) -> Self {
537 self.inner = self.inner.with_chunk_size(size);
538 self
539 }
540
541 pub fn process_chunk(&mut self, chunk: &str) -> Vec<Token> {
543 self.bytes_processed += chunk.len();
544 self.chunks_processed += 1;
545
546 let tokens = self.inner.process_chunk(chunk);
547 self.tokens_generated += tokens.len();
548
549 if self.bytes_processed - self.last_callback_bytes >= self.callback_interval {
551 self.report_progress();
552 self.last_callback_bytes = self.bytes_processed;
553 }
554
555 tokens
556 }
557
558 pub fn flush(&mut self) -> Vec<Token> {
560 let tokens = self.inner.flush();
561 self.tokens_generated += tokens.len();
562
563 self.report_progress();
565
566 tokens
567 }
568
569 fn report_progress(&self) {
571 if let Some(ref callback) = self.callback {
572 callback(StreamingProgress {
573 bytes_processed: self.bytes_processed,
574 total_bytes: self.total_bytes,
575 tokens_generated: self.tokens_generated,
576 chunks_processed: self.chunks_processed,
577 });
578 }
579 }
580
581 pub fn process_reader<R: Read>(&mut self, reader: R) -> Result<Vec<Token>> {
587 let mut buf_reader = BufReader::with_capacity(self.inner.chunk_size, reader);
588 let mut all_tokens = Vec::new();
589
590 loop {
591 let mut line = String::new();
592 let bytes_read = buf_reader
593 .read_line(&mut line)
594 .map_err(|e| crate::Error::Analysis(format!("Failed to read line: {e}")))?;
595
596 if bytes_read == 0 {
597 break;
598 }
599
600 let tokens = self.process_chunk(&line);
601 all_tokens.extend(tokens);
602 }
603
604 let remaining = self.flush();
605 all_tokens.extend(remaining);
606
607 Ok(all_tokens)
608 }
609
610 #[allow(clippy::cast_possible_truncation)]
616 pub fn process_file<P: AsRef<std::path::Path>>(&mut self, path: P) -> Result<Vec<Token>> {
617 let metadata = std::fs::metadata(path.as_ref())
618 .map_err(|e| crate::Error::Analysis(format!("Failed to read metadata: {e}")))?;
619
620 self.total_bytes = Some(metadata.len() as usize);
621
622 let file = std::fs::File::open(path)
623 .map_err(|e| crate::Error::Analysis(format!("Failed to open file: {e}")))?;
624
625 self.process_reader(file)
626 }
627
628 #[must_use]
630 pub const fn progress(&self) -> StreamingProgress {
631 StreamingProgress {
632 bytes_processed: self.bytes_processed,
633 total_bytes: self.total_bytes,
634 tokens_generated: self.tokens_generated,
635 chunks_processed: self.chunks_processed,
636 }
637 }
638
639 pub fn reset(&mut self) {
641 self.inner.reset();
642 self.bytes_processed = 0;
643 self.tokens_generated = 0;
644 self.chunks_processed = 0;
645 self.last_callback_bytes = 0;
646 }
647}
648
649pub struct ChunkedTokenIterator<I>
653where
654 I: Iterator<Item = String>,
655{
656 chunks: I,
658
659 streaming: StreamingTokenizer,
661
662 finished: bool,
664}
665
666impl<I> ChunkedTokenIterator<I>
667where
668 I: Iterator<Item = String>,
669{
670 #[must_use]
672 pub fn new(chunks: I, tokenizer: Tokenizer) -> Self {
673 Self {
674 chunks,
675 streaming: StreamingTokenizer::new(tokenizer),
676 finished: false,
677 }
678 }
679
680 #[must_use]
682 pub fn with_chunk_size(mut self, size: usize) -> Self {
683 self.streaming = self.streaming.with_chunk_size(size);
684 self
685 }
686}
687
688impl<I> Iterator for ChunkedTokenIterator<I>
689where
690 I: Iterator<Item = String>,
691{
692 type Item = Vec<Token>;
693
694 fn next(&mut self) -> Option<Self::Item> {
695 if self.finished {
696 return None;
697 }
698
699 for chunk in self.chunks.by_ref() {
701 let tokens = self.streaming.process_chunk(&chunk);
702 if !tokens.is_empty() {
703 return Some(tokens);
704 }
705 }
706
707 self.finished = true;
709 let remaining = self.streaming.flush();
710
711 if remaining.is_empty() {
712 None
713 } else {
714 Some(remaining)
715 }
716 }
717}
718
719#[cfg(test)]
720#[allow(clippy::expect_used)]
721mod tests {
722 use super::*;
723
724 fn create_test_tokenizer() -> Tokenizer {
725 Tokenizer::new().expect("should create tokenizer")
726 }
727
728 #[test]
729 fn test_streaming_tokenizer_creation() {
730 let tokenizer = create_test_tokenizer();
731 let stream = StreamingTokenizer::new(tokenizer);
732
733 assert_eq!(stream.buffer_len(), 0);
734 assert_eq!(stream.total_chars_processed(), 0);
735 }
736
737 #[test]
738 fn test_process_chunk_with_delimiter() {
739 let tokenizer = create_test_tokenizer();
740 let mut stream = StreamingTokenizer::new(tokenizer);
741
742 let tokens = stream.process_chunk("안녕\n");
743 assert!(!tokens.is_empty() || stream.buffer_len() > 0);
744
745 let remaining = stream.flush();
747 let total_tokens = tokens.len() + remaining.len();
748 assert!(total_tokens > 0);
749 }
750
751 #[test]
752 fn test_process_chunk_without_delimiter() {
753 let tokenizer = create_test_tokenizer();
754 let mut stream = StreamingTokenizer::new(tokenizer);
755
756 let tokens = stream.process_chunk("안녕하세요");
757 assert!(tokens.is_empty() || stream.buffer_len() > 0);
759 }
760
761 #[test]
762 fn test_flush() {
763 let tokenizer = create_test_tokenizer();
764 let mut stream = StreamingTokenizer::new(tokenizer);
765
766 stream.process_chunk("안녕하세요");
767 let tokens = stream.flush();
768
769 assert!(!tokens.is_empty());
770 assert_eq!(stream.buffer_len(), 0);
771 }
772
773 #[test]
774 fn test_multiple_chunks() {
775 let tokenizer = create_test_tokenizer();
776 let mut stream = StreamingTokenizer::new(tokenizer);
777
778 let _tokens1 = stream.process_chunk("안녕하세요.\n");
779 let _tokens2 = stream.process_chunk("감사합니다.\n");
780 let _remaining = stream.flush();
781
782 assert!(stream.total_chars_processed() > 0);
783 }
784
785 #[test]
786 fn test_reset() {
787 let tokenizer = create_test_tokenizer();
788 let mut stream = StreamingTokenizer::new(tokenizer);
789
790 stream.process_chunk("안녕하세요");
791 stream.reset();
792
793 assert_eq!(stream.buffer_len(), 0);
794 assert_eq!(stream.total_chars_processed(), 0);
795 }
796
797 #[test]
798 fn test_custom_chunk_size() {
799 let tokenizer = create_test_tokenizer();
800 let stream = StreamingTokenizer::new(tokenizer).with_chunk_size(1024);
801
802 assert_eq!(stream.chunk_size, 1024);
803 }
804
805 #[test]
806 fn test_custom_delimiters() {
807 let tokenizer = create_test_tokenizer();
808 let stream =
809 StreamingTokenizer::new(tokenizer).with_sentence_delimiters(vec!['.', '!', '?']);
810
811 assert_eq!(stream.sentence_delimiters.len(), 3);
812 }
813
814 #[test]
815 fn test_token_stream_creation() {
816 let tokenizer = create_test_tokenizer();
817 let chunks = vec!["안녕하세요.\n".to_string(), "감사합니다.\n".to_string()];
818 let stream = TokenStream::new(chunks.into_iter(), tokenizer);
819
820 assert!(!stream.finished);
821 }
822
823 #[test]
824 fn test_token_stream_iteration() {
825 let tokenizer = create_test_tokenizer();
826 let chunks = vec!["안녕\n".to_string(), "감사\n".to_string()];
827 let stream = TokenStream::new(chunks.into_iter(), tokenizer);
828
829 let tokens: Vec<_> = stream.collect();
830 assert!(!tokens.is_empty());
831 }
832
833 #[test]
834 fn test_token_stream_tokens_yielded() {
835 let tokenizer = create_test_tokenizer();
836 let chunks = vec!["안녕하세요.\n".to_string()];
837 let mut stream = TokenStream::new(chunks.into_iter(), tokenizer);
838
839 let mut count = 0;
841 while stream.next().is_some() {
842 count += 1;
843 }
844
845 assert_eq!(stream.tokens_yielded(), count);
847 }
848
849 #[test]
850 fn test_token_stream_size_hint() {
851 let tokenizer = create_test_tokenizer();
852 let chunks = vec!["안녕하세요.\n".to_string()];
853 let stream = TokenStream::new(chunks.into_iter(), tokenizer);
854
855 let (lower, _upper) = stream.size_hint();
856 assert_eq!(lower, 0);
858 }
859
860 #[test]
861 fn test_progress_streaming_tokenizer() {
862 let tokenizer = create_test_tokenizer();
863 let mut stream = ProgressStreamingTokenizer::new(tokenizer);
864
865 let _tokens = stream.process_chunk("안녕하세요.\n");
866 let progress = stream.progress();
867
868 assert!(progress.bytes_processed > 0);
869 assert!(progress.chunks_processed > 0);
870 }
871
872 #[test]
873 fn test_progress_callback() {
874 use std::sync::atomic::{AtomicUsize, Ordering};
875 use std::sync::Arc;
876
877 let tokenizer = create_test_tokenizer();
878 let callback_count = Arc::new(AtomicUsize::new(0));
879 let callback_count_clone = Arc::clone(&callback_count);
880
881 let mut stream = ProgressStreamingTokenizer::new(tokenizer)
882 .with_callback_interval(1) .with_progress_callback(move |_progress| {
884 callback_count_clone.fetch_add(1, Ordering::SeqCst);
885 });
886
887 stream.process_chunk("안녕하세요. 오늘 날씨가 좋네요.\n");
889 let _remaining = stream.flush();
890
891 assert!(callback_count.load(Ordering::SeqCst) > 0);
893 }
894
895 #[test]
896 fn test_progress_percent() {
897 let progress = StreamingProgress {
898 bytes_processed: 50,
899 total_bytes: Some(100),
900 tokens_generated: 10,
901 chunks_processed: 2,
902 };
903
904 assert_eq!(progress.percent(), Some(50.0));
905 }
906
907 #[test]
908 fn test_chunked_token_iterator() {
909 let tokenizer = create_test_tokenizer();
910 let chunks = vec!["안녕하세요.\n".to_string(), "감사합니다.\n".to_string()];
911 let iter = ChunkedTokenIterator::new(chunks.into_iter(), tokenizer);
912
913 let token_chunks: Vec<_> = iter.collect();
914 let total_tokens: usize = token_chunks.iter().map(|c| c.len()).sum();
916
917 let _ = total_tokens; }
921
922 #[test]
923 fn test_safe_split_point() {
924 let tokenizer = create_test_tokenizer();
925 let stream = StreamingTokenizer::new(tokenizer)
926 .with_sentence_delimiters(vec!['.', '!', '?', '\n', ' ']);
927
928 let mut stream = stream;
930 let _tokens = stream.process_chunk("안녕하세요 감사합니다");
931
932 assert!(stream.buffer_len() > 0);
934 }
935}