use crate::encode::encoders;
use crate::encode::normalize::normalize_json_value;
use crate::encode::replacer::apply_replacer;
use crate::error::Result;
use crate::options::{EncodeOptions, ResolvedEncodeOptions, resolve_encode_options};
use crate::shared::validation::is_valid_unquoted_key;
use crate::{JsonStreamEvent, JsonValue};
use asupersync::stream::{Stream, StreamExt, iter};
use std::pin::Pin;
use std::task::{Context, Poll};
pub struct AsyncEncodeStream {
lines: Vec<String>,
index: usize,
}
impl AsyncEncodeStream {
#[must_use]
pub fn new(input: impl Into<JsonValue>, options: Option<EncodeOptions>) -> Self {
Self::try_new(input, options).expect("AsyncEncodeStream::new failed")
}
pub fn try_new(input: impl Into<JsonValue>, options: Option<EncodeOptions>) -> Result<Self> {
let resolved = resolve_encode_options(options);
let normalized = normalize_json_value(input.into());
let replaced = if let Some(replacer) = &resolved.replacer {
apply_replacer(&normalized, replacer)
} else {
normalized
};
let lines = encoders::encode_json_value(&replaced, &resolved);
Ok(Self { lines, index: 0 })
}
#[must_use]
pub const fn len(&self) -> usize {
self.lines.len()
}
#[must_use]
pub const fn is_empty(&self) -> bool {
self.lines.is_empty()
}
}
impl Stream for AsyncEncodeStream {
type Item = String;
fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
if self.index < self.lines.len() {
let line = self.lines[self.index].clone();
self.index += 1;
Poll::Ready(Some(line))
} else {
Poll::Ready(None)
}
}
}
pub struct AsyncEncodeEventStream {
stack: Vec<EncodeStackFrame>,
#[allow(dead_code)]
options: ResolvedEncodeOptions,
started: bool,
root: Option<JsonValue>,
}
enum EncodeStackFrame {
Object {
entries: std::vec::IntoIter<(String, JsonValue)>,
pending_value: Option<JsonValue>,
},
Array {
items: std::vec::IntoIter<JsonValue>,
length: usize,
emitted_start: bool,
},
}
impl AsyncEncodeEventStream {
#[must_use]
pub fn new(input: impl Into<JsonValue>, options: Option<EncodeOptions>) -> Self {
Self::try_new(input, options).expect("AsyncEncodeEventStream::new failed")
}
pub fn try_new(input: impl Into<JsonValue>, options: Option<EncodeOptions>) -> Result<Self> {
let resolved = resolve_encode_options(options);
let normalized = normalize_json_value(input.into());
let replaced = if let Some(replacer) = &resolved.replacer {
apply_replacer(&normalized, replacer)
} else {
normalized
};
Ok(Self {
stack: Vec::new(),
options: resolved,
started: false,
root: Some(replaced),
})
}
fn next_event(&mut self) -> Option<JsonStreamEvent> {
if !self.started {
self.started = true;
if let Some(root) = self.root.take() {
return self.start_value(root);
}
}
let frame = self.stack.last_mut()?;
match frame {
EncodeStackFrame::Object {
entries,
pending_value,
} => {
if let Some(value) = pending_value.take() {
return self.start_value(value);
}
if let Some((key, value)) = entries.next() {
*pending_value = Some(value);
let was_quoted = !is_valid_unquoted_key(&key);
return Some(JsonStreamEvent::Key { key, was_quoted });
}
self.stack.pop();
Some(JsonStreamEvent::EndObject)
}
EncodeStackFrame::Array {
items,
length,
emitted_start,
} => {
if !*emitted_start {
*emitted_start = true;
return Some(JsonStreamEvent::StartArray { length: *length });
}
if let Some(item) = items.next() {
return self.start_value(item);
}
self.stack.pop();
Some(JsonStreamEvent::EndArray)
}
}
}
fn start_value(&mut self, value: JsonValue) -> Option<JsonStreamEvent> {
match value {
JsonValue::Primitive(p) => Some(JsonStreamEvent::Primitive { value: p }),
JsonValue::Array(arr) => {
let length = arr.len();
self.stack.push(EncodeStackFrame::Array {
items: arr.into_iter(),
length,
emitted_start: false,
});
self.next_event()
}
JsonValue::Object(obj) => {
self.stack.push(EncodeStackFrame::Object {
entries: obj.into_iter(),
pending_value: None,
});
Some(JsonStreamEvent::StartObject)
}
}
}
}
impl Stream for AsyncEncodeEventStream {
type Item = JsonStreamEvent;
fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Poll::Ready(self.next_event())
}
}
pub async fn encode_lines_async(
input: impl Into<JsonValue>,
options: Option<EncodeOptions>,
) -> Vec<String> {
try_encode_lines_async(input, options)
.await
.expect("encode_lines_async failed")
}
pub async fn try_encode_lines_async(
input: impl Into<JsonValue>,
options: Option<EncodeOptions>,
) -> Result<Vec<String>> {
let input = input.into();
let resolved = resolve_encode_options(options);
let normalized = normalize_json_value(input);
let replaced = if let Some(replacer) = &resolved.replacer {
apply_replacer(&normalized, replacer)
} else {
normalized
};
let lines = encoders::encode_json_value(&replaced, &resolved);
let line_stream = iter(lines.clone());
let _count = line_stream.count().await;
Ok(lines)
}
pub async fn encode_async(input: impl Into<JsonValue>, options: Option<EncodeOptions>) -> String {
try_encode_async(input, options)
.await
.expect("encode_async failed")
}
pub async fn try_encode_async(
input: impl Into<JsonValue>,
options: Option<EncodeOptions>,
) -> Result<String> {
let lines = try_encode_lines_async(input, options).await?;
Ok(lines.join("\n"))
}
pub async fn encode_events_async(
input: impl Into<JsonValue>,
options: Option<EncodeOptions>,
) -> Vec<JsonStreamEvent> {
try_encode_events_async(input, options)
.await
.expect("encode_events_async failed")
}
#[allow(clippy::unused_async)]
pub async fn try_encode_events_async(
input: impl Into<JsonValue>,
options: Option<EncodeOptions>,
) -> Result<Vec<JsonStreamEvent>> {
let input = input.into();
let mut stream = AsyncEncodeEventStream::try_new(input, options)?;
let mut events = Vec::new();
while let Some(event) = stream.next_event() {
events.push(event);
}
Ok(events)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::StringOrNumberOrBoolOrNull;
#[test]
fn test_async_encode_stream_creation() {
let value = JsonValue::Object(vec![
(
"name".to_string(),
JsonValue::Primitive(StringOrNumberOrBoolOrNull::String("Alice".to_string())),
),
(
"age".to_string(),
JsonValue::Primitive(StringOrNumberOrBoolOrNull::Number(30.0)),
),
]);
let stream = AsyncEncodeStream::new(value, None);
assert_eq!(stream.index, 0);
assert!(!stream.is_empty());
}
#[test]
fn test_async_encode_event_stream() {
let value = JsonValue::Object(vec![(
"key".to_string(),
JsonValue::Primitive(StringOrNumberOrBoolOrNull::String("value".to_string())),
)]);
let mut stream = AsyncEncodeEventStream::new(value, None);
let events: Vec<_> = std::iter::from_fn(|| stream.next_event()).collect();
assert!(events.len() >= 3); assert!(matches!(events[0], JsonStreamEvent::StartObject));
}
#[test]
fn test_encode_events_match() {
let value = JsonValue::Object(vec![
(
"name".to_string(),
JsonValue::Primitive(StringOrNumberOrBoolOrNull::String("Alice".to_string())),
),
(
"items".to_string(),
JsonValue::Array(vec![
JsonValue::Primitive(StringOrNumberOrBoolOrNull::Number(1.0)),
JsonValue::Primitive(StringOrNumberOrBoolOrNull::Number(2.0)),
]),
),
]);
let sync_events = crate::encode::encode_stream_events(value.clone(), None);
let mut stream = AsyncEncodeEventStream::new(value, None);
let async_events: Vec<_> = std::iter::from_fn(|| stream.next_event()).collect();
assert_eq!(sync_events.len(), async_events.len());
for (sync_ev, async_ev) in sync_events.iter().zip(async_events.iter()) {
assert_eq!(sync_ev, async_ev);
}
}
}