use std::io::Cursor;
use std::marker::PhantomData;
use serde::de::DeserializeOwned;
use crate::JSONParser;
use crate::serde::deserializer::DeserializeError;
pub struct StreamingDeserializer<T>
where
T: DeserializeOwned,
{
parser: JSONParser,
accumulated_json: String,
_phantom: PhantomData<T>,
}
impl<T> StreamingDeserializer<T>
where
T: DeserializeOwned,
{
pub fn new() -> Self {
Self {
parser: JSONParser::new(),
accumulated_json: String::new(),
_phantom: PhantomData,
}
}
pub fn process_chunk(&mut self, chunk: &str) -> Option<T> {
let mut buffer = Vec::new();
{
let mut writer = Cursor::new(&mut buffer);
if self.parser.extract_json_from_stream(&mut writer, chunk).is_err() {
return None;
}
}
if let Ok(chunk_json) = String::from_utf8(buffer) {
self.accumulated_json.push_str(&chunk_json);
} else {
return None;
}
if !self.parser.is_in_json() && !self.accumulated_json.is_empty() {
let accumulated_json = self.accumulated_json.clone();
self.accumulated_json.clear();
match serde_json::from_str::<T>(&accumulated_json) {
Ok(value) => {
Some(value)
}
Err(_) => {
None
},
}
} else {
None
}
}
pub fn is_in_json(&self) -> bool {
self.parser.is_in_json()
}
pub fn accumulated_json(&self) -> &str {
&self.accumulated_json
}
pub fn reset(&mut self) {
self.parser = JSONParser::new();
self.accumulated_json.clear();
}
pub fn finalize(&mut self) -> Result<Option<T>, DeserializeError> {
if self.accumulated_json.is_empty() {
return Ok(None);
}
match serde_json::from_str::<T>(&self.accumulated_json) {
Ok(value) => {
self.reset();
Ok(Some(value))
}
Err(e) => Err(DeserializeError::Deserialization(e)),
}
}
}
impl<T> Default for StreamingDeserializer<T>
where
T: DeserializeOwned,
{
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
use serde::Deserialize;
#[derive(Debug, Deserialize, PartialEq)]
struct TestData {
id: u64,
name: String,
}
#[test]
fn test_complete_json_in_one_chunk() {
let mut deserializer = StreamingDeserializer::<TestData>::new();
let result = deserializer.process_chunk("{\"id\":1,\"name\":\"test\"}");
assert!(result.is_some());
let data = result.unwrap();
assert_eq!(data.id, 1);
assert_eq!(data.name, "test");
}
#[test]
fn test_partial_json_across_multiple_chunks() {
let mut deserializer = StreamingDeserializer::<TestData>::new();
let result = deserializer.process_chunk("{\"id\":2,");
assert!(result.is_none());
let result = deserializer.process_chunk("\"name\":\"");
assert!(result.is_none());
let result = deserializer.process_chunk("streaming\"}");
assert!(result.is_some());
let data = result.unwrap();
assert_eq!(data.id, 2);
assert_eq!(data.name, "streaming");
}
#[test]
fn test_mixed_text_with_json() {
let mut deserializer = StreamingDeserializer::<TestData>::new();
let result = deserializer.process_chunk("Log entry: {\"id\":3,\"name\":\"mixed\"} End of log");
assert!(result.is_some());
let data = result.unwrap();
assert_eq!(data.id, 3);
assert_eq!(data.name, "mixed");
}
#[test]
fn test_reset_deserializer() {
let mut deserializer = StreamingDeserializer::<TestData>::new();
deserializer.process_chunk("{\"id\":4,");
assert!(deserializer.is_in_json());
deserializer.reset();
assert!(!deserializer.is_in_json());
assert_eq!(deserializer.accumulated_json(), "");
let result = deserializer.process_chunk("{\"id\":4,\"name\":\"reset\"}");
assert!(result.is_some());
}
#[test]
fn test_finalize_with_complete_json() {
let mut deserializer = StreamingDeserializer::<TestData>::new();
deserializer.process_chunk("{\"id\":5,\"name\":\"finalize\"}");
let result = deserializer.finalize();
assert!(result.is_ok());
let data = result.unwrap();
assert!(data.is_some());
assert_eq!(data.unwrap().name, "finalize");
}
#[test]
fn test_finalize_with_incomplete_json() {
let mut deserializer = StreamingDeserializer::<TestData>::new();
deserializer.process_chunk("{\"id\":6,\"name\":");
let result = deserializer.finalize();
assert!(result.is_err());
}
#[test]
fn test_no_json_returns_none() {
let mut deserializer = StreamingDeserializer::<TestData>::new();
let result = deserializer.process_chunk("This text contains no JSON objects");
assert!(result.is_none());
}
#[test]
fn test_multiple_json_objects() {
let mut deserializer = StreamingDeserializer::<TestData>::new();
let chunk = "{\"id\":7,\"name\":\"first\"}{\"id\":8,\"name\":\"second\"}";
let result1 = deserializer.process_chunk(chunk);
assert!(result1.is_some());
assert_eq!(result1.unwrap().id, 7);
}
}