Skip to main content

quicknode_cascade/plugins/
ndjson.rs

1//! NDJSON file plugin — writes structured data to newline-delimited JSON files.
2//!
3//! Creates one file per record type: blocks.ndjson, transactions.ndjson,
4//! token_transfers.ndjson, account_activity.ndjson. Append-only, safe for resume.
5
6use std::io::Write;
7use std::path::PathBuf;
8use std::sync::Mutex;
9
10use crate::solana::{
11    AccountActivityData, BlockData, Plugin, PluginFuture, TokenTransferData, TransactionData,
12};
13
14/// Built-in plugin that writes structured data to NDJSON files.
15///
16/// Creates one file per record type in the specified directory:
17/// `blocks.ndjson`, `transactions.ndjson`, `token_transfers.ndjson`, `account_activity.ndjson`.
18/// Files are append-only and safe for resume (re-runs append duplicates).
19pub struct NdjsonPlugin {
20    dir: PathBuf,
21    blocks: Mutex<Option<std::io::BufWriter<std::fs::File>>>,
22    transactions: Mutex<Option<std::io::BufWriter<std::fs::File>>>,
23    token_transfers: Mutex<Option<std::io::BufWriter<std::fs::File>>>,
24    account_activity: Mutex<Option<std::io::BufWriter<std::fs::File>>>,
25}
26
27impl NdjsonPlugin {
28    /// Create a new NDJSON plugin that writes to the specified directory.
29    /// The directory is created on `on_load` if it doesn't exist.
30    pub fn new(dir: &str) -> Self {
31        Self {
32            dir: PathBuf::from(dir),
33            blocks: Mutex::new(None),
34            transactions: Mutex::new(None),
35            token_transfers: Mutex::new(None),
36            account_activity: Mutex::new(None),
37        }
38    }
39
40    fn open(&self, name: &str) -> std::io::Result<std::io::BufWriter<std::fs::File>> {
41        let path = self.dir.join(format!("{}.ndjson", name));
42        let file = std::fs::OpenOptions::new()
43            .create(true)
44            .append(true)
45            .open(path)?;
46        Ok(std::io::BufWriter::new(file))
47    }
48}
49
50impl Plugin for NdjsonPlugin {
51    fn name(&self) -> &'static str {
52        "ndjson"
53    }
54
55    fn on_load<'a>(&'a self) -> PluginFuture<'a> {
56        Box::pin(async move {
57            std::fs::create_dir_all(&self.dir)
58                .map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)?;
59            *self.blocks.lock().unwrap() = Some(
60                self.open("blocks")
61                    .map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)?,
62            );
63            *self.transactions.lock().unwrap() = Some(
64                self.open("transactions")
65                    .map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)?,
66            );
67            *self.token_transfers.lock().unwrap() = Some(
68                self.open("token_transfers")
69                    .map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)?,
70            );
71            *self.account_activity.lock().unwrap() = Some(
72                self.open("account_activity")
73                    .map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)?,
74            );
75            tracing::info!("NDJSON plugin ready: {}", self.dir.display());
76            Ok(())
77        })
78    }
79
80    fn on_block<'a>(&'a self, block: &'a BlockData) -> PluginFuture<'a> {
81        Box::pin(async move {
82            let mut f = self.blocks.lock().unwrap();
83            if let Some(ref mut w) = *f {
84                serde_json::to_writer(&mut *w, block)
85                    .map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)?;
86                writeln!(w)
87                    .map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)?;
88            }
89            Ok(())
90        })
91    }
92
93    fn on_transaction<'a>(&'a self, tx: &'a TransactionData) -> PluginFuture<'a> {
94        Box::pin(async move {
95            let mut f = self.transactions.lock().unwrap();
96            if let Some(ref mut w) = *f {
97                serde_json::to_writer(&mut *w, tx)
98                    .map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)?;
99                writeln!(w)
100                    .map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)?;
101            }
102            Ok(())
103        })
104    }
105
106    fn on_token_transfer<'a>(&'a self, transfer: &'a TokenTransferData) -> PluginFuture<'a> {
107        Box::pin(async move {
108            let mut f = self.token_transfers.lock().unwrap();
109            if let Some(ref mut w) = *f {
110                serde_json::to_writer(&mut *w, transfer)
111                    .map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)?;
112                writeln!(w)
113                    .map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)?;
114            }
115            Ok(())
116        })
117    }
118
119    fn on_account_activity<'a>(&'a self, activity: &'a AccountActivityData) -> PluginFuture<'a> {
120        Box::pin(async move {
121            let mut f = self.account_activity.lock().unwrap();
122            if let Some(ref mut w) = *f {
123                serde_json::to_writer(&mut *w, activity)
124                    .map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)?;
125                writeln!(w)
126                    .map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)?;
127            }
128            Ok(())
129        })
130    }
131
132    fn on_exit<'a>(&'a self) -> PluginFuture<'a> {
133        Box::pin(async move {
134            for file_mutex in [
135                &self.blocks,
136                &self.transactions,
137                &self.token_transfers,
138                &self.account_activity,
139            ] {
140                let mut f = file_mutex.lock().unwrap();
141                if let Some(ref mut w) = *f {
142                    w.flush()
143                        .map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)?;
144                }
145            }
146            Ok(())
147        })
148    }
149}