1use std::collections::VecDeque;
34use std::io::{self, BufRead, BufReader, Read};
35
36use crate::tokenizer::{Token, Tokenizer};
37use crate::Result;
38
39pub struct StreamingTokenizer {
44 tokenizer: Tokenizer,
46
47 buffer: String,
49
50 chunk_size: usize,
52
53 sentence_delimiters: Vec<char>,
55
56 total_chars_processed: usize,
58
59 max_buffer_size: usize,
61}
62
63impl StreamingTokenizer {
64 pub const DEFAULT_CHUNK_SIZE: usize = 8192;
66
67 pub const DEFAULT_MAX_BUFFER_SIZE: usize = 16 * 1024 * 1024;
69
70 #[must_use]
86 pub fn new(tokenizer: Tokenizer) -> Self {
87 Self {
88 tokenizer,
89 buffer: String::with_capacity(Self::DEFAULT_CHUNK_SIZE),
90 chunk_size: Self::DEFAULT_CHUNK_SIZE,
91 sentence_delimiters: vec!['.', '!', '?', '。', '.', '\n'],
92 total_chars_processed: 0,
93 max_buffer_size: Self::DEFAULT_MAX_BUFFER_SIZE,
94 }
95 }
96
97 #[must_use]
103 pub fn with_chunk_size(mut self, size: usize) -> Self {
104 self.chunk_size = size;
105 self.buffer = String::with_capacity(size);
106 self
107 }
108
109 #[must_use]
115 pub fn with_sentence_delimiters(mut self, delimiters: Vec<char>) -> Self {
116 self.sentence_delimiters = delimiters;
117 self
118 }
119
120 pub fn process_chunk(&mut self, chunk: &str) -> Vec<Token> {
132 self.buffer.push_str(chunk);
133
134 let split_pos = self.find_last_sentence_boundary();
135
136 if let Some(pos) = split_pos {
137 let to_process = self.buffer[..=pos].to_string();
138 let remaining = self.buffer[pos + 1..].to_string();
139
140 let mut tokens = self.tokenizer.tokenize(&to_process);
141
142 for token in &mut tokens {
143 token.start_pos += self.total_chars_processed;
144 token.end_pos += self.total_chars_processed;
145 }
146
147 self.total_chars_processed += to_process.chars().count();
148 self.buffer = remaining;
149
150 tokens
151 } else if self.buffer.len() > self.max_buffer_size {
152 self.force_flush_partial()
153 } else {
154 Vec::new()
155 }
156 }
157
158 fn find_last_sentence_boundary(&self) -> Option<usize> {
164 let bytes = self.buffer.as_bytes();
165 for (i, ch) in self.buffer.char_indices().rev() {
166 if self.sentence_delimiters.contains(&ch) {
167 if ch == '.' && i > 0 && i + ch.len_utf8() < bytes.len() {
169 let prev_byte = bytes[i - 1];
170 let next_byte = bytes[i + ch.len_utf8()];
171 if prev_byte.is_ascii_digit() && next_byte.is_ascii_digit() {
172 continue;
173 }
174 }
175 return Some(i + ch.len_utf8() - 1);
176 }
177 }
178 None
179 }
180
181 fn find_safe_split_point(&self, target_pos: usize) -> usize {
183 let mut pos = target_pos.min(self.buffer.len());
185 while pos > 0 && !self.buffer.is_char_boundary(pos) {
186 pos -= 1;
187 }
188
189 for (byte_idx, ch) in self.buffer[..pos].char_indices().rev() {
192 if ch.is_whitespace() || self.sentence_delimiters.contains(&ch) {
193 return byte_idx + ch.len_utf8();
194 }
195 }
196
197 pos
199 }
200
201 fn force_flush_partial(&mut self) -> Vec<Token> {
203 let target_pos = self.buffer.len() / 2;
205 let split_pos = self.find_safe_split_point(target_pos);
206
207 if split_pos == 0 {
208 return self.flush();
210 }
211
212 let to_process = self.buffer[..split_pos].to_string();
213 let remaining = self.buffer[split_pos..].to_string();
214
215 let mut tokens = self.tokenizer.tokenize(&to_process);
216
217 for token in &mut tokens {
218 token.start_pos += self.total_chars_processed;
219 token.end_pos += self.total_chars_processed;
220 }
221
222 self.total_chars_processed += to_process.chars().count();
223 self.buffer = remaining;
224
225 tokens
226 }
227
228 pub fn flush(&mut self) -> Vec<Token> {
236 if self.buffer.is_empty() {
237 return Vec::new();
238 }
239
240 let to_process = std::mem::take(&mut self.buffer);
241 let mut tokens = self.tokenizer.tokenize(&to_process);
242
243 for token in &mut tokens {
244 token.start_pos += self.total_chars_processed;
245 token.end_pos += self.total_chars_processed;
246 }
247
248 self.total_chars_processed += to_process.chars().count();
249
250 tokens
251 }
252
253 pub fn process_reader<R: Read>(&mut self, reader: R) -> Result<Vec<Token>> {
267 let mut buf_reader = BufReader::with_capacity(self.chunk_size, reader);
268 let mut all_tokens = Vec::new();
269
270 loop {
271 let mut line = String::new();
272 let bytes_read = buf_reader
273 .read_line(&mut line)
274 .map_err(|e| crate::Error::Analysis(format!("Failed to read line: {e}")))?;
275
276 if bytes_read == 0 {
277 break; }
279
280 let tokens = self.process_chunk(&line);
281 all_tokens.extend(tokens);
282 }
283
284 let remaining = self.flush();
286 all_tokens.extend(remaining);
287
288 Ok(all_tokens)
289 }
290
291 pub fn process_file<P: AsRef<std::path::Path>>(&mut self, path: P) -> Result<Vec<Token>> {
305 let file = std::fs::File::open(path)
306 .map_err(|e| crate::Error::Analysis(format!("Failed to open file: {e}")))?;
307 self.process_reader(file)
308 }
309
310 #[must_use]
312 pub fn buffer_len(&self) -> usize {
313 self.buffer.len()
314 }
315
316 #[must_use]
318 pub const fn total_chars_processed(&self) -> usize {
319 self.total_chars_processed
320 }
321
322 pub fn reset(&mut self) {
324 self.buffer.clear();
325 self.total_chars_processed = 0;
326 }
327}
328
329pub struct TokenStream<I>
334where
335 I: Iterator<Item = String>,
336{
337 chunks: I,
339
340 streaming: StreamingTokenizer,
342
343 token_buffer: VecDeque<Token>,
345
346 finished: bool,
348
349 tokens_yielded: usize,
351}
352
353impl<I> TokenStream<I>
354where
355 I: Iterator<Item = String>,
356{
357 #[must_use]
364 pub fn new(chunks: I, tokenizer: Tokenizer) -> Self {
365 Self {
366 chunks,
367 streaming: StreamingTokenizer::new(tokenizer),
368 token_buffer: VecDeque::new(),
369 finished: false,
370 tokens_yielded: 0,
371 }
372 }
373
374 #[must_use]
376 pub fn with_chunk_size(mut self, size: usize) -> Self {
377 self.streaming = self.streaming.with_chunk_size(size);
378 self
379 }
380
381 #[must_use]
383 pub const fn tokens_yielded(&self) -> usize {
384 self.tokens_yielded
385 }
386}
387
388impl<I> Iterator for TokenStream<I>
389where
390 I: Iterator<Item = String>,
391{
392 type Item = Token;
393
394 fn next(&mut self) -> Option<Self::Item> {
395 if let Some(token) = self.token_buffer.pop_front() {
397 self.tokens_yielded += 1;
398 return Some(token);
399 }
400
401 if self.finished {
403 return None;
404 }
405
406 for chunk in self.chunks.by_ref() {
408 let tokens = self.streaming.process_chunk(&chunk);
409
410 if !tokens.is_empty() {
411 self.token_buffer.extend(tokens);
412 if let Some(token) = self.token_buffer.pop_front() {
413 self.tokens_yielded += 1;
414 return Some(token);
415 }
416 }
417 }
418
419 self.finished = true;
421 let remaining = self.streaming.flush();
422
423 if !remaining.is_empty() {
424 self.token_buffer.extend(remaining);
425 if let Some(token) = self.token_buffer.pop_front() {
426 self.tokens_yielded += 1;
427 return Some(token);
428 }
429 }
430
431 None
432 }
433
434 fn size_hint(&self) -> (usize, Option<usize>) {
435 (self.token_buffer.len(), None)
437 }
438}
439
440pub type ProgressCallback = Box<dyn Fn(StreamingProgress) + Send>;
442
443#[derive(Debug, Clone)]
445pub struct StreamingProgress {
446 pub bytes_processed: usize,
448 pub total_bytes: Option<usize>,
450 pub tokens_generated: usize,
452 pub chunks_processed: usize,
454}
455
456impl StreamingProgress {
457 #[must_use]
459 #[allow(clippy::cast_precision_loss)]
460 pub fn percent(&self) -> Option<f64> {
461 self.total_bytes
462 .map(|total| (self.bytes_processed as f64 / total as f64) * 100.0)
463 }
464}
465
466pub struct ProgressStreamingTokenizer {
470 inner: StreamingTokenizer,
472
473 callback: Option<ProgressCallback>,
475
476 bytes_processed: usize,
478
479 total_bytes: Option<usize>,
481
482 tokens_generated: usize,
484
485 chunks_processed: usize,
487
488 callback_interval: usize,
490
491 last_callback_bytes: usize,
493}
494
495impl ProgressStreamingTokenizer {
496 pub const DEFAULT_CALLBACK_INTERVAL: usize = 65536;
498
499 #[must_use]
501 pub fn new(tokenizer: Tokenizer) -> Self {
502 Self {
503 inner: StreamingTokenizer::new(tokenizer),
504 callback: None,
505 bytes_processed: 0,
506 total_bytes: None,
507 tokens_generated: 0,
508 chunks_processed: 0,
509 callback_interval: Self::DEFAULT_CALLBACK_INTERVAL,
510 last_callback_bytes: 0,
511 }
512 }
513
514 #[must_use]
516 pub fn with_progress_callback<F>(mut self, callback: F) -> Self
517 where
518 F: Fn(StreamingProgress) + Send + 'static,
519 {
520 self.callback = Some(Box::new(callback));
521 self
522 }
523
524 #[must_use]
526 pub const fn with_total_bytes(mut self, total: usize) -> Self {
527 self.total_bytes = Some(total);
528 self
529 }
530
531 #[must_use]
533 pub const fn with_callback_interval(mut self, interval: usize) -> Self {
534 self.callback_interval = interval;
535 self
536 }
537
538 #[must_use]
540 pub fn with_chunk_size(mut self, size: usize) -> Self {
541 self.inner = self.inner.with_chunk_size(size);
542 self
543 }
544
545 pub fn process_chunk(&mut self, chunk: &str) -> Vec<Token> {
547 self.bytes_processed += chunk.len();
548 self.chunks_processed += 1;
549
550 let tokens = self.inner.process_chunk(chunk);
551 self.tokens_generated += tokens.len();
552
553 if self.bytes_processed - self.last_callback_bytes >= self.callback_interval {
555 self.report_progress();
556 self.last_callback_bytes = self.bytes_processed;
557 }
558
559 tokens
560 }
561
562 pub fn flush(&mut self) -> Vec<Token> {
564 let tokens = self.inner.flush();
565 self.tokens_generated += tokens.len();
566
567 self.report_progress();
569
570 tokens
571 }
572
573 fn report_progress(&self) {
575 if let Some(ref callback) = self.callback {
576 callback(StreamingProgress {
577 bytes_processed: self.bytes_processed,
578 total_bytes: self.total_bytes,
579 tokens_generated: self.tokens_generated,
580 chunks_processed: self.chunks_processed,
581 });
582 }
583 }
584
585 pub fn process_reader<R: Read>(&mut self, reader: R) -> Result<Vec<Token>> {
591 let mut buf_reader = BufReader::with_capacity(self.inner.chunk_size, reader);
592 let mut all_tokens = Vec::new();
593
594 loop {
595 let mut line = String::new();
596 let bytes_read = buf_reader
597 .read_line(&mut line)
598 .map_err(|e| crate::Error::Analysis(format!("Failed to read line: {e}")))?;
599
600 if bytes_read == 0 {
601 break;
602 }
603
604 let tokens = self.process_chunk(&line);
605 all_tokens.extend(tokens);
606 }
607
608 let remaining = self.flush();
609 all_tokens.extend(remaining);
610
611 Ok(all_tokens)
612 }
613
614 #[allow(clippy::cast_possible_truncation)]
620 pub fn process_file<P: AsRef<std::path::Path>>(&mut self, path: P) -> Result<Vec<Token>> {
621 let metadata = std::fs::metadata(path.as_ref())
622 .map_err(|e| crate::Error::Analysis(format!("Failed to read metadata: {e}")))?;
623
624 self.total_bytes = Some(metadata.len() as usize);
625
626 let file = std::fs::File::open(path)
627 .map_err(|e| crate::Error::Analysis(format!("Failed to open file: {e}")))?;
628
629 self.process_reader(file)
630 }
631
632 #[must_use]
634 pub const fn progress(&self) -> StreamingProgress {
635 StreamingProgress {
636 bytes_processed: self.bytes_processed,
637 total_bytes: self.total_bytes,
638 tokens_generated: self.tokens_generated,
639 chunks_processed: self.chunks_processed,
640 }
641 }
642
643 pub fn reset(&mut self) {
645 self.inner.reset();
646 self.bytes_processed = 0;
647 self.tokens_generated = 0;
648 self.chunks_processed = 0;
649 self.last_callback_bytes = 0;
650 }
651}
652
653pub struct ChunkedTokenIterator<I>
657where
658 I: Iterator<Item = String>,
659{
660 chunks: I,
662
663 streaming: StreamingTokenizer,
665
666 finished: bool,
668}
669
670impl<I> ChunkedTokenIterator<I>
671where
672 I: Iterator<Item = String>,
673{
674 #[must_use]
676 pub fn new(chunks: I, tokenizer: Tokenizer) -> Self {
677 Self {
678 chunks,
679 streaming: StreamingTokenizer::new(tokenizer),
680 finished: false,
681 }
682 }
683
684 #[must_use]
686 pub fn with_chunk_size(mut self, size: usize) -> Self {
687 self.streaming = self.streaming.with_chunk_size(size);
688 self
689 }
690}
691
692impl<I> Iterator for ChunkedTokenIterator<I>
693where
694 I: Iterator<Item = String>,
695{
696 type Item = Vec<Token>;
697
698 fn next(&mut self) -> Option<Self::Item> {
699 if self.finished {
700 return None;
701 }
702
703 for chunk in self.chunks.by_ref() {
705 let tokens = self.streaming.process_chunk(&chunk);
706 if !tokens.is_empty() {
707 return Some(tokens);
708 }
709 }
710
711 self.finished = true;
713 let remaining = self.streaming.flush();
714
715 if remaining.is_empty() {
716 None
717 } else {
718 Some(remaining)
719 }
720 }
721}
722
723#[cfg(test)]
724#[allow(clippy::expect_used)]
725mod tests {
726 use super::*;
727
728 fn create_test_tokenizer() -> Tokenizer {
729 Tokenizer::new().expect("should create tokenizer")
730 }
731
732 #[test]
733 fn test_streaming_tokenizer_creation() {
734 let tokenizer = create_test_tokenizer();
735 let stream = StreamingTokenizer::new(tokenizer);
736
737 assert_eq!(stream.buffer_len(), 0);
738 assert_eq!(stream.total_chars_processed(), 0);
739 }
740
741 #[test]
742 fn test_process_chunk_with_delimiter() {
743 let tokenizer = create_test_tokenizer();
744 let mut stream = StreamingTokenizer::new(tokenizer);
745
746 let tokens = stream.process_chunk("안녕\n");
747 assert!(!tokens.is_empty() || stream.buffer_len() > 0);
748
749 let remaining = stream.flush();
751 let total_tokens = tokens.len() + remaining.len();
752 assert!(total_tokens > 0);
753 }
754
755 #[test]
756 fn test_process_chunk_without_delimiter() {
757 let tokenizer = create_test_tokenizer();
758 let mut stream = StreamingTokenizer::new(tokenizer);
759
760 let tokens = stream.process_chunk("안녕하세요");
761 assert!(tokens.is_empty() || stream.buffer_len() > 0);
763 }
764
765 #[test]
766 fn test_flush() {
767 let tokenizer = create_test_tokenizer();
768 let mut stream = StreamingTokenizer::new(tokenizer);
769
770 stream.process_chunk("안녕하세요");
771 let tokens = stream.flush();
772
773 assert!(!tokens.is_empty());
774 assert_eq!(stream.buffer_len(), 0);
775 }
776
777 #[test]
778 fn test_multiple_chunks() {
779 let tokenizer = create_test_tokenizer();
780 let mut stream = StreamingTokenizer::new(tokenizer);
781
782 let _tokens1 = stream.process_chunk("안녕하세요.\n");
783 let _tokens2 = stream.process_chunk("감사합니다.\n");
784 let _remaining = stream.flush();
785
786 assert!(stream.total_chars_processed() > 0);
787 }
788
789 #[test]
790 fn test_reset() {
791 let tokenizer = create_test_tokenizer();
792 let mut stream = StreamingTokenizer::new(tokenizer);
793
794 stream.process_chunk("안녕하세요");
795 stream.reset();
796
797 assert_eq!(stream.buffer_len(), 0);
798 assert_eq!(stream.total_chars_processed(), 0);
799 }
800
801 #[test]
802 fn test_custom_chunk_size() {
803 let tokenizer = create_test_tokenizer();
804 let stream = StreamingTokenizer::new(tokenizer).with_chunk_size(1024);
805
806 assert_eq!(stream.chunk_size, 1024);
807 }
808
809 #[test]
810 fn test_custom_delimiters() {
811 let tokenizer = create_test_tokenizer();
812 let stream =
813 StreamingTokenizer::new(tokenizer).with_sentence_delimiters(vec!['.', '!', '?']);
814
815 assert_eq!(stream.sentence_delimiters.len(), 3);
816 }
817
818 #[test]
819 fn test_token_stream_creation() {
820 let tokenizer = create_test_tokenizer();
821 let chunks = vec!["안녕하세요.\n".to_string(), "감사합니다.\n".to_string()];
822 let stream = TokenStream::new(chunks.into_iter(), tokenizer);
823
824 assert!(!stream.finished);
825 }
826
827 #[test]
828 fn test_token_stream_iteration() {
829 let tokenizer = create_test_tokenizer();
830 let chunks = vec!["안녕\n".to_string(), "감사\n".to_string()];
831 let stream = TokenStream::new(chunks.into_iter(), tokenizer);
832
833 let tokens: Vec<_> = stream.collect();
834 assert!(!tokens.is_empty());
835 }
836
837 #[test]
838 fn test_token_stream_tokens_yielded() {
839 let tokenizer = create_test_tokenizer();
840 let chunks = vec!["안녕하세요.\n".to_string()];
841 let mut stream = TokenStream::new(chunks.into_iter(), tokenizer);
842
843 let mut count = 0;
845 while stream.next().is_some() {
846 count += 1;
847 }
848
849 assert_eq!(stream.tokens_yielded(), count);
851 }
852
853 #[test]
854 fn test_token_stream_size_hint() {
855 let tokenizer = create_test_tokenizer();
856 let chunks = vec!["안녕하세요.\n".to_string()];
857 let stream = TokenStream::new(chunks.into_iter(), tokenizer);
858
859 let (lower, _upper) = stream.size_hint();
860 assert_eq!(lower, 0);
862 }
863
864 #[test]
865 fn test_progress_streaming_tokenizer() {
866 let tokenizer = create_test_tokenizer();
867 let mut stream = ProgressStreamingTokenizer::new(tokenizer);
868
869 let _tokens = stream.process_chunk("안녕하세요.\n");
870 let progress = stream.progress();
871
872 assert!(progress.bytes_processed > 0);
873 assert!(progress.chunks_processed > 0);
874 }
875
876 #[test]
877 fn test_progress_callback() {
878 use std::sync::atomic::{AtomicUsize, Ordering};
879 use std::sync::Arc;
880
881 let tokenizer = create_test_tokenizer();
882 let callback_count = Arc::new(AtomicUsize::new(0));
883 let callback_count_clone = Arc::clone(&callback_count);
884
885 let mut stream = ProgressStreamingTokenizer::new(tokenizer)
886 .with_callback_interval(1) .with_progress_callback(move |_progress| {
888 callback_count_clone.fetch_add(1, Ordering::SeqCst);
889 });
890
891 stream.process_chunk("안녕하세요. 오늘 날씨가 좋네요.\n");
893 let _remaining = stream.flush();
894
895 assert!(callback_count.load(Ordering::SeqCst) > 0);
897 }
898
899 #[test]
900 fn test_progress_percent() {
901 let progress = StreamingProgress {
902 bytes_processed: 50,
903 total_bytes: Some(100),
904 tokens_generated: 10,
905 chunks_processed: 2,
906 };
907
908 assert_eq!(progress.percent(), Some(50.0));
909 }
910
911 #[test]
912 fn test_chunked_token_iterator() {
913 let tokenizer = create_test_tokenizer();
914 let chunks = vec!["안녕하세요.\n".to_string(), "감사합니다.\n".to_string()];
915 let iter = ChunkedTokenIterator::new(chunks.into_iter(), tokenizer);
916
917 let token_chunks: Vec<_> = iter.collect();
918 let total_tokens: usize = token_chunks.iter().map(std::vec::Vec::len).sum();
920
921 let _ = total_tokens; }
925
926 #[test]
927 fn test_multibyte_delimiter_no_panic() {
928 let tokenizer = create_test_tokenizer();
929 let mut stream = StreamingTokenizer::new(tokenizer)
930 .with_sentence_delimiters(vec!['.', '!', '?', '。', '.', '\n']);
931
932 let tokens = stream.process_chunk("テスト。次の文。\n");
934 let remaining = stream.flush();
935 let total = tokens.len() + remaining.len();
936 assert!(total > 0 || stream.buffer_len() == 0);
937 }
938
939 #[test]
940 fn test_decimal_number_not_split() {
941 let tokenizer = create_test_tokenizer();
942 let mut stream = StreamingTokenizer::new(tokenizer);
943
944 let tokens = stream.process_chunk("값은 3.14입니다.\n");
945 let remaining = stream.flush();
946 let all: Vec<_> = tokens.into_iter().chain(remaining).collect();
947 let surfaces: Vec<_> = all.iter().map(|t| t.surface.as_str()).collect();
949 let joined = surfaces.join(" ");
950 assert!(
951 !joined.contains("3 .") && !joined.contains(". 14"),
952 "Decimal was incorrectly split: {joined}"
953 );
954 }
955
956 #[test]
957 fn test_buffer_limit_forces_flush() {
958 let tokenizer = create_test_tokenizer();
959 let mut stream = StreamingTokenizer::new(tokenizer);
960 stream.max_buffer_size = 32;
962
963 let tokens = stream.process_chunk(&"가".repeat(100));
965 assert!(!tokens.is_empty(), "Buffer limit should force a flush");
966 }
967
968 #[test]
969 fn test_safe_split_point() {
970 let tokenizer = create_test_tokenizer();
971 let stream = StreamingTokenizer::new(tokenizer)
972 .with_sentence_delimiters(vec!['.', '!', '?', '\n', ' ']);
973
974 let mut stream = stream;
976 let _tokens = stream.process_chunk("안녕하세요 감사합니다");
977
978 assert!(stream.buffer_len() > 0);
980 }
981}
982
983pub struct SentenceReader<R: BufRead> {
1012 reader: R,
1013 buffer: String,
1015 queue: std::collections::VecDeque<String>,
1017 eof: bool,
1019 max_buffer_size: usize,
1021}
1022
1023impl<R: BufRead> SentenceReader<R> {
1024 pub const DEFAULT_MAX_BUFFER_SIZE: usize = 16 * 1024 * 1024;
1026
1027 #[must_use]
1029 pub const fn new(reader: R) -> Self {
1030 Self {
1031 reader,
1032 buffer: String::new(),
1033 queue: std::collections::VecDeque::new(),
1034 eof: false,
1035 max_buffer_size: Self::DEFAULT_MAX_BUFFER_SIZE,
1036 }
1037 }
1038
1039 #[must_use]
1043 pub const fn with_max_buffer_size(mut self, size: usize) -> Self {
1044 self.max_buffer_size = size;
1045 self
1046 }
1047
1048 fn drain_sentences(&mut self) {
1055 let buf = self.buffer.as_str();
1057 let indices: Vec<(usize, char)> = buf.char_indices().collect();
1058 let len = indices.len();
1059 let mut start_char = 0; let mut i = 0;
1062 while i < len {
1063 let (_, ch) = indices[i];
1064
1065 if ch == '\n' {
1066 let start_byte = indices[start_char].0;
1067 let end_byte = indices[i].0;
1068 let trimmed = buf[start_byte..end_byte].trim();
1069 if !trimmed.is_empty() {
1070 self.queue.push_back(trimmed.to_string());
1071 }
1072 start_char = i + 1;
1073 i += 1;
1074 continue;
1075 }
1076
1077 if matches!(ch, '.' | '?' | '!') {
1078 if ch == '.' {
1079 let prev_is_digit = i > 0 && indices[i - 1].1.is_ascii_digit();
1080 let next_is_digit = i + 1 < len && indices[i + 1].1.is_ascii_digit();
1081 if prev_is_digit && next_is_digit {
1082 i += 1;
1083 continue;
1084 }
1085 }
1086
1087 let punct_byte_end = indices[i].0 + ch.len_utf8();
1088
1089 let mut j = i + 1;
1090 while j < len && matches!(indices[j].1, ')' | ']' | '"' | '\'') {
1091 j += 1;
1092 }
1093
1094 let followed_by_whitespace = j < len && indices[j].1.is_whitespace();
1095 let followed_by_eof = j >= len && self.eof;
1096
1097 if followed_by_whitespace || followed_by_eof {
1098 let start_byte = indices[start_char].0;
1099 let trimmed = buf[start_byte..punct_byte_end].trim();
1100 if !trimmed.is_empty() {
1101 self.queue.push_back(trimmed.to_string());
1102 }
1103 start_char = j;
1104 if j < len && indices[j].1.is_whitespace() && indices[j].1 != '\n' {
1105 start_char = j + 1;
1106 i = j + 1;
1107 } else {
1108 i = j;
1109 }
1110 continue;
1111 }
1112 }
1113
1114 i += 1;
1115 }
1116
1117 if self.eof && start_char < len {
1118 let start_byte = indices[start_char].0;
1119 let trimmed = buf[start_byte..].trim();
1120 if !trimmed.is_empty() {
1121 self.queue.push_back(trimmed.to_string());
1122 }
1123 self.buffer.clear();
1124 } else if start_char > 0 && start_char < len {
1125 let byte_offset = indices[start_char].0;
1126 self.buffer.drain(..byte_offset);
1127 } else if start_char >= len && !self.eof {
1128 self.buffer.clear();
1129 }
1130 }
1131
1132 fn fill_buffer(&mut self) -> io::Result<bool> {
1137 if self.buffer.len() >= self.max_buffer_size {
1138 let trimmed = self.buffer.trim().to_string();
1140 if !trimmed.is_empty() {
1141 self.queue.push_back(trimmed);
1142 }
1143 self.buffer.clear();
1144 }
1145
1146 let mut line = String::new();
1147 let n = self.reader.read_line(&mut line)?;
1148 if n == 0 {
1149 self.eof = true;
1150 Ok(false)
1151 } else {
1152 self.buffer.push_str(&line);
1153 Ok(true)
1154 }
1155 }
1156}
1157
1158impl<R: BufRead> Iterator for SentenceReader<R> {
1159 type Item = io::Result<String>;
1160
1161 fn next(&mut self) -> Option<Self::Item> {
1162 loop {
1163 if let Some(sentence) = self.queue.pop_front() {
1165 return Some(Ok(sentence));
1166 }
1167
1168 if self.eof {
1170 return None;
1171 }
1172
1173 if let Err(e) = self.fill_buffer() {
1175 return Some(Err(e));
1176 }
1177
1178 self.drain_sentences();
1180 }
1181 }
1182}
1183
1184#[cfg(test)]
1185#[allow(clippy::expect_used)]
1186mod sentence_reader_tests {
1187 use super::*;
1188 use std::io::Cursor;
1189
1190 #[test]
1191 fn test_single_sentence() {
1192 let input = "안녕하세요.\n";
1193 let reader = SentenceReader::new(Cursor::new(input));
1194 let sentences: Vec<_> = reader.map(|r| r.unwrap()).collect();
1195 assert_eq!(sentences, vec!["안녕하세요."]);
1196 }
1197
1198 #[test]
1199 fn test_multiple_sentences() {
1200 let input = "첫 번째 문장입니다. 두 번째 문장입니다.\n";
1201 let reader = SentenceReader::new(Cursor::new(input));
1202 let sentences: Vec<_> = reader.map(|r| r.unwrap()).collect();
1203 assert_eq!(sentences.len(), 2);
1204 assert_eq!(sentences[0], "첫 번째 문장입니다.");
1205 assert_eq!(sentences[1], "두 번째 문장입니다.");
1206 }
1207
1208 #[test]
1209 fn test_newline_boundary() {
1210 let input = "줄 하나\n줄 둘\n";
1211 let reader = SentenceReader::new(Cursor::new(input));
1212 let sentences: Vec<_> = reader.map(|r| r.unwrap()).collect();
1213 assert_eq!(sentences, vec!["줄 하나", "줄 둘"]);
1214 }
1215
1216 #[test]
1217 fn test_decimal_not_boundary() {
1218 let input = "값은 3.14입니다.\n";
1219 let reader = SentenceReader::new(Cursor::new(input));
1220 let sentences: Vec<_> = reader.map(|r| r.unwrap()).collect();
1221 assert_eq!(sentences, vec!["값은 3.14입니다."]);
1222 }
1223
1224 #[test]
1225 fn test_question_mark() {
1226 let input = "이것은 무엇인가요? 네, 맞습니다.\n";
1227 let reader = SentenceReader::new(Cursor::new(input));
1228 let sentences: Vec<_> = reader.collect::<std::result::Result<_, _>>().unwrap();
1229 assert_eq!(sentences.len(), 2);
1230 }
1231
1232 #[test]
1233 fn test_empty_input() {
1234 let input = "";
1235 let reader = SentenceReader::new(Cursor::new(input));
1236 let sentences: Vec<_> = reader.collect::<std::result::Result<_, _>>().unwrap();
1237 assert!(sentences.is_empty());
1238 }
1239
1240 #[test]
1241 fn test_no_trailing_newline() {
1242 let input = "마지막 문장";
1243 let reader = SentenceReader::new(Cursor::new(input));
1244 let sentences: Vec<_> = reader.map(|r| r.unwrap()).collect();
1245 assert_eq!(sentences, vec!["마지막 문장"]);
1246 }
1247
1248 #[test]
1249 fn test_multiple_newlines() {
1250 let input = "첫째\n\n둘째\n";
1251 let reader = SentenceReader::new(Cursor::new(input));
1252 let sentences: Vec<_> = reader.map(|r| r.unwrap()).collect();
1253 assert_eq!(sentences, vec!["첫째", "둘째"]);
1255 }
1256
1257 #[test]
1258 fn test_exclamation() {
1259 let input = "대단합니다! 정말요?\n";
1260 let reader = SentenceReader::new(Cursor::new(input));
1261 let sentences: Vec<_> = reader.collect::<std::result::Result<_, _>>().unwrap();
1262 assert_eq!(sentences.len(), 2);
1263 }
1264
1265 #[test]
1266 fn test_sentence_reader_is_send() {
1267 fn assert_send<T: Send>() {}
1268 assert_send::<SentenceReader<std::io::Cursor<&[u8]>>>();
1269 }
1270
1271 #[test]
1272 fn test_closing_paren_before_whitespace() {
1273 let input = "문장입니다.) 다음 문장.\n";
1275 let reader = SentenceReader::new(Cursor::new(input));
1276 let sentences: Vec<_> = reader.collect::<std::result::Result<_, _>>().unwrap();
1277 assert_eq!(sentences.len(), 2);
1278 }
1279
1280 #[test]
1281 fn test_no_trailing_newline_punctuation() {
1282 let input = "첫째. 둘째.";
1284 let reader = SentenceReader::new(Cursor::new(input));
1285 let sentences: Vec<_> = reader.map(|r| r.unwrap()).collect();
1286 assert_eq!(sentences.len(), 2);
1287 assert_eq!(sentences[0], "첫째.");
1288 assert_eq!(sentences[1], "둘째.");
1289 }
1290
1291 #[test]
1292 fn test_buffer_limit_prevents_oom() {
1293 let long_line = "가".repeat(200);
1296 let reader = SentenceReader::new(Cursor::new(long_line.as_str())).with_max_buffer_size(64);
1297 let sentences: Vec<_> = reader.map(|r| r.unwrap()).collect();
1298 assert!(!sentences.is_empty());
1300 }
1301}