JsonlReader

Trait JsonlReader 

Source
pub trait JsonlReader:
    JsonlDeserialize
    + JsonlValueDeserialize
    + Stream
    + Send
    + Sync {
    type NLines: Stream<Item = Result<String>>;
    type NLinesRev: Stream<Item = Result<String>>;

    // Required methods
    fn first_n<'async_trait>(
        self,
        n: usize,
    ) -> Pin<Box<dyn Future<Output = Result<Self::NLines>> + Send + 'async_trait>>
       where Self: 'async_trait;
    fn last_n<'async_trait>(
        self,
        n: usize,
    ) -> Pin<Box<dyn Future<Output = Result<Self::NLinesRev>> + Send + 'async_trait>>
       where Self: 'async_trait;
    fn count<'async_trait>(
        self,
    ) -> Pin<Box<dyn Future<Output = usize> + Send + 'async_trait>>
       where Self: 'async_trait;
}
Expand description

Main trait for reading JSONL (JSON Lines) files with async capabilities.

This trait provides methods to read and process JSONL files asynchronously. It combines streaming capabilities with deserialization and line selection methods. The trait is implemented by Jsonl<R> where R implements AsyncRead + AsyncSeek.

§Examples

§Reading from a file and getting first n lines

use async_jsonl::{Jsonl, JsonlReader, JsonlDeserialize};
use futures::StreamExt;
use serde::Deserialize;

#[derive(Deserialize, Debug)]
struct Person {
    name: String,
    age: u32,
}

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    let reader = Jsonl::from_path("people.jsonl").await?;
     
    // Get first 5 lines and deserialize directly
    let first_five = reader.first_n(5).await?;
    let mut stream = first_five.deserialize::<Person>();
     
    while let Some(result) = stream.next().await {
        match result {
            Ok(person) => println!("Found person: {:?}", person),
            Err(e) => eprintln!("Error parsing line: {}", e),
        }
    }
     
    Ok(())
}

§Reading last n lines (tail-like functionality)

use async_jsonl::{Jsonl, JsonlReader};
use futures::StreamExt;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    let reader = Jsonl::from_path("log.jsonl").await?;
     
    // Get last 10 lines (like tail)
    let last_ten = reader.last_n(10).await?;
     
    let lines: Vec<String> = last_ten
        .collect::<Vec<_>>()
        .await
        .into_iter()
        .collect::<Result<Vec<_>, _>>()?;
     
    for line in lines {
        println!("{}", line);
    }
     
    Ok(())
}

Required Associated Types§

Source

type NLines: Stream<Item = Result<String>>

Stream type for the first n lines

Source

type NLinesRev: Stream<Item = Result<String>>

Stream type for the last n lines (in reverse order)

Required Methods§

Source

fn first_n<'async_trait>( self, n: usize, ) -> Pin<Box<dyn Future<Output = Result<Self::NLines>> + Send + 'async_trait>>
where Self: 'async_trait,

Get the first n lines from the JSONL stream.

§Arguments
  • n - The number of lines to retrieve from the beginning
§Returns

Returns a stream of the first n lines as Strings, or an error if reading fails.

§Examples
use async_jsonl::{Jsonl, JsonlReader, JsonlDeserialize};
use futures::StreamExt;
use serde::Deserialize;

#[derive(Deserialize, Debug)]
struct LogEntry {
    timestamp: String,
    level: String,
    message: String,
}

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    let reader = Jsonl::from_path("data.jsonl").await?;
     
    // Get first 3 lines and deserialize them
    let first_three = reader.first_n(3).await?;
    let entries: Vec<LogEntry> = first_three
        .deserialize::<LogEntry>()
        .collect::<Vec<_>>()
        .await
        .into_iter()
        .collect::<Result<Vec<_>, _>>()?;
     
    println!("First 3 log entries: {:?}", entries);
    Ok(())
}
Source

fn last_n<'async_trait>( self, n: usize, ) -> Pin<Box<dyn Future<Output = Result<Self::NLinesRev>> + Send + 'async_trait>>
where Self: 'async_trait,

Get the last n lines from the JSONL stream.

§Arguments
  • n - The number of lines to retrieve from the end
§Returns

Returns a stream of the last n lines as Strings in reverse order, or an error if reading fails.

§Examples
use async_jsonl::{Jsonl, JsonlReader};
use futures::StreamExt;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    let reader = Jsonl::from_path("data.jsonl").await?;
     
    let last_two = reader.last_n(2).await?;
    let mut stream = last_two;
     
    while let Some(result) = stream.next().await {
        match result {
            Ok(line) => println!("Line: {}", line),
            Err(e) => eprintln!("Error: {}", e),
        }
    }
     
    Ok(())
}
Source

fn count<'async_trait>( self, ) -> Pin<Box<dyn Future<Output = usize> + Send + 'async_trait>>
where Self: 'async_trait,

Count the total number of lines in the JSONL stream.

Dyn Compatibility§

This trait is not dyn compatible.

In older versions of Rust, dyn compatibility was called "object safety", so this trait is not object safe.

Implementors§

Source§

impl<R: AsyncRead + AsyncSeek + Unpin + Sync + Send> JsonlReader for Jsonl<R>

Source§

type NLines = TakeNLines<R>

Source§

type NLinesRev = TakeNLinesReverse