async_jsonl/async_jsonl.rs
1use futures::Stream;
2use serde::Deserialize;
3use serde_json::Value;
4use tokio::io::{BufReader, Lines};
5
6/// Iterator to read JSONL file as raw JSON strings
7pub struct Jsonl<R> {
8 pub(crate) lines: Lines<BufReader<R>>,
9}
10
11/// Main trait for reading JSONL (JSON Lines) files with async capabilities.
12///
13/// This trait provides methods to read and process JSONL files asynchronously.
14/// It combines streaming capabilities with deserialization and line selection methods.
15/// The trait is implemented by `Jsonl<R>` where `R` implements `AsyncRead + AsyncSeek`.
16///
17/// # Examples
18///
19/// ## Reading from a file and getting first n lines
20///
21/// ```ignore
22/// use async_jsonl::{Jsonl, JsonlReader, JsonlDeserialize};
23/// use futures::StreamExt;
24/// use serde::Deserialize;
25///
26/// #[derive(Deserialize, Debug)]
27/// struct Person {
28/// name: String,
29/// age: u32,
30/// }
31///
32/// #[tokio::main]
33/// async fn main() -> anyhow::Result<()> {
34/// let reader = Jsonl::from_path("people.jsonl").await?;
35///
36/// // Get first 5 lines and deserialize directly
37/// let first_five = reader.first_n(5).await?;
38/// let mut stream = first_five.deserialize::<Person>();
39///
40/// while let Some(result) = stream.next().await {
41/// match result {
42/// Ok(person) => println!("Found person: {:?}", person),
43/// Err(e) => eprintln!("Error parsing line: {}", e),
44/// }
45/// }
46///
47/// Ok(())
48/// }
49/// ```
50///
51/// ## Reading last n lines (tail-like functionality)
52///
53/// ```ignore
54/// use async_jsonl::{Jsonl, JsonlReader};
55/// use futures::StreamExt;
56///
57/// #[tokio::main]
58/// async fn main() -> anyhow::Result<()> {
59/// let reader = Jsonl::from_path("log.jsonl").await?;
60///
61/// // Get last 10 lines (like tail)
62/// let last_ten = reader.last_n(10).await?;
63///
64/// let lines: Vec<String> = last_ten
65/// .collect::<Vec<_>>()
66/// .await
67/// .into_iter()
68/// .collect::<Result<Vec<_>, _>>()?;
69///
70/// for line in lines {
71/// println!("{}", line);
72/// }
73///
74/// Ok(())
75/// }
76/// ```
77#[async_trait::async_trait]
78pub trait JsonlReader: JsonlDeserialize + JsonlValueDeserialize + Stream + Send + Sync {
79 /// Stream type for the first n lines
80 type NLines: Stream<Item = anyhow::Result<String>>;
81 /// Stream type for the last n lines (in reverse order)
82 type NLinesRev: Stream<Item = anyhow::Result<String>>;
83
84 /// Get the first `n` lines from the JSONL stream.
85 ///
86 /// # Arguments
87 ///
88 /// * `n` - The number of lines to retrieve from the beginning
89 ///
90 /// # Returns
91 ///
92 /// Returns a stream of the first `n` lines as `String`s, or an error if reading fails.
93 ///
94 /// # Examples
95 ///
96 /// ```ignore
97 /// use async_jsonl::{Jsonl, JsonlReader, JsonlDeserialize};
98 /// use futures::StreamExt;
99 /// use serde::Deserialize;
100 ///
101 /// #[derive(Deserialize, Debug)]
102 /// struct LogEntry {
103 /// timestamp: String,
104 /// level: String,
105 /// message: String,
106 /// }
107 ///
108 /// #[tokio::main]
109 /// async fn main() -> anyhow::Result<()> {
110 /// let reader = Jsonl::from_path("data.jsonl").await?;
111 ///
112 /// // Get first 3 lines and deserialize them
113 /// let first_three = reader.first_n(3).await?;
114 /// let entries: Vec<LogEntry> = first_three
115 /// .deserialize::<LogEntry>()
116 /// .collect::<Vec<_>>()
117 /// .await
118 /// .into_iter()
119 /// .collect::<Result<Vec<_>, _>>()?;
120 ///
121 /// println!("First 3 log entries: {:?}", entries);
122 /// Ok(())
123 /// }
124 /// ```
125 async fn first_n(self, n: usize) -> anyhow::Result<Self::NLines>;
126
127 /// Get the last `n` lines from the JSONL stream.
128 ///
129 /// # Arguments
130 ///
131 /// * `n` - The number of lines to retrieve from the end
132 ///
133 /// # Returns
134 ///
135 /// Returns a stream of the last `n` lines as `String`s in reverse order,
136 /// or an error if reading fails.
137 ///
138 /// # Examples
139 ///
140 /// ```ignore
141 /// use async_jsonl::{Jsonl, JsonlReader};
142 /// use futures::StreamExt;
143 ///
144 /// #[tokio::main]
145 /// async fn main() -> anyhow::Result<()> {
146 /// let reader = Jsonl::from_path("data.jsonl").await?;
147 ///
148 /// let last_two = reader.last_n(2).await?;
149 /// let mut stream = last_two;
150 ///
151 /// while let Some(result) = stream.next().await {
152 /// match result {
153 /// Ok(line) => println!("Line: {}", line),
154 /// Err(e) => eprintln!("Error: {}", e),
155 /// }
156 /// }
157 ///
158 /// Ok(())
159 /// }
160 /// ```
161 async fn last_n(self, n: usize) -> anyhow::Result<Self::NLinesRev>;
162
163 /// Count the total number of lines in the JSONL stream.
164 async fn count(self) -> usize;
165}
166
167/// Extension trait to add deserialization capabilities to JSONL readers.
168///
169/// This trait provides methods to deserialize JSON lines into strongly-typed Rust structures.
170/// It works with any type that implements `serde::Deserialize` and processes each line
171/// of a JSONL file as a separate JSON object.
172///
173/// # Examples
174///
175/// ## Basic Usage with Custom Types
176///
177/// ```ignore
178/// use async_jsonl::{Jsonl, JsonlDeserialize};
179/// use futures::StreamExt;
180/// use serde::Deserialize;
181/// use std::io::Cursor;
182///
183/// #[derive(Deserialize, Debug, PartialEq)]
184/// struct User {
185/// id: u64,
186/// name: String,
187/// email: String,
188/// }
189///
190/// #[tokio::main]
191/// async fn main() -> anyhow::Result<()> {
192/// let data = r#"{"id": 1, "name": "Alice", "email": "alice@example.com"}
193/// {"id": 2, "name": "Bob", "email": "bob@example.com"}"#;
194/// let reader = Jsonl::new(Cursor::new(data.as_bytes()));
195///
196/// let mut user_stream = reader.deserialize::<User>();
197///
198/// while let Some(result) = user_stream.next().await {
199/// match result {
200/// Ok(user) => println!("User: {} ({})", user.name, user.email),
201/// Err(e) => eprintln!("Failed to parse user: {}", e),
202/// }
203/// }
204///
205/// Ok(())
206/// }
207/// ```
208///
209/// ## Error Handling and Filtering
210///
211/// ```ignore
212/// use async_jsonl::{Jsonl, JsonlDeserialize};
213/// use futures::StreamExt;
214/// use serde::Deserialize;
215/// use std::io::Cursor;
216///
217/// #[derive(Deserialize, Debug)]
218/// struct Product {
219/// name: String,
220/// price: f64,
221/// #[serde(default)]
222/// in_stock: bool,
223/// }
224///
225/// #[tokio::main]
226/// async fn main() -> anyhow::Result<()> {
227/// let data = r#"{"name": "Widget A", "price": 19.99, "in_stock": true}
228/// {"name": "Widget B", "price": 29.99, "in_stock": false}"#;
229/// let reader = Jsonl::new(Cursor::new(data.as_bytes()));
230///
231/// // Filter only successful deserializations and in-stock products
232/// let in_stock_products: Vec<Product> = reader
233/// .deserialize::<Product>()
234/// .filter_map(|result| async move {
235/// match result {
236/// Ok(product) if product.in_stock => Some(product),
237/// Ok(_) => None, // Out of stock
238/// Err(e) => {
239/// eprintln!("Skipping invalid product: {}", e);
240/// None
241/// }
242/// }
243/// })
244/// .collect()
245/// .await;
246///
247/// println!("Found {} products in stock", in_stock_products.len());
248/// Ok(())
249/// }
250/// ```
251pub trait JsonlDeserialize {
252 /// Deserialize JSON lines into the specified type
253 fn deserialize<T>(self) -> impl Stream<Item = anyhow::Result<T>>
254 where
255 T: for<'a> Deserialize<'a>;
256}
257
258/// Extension trait specifically for deserializing JSONL to `serde_json::Value` objects.
259///
260/// This trait provides a convenient method to deserialize JSON lines into generic
261/// `serde_json::Value` objects when you don't know the exact structure of the JSON
262/// data ahead of time or when working with heterogeneous JSON objects.
263///
264/// # Examples
265///
266/// ## Basic Usage with Dynamic JSON
267///
268/// ```ignore
269/// use async_jsonl::{Jsonl, JsonlValueDeserialize};
270/// use futures::StreamExt;
271/// use std::io::Cursor;
272///
273/// #[tokio::main]
274/// async fn main() -> anyhow::Result<()> {
275/// let data = r#"{"user_id": 123, "action": "login", "timestamp": "2024-01-01T10:00:00Z"}
276/// {"user_id": 456, "action": "logout", "timestamp": "2024-01-01T11:00:00Z"}
277/// {"user_id": 789, "action": "purchase", "item": "widget", "price": 29.99}"#;
278///
279/// let reader = Jsonl::new(Cursor::new(data.as_bytes()));
280/// let mut value_stream = reader.deserialize_values();
281///
282/// while let Some(result) = value_stream.next().await {
283/// match result {
284/// Ok(value) => {
285/// println!("Event: {}", value["action"]);
286/// if let Some(price) = value.get("price") {
287/// println!(" Purchase amount: {}", price);
288/// }
289/// }
290/// Err(e) => eprintln!("Failed to parse JSON: {}", e),
291/// }
292/// }
293///
294/// Ok(())
295/// }
296/// ```
297///
298/// ## Processing Mixed JSON Structures
299///
300/// ```ignore
301/// use async_jsonl::{Jsonl, JsonlValueDeserialize};
302/// use futures::StreamExt;
303/// use serde_json::Value;
304/// use std::io::Cursor;
305///
306/// #[tokio::main]
307/// async fn main() -> anyhow::Result<()> {
308/// let mixed_data = r#"{"type": "user", "name": "Alice", "age": 30}
309/// {"type": "product", "name": "Widget", "price": 19.99, "categories": ["tools", "hardware"]}
310/// {"type": "event", "name": "click", "target": "button", "metadata": {"page": "/home"}}"#;
311///
312/// let reader = Jsonl::new(Cursor::new(mixed_data.as_bytes()));
313/// let values: Vec<Value> = reader
314/// .deserialize_values()
315/// .collect::<Vec<_>>()
316/// .await
317/// .into_iter()
318/// .collect::<Result<Vec<_>, _>>()?;
319///
320/// for value in values {
321/// match value["type"].as_str() {
322/// Some("user") => println!("User: {} (age {})", value["name"], value["age"]),
323/// Some("product") => println!("Product: {} - ${}", value["name"], value["price"]),
324/// Some("event") => println!("Event: {} on {}", value["name"], value["target"]),
325/// _ => println!("Unknown type: {:?}", value),
326/// }
327/// }
328///
329/// Ok(())
330/// }
331/// ```
332///
333/// ## Error Handling with Invalid JSON
334///
335/// ```ignore
336/// use async_jsonl::{Jsonl, JsonlValueDeserialize};
337/// use futures::StreamExt;
338/// use std::io::Cursor;
339///
340/// #[tokio::main]
341/// async fn main() -> anyhow::Result<()> {
342/// let data_with_errors = r#"{"valid": "json"}
343/// {invalid json line
344/// {"another": "valid line"}"#;
345///
346/// let reader = Jsonl::new(Cursor::new(data_with_errors.as_bytes()));
347/// let mut value_stream = reader.deserialize_values();
348///
349/// let mut valid_count = 0;
350/// let mut error_count = 0;
351///
352/// while let Some(result) = value_stream.next().await {
353/// match result {
354/// Ok(_) => valid_count += 1,
355/// Err(_) => error_count += 1,
356/// }
357/// }
358///
359/// println!("Valid JSON lines: {}, Errors: {}", valid_count, error_count);
360/// Ok(())
361/// }
362/// ```
363pub trait JsonlValueDeserialize {
364 /// Deserialize JSON lines into `serde_json::Value` objects.
365 ///
366 /// This method transforms each line of a JSONL stream into `serde_json::Value` objects,
367 /// which can represent any valid JSON structure. This is useful when:
368 ///
369 /// - You don't know the exact structure of the JSON data ahead of time
370 /// - You're working with heterogeneous JSON objects in the same file
371 /// - You want to inspect or transform JSON data dynamically
372 /// - You need to handle mixed or evolving JSON schemas
373 ///
374 /// # Returns
375 ///
376 /// Returns a `Stream` of `anyhow::Result<Value>` where:
377 /// - `Ok(Value)` represents a successfully parsed JSON value
378 /// - `Err(anyhow::Error)` represents parsing errors for invalid JSON lines
379 ///
380 /// # Examples
381 ///
382 /// ```ignore
383 /// use async_jsonl::{Jsonl, JsonlValueDeserialize};
384 /// use futures::StreamExt;
385 /// use std::io::Cursor;
386 ///
387 /// #[tokio::main]
388 /// async fn main() -> anyhow::Result<()> {
389 /// let data = r#"{"id": 1, "data": {"nested": [1, 2, 3]}}
390 /// {"id": 2, "data": {"different": "structure"}}"#;
391 ///
392 /// let reader = Jsonl::new(Cursor::new(data.as_bytes()));
393 /// let values: Vec<_> = reader
394 /// .deserialize_values()
395 /// .collect()
396 /// .await;
397 ///
398 /// for (i, result) in values.iter().enumerate() {
399 /// match result {
400 /// Ok(value) => println!("Object {}: ID = {}", i + 1, value["id"]),
401 /// Err(e) => eprintln!("Error parsing object {}: {}", i + 1, e),
402 /// }
403 /// }
404 ///
405 /// Ok(())
406 /// }
407 /// ```
408 fn deserialize_values(self) -> impl Stream<Item = anyhow::Result<Value>>;
409}