async_jsonl/
async_jsonl.rs

1use futures::Stream;
2use serde::Deserialize;
3use serde_json::Value;
4use tokio::io::{BufReader, Lines};
5
6/// Iterator to read JSONL file as raw JSON strings
7pub struct Jsonl<R> {
8    pub(crate) lines: Lines<BufReader<R>>,
9}
10
11/// Main trait for reading JSONL (JSON Lines) files with async capabilities.
12///
13/// This trait provides methods to read and process JSONL files asynchronously.
14/// It combines streaming capabilities with deserialization and line selection methods.
15/// The trait is implemented by `Jsonl<R>` where `R` implements `AsyncRead + AsyncSeek`.
16///
17/// # Examples
18///
19/// ## Reading from a file and getting first n lines
20///
21/// ```ignore
22/// use async_jsonl::{Jsonl, JsonlReader, JsonlDeserialize};
23/// use futures::StreamExt;
24/// use serde::Deserialize;
25///
26/// #[derive(Deserialize, Debug)]
27/// struct Person {
28///     name: String,
29///     age: u32,
30/// }
31///
32/// #[tokio::main]
33/// async fn main() -> anyhow::Result<()> {
34///     let reader = Jsonl::from_path("people.jsonl").await?;
35///     
36///     // Get first 5 lines and deserialize directly
37///     let first_five = reader.first_n(5).await?;
38///     let mut stream = first_five.deserialize::<Person>();
39///     
40///     while let Some(result) = stream.next().await {
41///         match result {
42///             Ok(person) => println!("Found person: {:?}", person),
43///             Err(e) => eprintln!("Error parsing line: {}", e),
44///         }
45///     }
46///     
47///     Ok(())
48/// }
49/// ```
50///
51/// ## Reading last n lines (tail-like functionality)
52///
53/// ```ignore
54/// use async_jsonl::{Jsonl, JsonlReader};
55/// use futures::StreamExt;
56///
57/// #[tokio::main]
58/// async fn main() -> anyhow::Result<()> {
59///     let reader = Jsonl::from_path("log.jsonl").await?;
60///     
61///     // Get last 10 lines (like tail)
62///     let last_ten = reader.last_n(10).await?;
63///     
64///     let lines: Vec<String> = last_ten
65///         .collect::<Vec<_>>()
66///         .await
67///         .into_iter()
68///         .collect::<Result<Vec<_>, _>>()?;
69///     
70///     for line in lines {
71///         println!("{}", line);
72///     }
73///     
74///     Ok(())
75/// }
76/// ```
77#[async_trait::async_trait]
78pub trait JsonlReader: JsonlDeserialize + JsonlValueDeserialize + Stream + Send + Sync {
79    /// Stream type for the first n lines
80    type NLines: Stream<Item = anyhow::Result<String>>;
81    /// Stream type for the last n lines (in reverse order)
82    type NLinesRev: Stream<Item = anyhow::Result<String>>;
83
84    /// Get the first `n` lines from the JSONL stream.
85    ///
86    /// # Arguments
87    ///
88    /// * `n` - The number of lines to retrieve from the beginning
89    ///
90    /// # Returns
91    ///
92    /// Returns a stream of the first `n` lines as `String`s, or an error if reading fails.
93    ///
94    /// # Examples
95    ///
96    /// ```ignore
97    /// use async_jsonl::{Jsonl, JsonlReader, JsonlDeserialize};
98    /// use futures::StreamExt;
99    /// use serde::Deserialize;
100    ///
101    /// #[derive(Deserialize, Debug)]
102    /// struct LogEntry {
103    ///     timestamp: String,
104    ///     level: String,
105    ///     message: String,
106    /// }
107    ///
108    /// #[tokio::main]
109    /// async fn main() -> anyhow::Result<()> {
110    ///     let reader = Jsonl::from_path("data.jsonl").await?;
111    ///     
112    ///     // Get first 3 lines and deserialize them
113    ///     let first_three = reader.first_n(3).await?;
114    ///     let entries: Vec<LogEntry> = first_three
115    ///         .deserialize::<LogEntry>()
116    ///         .collect::<Vec<_>>()
117    ///         .await
118    ///         .into_iter()
119    ///         .collect::<Result<Vec<_>, _>>()?;
120    ///     
121    ///     println!("First 3 log entries: {:?}", entries);
122    ///     Ok(())
123    /// }
124    /// ```
125    async fn first_n(self, n: usize) -> anyhow::Result<Self::NLines>;
126
127    /// Get the last `n` lines from the JSONL stream.
128    ///
129    /// # Arguments
130    ///
131    /// * `n` - The number of lines to retrieve from the end
132    ///
133    /// # Returns
134    ///
135    /// Returns a stream of the last `n` lines as `String`s in reverse order,
136    /// or an error if reading fails.
137    ///
138    /// # Examples
139    ///
140    /// ```ignore
141    /// use async_jsonl::{Jsonl, JsonlReader};
142    /// use futures::StreamExt;
143    ///
144    /// #[tokio::main]
145    /// async fn main() -> anyhow::Result<()> {
146    ///     let reader = Jsonl::from_path("data.jsonl").await?;
147    ///     
148    ///     let last_two = reader.last_n(2).await?;
149    ///     let mut stream = last_two;
150    ///     
151    ///     while let Some(result) = stream.next().await {
152    ///         match result {
153    ///             Ok(line) => println!("Line: {}", line),
154    ///             Err(e) => eprintln!("Error: {}", e),
155    ///         }
156    ///     }
157    ///     
158    ///     Ok(())
159    /// }
160    /// ```
161    async fn last_n(self, n: usize) -> anyhow::Result<Self::NLinesRev>;
162
163    /// Count the total number of lines in the JSONL stream.
164    async fn count(self) -> usize;
165}
166
167/// Extension trait to add deserialization capabilities to JSONL readers.
168///
169/// This trait provides methods to deserialize JSON lines into strongly-typed Rust structures.
170/// It works with any type that implements `serde::Deserialize` and processes each line
171/// of a JSONL file as a separate JSON object.
172///
173/// # Examples
174///
175/// ## Basic Usage with Custom Types
176///
177/// ```ignore
178/// use async_jsonl::{Jsonl, JsonlDeserialize};
179/// use futures::StreamExt;
180/// use serde::Deserialize;
181/// use std::io::Cursor;
182///
183/// #[derive(Deserialize, Debug, PartialEq)]
184/// struct User {
185///     id: u64,
186///     name: String,
187///     email: String,
188/// }
189///
190/// #[tokio::main]
191/// async fn main() -> anyhow::Result<()> {
192///     let data = r#"{"id": 1, "name": "Alice", "email": "alice@example.com"}
193/// {"id": 2, "name": "Bob", "email": "bob@example.com"}"#;
194///     let reader = Jsonl::new(Cursor::new(data.as_bytes()));
195///     
196///     let mut user_stream = reader.deserialize::<User>();
197///     
198///     while let Some(result) = user_stream.next().await {
199///         match result {
200///             Ok(user) => println!("User: {} ({})", user.name, user.email),
201///             Err(e) => eprintln!("Failed to parse user: {}", e),
202///         }
203///     }
204///     
205///     Ok(())
206/// }
207/// ```
208///
209/// ## Error Handling and Filtering
210///
211/// ```ignore
212/// use async_jsonl::{Jsonl, JsonlDeserialize};
213/// use futures::StreamExt;
214/// use serde::Deserialize;
215/// use std::io::Cursor;
216///
217/// #[derive(Deserialize, Debug)]
218/// struct Product {
219///     name: String,
220///     price: f64,
221///     #[serde(default)]
222///     in_stock: bool,
223/// }
224///
225/// #[tokio::main]
226/// async fn main() -> anyhow::Result<()> {
227///     let data = r#"{"name": "Widget A", "price": 19.99, "in_stock": true}
228/// {"name": "Widget B", "price": 29.99, "in_stock": false}"#;
229///     let reader = Jsonl::new(Cursor::new(data.as_bytes()));
230///     
231///     // Filter only successful deserializations and in-stock products
232///     let in_stock_products: Vec<Product> = reader
233///         .deserialize::<Product>()
234///         .filter_map(|result| async move {
235///             match result {
236///                 Ok(product) if product.in_stock => Some(product),
237///                 Ok(_) => None, // Out of stock
238///                 Err(e) => {
239///                     eprintln!("Skipping invalid product: {}", e);
240///                     None
241///                 }
242///             }
243///         })
244///         .collect()
245///         .await;
246///     
247///     println!("Found {} products in stock", in_stock_products.len());
248///     Ok(())
249/// }
250/// ```
251pub trait JsonlDeserialize {
252    /// Deserialize JSON lines into the specified type
253    fn deserialize<T>(self) -> impl Stream<Item = anyhow::Result<T>>
254    where
255        T: for<'a> Deserialize<'a>;
256}
257
258/// Extension trait specifically for deserializing JSONL to `serde_json::Value` objects.
259///
260/// This trait provides a convenient method to deserialize JSON lines into generic
261/// `serde_json::Value` objects when you don't know the exact structure of the JSON
262/// data ahead of time or when working with heterogeneous JSON objects.
263///
264/// # Examples
265///
266/// ## Basic Usage with Dynamic JSON
267///
268/// ```ignore
269/// use async_jsonl::{Jsonl, JsonlValueDeserialize};
270/// use futures::StreamExt;
271/// use std::io::Cursor;
272///
273/// #[tokio::main]
274/// async fn main() -> anyhow::Result<()> {
275///     let data = r#"{"user_id": 123, "action": "login", "timestamp": "2024-01-01T10:00:00Z"}
276/// {"user_id": 456, "action": "logout", "timestamp": "2024-01-01T11:00:00Z"}
277/// {"user_id": 789, "action": "purchase", "item": "widget", "price": 29.99}"#;
278///     
279///     let reader = Jsonl::new(Cursor::new(data.as_bytes()));
280///     let mut value_stream = reader.deserialize_values();
281///     
282///     while let Some(result) = value_stream.next().await {
283///         match result {
284///             Ok(value) => {
285///                 println!("Event: {}", value["action"]);
286///                 if let Some(price) = value.get("price") {
287///                     println!("  Purchase amount: {}", price);
288///                 }
289///             }
290///             Err(e) => eprintln!("Failed to parse JSON: {}", e),
291///         }
292///     }
293///     
294///     Ok(())
295/// }
296/// ```
297///
298/// ## Processing Mixed JSON Structures
299///
300/// ```ignore
301/// use async_jsonl::{Jsonl, JsonlValueDeserialize};
302/// use futures::StreamExt;
303/// use serde_json::Value;
304/// use std::io::Cursor;
305///
306/// #[tokio::main]
307/// async fn main() -> anyhow::Result<()> {
308///     let mixed_data = r#"{"type": "user", "name": "Alice", "age": 30}
309/// {"type": "product", "name": "Widget", "price": 19.99, "categories": ["tools", "hardware"]}
310/// {"type": "event", "name": "click", "target": "button", "metadata": {"page": "/home"}}"#;
311///     
312///     let reader = Jsonl::new(Cursor::new(mixed_data.as_bytes()));
313///     let values: Vec<Value> = reader
314///         .deserialize_values()
315///         .collect::<Vec<_>>()
316///         .await
317///         .into_iter()
318///         .collect::<Result<Vec<_>, _>>()?;
319///     
320///     for value in values {
321///         match value["type"].as_str() {
322///             Some("user") => println!("User: {} (age {})", value["name"], value["age"]),
323///             Some("product") => println!("Product: {} - ${}", value["name"], value["price"]),
324///             Some("event") => println!("Event: {} on {}", value["name"], value["target"]),
325///             _ => println!("Unknown type: {:?}", value),
326///         }
327///     }
328///     
329///     Ok(())
330/// }
331/// ```
332///
333/// ## Error Handling with Invalid JSON
334///
335/// ```ignore
336/// use async_jsonl::{Jsonl, JsonlValueDeserialize};
337/// use futures::StreamExt;
338/// use std::io::Cursor;
339///
340/// #[tokio::main]
341/// async fn main() -> anyhow::Result<()> {
342///     let data_with_errors = r#"{"valid": "json"}
343/// {invalid json line
344/// {"another": "valid line"}"#;
345///     
346///     let reader = Jsonl::new(Cursor::new(data_with_errors.as_bytes()));
347///     let mut value_stream = reader.deserialize_values();
348///     
349///     let mut valid_count = 0;
350///     let mut error_count = 0;
351///     
352///     while let Some(result) = value_stream.next().await {
353///         match result {
354///             Ok(_) => valid_count += 1,
355///             Err(_) => error_count += 1,
356///         }
357///     }
358///     
359///     println!("Valid JSON lines: {}, Errors: {}", valid_count, error_count);
360///     Ok(())
361/// }
362/// ```
363pub trait JsonlValueDeserialize {
364    /// Deserialize JSON lines into `serde_json::Value` objects.
365    ///
366    /// This method transforms each line of a JSONL stream into `serde_json::Value` objects,
367    /// which can represent any valid JSON structure. This is useful when:
368    ///
369    /// - You don't know the exact structure of the JSON data ahead of time
370    /// - You're working with heterogeneous JSON objects in the same file
371    /// - You want to inspect or transform JSON data dynamically
372    /// - You need to handle mixed or evolving JSON schemas
373    ///
374    /// # Returns
375    ///
376    /// Returns a `Stream` of `anyhow::Result<Value>` where:
377    /// - `Ok(Value)` represents a successfully parsed JSON value
378    /// - `Err(anyhow::Error)` represents parsing errors for invalid JSON lines
379    ///
380    /// # Examples
381    ///
382    /// ```ignore
383    /// use async_jsonl::{Jsonl, JsonlValueDeserialize};
384    /// use futures::StreamExt;
385    /// use std::io::Cursor;
386    ///
387    /// #[tokio::main]
388    /// async fn main() -> anyhow::Result<()> {
389    ///     let data = r#"{"id": 1, "data": {"nested": [1, 2, 3]}}
390    /// {"id": 2, "data": {"different": "structure"}}"#;
391    ///     
392    ///     let reader = Jsonl::new(Cursor::new(data.as_bytes()));
393    ///     let values: Vec<_> = reader
394    ///         .deserialize_values()
395    ///         .collect()
396    ///         .await;
397    ///     
398    ///     for (i, result) in values.iter().enumerate() {
399    ///         match result {
400    ///             Ok(value) => println!("Object {}: ID = {}", i + 1, value["id"]),
401    ///             Err(e) => eprintln!("Error parsing object {}: {}", i + 1, e),
402    ///         }
403    ///     }
404    ///     
405    ///     Ok(())
406    /// }
407    /// ```
408    fn deserialize_values(self) -> impl Stream<Item = anyhow::Result<Value>>;
409}