quicknode_cascade/runner.rs
1//! CascadeRunner — the builder-pattern engine that wires everything together.
2//!
3//! Chain-agnostic runner that handles parallel fetching, retry-forever logic,
4//! plugin dispatch, cursor management, and graceful shutdown.
5
6use anyhow::Result;
7use std::sync::atomic::{AtomicBool, Ordering};
8use std::sync::Arc;
9
10use crate::solana::Plugin;
11use crate::source;
12
13enum RunMode {
14 Backfill { start: u64, end: u64 },
15 Live { start_slot: Option<u64> },
16}
17
18/// Builder-pattern runner for the streaming engine.
19///
20/// Handles all parallel fetching, retry-forever logic, plugin dispatch,
21/// cursor management, and graceful shutdown.
22///
23/// # Example
24///
25/// ```rust,no_run
26/// use quicknode_cascade::{CascadeRunner, solana};
27///
28/// struct Logger;
29/// impl solana::Plugin for Logger {
30/// fn name(&self) -> &'static str { "logger" }
31/// fn on_block<'a>(&'a self, block: &'a solana::BlockData) -> solana::PluginFuture<'a> {
32/// Box::pin(async move {
33/// println!("slot {}", block.slot);
34/// Ok(())
35/// })
36/// }
37/// }
38///
39/// CascadeRunner::solana_mainnet()
40/// .auth_token("your-jwt-token")
41/// .backfill(400_000_000, 400_000_010)
42/// .with_plugin(Box::new(Logger))
43/// .run()
44/// .expect("done");
45/// ```
46pub struct CascadeRunner {
47 source: String,
48 auth_token: Option<String>,
49 plugins: Vec<Box<dyn Plugin>>,
50 concurrency: usize,
51 encoding: String,
52 mode: RunMode,
53 cursor_file: String,
54 tip_buffer: u64,
55}
56
57impl CascadeRunner {
58 /// Create a runner for Solana mainnet via QuickNode Cascade.
59 pub fn solana_mainnet() -> Self {
60 Self::chain("solana-mainnet")
61 }
62
63 /// Create a runner for Solana devnet via QuickNode Cascade.
64 pub fn solana_devnet() -> Self {
65 Self::chain("solana-devnet")
66 }
67
68 /// Create a runner for a named chain.
69 ///
70 /// The chain name maps to the Cascade endpoint:
71 /// `https://{chain}-cascade.quiknode.io`
72 ///
73 /// # Examples
74 ///
75 /// ```rust
76 /// use quicknode_cascade::CascadeRunner;
77 ///
78 /// // These are equivalent:
79 /// let a = CascadeRunner::solana_mainnet();
80 /// let b = CascadeRunner::chain("solana-mainnet");
81 /// ```
82 pub fn chain(name: &str) -> Self {
83 let source = format!("https://{}-cascade.quiknode.io", name);
84 Self {
85 source,
86 auth_token: None,
87 plugins: Vec::new(),
88 concurrency: 10,
89 encoding: "json".to_string(),
90 mode: RunMode::Live { start_slot: None },
91 cursor_file: "cursor.json".to_string(),
92 tip_buffer: 100,
93 }
94 }
95
96 /// Set the authentication token (JWT) for the Cascade endpoint.
97 ///
98 /// The token is sent as `Authorization: Bearer <token>` on every request.
99 pub fn auth_token(mut self, token: &str) -> Self {
100 self.auth_token = Some(token.to_string());
101 self
102 }
103
104 /// Override the source URL directly (for custom RPC endpoints).
105 ///
106 /// Use this when pointing at your own Solana validator or a non-Cascade RPC.
107 pub fn source_url(mut self, url: &str) -> Self {
108 self.source = url.to_string();
109 self
110 }
111
112 /// Register a plugin. Multiple plugins can be registered; each sees all events.
113 pub fn with_plugin(mut self, plugin: Box<dyn Plugin>) -> Self {
114 self.plugins.push(plugin);
115 self
116 }
117
118 /// Set parallel fetch concurrency (default: 10).
119 pub fn concurrency(mut self, n: usize) -> Self {
120 self.concurrency = n;
121 self
122 }
123
124 /// Set the encoding for fetching blocks (default: "json").
125 ///
126 /// In `"json"` mode, the runner extracts structured data and calls
127 /// `on_block`/`on_transaction`/`on_token_transfer`/`on_account_activity`.
128 ///
129 /// In any other mode (`"jsonParsed"`, `"base64"`, etc.), it calls
130 /// `on_raw_block` with the full JSON-RPC response.
131 pub fn encoding(mut self, encoding: &str) -> Self {
132 self.encoding = encoding.to_string();
133 self
134 }
135
136 /// Set the runner to backfill a specific slot range.
137 pub fn backfill(mut self, start: u64, end: u64) -> Self {
138 self.mode = RunMode::Backfill { start, end };
139 self
140 }
141
142 /// Set the runner to follow the chain tip in real-time.
143 pub fn live(mut self) -> Self {
144 self.mode = RunMode::Live { start_slot: None };
145 self
146 }
147
148 /// Set the runner to follow the chain tip, starting from a specific slot.
149 pub fn live_from(mut self, slot: u64) -> Self {
150 self.mode = RunMode::Live {
151 start_slot: Some(slot),
152 };
153 self
154 }
155
156 /// Set the cursor file path for resume support (default: "cursor.json").
157 pub fn cursor_file(mut self, path: &str) -> Self {
158 self.cursor_file = path.to_string();
159 self
160 }
161
162 /// Set the tip buffer for live mode (default: 100 slots).
163 ///
164 /// Keeps the client N slots behind the chain tip to ensure data
165 /// availability on Cascade. Set to 0 for direct RPC endpoints.
166 pub fn tip_buffer(mut self, slots: u64) -> Self {
167 self.tip_buffer = slots;
168 self
169 }
170
171 /// Run the engine. Blocks until completion (backfill) or shutdown signal (live).
172 ///
173 /// Creates a tokio runtime internally. If called from within an async
174 /// context, use [`run_async`](Self::run_async) instead.
175 pub fn run(self) -> Result<()> {
176 let rt = tokio::runtime::Builder::new_multi_thread()
177 .enable_all()
178 .build()?;
179 rt.block_on(self.run_async())
180 }
181
182 /// Async version of [`run`](Self::run).
183 ///
184 /// Use this from within an existing tokio runtime.
185 pub async fn run_async(self) -> Result<()> {
186 if self.plugins.is_empty() {
187 return Err(anyhow::anyhow!(
188 "No plugins registered. Call .with_plugin() before .run()"
189 ));
190 }
191
192 let plugins: Vec<Arc<dyn Plugin>> =
193 self.plugins.into_iter().map(Arc::from).collect();
194 let plugins = Arc::new(plugins);
195
196 let shutdown = Arc::new(AtomicBool::new(false));
197 let shutdown_clone = shutdown.clone();
198 tokio::spawn(async move {
199 tokio::signal::ctrl_c().await.ok();
200 tracing::info!("Shutdown signal received");
201 shutdown_clone.store(true, Ordering::SeqCst);
202 });
203
204 for plugin in plugins.iter() {
205 tracing::info!("Loading plugin: {}", plugin.name());
206 if let Err(e) = plugin.on_load().await {
207 return Err(anyhow::anyhow!(
208 "Plugin '{}' on_load failed: {}",
209 plugin.name(),
210 e
211 ));
212 }
213 }
214
215 let result = match self.mode {
216 RunMode::Backfill { start, end } => {
217 source::backfill::run_backfill(
218 &self.source,
219 self.auth_token.as_deref(),
220 plugins.clone(),
221 &self.encoding,
222 start,
223 end,
224 self.concurrency,
225 &self.cursor_file,
226 shutdown,
227 )
228 .await
229 }
230 RunMode::Live { start_slot } => {
231 source::live::run_live(
232 &self.source,
233 self.auth_token.as_deref(),
234 plugins.clone(),
235 &self.encoding,
236 start_slot,
237 &self.cursor_file,
238 shutdown,
239 self.tip_buffer,
240 self.concurrency,
241 )
242 .await
243 }
244 };
245
246 for plugin in plugins.iter() {
247 if let Err(e) = plugin.on_exit().await {
248 tracing::warn!("Plugin '{}' on_exit error: {}", plugin.name(), e);
249 }
250 }
251
252 result
253 }
254}