tru 0.2.3

TOON reference implementation in Rust (JSON <-> TOON)
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
//! Async streaming encode using the asupersync runtime.
//!
//! This module provides true async streaming for TOON encoding with:
//! - Yield points between event processing for cooperative scheduling
//! - Cancellation support via asupersync's capability context
//! - Stream-based API for encoding large JSON values
//!
//! # Example
//!
//! ```ignore
//! use toon::encode::async_encode::{encode_async, AsyncEncodeStream};
//! use asupersync::stream::StreamExt;
//!
//! async fn encode_large_value(value: JsonValue) {
//!     let mut stream = AsyncEncodeStream::new(value, None);
//!     while let Some(line) = stream.next().await {
//!         println!("{}", line);
//!     }
//! }
//! ```

use crate::encode::encoders;
use crate::encode::normalize::normalize_json_value;
use crate::encode::replacer::apply_replacer;
use crate::error::Result;
use crate::options::{EncodeOptions, ResolvedEncodeOptions, resolve_encode_options};
use crate::shared::validation::is_valid_unquoted_key;
use crate::{JsonStreamEvent, JsonValue};
use asupersync::stream::{Stream, StreamExt, iter};
use std::pin::Pin;
use std::task::{Context, Poll};

/// Async stream that yields TOON output lines from a JSON value.
///
/// This stream processes JSON input and yields TOON lines as they are
/// encoded. It supports cancellation and cooperative scheduling through
/// asupersync's stream primitives.
pub struct AsyncEncodeStream {
    /// Pre-computed lines to emit
    lines: Vec<String>,
    /// Current index into lines
    index: usize,
}

impl AsyncEncodeStream {
    /// Create a new async encode stream from a JSON value.
    ///
    /// # Panics
    ///
    /// Panics if the underlying encoding pipeline fails. In practice encoding a
    /// normalized `JsonValue` is infallible today; callers that need to surface
    /// errors should use [`Self::try_new`] instead.
    #[must_use]
    pub fn new(input: impl Into<JsonValue>, options: Option<EncodeOptions>) -> Self {
        Self::try_new(input, options).expect("AsyncEncodeStream::new failed")
    }

    /// Fallible counterpart to [`Self::new`].
    ///
    /// # Errors
    ///
    /// Returns an error when the replacer or encoder pipeline surfaces one.
    pub fn try_new(input: impl Into<JsonValue>, options: Option<EncodeOptions>) -> Result<Self> {
        let resolved = resolve_encode_options(options);
        let normalized = normalize_json_value(input.into());
        let replaced = if let Some(replacer) = &resolved.replacer {
            apply_replacer(&normalized, replacer)
        } else {
            normalized
        };
        let lines = encoders::encode_json_value(&replaced, &resolved);

        Ok(Self { lines, index: 0 })
    }

    /// Get the total number of lines.
    #[must_use]
    pub const fn len(&self) -> usize {
        self.lines.len()
    }

    /// Check if the stream is empty.
    #[must_use]
    pub const fn is_empty(&self) -> bool {
        self.lines.is_empty()
    }
}

impl Stream for AsyncEncodeStream {
    type Item = String;

    fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        if self.index < self.lines.len() {
            let line = self.lines[self.index].clone();
            self.index += 1;
            Poll::Ready(Some(line))
        } else {
            Poll::Ready(None)
        }
    }
}

/// Async stream that yields `JsonStreamEvent` items from a JSON value.
///
/// This stream traverses the JSON structure and yields events representing
/// the structure (start/end object, start/end array, keys, primitives).
pub struct AsyncEncodeEventStream {
    /// Stack of iterators for nested structures
    stack: Vec<EncodeStackFrame>,
    /// The resolved encoding options
    #[allow(dead_code)]
    options: ResolvedEncodeOptions,
    /// Whether we've started
    started: bool,
    /// Root value (only used once at start)
    root: Option<JsonValue>,
}

enum EncodeStackFrame {
    Object {
        entries: std::vec::IntoIter<(String, JsonValue)>,
        pending_value: Option<JsonValue>,
    },
    Array {
        items: std::vec::IntoIter<JsonValue>,
        length: usize,
        emitted_start: bool,
    },
}

impl AsyncEncodeEventStream {
    /// Create a new async event stream from a JSON value.
    ///
    /// # Panics
    ///
    /// Panics if the underlying pipeline fails. Use [`Self::try_new`] to
    /// propagate errors instead.
    #[must_use]
    pub fn new(input: impl Into<JsonValue>, options: Option<EncodeOptions>) -> Self {
        Self::try_new(input, options).expect("AsyncEncodeEventStream::new failed")
    }

    /// Fallible counterpart to [`Self::new`].
    ///
    /// # Errors
    ///
    /// Returns an error when the replacer or encoder pipeline surfaces one.
    pub fn try_new(input: impl Into<JsonValue>, options: Option<EncodeOptions>) -> Result<Self> {
        let resolved = resolve_encode_options(options);
        let normalized = normalize_json_value(input.into());
        let replaced = if let Some(replacer) = &resolved.replacer {
            apply_replacer(&normalized, replacer)
        } else {
            normalized
        };

        Ok(Self {
            stack: Vec::new(),
            options: resolved,
            started: false,
            root: Some(replaced),
        })
    }

    fn next_event(&mut self) -> Option<JsonStreamEvent> {
        // Handle root value on first call
        if !self.started {
            self.started = true;
            if let Some(root) = self.root.take() {
                return self.start_value(root);
            }
        }

        // Process the current frame on the stack
        let frame = self.stack.last_mut()?;

        match frame {
            EncodeStackFrame::Object {
                entries,
                pending_value,
            } => {
                // If we have a pending value, process it
                if let Some(value) = pending_value.take() {
                    return self.start_value(value);
                }

                // Get next entry
                if let Some((key, value)) = entries.next() {
                    *pending_value = Some(value);
                    let was_quoted = !is_valid_unquoted_key(&key);
                    return Some(JsonStreamEvent::Key { key, was_quoted });
                }

                // Object exhausted - emit end
                self.stack.pop();
                Some(JsonStreamEvent::EndObject)
            }
            EncodeStackFrame::Array {
                items,
                length,
                emitted_start,
            } => {
                // Emit start array if not done
                if !*emitted_start {
                    *emitted_start = true;
                    return Some(JsonStreamEvent::StartArray { length: *length });
                }

                // Get next item
                if let Some(item) = items.next() {
                    return self.start_value(item);
                }

                // Array exhausted - emit end
                self.stack.pop();
                Some(JsonStreamEvent::EndArray)
            }
        }
    }

    fn start_value(&mut self, value: JsonValue) -> Option<JsonStreamEvent> {
        match value {
            JsonValue::Primitive(p) => Some(JsonStreamEvent::Primitive { value: p }),
            JsonValue::Array(arr) => {
                let length = arr.len();
                self.stack.push(EncodeStackFrame::Array {
                    items: arr.into_iter(),
                    length,
                    emitted_start: false,
                });
                // Recursively get the start array event
                self.next_event()
            }
            JsonValue::Object(obj) => {
                self.stack.push(EncodeStackFrame::Object {
                    entries: obj.into_iter(),
                    pending_value: None,
                });
                Some(JsonStreamEvent::StartObject)
            }
        }
    }
}

impl Stream for AsyncEncodeEventStream {
    type Item = JsonStreamEvent;

    fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        Poll::Ready(self.next_event())
    }
}

/// Encode a JSON value to TOON lines asynchronously.
///
/// This function creates an async stream and collects all lines. The async
/// wrapper provides yield points for cooperative scheduling.
///
/// # Panics
///
/// Panics if the underlying encoder pipeline fails. Use
/// [`try_encode_lines_async`] to surface errors as a `Result`.
pub async fn encode_lines_async(
    input: impl Into<JsonValue>,
    options: Option<EncodeOptions>,
) -> Vec<String> {
    try_encode_lines_async(input, options)
        .await
        .expect("encode_lines_async failed")
}

/// Fallible counterpart to [`encode_lines_async`].
///
/// # Errors
///
/// Returns an error when the replacer or encoder pipeline surfaces one.
pub async fn try_encode_lines_async(
    input: impl Into<JsonValue>,
    options: Option<EncodeOptions>,
) -> Result<Vec<String>> {
    let input = input.into();

    // Use asupersync's iter() to create a yielding stream from the lines
    let resolved = resolve_encode_options(options);
    let normalized = normalize_json_value(input);
    let replaced = if let Some(replacer) = &resolved.replacer {
        apply_replacer(&normalized, replacer)
    } else {
        normalized
    };
    let lines = encoders::encode_json_value(&replaced, &resolved);

    // Wrap lines in an async stream for yield points
    let line_stream = iter(lines.clone());

    // Count forces iteration with yield points
    let _count = line_stream.count().await;

    Ok(lines)
}

/// Encode a JSON value to a TOON string asynchronously.
///
/// # Panics
///
/// Panics if the underlying encoder pipeline fails. Use [`try_encode_async`]
/// to surface errors as a `Result`.
pub async fn encode_async(input: impl Into<JsonValue>, options: Option<EncodeOptions>) -> String {
    try_encode_async(input, options)
        .await
        .expect("encode_async failed")
}

/// Fallible counterpart to [`encode_async`].
///
/// # Errors
///
/// Returns an error when the replacer or encoder pipeline surfaces one.
pub async fn try_encode_async(
    input: impl Into<JsonValue>,
    options: Option<EncodeOptions>,
) -> Result<String> {
    let lines = try_encode_lines_async(input, options).await?;
    Ok(lines.join("\n"))
}

/// Encode a JSON value to events asynchronously.
///
/// Returns a vector of `JsonStreamEvent` items representing the structure.
///
/// # Panics
///
/// Panics if the underlying pipeline fails. Use [`try_encode_events_async`]
/// to surface errors as a `Result`.
pub async fn encode_events_async(
    input: impl Into<JsonValue>,
    options: Option<EncodeOptions>,
) -> Vec<JsonStreamEvent> {
    try_encode_events_async(input, options)
        .await
        .expect("encode_events_async failed")
}

/// Fallible counterpart to [`encode_events_async`].
///
/// # Errors
///
/// Returns an error when the replacer or encoder pipeline surfaces one.
#[allow(clippy::unused_async)]
pub async fn try_encode_events_async(
    input: impl Into<JsonValue>,
    options: Option<EncodeOptions>,
) -> Result<Vec<JsonStreamEvent>> {
    let input = input.into();
    let mut stream = AsyncEncodeEventStream::try_new(input, options)?;

    // Collect all events
    let mut events = Vec::new();
    while let Some(event) = stream.next_event() {
        events.push(event);
    }

    Ok(events)
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::StringOrNumberOrBoolOrNull;

    #[test]
    fn test_async_encode_stream_creation() {
        let value = JsonValue::Object(vec![
            (
                "name".to_string(),
                JsonValue::Primitive(StringOrNumberOrBoolOrNull::String("Alice".to_string())),
            ),
            (
                "age".to_string(),
                JsonValue::Primitive(StringOrNumberOrBoolOrNull::Number(30.0)),
            ),
        ]);
        let stream = AsyncEncodeStream::new(value, None);
        assert_eq!(stream.index, 0);
        assert!(!stream.is_empty());
    }

    #[test]
    fn test_async_encode_event_stream() {
        let value = JsonValue::Object(vec![(
            "key".to_string(),
            JsonValue::Primitive(StringOrNumberOrBoolOrNull::String("value".to_string())),
        )]);
        let mut stream = AsyncEncodeEventStream::new(value, None);

        // Manually poll the stream
        let events: Vec<_> = std::iter::from_fn(|| stream.next_event()).collect();

        assert!(events.len() >= 3); // StartObject, Key, Primitive, EndObject
        assert!(matches!(events[0], JsonStreamEvent::StartObject));
    }

    #[test]
    fn test_encode_events_match() {
        // Test that event stream produces same events as sync version
        let value = JsonValue::Object(vec![
            (
                "name".to_string(),
                JsonValue::Primitive(StringOrNumberOrBoolOrNull::String("Alice".to_string())),
            ),
            (
                "items".to_string(),
                JsonValue::Array(vec![
                    JsonValue::Primitive(StringOrNumberOrBoolOrNull::Number(1.0)),
                    JsonValue::Primitive(StringOrNumberOrBoolOrNull::Number(2.0)),
                ]),
            ),
        ]);

        // Get events from sync version
        let sync_events = crate::encode::encode_stream_events(value.clone(), None);

        // Get events from async stream
        let mut stream = AsyncEncodeEventStream::new(value, None);
        let async_events: Vec<_> = std::iter::from_fn(|| stream.next_event()).collect();

        assert_eq!(sync_events.len(), async_events.len());
        for (sync_ev, async_ev) in sync_events.iter().zip(async_events.iter()) {
            assert_eq!(sync_ev, async_ev);
        }
    }
}