use crate::Message;
use crate::error::ChatpackError;
use super::StreamingResult;
pub trait MessageIterator: Iterator<Item = StreamingResult<Message>> + Send {
fn progress(&self) -> Option<f64> {
None
}
fn bytes_processed(&self) -> u64;
fn total_bytes(&self) -> Option<u64> {
None
}
}
pub trait StreamingParser: Send + Sync {
fn name(&self) -> &'static str;
fn stream(&self, file_path: &str) -> Result<Box<dyn MessageIterator>, ChatpackError>;
fn recommended_buffer_size(&self) -> usize {
64 * 1024 }
fn supports_progress(&self) -> bool {
true
}
}
#[derive(Debug, Clone, Copy)]
pub struct StreamingConfig {
pub buffer_size: usize,
pub max_message_size: usize,
pub skip_invalid: bool,
pub progress_interval: usize,
}
impl Default for StreamingConfig {
fn default() -> Self {
Self {
buffer_size: 64 * 1024, max_message_size: 10 * 1024 * 1024, skip_invalid: true,
progress_interval: 10_000,
}
}
}
impl StreamingConfig {
pub fn new() -> Self {
Self::default()
}
#[must_use]
pub fn with_buffer_size(mut self, size: usize) -> Self {
self.buffer_size = size;
self
}
#[must_use]
pub fn with_max_message_size(mut self, size: usize) -> Self {
self.max_message_size = size;
self
}
#[must_use]
pub fn with_skip_invalid(mut self, skip: bool) -> Self {
self.skip_invalid = skip;
self
}
#[must_use]
pub fn with_progress_interval(mut self, interval: usize) -> Self {
self.progress_interval = interval;
self
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_streaming_config_default() {
let config = StreamingConfig::default();
assert_eq!(config.buffer_size, 64 * 1024);
assert_eq!(config.max_message_size, 10 * 1024 * 1024);
assert!(config.skip_invalid);
assert_eq!(config.progress_interval, 10_000);
}
#[test]
fn test_streaming_config_new() {
let config = StreamingConfig::new();
assert_eq!(config.buffer_size, 64 * 1024);
assert_eq!(config.max_message_size, 10 * 1024 * 1024);
assert!(config.skip_invalid);
}
#[test]
fn test_streaming_config_builder() {
let config = StreamingConfig::new()
.with_buffer_size(128 * 1024)
.with_max_message_size(1024)
.with_skip_invalid(false);
assert_eq!(config.buffer_size, 128 * 1024);
assert_eq!(config.max_message_size, 1024);
assert!(!config.skip_invalid);
}
#[test]
fn test_streaming_config_with_progress_interval() {
let config = StreamingConfig::new().with_progress_interval(5000);
assert_eq!(config.progress_interval, 5000);
}
#[test]
fn test_streaming_config_builder_chain() {
let config = StreamingConfig::new()
.with_buffer_size(256 * 1024)
.with_max_message_size(20 * 1024 * 1024)
.with_skip_invalid(false)
.with_progress_interval(1000);
assert_eq!(config.buffer_size, 256 * 1024);
assert_eq!(config.max_message_size, 20 * 1024 * 1024);
assert!(!config.skip_invalid);
assert_eq!(config.progress_interval, 1000);
}
#[test]
fn test_streaming_config_copy() {
let config = StreamingConfig::new();
let copied = config; assert_eq!(config.buffer_size, copied.buffer_size);
assert_eq!(config.max_message_size, copied.max_message_size);
assert_eq!(config.skip_invalid, copied.skip_invalid);
}
#[test]
fn test_streaming_config_clone() {
let config = StreamingConfig::new().with_buffer_size(512 * 1024);
let cloned = config;
assert_eq!(config.buffer_size, cloned.buffer_size);
}
#[test]
fn test_streaming_config_debug() {
let config = StreamingConfig::new();
let debug = format!("{:?}", config);
assert!(debug.contains("StreamingConfig"));
assert!(debug.contains("buffer_size"));
}
}