use crate::chunker::SemanticBoundaryFinder;
use crate::config::{BatlessConfig, ChunkStrategy};
use crate::error::{BatlessError, BatlessResult};
use serde::{Deserialize, Serialize};
use serde_json::json;
use std::fs::File;
use std::io::{BufRead, BufReader};
use std::path::Path;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StreamingCheckpoint {
pub file_path: String,
pub line_number: usize,
pub bytes_processed: usize,
pub chunk_number: usize,
pub total_chunks: Option<usize>,
pub schema_version: String,
pub timestamp: String,
pub config_hash: String,
}
impl StreamingCheckpoint {
pub fn new(
file_path: String,
line_number: usize,
bytes_processed: usize,
chunk_number: usize,
config: &BatlessConfig,
) -> Self {
Self {
file_path,
line_number,
bytes_processed,
chunk_number,
total_chunks: None,
schema_version: config.schema_version.clone(),
timestamp: chrono::Utc::now().to_rfc3339(),
config_hash: Self::compute_config_hash(config),
}
}
fn compute_config_hash(config: &BatlessConfig) -> String {
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
let mut hasher = DefaultHasher::new();
config.max_lines.hash(&mut hasher);
config.max_bytes.hash(&mut hasher);
config.language.hash(&mut hasher);
config.include_tokens.hash(&mut hasher);
config.summary_level.hash(&mut hasher);
config.streaming_chunk_size.hash(&mut hasher);
format!("{:x}", hasher.finish())
}
pub fn is_compatible(&self, config: &BatlessConfig) -> bool {
self.config_hash == Self::compute_config_hash(config)
&& self.schema_version == config.schema_version
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StreamingChunk {
pub schema_version: String,
pub metadata: ChunkMetadata,
pub lines: Vec<String>,
pub checkpoint: StreamingCheckpoint,
pub is_final: bool,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ChunkMetadata {
pub file_path: String,
pub language: Option<String>,
pub encoding: String,
pub total_file_bytes: u64,
pub total_file_lines: usize,
pub total_file_lines_exact: bool,
pub chunk_lines: usize,
pub chunk_bytes: usize,
pub start_line: usize,
pub end_line: usize,
}
pub struct StreamingProcessor;
impl StreamingProcessor {
pub fn process_streaming(
file_path: &str,
config: &BatlessConfig,
checkpoint: Option<StreamingCheckpoint>,
) -> BatlessResult<impl Iterator<Item = BatlessResult<StreamingChunk>>> {
if file_path == "-" {
if checkpoint.is_some() {
return Err(BatlessError::config_error_with_help(
"Resume/checkpoint functionality is not supported with stdin input".to_string(),
Some(
"Stdin is not seekable. Use file input for checkpoint support.".to_string(),
),
));
}
let processor = StreamingProcessorIterator::new_from_stdin(config)?;
return Ok(processor);
}
if let Some(ref cp) = checkpoint {
if !cp.is_compatible(config) {
return Err(BatlessError::config_error_with_help(
"Checkpoint is incompatible with current configuration".to_string(),
Some("Configuration or schema version has changed. Start fresh without checkpoint.".to_string()),
));
}
if cp.file_path != file_path {
return Err(BatlessError::config_error_with_help(
"Checkpoint file path doesn't match current file".to_string(),
Some("Checkpoint was created for a different file".to_string()),
));
}
}
let processor = StreamingProcessorIterator::new(file_path, config, checkpoint)?;
Ok(processor)
}
pub fn save_checkpoint(
checkpoint: &StreamingCheckpoint,
checkpoint_path: &Path,
) -> BatlessResult<()> {
let json_data = serde_json::to_string_pretty(checkpoint)
.map_err(BatlessError::JsonSerializationError)?;
std::fs::write(checkpoint_path, json_data).map_err(|e| BatlessError::FileReadError {
path: checkpoint_path.to_string_lossy().to_string(),
source: e,
})?;
Ok(())
}
pub fn load_checkpoint(checkpoint_path: &Path) -> BatlessResult<StreamingCheckpoint> {
let data =
std::fs::read_to_string(checkpoint_path).map_err(|e| BatlessError::FileReadError {
path: checkpoint_path.to_string_lossy().to_string(),
source: e,
})?;
let checkpoint: StreamingCheckpoint =
serde_json::from_str(&data).map_err(BatlessError::JsonSerializationError)?;
Ok(checkpoint)
}
pub fn get_streaming_schema() -> serde_json::Value {
json!({
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "Batless Streaming JSON Output",
"description": "Schema for streaming JSON chunks from batless",
"type": "object",
"required": ["schema_version", "metadata", "lines", "checkpoint", "is_final"],
"properties": {
"schema_version": {
"type": "string",
"description": "Version of the JSON schema used"
},
"metadata": {
"type": "object",
"required": ["file_path", "encoding", "total_file_bytes", "total_file_lines", "total_file_lines_exact", "chunk_lines", "chunk_bytes", "start_line", "end_line"],
"properties": {
"file_path": { "type": "string" },
"language": { "type": ["string", "null"] },
"encoding": { "type": "string" },
"total_file_bytes": { "type": "integer", "minimum": 0 },
"total_file_lines": { "type": "integer", "minimum": 0 },
"total_file_lines_exact": { "type": "boolean" },
"chunk_lines": { "type": "integer", "minimum": 0 },
"chunk_bytes": { "type": "integer", "minimum": 0 },
"start_line": { "type": "integer", "minimum": 0 },
"end_line": { "type": "integer", "minimum": 0 }
}
},
"lines": {
"type": "array",
"items": { "type": "string" },
"description": "Content lines for this chunk"
},
"checkpoint": {
"type": "object",
"required": ["file_path", "line_number", "bytes_processed", "chunk_number", "schema_version", "timestamp", "config_hash"],
"properties": {
"file_path": { "type": "string" },
"line_number": { "type": "integer", "minimum": 0 },
"bytes_processed": { "type": "integer", "minimum": 0 },
"chunk_number": { "type": "integer", "minimum": 0 },
"total_chunks": { "type": ["integer", "null"], "minimum": 1 },
"schema_version": { "type": "string" },
"timestamp": { "type": "string", "format": "date-time" },
"config_hash": { "type": "string" }
}
},
"is_final": {
"type": "boolean",
"description": "Whether this is the last chunk in the stream"
}
}
})
}
}
enum StreamingProcessorIterator {
File {
reader: BufReader<File>,
config: BatlessConfig,
file_metadata: FileMetadata,
current_line: usize,
bytes_processed: usize,
chunk_number: usize,
finished: bool,
semantic_boundaries: Vec<usize>,
},
Stdin {
reader: BufReader<std::io::Stdin>,
config: BatlessConfig,
stdin_metadata: FileMetadata,
current_line: usize,
bytes_processed: usize,
chunk_number: usize,
finished: bool,
},
}
#[derive(Debug, Clone)]
struct FileMetadata {
path: String,
language: Option<String>,
encoding: String,
total_bytes: u64,
}
impl StreamingProcessorIterator {
fn new(
file_path: &str,
config: &BatlessConfig,
checkpoint: Option<StreamingCheckpoint>,
) -> BatlessResult<Self> {
let file = File::open(file_path).map_err(|e| BatlessError::FileReadError {
path: file_path.to_string(),
source: e,
})?;
let file_metadata = Self::gather_file_metadata(file_path)?;
let mut reader = BufReader::new(file);
let (current_line, bytes_processed, chunk_number) = if let Some(cp) = checkpoint {
for _ in 0..cp.line_number {
let mut line = String::new();
reader
.read_line(&mut line)
.map_err(|e| BatlessError::FileReadError {
path: file_path.to_string(),
source: e,
})?;
}
(cp.line_number, cp.bytes_processed, cp.chunk_number)
} else {
(0, 0, 0)
};
let semantic_boundaries = if config.chunk_strategy == ChunkStrategy::Semantic {
let content = std::fs::read_to_string(file_path).unwrap_or_default();
SemanticBoundaryFinder::find_boundaries(&content, file_metadata.language.as_deref())
} else {
Vec::new()
};
Ok(StreamingProcessorIterator::File {
reader,
config: config.clone(),
file_metadata,
current_line,
bytes_processed,
chunk_number,
finished: false,
semantic_boundaries,
})
}
fn new_from_stdin(config: &BatlessConfig) -> BatlessResult<Self> {
use std::io::stdin;
let reader = BufReader::new(stdin());
let stdin_metadata = FileMetadata {
path: "<stdin>".to_string(),
language: None, encoding: "UTF-8".to_string(),
total_bytes: 0, };
Ok(StreamingProcessorIterator::Stdin {
reader,
config: config.clone(),
stdin_metadata,
current_line: 0,
bytes_processed: 0,
chunk_number: 0,
finished: false,
})
}
fn gather_file_metadata(file_path: &str) -> BatlessResult<FileMetadata> {
use crate::language::LanguageDetector;
use crate::processor::FileProcessor;
let file = File::open(file_path).map_err(|e| BatlessError::FileReadError {
path: file_path.to_string(),
source: e,
})?;
let metadata = file.metadata().map_err(|e| BatlessError::FileReadError {
path: file_path.to_string(),
source: e,
})?;
let encoding = FileProcessor::detect_encoding(file_path)?;
let language = LanguageDetector::detect_language_with_fallback(file_path);
Ok(FileMetadata {
path: file_path.to_string(),
language,
encoding,
total_bytes: metadata.len(),
})
}
}
impl Iterator for StreamingProcessorIterator {
type Item = BatlessResult<StreamingChunk>;
fn next(&mut self) -> Option<Self::Item> {
match self {
StreamingProcessorIterator::File {
reader,
config,
file_metadata,
current_line,
bytes_processed,
chunk_number,
finished,
semantic_boundaries,
} => {
if *finished {
return None;
}
let mut chunk_lines = Vec::new();
let mut chunk_bytes = 0;
let start_line = *current_line;
let read_one_line = |reader: &mut BufReader<File>,
chunk_lines: &mut Vec<String>,
chunk_bytes: &mut usize,
bytes_processed: &mut usize,
current_line: &mut usize,
path: &str|
-> Option<BatlessResult<()>> {
let mut line = String::new();
match reader.read_line(&mut line) {
Ok(0) => None, Ok(bytes_read) => {
*chunk_bytes += bytes_read;
*bytes_processed += bytes_read;
if line.ends_with('\n') {
line.pop();
if line.ends_with('\r') {
line.pop();
}
}
chunk_lines.push(line);
*current_line += 1;
Some(Ok(()))
}
Err(e) => Some(Err(BatlessError::FileReadError {
path: path.to_string(),
source: e,
})),
}
};
for _ in 0..config.streaming_chunk_size {
match read_one_line(
reader,
&mut chunk_lines,
&mut chunk_bytes,
bytes_processed,
current_line,
&file_metadata.path,
) {
None => break, Some(Err(e)) => return Some(Err(e)),
Some(Ok(())) => {}
}
}
if !semantic_boundaries.is_empty() && !chunk_lines.is_empty() {
if let Some(&next_boundary) =
semantic_boundaries.iter().find(|&&b| b > *current_line)
{
while *current_line < next_boundary {
match read_one_line(
reader,
&mut chunk_lines,
&mut chunk_bytes,
bytes_processed,
current_line,
&file_metadata.path,
) {
None => break,
Some(Err(e)) => return Some(Err(e)),
Some(Ok(())) => {}
}
}
}
}
if chunk_lines.is_empty() {
*finished = true;
return None;
}
let end_line = *current_line - 1;
let is_final = match reader.fill_buf() {
Ok(buf) => buf.is_empty(),
Err(_) => true,
};
if is_final {
*finished = true;
}
let metadata = ChunkMetadata {
file_path: file_metadata.path.clone(),
language: file_metadata.language.clone(),
encoding: file_metadata.encoding.clone(),
total_file_bytes: file_metadata.total_bytes,
total_file_lines: *current_line,
total_file_lines_exact: is_final,
chunk_lines: chunk_lines.len(),
chunk_bytes,
start_line,
end_line,
};
let checkpoint = StreamingCheckpoint::new(
file_metadata.path.clone(),
*current_line,
*bytes_processed,
*chunk_number,
config,
);
let chunk = StreamingChunk {
schema_version: config.schema_version.clone(),
metadata,
lines: chunk_lines,
checkpoint,
is_final,
};
*chunk_number += 1;
Some(Ok(chunk))
}
StreamingProcessorIterator::Stdin {
reader,
config,
stdin_metadata,
current_line,
bytes_processed,
chunk_number,
finished,
} => {
if *finished {
return None;
}
let mut chunk_lines = Vec::new();
let mut chunk_bytes = 0;
let start_line = *current_line;
for _ in 0..config.streaming_chunk_size {
let mut line = String::new();
match reader.read_line(&mut line) {
Ok(0) => break, Ok(bytes_read) => {
chunk_bytes += bytes_read;
*bytes_processed += bytes_read;
if line.ends_with('\n') {
line.pop();
if line.ends_with('\r') {
line.pop();
}
}
chunk_lines.push(line);
*current_line += 1;
}
Err(e) => {
return Some(Err(BatlessError::FileReadError {
path: stdin_metadata.path.clone(),
source: e,
}));
}
}
}
if chunk_lines.is_empty() {
*finished = true;
return None;
}
let end_line = *current_line - 1;
let is_final = match reader.fill_buf() {
Ok(buf) => buf.is_empty(), Err(_) => true, };
if is_final {
*finished = true;
}
let metadata = ChunkMetadata {
file_path: stdin_metadata.path.clone(),
language: stdin_metadata.language.clone(),
encoding: stdin_metadata.encoding.clone(),
total_file_bytes: *bytes_processed as u64, total_file_lines: *current_line, total_file_lines_exact: is_final,
chunk_lines: chunk_lines.len(),
chunk_bytes,
start_line,
end_line,
};
let checkpoint = StreamingCheckpoint::new(
stdin_metadata.path.clone(),
*current_line,
*bytes_processed,
*chunk_number,
config,
);
let chunk = StreamingChunk {
schema_version: config.schema_version.clone(),
metadata,
lines: chunk_lines,
checkpoint,
is_final,
};
*chunk_number += 1;
Some(Ok(chunk))
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::config::BatlessConfig;
use std::io::Write;
use tempfile::NamedTempFile;
fn create_test_file() -> NamedTempFile {
let mut file = NamedTempFile::new().unwrap();
writeln!(file, "line 1").unwrap();
writeln!(file, "line 2").unwrap();
writeln!(file, "line 3").unwrap();
writeln!(file, "line 4").unwrap();
writeln!(file, "line 5").unwrap();
file
}
#[test]
fn test_streaming_checkpoint_creation() {
let config = BatlessConfig::default().with_streaming_chunk_size(2);
let checkpoint = StreamingCheckpoint::new("test.txt".to_string(), 10, 500, 2, &config);
assert_eq!(checkpoint.file_path, "test.txt");
assert_eq!(checkpoint.line_number, 10);
assert_eq!(checkpoint.bytes_processed, 500);
assert_eq!(checkpoint.chunk_number, 2);
assert_eq!(checkpoint.schema_version, config.schema_version);
assert!(!checkpoint.timestamp.is_empty());
assert!(!checkpoint.config_hash.is_empty());
}
#[test]
fn test_checkpoint_compatibility() {
let config1 = BatlessConfig::default().with_streaming_chunk_size(1000);
let config2 = BatlessConfig::default().with_streaming_chunk_size(2000);
let checkpoint = StreamingCheckpoint::new("test.txt".to_string(), 0, 0, 0, &config1);
assert!(checkpoint.is_compatible(&config1));
assert!(!checkpoint.is_compatible(&config2));
}
#[test]
fn test_streaming_schema() {
let schema = StreamingProcessor::get_streaming_schema();
assert!(schema["properties"]["schema_version"].is_object());
assert!(schema["properties"]["metadata"].is_object());
assert!(schema["properties"]["lines"].is_object());
assert!(schema["properties"]["checkpoint"].is_object());
assert!(schema["properties"]["is_final"].is_object());
}
#[test]
fn test_streaming_processor_basic() -> BatlessResult<()> {
let file = create_test_file();
let config = BatlessConfig::default()
.with_streaming_json(true)
.with_streaming_chunk_size(2);
let chunks: Result<Vec<_>, _> =
StreamingProcessor::process_streaming(file.path().to_str().unwrap(), &config, None)?
.collect();
let chunks = chunks?;
assert_eq!(chunks.len(), 3);
assert_eq!(chunks[0].lines.len(), 2);
assert_eq!(chunks[0].lines[0], "line 1");
assert_eq!(chunks[0].lines[1], "line 2");
assert!(!chunks[0].is_final);
assert!(chunks[2].is_final);
assert_eq!(chunks[2].lines.len(), 1);
assert_eq!(chunks[2].lines[0], "line 5");
Ok(())
}
}