use std::path::Path;
use std::sync::Arc;
use tokio::fs::File;
use tokio::io::{AsyncBufReadExt, AsyncRead, BufReader};
use tokio::sync::{Mutex, Semaphore};
use crate::tokenizer::{Token, Tokenizer};
use crate::Result;
pub struct AsyncTokenizer {
tokenizer: Arc<Mutex<Tokenizer>>,
semaphore: Arc<Semaphore>,
max_concurrent: usize,
}
impl AsyncTokenizer {
pub const DEFAULT_MAX_CONCURRENT: usize = 4;
pub async fn new() -> Result<Self> {
let tokenizer = tokio::task::spawn_blocking(Tokenizer::new)
.await
.map_err(|e| crate::Error::Init(format!("Failed to spawn task: {e}")))?
.map_err(|e| crate::Error::Init(format!("Failed to create tokenizer: {e}")))?;
Ok(Self {
tokenizer: Arc::new(Mutex::new(tokenizer)),
semaphore: Arc::new(Semaphore::new(Self::DEFAULT_MAX_CONCURRENT)),
max_concurrent: Self::DEFAULT_MAX_CONCURRENT,
})
}
pub async fn with_dict<P: AsRef<Path> + Send + 'static>(dict_path: P) -> Result<Self> {
let tokenizer = tokio::task::spawn_blocking(move || Tokenizer::with_dict(dict_path))
.await
.map_err(|e| crate::Error::Init(format!("Failed to spawn task: {e}")))?
.map_err(|e| crate::Error::Init(format!("Failed to create tokenizer: {e}")))?;
Ok(Self {
tokenizer: Arc::new(Mutex::new(tokenizer)),
semaphore: Arc::new(Semaphore::new(Self::DEFAULT_MAX_CONCURRENT)),
max_concurrent: Self::DEFAULT_MAX_CONCURRENT,
})
}
#[must_use]
pub fn with_max_concurrent(mut self, max: usize) -> Self {
self.max_concurrent = max;
self.semaphore = Arc::new(Semaphore::new(max));
self
}
pub async fn tokenize_async(&self, text: &str) -> Vec<Token> {
let _permit = match self.semaphore.acquire().await {
Ok(permit) => permit,
Err(_) => return Vec::new(), };
let text_owned = text.to_string();
let tokenizer = Arc::clone(&self.tokenizer);
tokio::task::spawn_blocking(move || {
let mut tok = tokenizer.blocking_lock();
tok.tokenize(&text_owned)
})
.await
.unwrap_or_default()
}
pub async fn tokenize_file<P: AsRef<Path>>(&self, path: P) -> Result<Vec<Token>> {
let file = File::open(path)
.await
.map_err(|e| crate::Error::Analysis(format!("Failed to open file: {e}")))?;
self.tokenize_reader(file).await
}
pub async fn tokenize_reader<R: AsyncRead + Unpin>(&self, reader: R) -> Result<Vec<Token>> {
let mut buf_reader = BufReader::new(reader);
let mut all_tokens = Vec::new();
loop {
let mut line = String::new();
let bytes_read = buf_reader
.read_line(&mut line)
.await
.map_err(|e| crate::Error::Analysis(format!("Failed to read line: {e}")))?;
if bytes_read == 0 {
break; }
let tokens = self.tokenize_async(&line).await;
all_tokens.extend(tokens);
}
Ok(all_tokens)
}
pub async fn tokenize_batch(&self, texts: Vec<String>) -> Vec<Vec<Token>> {
let mut handles = Vec::new();
for text in texts {
let tokenizer = Arc::clone(&self.tokenizer);
let semaphore = Arc::clone(&self.semaphore);
let handle = tokio::spawn(async move {
let _permit = match semaphore.acquire().await {
Ok(permit) => permit,
Err(_) => return Vec::new(), };
tokio::task::spawn_blocking(move || {
let mut tok = tokenizer.blocking_lock();
tok.tokenize(&text)
})
.await
.unwrap_or_default()
});
handles.push(handle);
}
let mut results = Vec::new();
for handle in handles {
if let Ok(tokens) = handle.await {
results.push(tokens);
} else {
results.push(Vec::new());
}
}
results
}
pub async fn tokenize_stream<I>(&self, texts: I) -> Vec<Vec<Token>>
where
I: IntoIterator<Item = String>,
{
let texts_vec: Vec<_> = texts.into_iter().collect();
self.tokenize_batch(texts_vec).await
}
pub async fn get_tokenizer(&self) -> tokio::sync::MutexGuard<'_, Tokenizer> {
self.tokenizer.lock().await
}
#[must_use]
pub fn max_concurrent(&self) -> usize {
self.max_concurrent
}
}
pub struct AsyncStreamingTokenizer {
tokenizer: AsyncTokenizer,
buffer: String,
sentence_delimiters: Vec<char>,
}
impl AsyncStreamingTokenizer {
#[must_use]
pub fn new(tokenizer: AsyncTokenizer) -> Self {
Self {
tokenizer,
buffer: String::new(),
sentence_delimiters: vec!['.', '!', '?', '。', '.', '\n'],
}
}
pub async fn process_chunk(&mut self, chunk: &str) -> Vec<Token> {
self.buffer.push_str(chunk);
let split_pos = self.find_last_sentence_boundary();
if let Some(pos) = split_pos {
let to_process = self.buffer[..=pos].to_string();
let remaining = self.buffer[pos + 1..].to_string();
let tokens = self.tokenizer.tokenize_async(&to_process).await;
self.buffer = remaining;
tokens
} else {
Vec::new()
}
}
fn find_last_sentence_boundary(&self) -> Option<usize> {
let mut last_pos = None;
for (i, ch) in self.buffer.char_indices() {
if self.sentence_delimiters.contains(&ch) {
last_pos = Some(i);
}
}
last_pos
}
pub async fn flush(&mut self) -> Vec<Token> {
if self.buffer.is_empty() {
return Vec::new();
}
let to_process = std::mem::take(&mut self.buffer);
self.tokenizer.tokenize_async(&to_process).await
}
pub async fn process_reader<R: AsyncRead + Unpin>(&mut self, reader: R) -> Result<Vec<Token>> {
let mut buf_reader = BufReader::new(reader);
let mut all_tokens = Vec::new();
loop {
let mut line = String::new();
let bytes_read = buf_reader
.read_line(&mut line)
.await
.map_err(|e| crate::Error::Analysis(format!("Failed to read line: {e}")))?;
if bytes_read == 0 {
break; }
let tokens = self.process_chunk(&line).await;
all_tokens.extend(tokens);
}
let remaining = self.flush().await;
all_tokens.extend(remaining);
Ok(all_tokens)
}
}
#[cfg(test)]
#[allow(clippy::expect_used)]
mod tests {
use super::*;
#[tokio::test]
async fn test_async_tokenizer_creation() {
let result = AsyncTokenizer::new().await;
assert!(result.is_ok());
}
#[tokio::test]
async fn test_default_max_concurrent_value() {
let tokenizer = AsyncTokenizer::new().await.expect("should create");
assert_eq!(tokenizer.max_concurrent(), AsyncTokenizer::DEFAULT_MAX_CONCURRENT);
}
#[tokio::test]
async fn test_max_concurrent() {
let tokenizer = AsyncTokenizer::new()
.await
.expect("should create")
.with_max_concurrent(8);
assert_eq!(tokenizer.max_concurrent(), 8);
}
#[tokio::test]
async fn test_max_concurrent_one() {
let tokenizer = AsyncTokenizer::new()
.await
.expect("should create")
.with_max_concurrent(1);
assert_eq!(tokenizer.max_concurrent(), 1);
}
#[tokio::test]
async fn test_tokenize_async_empty_string() {
let tokenizer = AsyncTokenizer::new().await.expect("should create");
let tokens = tokenizer.tokenize_async("").await;
assert!(tokens.is_empty(), "expected no tokens for empty input, got {}", tokens.len());
}
#[tokio::test]
async fn test_tokenize_async_single_ascii_char() {
let tokenizer = AsyncTokenizer::new().await.expect("should create");
let tokens = tokenizer.tokenize_async("a").await;
assert!(tokens.iter().all(|t| !t.surface.is_empty()));
}
#[tokio::test]
async fn test_tokenize_async_korean_text() {
let tokenizer = AsyncTokenizer::new().await.expect("should create");
let tokens = tokenizer.tokenize_async("안녕하세요").await;
assert!(tokens.iter().all(|t| !t.surface.is_empty()));
}
#[tokio::test]
async fn test_tokenize_async_multibyte_korean() {
let tokenizer = AsyncTokenizer::new().await.expect("should create");
let tokens = tokenizer.tokenize_async("오늘 날씨가 좋네요.").await;
assert!(tokens.iter().all(|t| !t.surface.is_empty()));
}
#[tokio::test]
async fn test_tokenize_async_reuse() {
let tokenizer = AsyncTokenizer::new().await.expect("should create");
let t1 = tokenizer.tokenize_async("안녕").await;
let t2 = tokenizer.tokenize_async("안녕").await;
assert_eq!(t1.len(), t2.len(), "repeated calls should return same token count");
}
#[tokio::test]
async fn test_tokenize_batch_length() {
let tokenizer = AsyncTokenizer::new().await.expect("should create");
let texts = vec!["안녕하세요".to_string(), "감사합니다".to_string()];
let results = tokenizer.tokenize_batch(texts).await;
assert_eq!(results.len(), 2, "batch result count must match input count");
}
#[tokio::test]
async fn test_tokenize_batch_empty_input() {
let tokenizer = AsyncTokenizer::new().await.expect("should create");
let results = tokenizer.tokenize_batch(Vec::new()).await;
assert!(results.is_empty(), "empty batch must produce empty results");
}
#[tokio::test]
async fn test_tokenize_batch_single_item() {
let tokenizer = AsyncTokenizer::new().await.expect("should create");
let results = tokenizer.tokenize_batch(vec!["안녕".to_string()]).await;
assert_eq!(results.len(), 1);
}
#[tokio::test]
async fn test_tokenize_batch_with_empty_strings() {
let tokenizer = AsyncTokenizer::new().await.expect("should create");
let texts = vec!["".to_string(), "".to_string(), "".to_string()];
let results = tokenizer.tokenize_batch(texts).await;
assert_eq!(results.len(), 3);
for result in &results {
assert!(result.is_empty());
}
}
#[tokio::test]
async fn test_tokenize_stream_length() {
let tokenizer = AsyncTokenizer::new().await.expect("should create");
let texts = vec!["안녕하세요".to_string(), "감사합니다".to_string()];
let results = tokenizer.tokenize_stream(texts).await;
assert_eq!(results.len(), 2);
}
#[tokio::test]
async fn test_tokenize_stream_empty() {
let tokenizer = AsyncTokenizer::new().await.expect("should create");
let results = tokenizer.tokenize_stream(std::iter::empty::<String>()).await;
assert!(results.is_empty());
}
#[tokio::test]
async fn test_tokenize_reader_empty() {
let tokenizer = AsyncTokenizer::new().await.expect("should create");
let cursor = std::io::Cursor::new(b"" as &[u8]);
let result = tokenizer.tokenize_reader(cursor).await;
assert!(result.is_ok(), "tokenize_reader should succeed on empty input");
assert!(result.unwrap().is_empty());
}
#[tokio::test]
async fn test_tokenize_reader_single_line() {
let tokenizer = AsyncTokenizer::new().await.expect("should create");
let data = "안녕하세요.\n";
let cursor = std::io::Cursor::new(data.as_bytes());
let result = tokenizer.tokenize_reader(cursor).await;
assert!(result.is_ok(), "tokenize_reader should succeed");
}
#[tokio::test]
async fn test_tokenize_reader_multiple_lines() {
let tokenizer = AsyncTokenizer::new().await.expect("should create");
let data = "첫 번째 줄.\n두 번째 줄.\n";
let cursor = std::io::Cursor::new(data.as_bytes());
let result = tokenizer.tokenize_reader(cursor).await;
assert!(result.is_ok(), "tokenize_reader should succeed on multiple lines");
}
#[tokio::test]
async fn test_tokenize_file_nonexistent() {
let tokenizer = AsyncTokenizer::new().await.expect("should create");
let result = tokenizer.tokenize_file("/nonexistent/path/that/does/not/exist.txt").await;
assert!(result.is_err(), "tokenize_file on missing path must return Err");
}
#[tokio::test]
async fn test_get_tokenizer_sync_call() {
let tokenizer = AsyncTokenizer::new().await.expect("should create");
let mut guard = tokenizer.get_tokenizer().await;
let tokens = guard.tokenize("안녕");
assert!(tokens.iter().all(|t| !t.surface.is_empty()));
}
#[tokio::test]
async fn test_async_streaming_tokenizer_default_delimiters() {
let tokenizer = AsyncTokenizer::new().await.expect("should create");
let stream = AsyncStreamingTokenizer::new(tokenizer);
assert!(stream.sentence_delimiters.contains(&'.'));
assert!(stream.sentence_delimiters.contains(&'\n'));
assert!(stream.sentence_delimiters.contains(&'?'));
assert!(stream.sentence_delimiters.contains(&'!'));
}
#[tokio::test]
async fn test_async_streaming_tokenizer_initial_buffer_empty() {
let tokenizer = AsyncTokenizer::new().await.expect("should create");
let stream = AsyncStreamingTokenizer::new(tokenizer);
assert!(stream.buffer.is_empty(), "buffer must be empty on construction");
}
#[tokio::test]
async fn test_async_streaming_flush_empty_buffer() {
let tokenizer = AsyncTokenizer::new().await.expect("should create");
let mut stream = AsyncStreamingTokenizer::new(tokenizer);
let tokens = stream.flush().await;
assert!(tokens.is_empty(), "flush on empty buffer must produce no tokens");
assert!(stream.buffer.is_empty(), "buffer must remain empty after flushing empty buffer");
}
#[tokio::test]
async fn test_async_streaming_flush_clears_buffer() {
let tokenizer = AsyncTokenizer::new().await.expect("should create");
let mut stream = AsyncStreamingTokenizer::new(tokenizer);
let tokens = stream.process_chunk("버퍼에 남을 텍스트").await;
assert!(tokens.iter().all(|t| !t.surface.is_empty()));
assert!(!stream.buffer.is_empty(), "buffer should hold unprocessed text");
let flushed = stream.flush().await;
assert!(flushed.iter().all(|t| !t.surface.is_empty()));
assert!(stream.buffer.is_empty(), "flush must clear the buffer");
}
#[tokio::test]
async fn test_async_streaming_chunk_with_newline_delimiter() {
let tokenizer = AsyncTokenizer::new().await.expect("should create");
let mut stream = AsyncStreamingTokenizer::new(tokenizer);
let tokens = stream.process_chunk("안녕하세요.\n").await;
let remaining = stream.flush().await;
let total = tokens.len() + remaining.len();
assert!(tokens.iter().chain(remaining.iter()).all(|t| !t.surface.is_empty()), "all tokens must have non-empty surface (total: {total})");
}
#[tokio::test]
async fn test_async_streaming_chunk_without_delimiter_stays_buffered() {
let tokenizer = AsyncTokenizer::new().await.expect("should create");
let mut stream = AsyncStreamingTokenizer::new(tokenizer);
let tokens = stream.process_chunk("구분자없음").await;
assert!(tokens.is_empty(), "text without delimiter must not produce tokens immediately");
assert!(!stream.buffer.is_empty(), "text without delimiter must be held in the buffer");
}
#[tokio::test]
async fn test_async_streaming_process_reader_empty() {
let tokenizer = AsyncTokenizer::new().await.expect("should create");
let mut stream = AsyncStreamingTokenizer::new(tokenizer);
let cursor = std::io::Cursor::new(b"" as &[u8]);
let result = stream.process_reader(cursor).await;
assert!(result.is_ok(), "process_reader on empty input must succeed");
assert!(result.unwrap().is_empty());
}
#[tokio::test]
async fn test_async_streaming_process_reader_multiline() {
let tokenizer = AsyncTokenizer::new().await.expect("should create");
let mut stream = AsyncStreamingTokenizer::new(tokenizer);
let data = "첫째 줄.\n둘째 줄.\n";
let cursor = std::io::Cursor::new(data.as_bytes());
let result = stream.process_reader(cursor).await;
assert!(result.is_ok(), "process_reader must succeed on multi-line input");
assert!(stream.buffer.is_empty(), "process_reader must flush the buffer at the end");
}
#[tokio::test]
async fn test_async_streaming_multiple_delimiters_in_chunk() {
let tokenizer = AsyncTokenizer::new().await.expect("should create");
let mut stream = AsyncStreamingTokenizer::new(tokenizer);
let tokens = stream.process_chunk("안녕하세요. 괜찮으세요?").await;
assert!(tokens.iter().all(|t| !t.surface.is_empty()));
let flushed = stream.flush().await;
assert!(flushed.iter().all(|t| !t.surface.is_empty()));
}
#[tokio::test]
async fn test_async_streaming_multibyte_delimiter_no_panic() {
let tokenizer = AsyncTokenizer::new().await.expect("should create");
let mut stream = AsyncStreamingTokenizer::new(tokenizer);
let tokens = stream.process_chunk("テスト。次の文。\n").await;
assert!(tokens.iter().all(|t| !t.surface.is_empty()));
let flushed = stream.flush().await;
assert!(flushed.iter().all(|t| !t.surface.is_empty()));
}
}