surfing 0.1.1

A Rust library for parsing JSON objects from text streams
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
//! Streaming deserializer for parsing JSON objects from text streams.
//!
//! This module provides the `StreamingDeserializer` which can process
//! chunks of text containing mixed content, extracting and deserializing
//! JSON objects as they become available.

use std::io::Cursor;
use std::marker::PhantomData;

use serde::de::DeserializeOwned;

use crate::JSONParser;
use crate::serde::deserializer::DeserializeError;

/// A deserializer for processing streams of text containing JSON.
///
/// The `StreamingDeserializer` can process chunks of text data incrementally,
/// extracting JSON objects and deserializing them into Rust types.
/// It automatically handles partial JSON objects split across multiple chunks.
///
/// # Examples
///
/// ```
/// # #[cfg(feature = "serde")]
/// # {
/// use serde::Deserialize;
/// use surfing::serde::StreamingDeserializer;
///
/// #[derive(Debug, Deserialize, PartialEq)]
/// struct User {
///     id: u64,
///     name: String,
/// }
///
/// // Create a deserializer for the User type
/// let mut deserializer = StreamingDeserializer::<User>::new();
///
/// // Process stream in chunks
/// let chunks = [
///     "Text before {\"id\":",
///     "42,\"name\":\"Alice",
///     "\"} more text"
/// ];
///
/// // Add each chunk and check for completed objects
/// for chunk in &chunks[0..2] {
///     let result = deserializer.process_chunk(chunk);
///     assert!(result.is_none()); // Not complete yet
/// }
///
/// // The final chunk should complete the JSON object
/// let result = deserializer.process_chunk(chunks[2]);
/// assert!(result.is_some());
///
/// let user = result.unwrap();
/// assert_eq!(user.id, 42);
/// assert_eq!(user.name, "Alice");
/// # }
/// ```
pub struct StreamingDeserializer<T>
where
    T: DeserializeOwned,
{
    parser: JSONParser,
    accumulated_json: String,
    _phantom: PhantomData<T>,
}

impl<T> StreamingDeserializer<T>
where
    T: DeserializeOwned,
{
    /// Creates a new streaming deserializer.
    ///
    /// # Examples
    ///
    /// ```
    /// # #[cfg(feature = "serde")]
    /// # {
    /// use serde::Deserialize;
    /// use surfing::serde::StreamingDeserializer;
    ///
    /// #[derive(Debug, Deserialize)]
    /// struct LogEntry {
    ///     level: String,
    ///     message: String,
    /// }
    ///
    /// let deserializer = StreamingDeserializer::<LogEntry>::new();
    /// # }
    /// ```
    pub fn new() -> Self {
        Self {
            parser: JSONParser::new(),
            accumulated_json: String::new(),
            _phantom: PhantomData,
        }
    }

    /// Process a chunk of text data and attempt to extract and deserialize JSON.
    ///
    /// This method processes the given chunk of text, extracting any JSON content
    /// and accumulating it. If a complete JSON object is found, it deserializes
    /// it into the target type `T`.
    ///
    /// # Arguments
    ///
    /// * `chunk` - A string slice containing text data, potentially with embedded JSON.
    ///
    /// # Returns
    ///
    /// * `Some(T)` - If a complete JSON object was found and successfully deserialized.
    /// * `None` - If the JSON is still incomplete or no JSON was found.
    ///
    /// # Errors
    ///
    /// The method will return `None` if:
    /// - The chunk contains no JSON
    /// - The JSON object is still incomplete
    /// - There was an error deserializing the JSON
    ///
    /// # Examples
    ///
    /// ```
    /// # #[cfg(feature = "serde")]
    /// # {
    /// use serde::Deserialize;
    /// use surfing::serde::StreamingDeserializer;
    ///
    /// #[derive(Debug, Deserialize, PartialEq)]
    /// struct Config {
    ///     debug: bool,
    ///     port: u16,
    /// }
    ///
    /// let mut deserializer = StreamingDeserializer::<Config>::new();
    ///
    /// // Process incomplete JSON
    /// let result = deserializer.process_chunk("{\"debug\":true,");
    /// assert!(result.is_none()); // Still incomplete
    ///
    /// // Complete the JSON
    /// let result = deserializer.process_chunk("\"port\":8080}");
    /// assert!(result.is_some());
    ///
    /// let config = result.unwrap();
    /// assert_eq!(config.debug, true);
    /// assert_eq!(config.port, 8080);
    /// # }
    /// ```
    pub fn process_chunk(&mut self, chunk: &str) -> Option<T> {
        // Extract JSON from this chunk
        let mut buffer = Vec::new();
        {
            let mut writer = Cursor::new(&mut buffer);
            if self.parser.extract_json_from_stream(&mut writer, chunk).is_err() {
                return None;
            }
        }

        // Add this chunk's extracted JSON to our accumulation
        if let Ok(chunk_json) = String::from_utf8(buffer) {
            self.accumulated_json.push_str(&chunk_json);
        } else {
            return None;
        }

        // If we've completed a JSON object, try to deserialize it
        if !self.parser.is_in_json() && !self.accumulated_json.is_empty() {
            let accumulated_json = self.accumulated_json.clone();
            // Reset the accumulated JSON for the next object
            self.accumulated_json.clear();
            
            match serde_json::from_str::<T>(&accumulated_json) {
                Ok(value) => {
                    Some(value)
                }
                Err(_) => {
                    None
                },
            }
        } else {
            // Still waiting for more JSON
            None
        }
    }

    /// Returns whether the parser is currently in the middle of processing a JSON object.
    ///
    /// # Returns
    ///
    /// * `true` - If the parser is currently processing an incomplete JSON object.
    /// * `false` - If the parser is not in the middle of a JSON object.
    ///
    /// # Examples
    ///
    /// ```
    /// # #[cfg(feature = "serde")]
    /// # {
    /// use serde::Deserialize;
    /// use surfing::serde::StreamingDeserializer;
    ///
    /// #[derive(Debug, Deserialize)]
    /// struct Data {
    ///     value: i32,
    /// }
    ///
    /// let mut deserializer = StreamingDeserializer::<Data>::new();
    /// assert!(!deserializer.is_in_json()); // Not processing anything yet
    ///
    /// deserializer.process_chunk("{\"value\":");
    /// assert!(deserializer.is_in_json()); // In the middle of JSON
    ///
    /// deserializer.process_chunk("42}");
    /// assert!(!deserializer.is_in_json()); // JSON complete
    /// # }
    /// ```
    pub fn is_in_json(&self) -> bool {
        self.parser.is_in_json()
    }

    /// Returns the currently accumulated partial JSON string.
    ///
    /// This can be useful for debugging or logging purposes.
    ///
    /// # Returns
    ///
    /// A string slice containing the currently accumulated partial JSON.
    ///
    /// # Examples
    ///
    /// ```
    /// # #[cfg(feature = "serde")]
    /// # {
    /// use serde::Deserialize;
    /// use surfing::serde::StreamingDeserializer;
    ///
    /// #[derive(Debug, Deserialize)]
    /// struct Data {
    ///     value: i32,
    /// }
    ///
    /// let mut deserializer = StreamingDeserializer::<Data>::new();
    /// deserializer.process_chunk("{\"value\":");
    ///
    /// assert_eq!(deserializer.accumulated_json(), "{\"value\":");
    /// # }
    /// ```
    pub fn accumulated_json(&self) -> &str {
        &self.accumulated_json
    }

    /// Resets the deserializer state.
    ///
    /// This clears any accumulated JSON and resets the parser,
    /// allowing you to start processing a new stream.
    ///
    /// # Examples
    ///
    /// ```
    /// # #[cfg(feature = "serde")]
    /// # {
    /// use serde::Deserialize;
    /// use surfing::serde::StreamingDeserializer;
    ///
    /// #[derive(Debug, Deserialize)]
    /// struct Data {
    ///     value: i32,
    /// }
    ///
    /// let mut deserializer = StreamingDeserializer::<Data>::new();
    /// deserializer.process_chunk("{\"value\":");  // Partial JSON
    ///
    /// // Reset the deserializer
    /// deserializer.reset();
    /// assert!(!deserializer.is_in_json());
    /// assert!(deserializer.accumulated_json().is_empty());
    /// # }
    /// ```
    pub fn reset(&mut self) {
        self.parser = JSONParser::new();
        self.accumulated_json.clear();
    }

    /// Attempts to finalize and deserialize any accumulated JSON.
    ///
    /// This method should be called when no more chunks are expected,
    /// to handle cases where the JSON might be valid despite the parser
    /// still expecting more input.
    ///
    /// # Returns
    ///
    /// * `Ok(Option<T>)` - `Some(T)` if a complete object was deserialized, `None` if no valid JSON is available
    /// * `Err(DeserializeError)` - If there was an error deserializing the JSON
    ///
    /// # Examples
    ///
    /// ```
    /// # #[cfg(feature = "serde")]
    /// # {
    /// use serde::Deserialize;
    /// use surfing::serde::StreamingDeserializer;
    ///
    /// #[derive(Debug, Deserialize, PartialEq)]
    /// struct Data {
    ///     value: i32,
    /// }
    ///
    /// let mut deserializer = StreamingDeserializer::<Data>::new();
    ///
    /// // The stream ends, try to finalize
    /// let result = deserializer.finalize();
    /// assert!(result.is_ok());
    /// assert!(result.unwrap().is_none()); // No JSON was accumulated
    ///
    /// // Process complete JSON
    /// deserializer.process_chunk("{\"value\":42}");
    ///
    /// // Should be able to finalize
    /// let result = deserializer.finalize();
    /// assert!(result.is_ok());
    /// assert!(result.unwrap().is_some());
    /// # }
    /// ```
    pub fn finalize(&mut self) -> Result<Option<T>, DeserializeError> {
        if self.accumulated_json.is_empty() {
            return Ok(None);
        }

        match serde_json::from_str::<T>(&self.accumulated_json) {
            Ok(value) => {
                self.reset();
                Ok(Some(value))
            }
            Err(e) => Err(DeserializeError::Deserialization(e)),
        }
    }
}

impl<T> Default for StreamingDeserializer<T>
where
    T: DeserializeOwned,
{
    fn default() -> Self {
        Self::new()
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use serde::Deserialize;

    #[derive(Debug, Deserialize, PartialEq)]
    struct TestData {
        id: u64,
        name: String,
    }

    #[test]
    fn test_complete_json_in_one_chunk() {
        let mut deserializer = StreamingDeserializer::<TestData>::new();
        
        let result = deserializer.process_chunk("{\"id\":1,\"name\":\"test\"}");
        assert!(result.is_some());
        
        let data = result.unwrap();
        assert_eq!(data.id, 1);
        assert_eq!(data.name, "test");
    }

    #[test]
    fn test_partial_json_across_multiple_chunks() {
        let mut deserializer = StreamingDeserializer::<TestData>::new();
        
        // First chunk - no complete JSON yet
        let result = deserializer.process_chunk("{\"id\":2,");
        assert!(result.is_none());
        
        // Second chunk - still incomplete
        let result = deserializer.process_chunk("\"name\":\"");
        assert!(result.is_none());
        
        // Third chunk - completes the JSON
        let result = deserializer.process_chunk("streaming\"}");
        assert!(result.is_some());
        
        let data = result.unwrap();
        assert_eq!(data.id, 2);
        assert_eq!(data.name, "streaming");
    }

    #[test]
    fn test_mixed_text_with_json() {
        let mut deserializer = StreamingDeserializer::<TestData>::new();
        
        let result = deserializer.process_chunk("Log entry: {\"id\":3,\"name\":\"mixed\"} End of log");
        assert!(result.is_some());
        
        let data = result.unwrap();
        assert_eq!(data.id, 3);
        assert_eq!(data.name, "mixed");
    }

    #[test]
    fn test_reset_deserializer() {
        let mut deserializer = StreamingDeserializer::<TestData>::new();
        
        // Start processing some JSON
        deserializer.process_chunk("{\"id\":4,");
        assert!(deserializer.is_in_json());
        
        // Reset the deserializer
        deserializer.reset();
        assert!(!deserializer.is_in_json());
        assert_eq!(deserializer.accumulated_json(), "");
        
        // Start fresh
        let result = deserializer.process_chunk("{\"id\":4,\"name\":\"reset\"}");
        assert!(result.is_some());
    }

    #[test]
    fn test_finalize_with_complete_json() {
        let mut deserializer = StreamingDeserializer::<TestData>::new();
        deserializer.process_chunk("{\"id\":5,\"name\":\"finalize\"}");
        
        let result = deserializer.finalize();
        assert!(result.is_ok());
        
        let data = result.unwrap();
        assert!(data.is_some());
        assert_eq!(data.unwrap().name, "finalize");
    }

    #[test]
    fn test_finalize_with_incomplete_json() {
        let mut deserializer = StreamingDeserializer::<TestData>::new();
        deserializer.process_chunk("{\"id\":6,\"name\":");
        
        let result = deserializer.finalize();
        assert!(result.is_err());
    }

    #[test]
    fn test_no_json_returns_none() {
        let mut deserializer = StreamingDeserializer::<TestData>::new();
        
        let result = deserializer.process_chunk("This text contains no JSON objects");
        assert!(result.is_none());
    }

    #[test]
    fn test_multiple_json_objects() {
        let mut deserializer = StreamingDeserializer::<TestData>::new();
        
        // Process a chunk with two complete JSON objects
        let chunk = "{\"id\":7,\"name\":\"first\"}{\"id\":8,\"name\":\"second\"}";
        
        // Should get the first object
        let result1 = deserializer.process_chunk(chunk);
        assert!(result1.is_some());
        assert_eq!(result1.unwrap().id, 7);
        
        // The second object should be ignored (current implementation limitation)
        // A more advanced implementation could handle this by tracking partial objects
    }
}