quicknode_cascade/plugins/
ndjson.rs1use std::io::Write;
7use std::path::PathBuf;
8use std::sync::Mutex;
9
10use crate::solana::{
11 AccountActivityData, BlockData, Plugin, PluginFuture, TokenTransferData, TransactionData,
12};
13
14pub struct NdjsonPlugin {
20 dir: PathBuf,
21 blocks: Mutex<Option<std::io::BufWriter<std::fs::File>>>,
22 transactions: Mutex<Option<std::io::BufWriter<std::fs::File>>>,
23 token_transfers: Mutex<Option<std::io::BufWriter<std::fs::File>>>,
24 account_activity: Mutex<Option<std::io::BufWriter<std::fs::File>>>,
25}
26
27impl NdjsonPlugin {
28 pub fn new(dir: &str) -> Self {
31 Self {
32 dir: PathBuf::from(dir),
33 blocks: Mutex::new(None),
34 transactions: Mutex::new(None),
35 token_transfers: Mutex::new(None),
36 account_activity: Mutex::new(None),
37 }
38 }
39
40 fn open(&self, name: &str) -> std::io::Result<std::io::BufWriter<std::fs::File>> {
41 let path = self.dir.join(format!("{}.ndjson", name));
42 let file = std::fs::OpenOptions::new()
43 .create(true)
44 .append(true)
45 .open(path)?;
46 Ok(std::io::BufWriter::new(file))
47 }
48}
49
50impl Plugin for NdjsonPlugin {
51 fn name(&self) -> &'static str {
52 "ndjson"
53 }
54
55 fn on_load<'a>(&'a self) -> PluginFuture<'a> {
56 Box::pin(async move {
57 std::fs::create_dir_all(&self.dir)
58 .map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)?;
59 *self.blocks.lock().unwrap() = Some(
60 self.open("blocks")
61 .map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)?,
62 );
63 *self.transactions.lock().unwrap() = Some(
64 self.open("transactions")
65 .map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)?,
66 );
67 *self.token_transfers.lock().unwrap() = Some(
68 self.open("token_transfers")
69 .map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)?,
70 );
71 *self.account_activity.lock().unwrap() = Some(
72 self.open("account_activity")
73 .map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)?,
74 );
75 tracing::info!("NDJSON plugin ready: {}", self.dir.display());
76 Ok(())
77 })
78 }
79
80 fn on_block<'a>(&'a self, block: &'a BlockData) -> PluginFuture<'a> {
81 Box::pin(async move {
82 let mut f = self.blocks.lock().unwrap();
83 if let Some(ref mut w) = *f {
84 serde_json::to_writer(&mut *w, block)
85 .map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)?;
86 writeln!(w)
87 .map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)?;
88 }
89 Ok(())
90 })
91 }
92
93 fn on_transaction<'a>(&'a self, tx: &'a TransactionData) -> PluginFuture<'a> {
94 Box::pin(async move {
95 let mut f = self.transactions.lock().unwrap();
96 if let Some(ref mut w) = *f {
97 serde_json::to_writer(&mut *w, tx)
98 .map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)?;
99 writeln!(w)
100 .map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)?;
101 }
102 Ok(())
103 })
104 }
105
106 fn on_token_transfer<'a>(&'a self, transfer: &'a TokenTransferData) -> PluginFuture<'a> {
107 Box::pin(async move {
108 let mut f = self.token_transfers.lock().unwrap();
109 if let Some(ref mut w) = *f {
110 serde_json::to_writer(&mut *w, transfer)
111 .map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)?;
112 writeln!(w)
113 .map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)?;
114 }
115 Ok(())
116 })
117 }
118
119 fn on_account_activity<'a>(&'a self, activity: &'a AccountActivityData) -> PluginFuture<'a> {
120 Box::pin(async move {
121 let mut f = self.account_activity.lock().unwrap();
122 if let Some(ref mut w) = *f {
123 serde_json::to_writer(&mut *w, activity)
124 .map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)?;
125 writeln!(w)
126 .map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)?;
127 }
128 Ok(())
129 })
130 }
131
132 fn on_exit<'a>(&'a self) -> PluginFuture<'a> {
133 Box::pin(async move {
134 for file_mutex in [
135 &self.blocks,
136 &self.transactions,
137 &self.token_transfers,
138 &self.account_activity,
139 ] {
140 let mut f = file_mutex.lock().unwrap();
141 if let Some(ref mut w) = *f {
142 w.flush()
143 .map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)?;
144 }
145 }
146 Ok(())
147 })
148 }
149}