Skip to main content

mecab_ko_core/
streaming.rs

1//! # Streaming Tokenizer Module
2//!
3//! 대용량 텍스트 스트리밍 처리를 위한 API
4//!
5//! ## 주요 기능
6//!
7//! - 청크 단위 토큰화
8//! - 문장 경계 감지 및 버퍼링
9//! - 메모리 효율적인 대용량 파일 처리
10//!
11//! ## Example
12//!
13//! ```rust,no_run
14//! use mecab_ko_core::streaming::StreamingTokenizer;
15//! use mecab_ko_core::tokenizer::Tokenizer;
16//!
17//! let tokenizer = Tokenizer::new().unwrap();
18//! let mut stream = StreamingTokenizer::new(tokenizer);
19//!
20//! // 청크 단위로 처리
21//! let text_chunks = vec!["안녕하세요. ", "오늘 날씨가 좋네요."];
22//! for chunk in text_chunks {
23//!     let tokens = stream.process_chunk(chunk);
24//!     for token in tokens {
25//!         println!("{}: {}", token.surface, token.pos);
26//!     }
27//! }
28//!
29//! // 남은 버퍼 flush
30//! let remaining = stream.flush();
31//! ```
32
33use std::collections::VecDeque;
34use std::io::{self, BufRead, BufReader, Read};
35
36use crate::tokenizer::{Token, Tokenizer};
37use crate::Result;
38
39/// 스트리밍 토크나이저
40///
41/// 대용량 텍스트를 청크 단위로 처리하며, 문장 경계를 고려하여
42/// 올바른 토큰화를 보장합니다.
43pub struct StreamingTokenizer {
44    /// 내부 토크나이저
45    tokenizer: Tokenizer,
46
47    /// 버퍼 (문장 경계를 고려하여 이전 청크의 일부를 보관)
48    buffer: String,
49
50    /// 청크 크기 (바이트)
51    chunk_size: usize,
52
53    /// 문장 구분자
54    sentence_delimiters: Vec<char>,
55
56    /// 전체 처리된 문자 수
57    total_chars_processed: usize,
58
59    /// 버퍼 최대 크기 (바이트). 초과 시 강제 flush.
60    max_buffer_size: usize,
61}
62
63impl StreamingTokenizer {
64    /// 기본 청크 크기 (8KB)
65    pub const DEFAULT_CHUNK_SIZE: usize = 8192;
66
67    /// 기본 버퍼 최대 크기 (16MB)
68    pub const DEFAULT_MAX_BUFFER_SIZE: usize = 16 * 1024 * 1024;
69
70    /// 새 스트리밍 토크나이저 생성
71    ///
72    /// # Arguments
73    ///
74    /// * `tokenizer` - 내부 토크나이저
75    ///
76    /// # Example
77    ///
78    /// ```rust,no_run
79    /// use mecab_ko_core::tokenizer::Tokenizer;
80    /// use mecab_ko_core::streaming::StreamingTokenizer;
81    ///
82    /// let tokenizer = Tokenizer::new().unwrap();
83    /// let stream = StreamingTokenizer::new(tokenizer);
84    /// ```
85    #[must_use]
86    pub fn new(tokenizer: Tokenizer) -> Self {
87        Self {
88            tokenizer,
89            buffer: String::with_capacity(Self::DEFAULT_CHUNK_SIZE),
90            chunk_size: Self::DEFAULT_CHUNK_SIZE,
91            sentence_delimiters: vec!['.', '!', '?', '。', '.', '\n'],
92            total_chars_processed: 0,
93            max_buffer_size: Self::DEFAULT_MAX_BUFFER_SIZE,
94        }
95    }
96
97    /// 청크 크기 설정
98    ///
99    /// # Arguments
100    ///
101    /// * `size` - 청크 크기 (바이트)
102    #[must_use]
103    pub fn with_chunk_size(mut self, size: usize) -> Self {
104        self.chunk_size = size;
105        self.buffer = String::with_capacity(size);
106        self
107    }
108
109    /// 문장 구분자 설정
110    ///
111    /// # Arguments
112    ///
113    /// * `delimiters` - 문장 구분자 목록
114    #[must_use]
115    pub fn with_sentence_delimiters(mut self, delimiters: Vec<char>) -> Self {
116        self.sentence_delimiters = delimiters;
117        self
118    }
119
120    /// 청크 처리
121    ///
122    /// 입력 청크를 버퍼에 추가하고, 완전한 문장을 토큰화합니다.
123    ///
124    /// # Arguments
125    ///
126    /// * `chunk` - 입력 청크
127    ///
128    /// # Returns
129    ///
130    /// 토큰 목록
131    pub fn process_chunk(&mut self, chunk: &str) -> Vec<Token> {
132        self.buffer.push_str(chunk);
133
134        let split_pos = self.find_last_sentence_boundary();
135
136        if let Some(pos) = split_pos {
137            let to_process = self.buffer[..=pos].to_string();
138            let remaining = self.buffer[pos + 1..].to_string();
139
140            let mut tokens = self.tokenizer.tokenize(&to_process);
141
142            for token in &mut tokens {
143                token.start_pos += self.total_chars_processed;
144                token.end_pos += self.total_chars_processed;
145            }
146
147            self.total_chars_processed += to_process.chars().count();
148            self.buffer = remaining;
149
150            tokens
151        } else if self.buffer.len() > self.max_buffer_size {
152            self.force_flush_partial()
153        } else {
154            Vec::new()
155        }
156    }
157
158    /// 마지막 문장 경계 찾기 (역방향 탐색으로 최적화)
159    ///
160    /// Returns the byte index of the last byte of the delimiter character,
161    /// so that `buffer[..=pos]` includes the full delimiter and
162    /// `buffer[pos+1..]` starts at the next character boundary.
163    fn find_last_sentence_boundary(&self) -> Option<usize> {
164        let bytes = self.buffer.as_bytes();
165        for (i, ch) in self.buffer.char_indices().rev() {
166            if self.sentence_delimiters.contains(&ch) {
167                // Decimal number exception: digit.digit is not a boundary.
168                if ch == '.' && i > 0 && i + ch.len_utf8() < bytes.len() {
169                    let prev_byte = bytes[i - 1];
170                    let next_byte = bytes[i + ch.len_utf8()];
171                    if prev_byte.is_ascii_digit() && next_byte.is_ascii_digit() {
172                        continue;
173                    }
174                }
175                return Some(i + ch.len_utf8() - 1);
176            }
177        }
178        None
179    }
180
181    /// 문장 경계에서 분할 (단어 중간 분할 방지)
182    fn find_safe_split_point(&self, target_pos: usize) -> usize {
183        // Snap target_pos to a valid char boundary first.
184        let mut pos = target_pos.min(self.buffer.len());
185        while pos > 0 && !self.buffer.is_char_boundary(pos) {
186            pos -= 1;
187        }
188
189        // Walk backwards through valid char boundaries looking for whitespace
190        // or a sentence delimiter.
191        for (byte_idx, ch) in self.buffer[..pos].char_indices().rev() {
192            if ch.is_whitespace() || self.sentence_delimiters.contains(&ch) {
193                return byte_idx + ch.len_utf8();
194            }
195        }
196
197        // No safe split point found — fall back to the snapped boundary.
198        pos
199    }
200
201    /// 부분 버퍼 강제 flush (문장 경계가 없을 때)
202    fn force_flush_partial(&mut self) -> Vec<Token> {
203        // 안전한 분할점에서 처리 (단어 중간 분할 방지)
204        let target_pos = self.buffer.len() / 2;
205        let split_pos = self.find_safe_split_point(target_pos);
206
207        if split_pos == 0 {
208            // 분할점을 찾을 수 없으면 전체 버퍼 처리
209            return self.flush();
210        }
211
212        let to_process = self.buffer[..split_pos].to_string();
213        let remaining = self.buffer[split_pos..].to_string();
214
215        let mut tokens = self.tokenizer.tokenize(&to_process);
216
217        for token in &mut tokens {
218            token.start_pos += self.total_chars_processed;
219            token.end_pos += self.total_chars_processed;
220        }
221
222        self.total_chars_processed += to_process.chars().count();
223        self.buffer = remaining;
224
225        tokens
226    }
227
228    /// 남은 버퍼 처리
229    ///
230    /// 스트림 처리가 끝난 후 버퍼에 남아있는 텍스트를 처리합니다.
231    ///
232    /// # Returns
233    ///
234    /// 남은 토큰 목록
235    pub fn flush(&mut self) -> Vec<Token> {
236        if self.buffer.is_empty() {
237            return Vec::new();
238        }
239
240        let to_process = std::mem::take(&mut self.buffer);
241        let mut tokens = self.tokenizer.tokenize(&to_process);
242
243        for token in &mut tokens {
244            token.start_pos += self.total_chars_processed;
245            token.end_pos += self.total_chars_processed;
246        }
247
248        self.total_chars_processed += to_process.chars().count();
249
250        tokens
251    }
252
253    /// Reader에서 스트리밍 처리
254    ///
255    /// # Arguments
256    ///
257    /// * `reader` - 입력 Reader
258    ///
259    /// # Returns
260    ///
261    /// 모든 토큰 목록
262    ///
263    /// # Errors
264    ///
265    /// I/O 에러 발생 시
266    pub fn process_reader<R: Read>(&mut self, reader: R) -> Result<Vec<Token>> {
267        let mut buf_reader = BufReader::with_capacity(self.chunk_size, reader);
268        let mut all_tokens = Vec::new();
269
270        loop {
271            let mut line = String::new();
272            let bytes_read = buf_reader
273                .read_line(&mut line)
274                .map_err(|e| crate::Error::Analysis(format!("Failed to read line: {e}")))?;
275
276            if bytes_read == 0 {
277                break; // EOF
278            }
279
280            let tokens = self.process_chunk(&line);
281            all_tokens.extend(tokens);
282        }
283
284        // Flush 남은 버퍼
285        let remaining = self.flush();
286        all_tokens.extend(remaining);
287
288        Ok(all_tokens)
289    }
290
291    /// 파일에서 스트리밍 처리
292    ///
293    /// # Arguments
294    ///
295    /// * `path` - 파일 경로
296    ///
297    /// # Returns
298    ///
299    /// 모든 토큰 목록
300    ///
301    /// # Errors
302    ///
303    /// 파일을 열 수 없거나 읽기 실패 시
304    pub fn process_file<P: AsRef<std::path::Path>>(&mut self, path: P) -> Result<Vec<Token>> {
305        let file = std::fs::File::open(path)
306            .map_err(|e| crate::Error::Analysis(format!("Failed to open file: {e}")))?;
307        self.process_reader(file)
308    }
309
310    /// 버퍼 크기 확인
311    #[must_use]
312    pub fn buffer_len(&self) -> usize {
313        self.buffer.len()
314    }
315
316    /// 처리된 문자 수
317    #[must_use]
318    pub const fn total_chars_processed(&self) -> usize {
319        self.total_chars_processed
320    }
321
322    /// 스트림 리셋
323    pub fn reset(&mut self) {
324        self.buffer.clear();
325        self.total_chars_processed = 0;
326    }
327}
328
329/// Iterator 기반 스트리밍 토크나이저
330///
331/// 텍스트 청크 iterator를 받아 토큰을 생성합니다.
332/// `VecDeque`를 사용하여 O(1) dequeue 성능을 보장합니다.
333pub struct TokenStream<I>
334where
335    I: Iterator<Item = String>,
336{
337    /// 청크 iterator
338    chunks: I,
339
340    /// 스트리밍 토크나이저
341    streaming: StreamingTokenizer,
342
343    /// 현재 처리 중인 토큰 버퍼 (`VecDeque` for O(1) `pop_front`)
344    token_buffer: VecDeque<Token>,
345
346    /// 스트림 종료 여부
347    finished: bool,
348
349    /// 처리된 총 토큰 수 (`size_hint`용)
350    tokens_yielded: usize,
351}
352
353impl<I> TokenStream<I>
354where
355    I: Iterator<Item = String>,
356{
357    /// 새 토큰 스트림 생성
358    ///
359    /// # Arguments
360    ///
361    /// * `chunks` - 텍스트 청크 iterator
362    /// * `tokenizer` - 토크나이저
363    #[must_use]
364    pub fn new(chunks: I, tokenizer: Tokenizer) -> Self {
365        Self {
366            chunks,
367            streaming: StreamingTokenizer::new(tokenizer),
368            token_buffer: VecDeque::new(),
369            finished: false,
370            tokens_yielded: 0,
371        }
372    }
373
374    /// 청크 크기 설정
375    #[must_use]
376    pub fn with_chunk_size(mut self, size: usize) -> Self {
377        self.streaming = self.streaming.with_chunk_size(size);
378        self
379    }
380
381    /// 처리된 토큰 수 조회
382    #[must_use]
383    pub const fn tokens_yielded(&self) -> usize {
384        self.tokens_yielded
385    }
386}
387
388impl<I> Iterator for TokenStream<I>
389where
390    I: Iterator<Item = String>,
391{
392    type Item = Token;
393
394    fn next(&mut self) -> Option<Self::Item> {
395        // 버퍼에서 토큰 반환 (O(1) pop_front)
396        if let Some(token) = self.token_buffer.pop_front() {
397            self.tokens_yielded += 1;
398            return Some(token);
399        }
400
401        // 스트림이 끝났으면 None
402        if self.finished {
403            return None;
404        }
405
406        // 다음 청크 처리
407        for chunk in self.chunks.by_ref() {
408            let tokens = self.streaming.process_chunk(&chunk);
409
410            if !tokens.is_empty() {
411                self.token_buffer.extend(tokens);
412                if let Some(token) = self.token_buffer.pop_front() {
413                    self.tokens_yielded += 1;
414                    return Some(token);
415                }
416            }
417        }
418
419        // 청크가 더 이상 없으면 flush
420        self.finished = true;
421        let remaining = self.streaming.flush();
422
423        if !remaining.is_empty() {
424            self.token_buffer.extend(remaining);
425            if let Some(token) = self.token_buffer.pop_front() {
426                self.tokens_yielded += 1;
427                return Some(token);
428            }
429        }
430
431        None
432    }
433
434    fn size_hint(&self) -> (usize, Option<usize>) {
435        // 버퍼에 있는 토큰 수를 최소 하한으로 제공
436        (self.token_buffer.len(), None)
437    }
438}
439
440/// 진행률 콜백 타입
441pub type ProgressCallback = Box<dyn Fn(StreamingProgress) + Send>;
442
443/// 스트리밍 진행 상황
444#[derive(Debug, Clone)]
445pub struct StreamingProgress {
446    /// 처리된 바이트 수
447    pub bytes_processed: usize,
448    /// 총 바이트 수 (알 수 있는 경우)
449    pub total_bytes: Option<usize>,
450    /// 처리된 토큰 수
451    pub tokens_generated: usize,
452    /// 처리된 청크 수
453    pub chunks_processed: usize,
454}
455
456impl StreamingProgress {
457    /// 진행률 퍼센트 계산
458    #[must_use]
459    #[allow(clippy::cast_precision_loss)]
460    pub fn percent(&self) -> Option<f64> {
461        self.total_bytes
462            .map(|total| (self.bytes_processed as f64 / total as f64) * 100.0)
463    }
464}
465
466/// 진행률 추적 스트리밍 토크나이저
467///
468/// 대용량 파일 처리 시 진행 상황을 콜백으로 보고합니다.
469pub struct ProgressStreamingTokenizer {
470    /// 내부 스트리밍 토크나이저
471    inner: StreamingTokenizer,
472
473    /// 진행률 콜백
474    callback: Option<ProgressCallback>,
475
476    /// 처리된 바이트 수
477    bytes_processed: usize,
478
479    /// 총 바이트 수
480    total_bytes: Option<usize>,
481
482    /// 생성된 토큰 수
483    tokens_generated: usize,
484
485    /// 처리된 청크 수
486    chunks_processed: usize,
487
488    /// 콜백 호출 간격 (바이트)
489    callback_interval: usize,
490
491    /// 마지막 콜백 호출 시 처리된 바이트
492    last_callback_bytes: usize,
493}
494
495impl ProgressStreamingTokenizer {
496    /// 기본 콜백 간격 (64KB)
497    pub const DEFAULT_CALLBACK_INTERVAL: usize = 65536;
498
499    /// 새 진행률 추적 토크나이저 생성
500    #[must_use]
501    pub fn new(tokenizer: Tokenizer) -> Self {
502        Self {
503            inner: StreamingTokenizer::new(tokenizer),
504            callback: None,
505            bytes_processed: 0,
506            total_bytes: None,
507            tokens_generated: 0,
508            chunks_processed: 0,
509            callback_interval: Self::DEFAULT_CALLBACK_INTERVAL,
510            last_callback_bytes: 0,
511        }
512    }
513
514    /// 진행률 콜백 설정
515    #[must_use]
516    pub fn with_progress_callback<F>(mut self, callback: F) -> Self
517    where
518        F: Fn(StreamingProgress) + Send + 'static,
519    {
520        self.callback = Some(Box::new(callback));
521        self
522    }
523
524    /// 총 바이트 수 설정 (진행률 계산용)
525    #[must_use]
526    pub const fn with_total_bytes(mut self, total: usize) -> Self {
527        self.total_bytes = Some(total);
528        self
529    }
530
531    /// 콜백 간격 설정
532    #[must_use]
533    pub const fn with_callback_interval(mut self, interval: usize) -> Self {
534        self.callback_interval = interval;
535        self
536    }
537
538    /// 청크 크기 설정
539    #[must_use]
540    pub fn with_chunk_size(mut self, size: usize) -> Self {
541        self.inner = self.inner.with_chunk_size(size);
542        self
543    }
544
545    /// 청크 처리
546    pub fn process_chunk(&mut self, chunk: &str) -> Vec<Token> {
547        self.bytes_processed += chunk.len();
548        self.chunks_processed += 1;
549
550        let tokens = self.inner.process_chunk(chunk);
551        self.tokens_generated += tokens.len();
552
553        // 콜백 호출 간격 확인
554        if self.bytes_processed - self.last_callback_bytes >= self.callback_interval {
555            self.report_progress();
556            self.last_callback_bytes = self.bytes_processed;
557        }
558
559        tokens
560    }
561
562    /// 남은 버퍼 처리
563    pub fn flush(&mut self) -> Vec<Token> {
564        let tokens = self.inner.flush();
565        self.tokens_generated += tokens.len();
566
567        // 최종 진행률 보고
568        self.report_progress();
569
570        tokens
571    }
572
573    /// 진행 상황 보고
574    fn report_progress(&self) {
575        if let Some(ref callback) = self.callback {
576            callback(StreamingProgress {
577                bytes_processed: self.bytes_processed,
578                total_bytes: self.total_bytes,
579                tokens_generated: self.tokens_generated,
580                chunks_processed: self.chunks_processed,
581            });
582        }
583    }
584
585    /// Reader에서 스트리밍 처리 (진행률 추적)
586    ///
587    /// # Errors
588    ///
589    /// I/O 에러 발생 시
590    pub fn process_reader<R: Read>(&mut self, reader: R) -> Result<Vec<Token>> {
591        let mut buf_reader = BufReader::with_capacity(self.inner.chunk_size, reader);
592        let mut all_tokens = Vec::new();
593
594        loop {
595            let mut line = String::new();
596            let bytes_read = buf_reader
597                .read_line(&mut line)
598                .map_err(|e| crate::Error::Analysis(format!("Failed to read line: {e}")))?;
599
600            if bytes_read == 0 {
601                break;
602            }
603
604            let tokens = self.process_chunk(&line);
605            all_tokens.extend(tokens);
606        }
607
608        let remaining = self.flush();
609        all_tokens.extend(remaining);
610
611        Ok(all_tokens)
612    }
613
614    /// 파일에서 스트리밍 처리 (자동 크기 감지)
615    ///
616    /// # Errors
617    ///
618    /// 파일을 열 수 없거나 읽기 실패 시
619    #[allow(clippy::cast_possible_truncation)]
620    pub fn process_file<P: AsRef<std::path::Path>>(&mut self, path: P) -> Result<Vec<Token>> {
621        let metadata = std::fs::metadata(path.as_ref())
622            .map_err(|e| crate::Error::Analysis(format!("Failed to read metadata: {e}")))?;
623
624        self.total_bytes = Some(metadata.len() as usize);
625
626        let file = std::fs::File::open(path)
627            .map_err(|e| crate::Error::Analysis(format!("Failed to open file: {e}")))?;
628
629        self.process_reader(file)
630    }
631
632    /// 현재 진행 상황 조회
633    #[must_use]
634    pub const fn progress(&self) -> StreamingProgress {
635        StreamingProgress {
636            bytes_processed: self.bytes_processed,
637            total_bytes: self.total_bytes,
638            tokens_generated: self.tokens_generated,
639            chunks_processed: self.chunks_processed,
640        }
641    }
642
643    /// 리셋
644    pub fn reset(&mut self) {
645        self.inner.reset();
646        self.bytes_processed = 0;
647        self.tokens_generated = 0;
648        self.chunks_processed = 0;
649        self.last_callback_bytes = 0;
650    }
651}
652
653/// 청크별 토큰 이터레이터
654///
655/// 토큰을 개별로 반환하지 않고 청크 단위로 반환하여 메모리 효율성 향상
656pub struct ChunkedTokenIterator<I>
657where
658    I: Iterator<Item = String>,
659{
660    /// 청크 iterator
661    chunks: I,
662
663    /// 스트리밍 토크나이저
664    streaming: StreamingTokenizer,
665
666    /// 스트림 종료 여부
667    finished: bool,
668}
669
670impl<I> ChunkedTokenIterator<I>
671where
672    I: Iterator<Item = String>,
673{
674    /// 새 청크 토큰 이터레이터 생성
675    #[must_use]
676    pub fn new(chunks: I, tokenizer: Tokenizer) -> Self {
677        Self {
678            chunks,
679            streaming: StreamingTokenizer::new(tokenizer),
680            finished: false,
681        }
682    }
683
684    /// 청크 크기 설정
685    #[must_use]
686    pub fn with_chunk_size(mut self, size: usize) -> Self {
687        self.streaming = self.streaming.with_chunk_size(size);
688        self
689    }
690}
691
692impl<I> Iterator for ChunkedTokenIterator<I>
693where
694    I: Iterator<Item = String>,
695{
696    type Item = Vec<Token>;
697
698    fn next(&mut self) -> Option<Self::Item> {
699        if self.finished {
700            return None;
701        }
702
703        // 다음 청크에서 토큰 생성
704        for chunk in self.chunks.by_ref() {
705            let tokens = self.streaming.process_chunk(&chunk);
706            if !tokens.is_empty() {
707                return Some(tokens);
708            }
709        }
710
711        // 청크가 더 이상 없으면 flush
712        self.finished = true;
713        let remaining = self.streaming.flush();
714
715        if remaining.is_empty() {
716            None
717        } else {
718            Some(remaining)
719        }
720    }
721}
722
723#[cfg(test)]
724#[allow(clippy::expect_used)]
725mod tests {
726    use super::*;
727
728    fn create_test_tokenizer() -> Tokenizer {
729        Tokenizer::new().expect("should create tokenizer")
730    }
731
732    #[test]
733    fn test_streaming_tokenizer_creation() {
734        let tokenizer = create_test_tokenizer();
735        let stream = StreamingTokenizer::new(tokenizer);
736
737        assert_eq!(stream.buffer_len(), 0);
738        assert_eq!(stream.total_chars_processed(), 0);
739    }
740
741    #[test]
742    fn test_process_chunk_with_delimiter() {
743        let tokenizer = create_test_tokenizer();
744        let mut stream = StreamingTokenizer::new(tokenizer);
745
746        let tokens = stream.process_chunk("안녕\n");
747        assert!(!tokens.is_empty() || stream.buffer_len() > 0);
748
749        // Flush로 남은 토큰 확인
750        let remaining = stream.flush();
751        let total_tokens = tokens.len() + remaining.len();
752        assert!(total_tokens > 0);
753    }
754
755    #[test]
756    fn test_process_chunk_without_delimiter() {
757        let tokenizer = create_test_tokenizer();
758        let mut stream = StreamingTokenizer::new(tokenizer);
759
760        let tokens = stream.process_chunk("안녕하세요");
761        // 구분자가 없으면 버퍼에 저장
762        assert!(tokens.is_empty() || stream.buffer_len() > 0);
763    }
764
765    #[test]
766    fn test_flush() {
767        let tokenizer = create_test_tokenizer();
768        let mut stream = StreamingTokenizer::new(tokenizer);
769
770        stream.process_chunk("안녕하세요");
771        let tokens = stream.flush();
772
773        assert!(!tokens.is_empty());
774        assert_eq!(stream.buffer_len(), 0);
775    }
776
777    #[test]
778    fn test_multiple_chunks() {
779        let tokenizer = create_test_tokenizer();
780        let mut stream = StreamingTokenizer::new(tokenizer);
781
782        let _tokens1 = stream.process_chunk("안녕하세요.\n");
783        let _tokens2 = stream.process_chunk("감사합니다.\n");
784        let _remaining = stream.flush();
785
786        assert!(stream.total_chars_processed() > 0);
787    }
788
789    #[test]
790    fn test_reset() {
791        let tokenizer = create_test_tokenizer();
792        let mut stream = StreamingTokenizer::new(tokenizer);
793
794        stream.process_chunk("안녕하세요");
795        stream.reset();
796
797        assert_eq!(stream.buffer_len(), 0);
798        assert_eq!(stream.total_chars_processed(), 0);
799    }
800
801    #[test]
802    fn test_custom_chunk_size() {
803        let tokenizer = create_test_tokenizer();
804        let stream = StreamingTokenizer::new(tokenizer).with_chunk_size(1024);
805
806        assert_eq!(stream.chunk_size, 1024);
807    }
808
809    #[test]
810    fn test_custom_delimiters() {
811        let tokenizer = create_test_tokenizer();
812        let stream =
813            StreamingTokenizer::new(tokenizer).with_sentence_delimiters(vec!['.', '!', '?']);
814
815        assert_eq!(stream.sentence_delimiters.len(), 3);
816    }
817
818    #[test]
819    fn test_token_stream_creation() {
820        let tokenizer = create_test_tokenizer();
821        let chunks = vec!["안녕하세요.\n".to_string(), "감사합니다.\n".to_string()];
822        let stream = TokenStream::new(chunks.into_iter(), tokenizer);
823
824        assert!(!stream.finished);
825    }
826
827    #[test]
828    fn test_token_stream_iteration() {
829        let tokenizer = create_test_tokenizer();
830        let chunks = vec!["안녕\n".to_string(), "감사\n".to_string()];
831        let stream = TokenStream::new(chunks.into_iter(), tokenizer);
832
833        let tokens: Vec<_> = stream.collect();
834        assert!(!tokens.is_empty());
835    }
836
837    #[test]
838    fn test_token_stream_tokens_yielded() {
839        let tokenizer = create_test_tokenizer();
840        let chunks = vec!["안녕하세요.\n".to_string()];
841        let mut stream = TokenStream::new(chunks.into_iter(), tokenizer);
842
843        // 몇 개의 토큰 소비
844        let mut count = 0;
845        while stream.next().is_some() {
846            count += 1;
847        }
848
849        // 토큰이 생성되었고, 카운트가 맞는지 확인
850        assert_eq!(stream.tokens_yielded(), count);
851    }
852
853    #[test]
854    fn test_token_stream_size_hint() {
855        let tokenizer = create_test_tokenizer();
856        let chunks = vec!["안녕하세요.\n".to_string()];
857        let stream = TokenStream::new(chunks.into_iter(), tokenizer);
858
859        let (lower, _upper) = stream.size_hint();
860        // 초기에는 버퍼가 비어있으므로 0
861        assert_eq!(lower, 0);
862    }
863
864    #[test]
865    fn test_progress_streaming_tokenizer() {
866        let tokenizer = create_test_tokenizer();
867        let mut stream = ProgressStreamingTokenizer::new(tokenizer);
868
869        let _tokens = stream.process_chunk("안녕하세요.\n");
870        let progress = stream.progress();
871
872        assert!(progress.bytes_processed > 0);
873        assert!(progress.chunks_processed > 0);
874    }
875
876    #[test]
877    fn test_progress_callback() {
878        use std::sync::atomic::{AtomicUsize, Ordering};
879        use std::sync::Arc;
880
881        let tokenizer = create_test_tokenizer();
882        let callback_count = Arc::new(AtomicUsize::new(0));
883        let callback_count_clone = Arc::clone(&callback_count);
884
885        let mut stream = ProgressStreamingTokenizer::new(tokenizer)
886            .with_callback_interval(1) // 매 바이트마다 콜백
887            .with_progress_callback(move |_progress| {
888                callback_count_clone.fetch_add(1, Ordering::SeqCst);
889            });
890
891        // 충분한 데이터 처리
892        stream.process_chunk("안녕하세요. 오늘 날씨가 좋네요.\n");
893        let _remaining = stream.flush();
894
895        // 콜백이 호출되었는지 확인
896        assert!(callback_count.load(Ordering::SeqCst) > 0);
897    }
898
899    #[test]
900    fn test_progress_percent() {
901        let progress = StreamingProgress {
902            bytes_processed: 50,
903            total_bytes: Some(100),
904            tokens_generated: 10,
905            chunks_processed: 2,
906        };
907
908        assert_eq!(progress.percent(), Some(50.0));
909    }
910
911    #[test]
912    fn test_chunked_token_iterator() {
913        let tokenizer = create_test_tokenizer();
914        let chunks = vec!["안녕하세요.\n".to_string(), "감사합니다.\n".to_string()];
915        let iter = ChunkedTokenIterator::new(chunks.into_iter(), tokenizer);
916
917        let token_chunks: Vec<_> = iter.collect();
918        // 청크들을 수집 (일부 청크는 비어있을 수 있음)
919        let total_tokens: usize = token_chunks.iter().map(std::vec::Vec::len).sum();
920
921        // ChunkedTokenIterator가 정상 작동하는지 확인
922        // mini-dict 환경에서는 토큰 수가 적을 수 있으므로 패닉 없이 완료되면 성공
923        let _ = total_tokens; // 사용되지 않는 변수 경고 방지
924    }
925
926    #[test]
927    fn test_multibyte_delimiter_no_panic() {
928        let tokenizer = create_test_tokenizer();
929        let mut stream = StreamingTokenizer::new(tokenizer)
930            .with_sentence_delimiters(vec!['.', '!', '?', '。', '.', '\n']);
931
932        // 。 is U+3002 (3 bytes). Previously pos+1 would slice mid-char and panic.
933        let tokens = stream.process_chunk("テスト。次の文。\n");
934        let remaining = stream.flush();
935        let total = tokens.len() + remaining.len();
936        assert!(total > 0 || stream.buffer_len() == 0);
937    }
938
939    #[test]
940    fn test_decimal_number_not_split() {
941        let tokenizer = create_test_tokenizer();
942        let mut stream = StreamingTokenizer::new(tokenizer);
943
944        let tokens = stream.process_chunk("값은 3.14입니다.\n");
945        let remaining = stream.flush();
946        let all: Vec<_> = tokens.into_iter().chain(remaining).collect();
947        // "3.14" should NOT be split at the decimal point.
948        let surfaces: Vec<_> = all.iter().map(|t| t.surface.as_str()).collect();
949        let joined = surfaces.join(" ");
950        assert!(
951            !joined.contains("3 .") && !joined.contains(". 14"),
952            "Decimal was incorrectly split: {joined}"
953        );
954    }
955
956    #[test]
957    fn test_buffer_limit_forces_flush() {
958        let tokenizer = create_test_tokenizer();
959        let mut stream = StreamingTokenizer::new(tokenizer);
960        // Set a tiny max buffer to trigger forced flush
961        stream.max_buffer_size = 32;
962
963        // No delimiter — would grow unbounded without the limit
964        let tokens = stream.process_chunk(&"가".repeat(100));
965        assert!(!tokens.is_empty(), "Buffer limit should force a flush");
966    }
967
968    #[test]
969    fn test_safe_split_point() {
970        let tokenizer = create_test_tokenizer();
971        let stream = StreamingTokenizer::new(tokenizer)
972            .with_sentence_delimiters(vec!['.', '!', '?', '\n', ' ']);
973
974        // 내부 버퍼에 직접 접근할 수 없으므로 process_chunk로 테스트
975        let mut stream = stream;
976        let _tokens = stream.process_chunk("안녕하세요 감사합니다");
977
978        // 버퍼가 있어야 함 (문장 구분자가 없으므로)
979        assert!(stream.buffer_len() > 0);
980    }
981}
982
983// ============================================================
984// SentenceReader — BufRead 기반 문장 단위 이터레이터
985// ============================================================
986
987/// Reads from a [`BufRead`] source and yields complete sentences one at a time.
988///
989/// Korean sentence boundaries are detected by:
990/// - Newline characters (`\n`) — always a boundary.
991/// - Sentence-ending punctuation (`.`, `?`, `!`) followed by whitespace or EOF,
992///   **except** when the `.` is between two ASCII digits (decimal numbers such
993///   as `3.14`).
994///
995/// Empty segments (blank lines or whitespace-only spans) are silently skipped.
996///
997/// Because the Viterbi algorithm requires the full sentence context, this is
998/// the minimum granularity for streaming tokenization of large inputs.
999///
1000/// # Examples
1001///
1002/// ```rust
1003/// use mecab_ko_core::streaming::SentenceReader;
1004/// use std::io::Cursor;
1005///
1006/// let input = "첫 번째 문장입니다. 두 번째 문장입니다.\n";
1007/// let reader = SentenceReader::new(Cursor::new(input));
1008/// let sentences: Vec<String> = reader.map(|r| r.unwrap()).collect();
1009/// assert_eq!(sentences.len(), 2);
1010/// ```
1011pub struct SentenceReader<R: BufRead> {
1012    reader: R,
1013    /// Raw character-level working buffer accumulated from `reader`.
1014    buffer: String,
1015    /// Completed sentences waiting to be returned by `next()`.
1016    queue: std::collections::VecDeque<String>,
1017    /// Set to `true` once the underlying reader returns EOF.
1018    eof: bool,
1019    /// Maximum buffer size in bytes. Exceeding this triggers a forced flush.
1020    max_buffer_size: usize,
1021}
1022
1023impl<R: BufRead> SentenceReader<R> {
1024    /// Default maximum buffer size (16 MB).
1025    pub const DEFAULT_MAX_BUFFER_SIZE: usize = 16 * 1024 * 1024;
1026
1027    /// Creates a new `SentenceReader` wrapping `reader`.
1028    #[must_use]
1029    pub const fn new(reader: R) -> Self {
1030        Self {
1031            reader,
1032            buffer: String::new(),
1033            queue: std::collections::VecDeque::new(),
1034            eof: false,
1035            max_buffer_size: Self::DEFAULT_MAX_BUFFER_SIZE,
1036        }
1037    }
1038
1039    /// Sets the maximum buffer size (bytes). If input accumulates
1040    /// beyond this limit without a sentence boundary, the buffer is
1041    /// force-flushed as a single sentence to prevent OOM.
1042    #[must_use]
1043    pub const fn with_max_buffer_size(mut self, size: usize) -> Self {
1044        self.max_buffer_size = size;
1045        self
1046    }
1047
1048    /// Drain all complete sentences currently visible in `self.buffer` into
1049    /// `self.queue`.  A sentence ends at:
1050    ///   1. A `\n` character (stripped from the yielded sentence).
1051    ///   2. A `.`, `?`, or `!` that is **not** a decimal point, followed
1052    ///      immediately by ASCII whitespace or at the end of the buffer when
1053    ///      `eof` is `true`.
1054    fn drain_sentences(&mut self) {
1055        // Work with byte indices directly to avoid allocating Vec<char>.
1056        let buf = self.buffer.as_str();
1057        let indices: Vec<(usize, char)> = buf.char_indices().collect();
1058        let len = indices.len();
1059        let mut start_char = 0; // char-level index into `indices`
1060
1061        let mut i = 0;
1062        while i < len {
1063            let (_, ch) = indices[i];
1064
1065            if ch == '\n' {
1066                let start_byte = indices[start_char].0;
1067                let end_byte = indices[i].0;
1068                let trimmed = buf[start_byte..end_byte].trim();
1069                if !trimmed.is_empty() {
1070                    self.queue.push_back(trimmed.to_string());
1071                }
1072                start_char = i + 1;
1073                i += 1;
1074                continue;
1075            }
1076
1077            if matches!(ch, '.' | '?' | '!') {
1078                if ch == '.' {
1079                    let prev_is_digit = i > 0 && indices[i - 1].1.is_ascii_digit();
1080                    let next_is_digit = i + 1 < len && indices[i + 1].1.is_ascii_digit();
1081                    if prev_is_digit && next_is_digit {
1082                        i += 1;
1083                        continue;
1084                    }
1085                }
1086
1087                let punct_byte_end = indices[i].0 + ch.len_utf8();
1088
1089                let mut j = i + 1;
1090                while j < len && matches!(indices[j].1, ')' | ']' | '"' | '\'') {
1091                    j += 1;
1092                }
1093
1094                let followed_by_whitespace = j < len && indices[j].1.is_whitespace();
1095                let followed_by_eof = j >= len && self.eof;
1096
1097                if followed_by_whitespace || followed_by_eof {
1098                    let start_byte = indices[start_char].0;
1099                    let trimmed = buf[start_byte..punct_byte_end].trim();
1100                    if !trimmed.is_empty() {
1101                        self.queue.push_back(trimmed.to_string());
1102                    }
1103                    start_char = j;
1104                    if j < len && indices[j].1.is_whitespace() && indices[j].1 != '\n' {
1105                        start_char = j + 1;
1106                        i = j + 1;
1107                    } else {
1108                        i = j;
1109                    }
1110                    continue;
1111                }
1112            }
1113
1114            i += 1;
1115        }
1116
1117        if self.eof && start_char < len {
1118            let start_byte = indices[start_char].0;
1119            let trimmed = buf[start_byte..].trim();
1120            if !trimmed.is_empty() {
1121                self.queue.push_back(trimmed.to_string());
1122            }
1123            self.buffer.clear();
1124        } else if start_char > 0 && start_char < len {
1125            let byte_offset = indices[start_char].0;
1126            self.buffer.drain(..byte_offset);
1127        } else if start_char >= len && !self.eof {
1128            self.buffer.clear();
1129        }
1130    }
1131
1132    /// Read one more line from the underlying reader.
1133    ///
1134    /// Returns `Ok(true)` if bytes were read, `Ok(false)` on EOF, and
1135    /// `Err(_)` on an I/O error.
1136    fn fill_buffer(&mut self) -> io::Result<bool> {
1137        if self.buffer.len() >= self.max_buffer_size {
1138            // Force-flush the entire buffer as a single sentence to prevent OOM.
1139            let trimmed = self.buffer.trim().to_string();
1140            if !trimmed.is_empty() {
1141                self.queue.push_back(trimmed);
1142            }
1143            self.buffer.clear();
1144        }
1145
1146        let mut line = String::new();
1147        let n = self.reader.read_line(&mut line)?;
1148        if n == 0 {
1149            self.eof = true;
1150            Ok(false)
1151        } else {
1152            self.buffer.push_str(&line);
1153            Ok(true)
1154        }
1155    }
1156}
1157
1158impl<R: BufRead> Iterator for SentenceReader<R> {
1159    type Item = io::Result<String>;
1160
1161    fn next(&mut self) -> Option<Self::Item> {
1162        loop {
1163            // If we already have a sentence ready, return it immediately.
1164            if let Some(sentence) = self.queue.pop_front() {
1165                return Some(Ok(sentence));
1166            }
1167
1168            // Nothing in the queue and EOF consumed — we are done.
1169            if self.eof {
1170                return None;
1171            }
1172
1173            // Try to read more data from the reader.
1174            if let Err(e) = self.fill_buffer() {
1175                return Some(Err(e));
1176            }
1177
1178            // Parse whatever is now in the buffer.
1179            self.drain_sentences();
1180        }
1181    }
1182}
1183
1184#[cfg(test)]
1185#[allow(clippy::expect_used)]
1186mod sentence_reader_tests {
1187    use super::*;
1188    use std::io::Cursor;
1189
1190    #[test]
1191    fn test_single_sentence() {
1192        let input = "안녕하세요.\n";
1193        let reader = SentenceReader::new(Cursor::new(input));
1194        let sentences: Vec<_> = reader.map(|r| r.unwrap()).collect();
1195        assert_eq!(sentences, vec!["안녕하세요."]);
1196    }
1197
1198    #[test]
1199    fn test_multiple_sentences() {
1200        let input = "첫 번째 문장입니다. 두 번째 문장입니다.\n";
1201        let reader = SentenceReader::new(Cursor::new(input));
1202        let sentences: Vec<_> = reader.map(|r| r.unwrap()).collect();
1203        assert_eq!(sentences.len(), 2);
1204        assert_eq!(sentences[0], "첫 번째 문장입니다.");
1205        assert_eq!(sentences[1], "두 번째 문장입니다.");
1206    }
1207
1208    #[test]
1209    fn test_newline_boundary() {
1210        let input = "줄 하나\n줄 둘\n";
1211        let reader = SentenceReader::new(Cursor::new(input));
1212        let sentences: Vec<_> = reader.map(|r| r.unwrap()).collect();
1213        assert_eq!(sentences, vec!["줄 하나", "줄 둘"]);
1214    }
1215
1216    #[test]
1217    fn test_decimal_not_boundary() {
1218        let input = "값은 3.14입니다.\n";
1219        let reader = SentenceReader::new(Cursor::new(input));
1220        let sentences: Vec<_> = reader.map(|r| r.unwrap()).collect();
1221        assert_eq!(sentences, vec!["값은 3.14입니다."]);
1222    }
1223
1224    #[test]
1225    fn test_question_mark() {
1226        let input = "이것은 무엇인가요? 네, 맞습니다.\n";
1227        let reader = SentenceReader::new(Cursor::new(input));
1228        let sentences: Vec<_> = reader.collect::<std::result::Result<_, _>>().unwrap();
1229        assert_eq!(sentences.len(), 2);
1230    }
1231
1232    #[test]
1233    fn test_empty_input() {
1234        let input = "";
1235        let reader = SentenceReader::new(Cursor::new(input));
1236        let sentences: Vec<_> = reader.collect::<std::result::Result<_, _>>().unwrap();
1237        assert!(sentences.is_empty());
1238    }
1239
1240    #[test]
1241    fn test_no_trailing_newline() {
1242        let input = "마지막 문장";
1243        let reader = SentenceReader::new(Cursor::new(input));
1244        let sentences: Vec<_> = reader.map(|r| r.unwrap()).collect();
1245        assert_eq!(sentences, vec!["마지막 문장"]);
1246    }
1247
1248    #[test]
1249    fn test_multiple_newlines() {
1250        let input = "첫째\n\n둘째\n";
1251        let reader = SentenceReader::new(Cursor::new(input));
1252        let sentences: Vec<_> = reader.map(|r| r.unwrap()).collect();
1253        // Empty lines are skipped.
1254        assert_eq!(sentences, vec!["첫째", "둘째"]);
1255    }
1256
1257    #[test]
1258    fn test_exclamation() {
1259        let input = "대단합니다! 정말요?\n";
1260        let reader = SentenceReader::new(Cursor::new(input));
1261        let sentences: Vec<_> = reader.collect::<std::result::Result<_, _>>().unwrap();
1262        assert_eq!(sentences.len(), 2);
1263    }
1264
1265    #[test]
1266    fn test_sentence_reader_is_send() {
1267        fn assert_send<T: Send>() {}
1268        assert_send::<SentenceReader<std::io::Cursor<&[u8]>>>();
1269    }
1270
1271    #[test]
1272    fn test_closing_paren_before_whitespace() {
1273        // Punctuation followed by closing bracket then space should still split.
1274        let input = "문장입니다.) 다음 문장.\n";
1275        let reader = SentenceReader::new(Cursor::new(input));
1276        let sentences: Vec<_> = reader.collect::<std::result::Result<_, _>>().unwrap();
1277        assert_eq!(sentences.len(), 2);
1278    }
1279
1280    #[test]
1281    fn test_no_trailing_newline_punctuation() {
1282        // Final sentence with punctuation but no newline should still be yielded.
1283        let input = "첫째. 둘째.";
1284        let reader = SentenceReader::new(Cursor::new(input));
1285        let sentences: Vec<_> = reader.map(|r| r.unwrap()).collect();
1286        assert_eq!(sentences.len(), 2);
1287        assert_eq!(sentences[0], "첫째.");
1288        assert_eq!(sentences[1], "둘째.");
1289    }
1290
1291    #[test]
1292    fn test_buffer_limit_prevents_oom() {
1293        // A line with no sentence boundary should eventually be flushed
1294        // when buffer exceeds max_buffer_size.
1295        let long_line = "가".repeat(200);
1296        let reader = SentenceReader::new(Cursor::new(long_line.as_str())).with_max_buffer_size(64);
1297        let sentences: Vec<_> = reader.map(|r| r.unwrap()).collect();
1298        // Should produce at least one sentence without hanging or OOM.
1299        assert!(!sentences.is_empty());
1300    }
1301}