quicknode_cascade/runner.rs
1//! CascadeRunner — the builder-pattern engine that wires everything together.
2//!
3//! Chain-agnostic runner that handles parallel fetching, retry-forever logic,
4//! plugin dispatch, cursor management, and graceful shutdown.
5
6use anyhow::Result;
7use std::sync::atomic::{AtomicBool, Ordering};
8use std::sync::Arc;
9
10use crate::solana::Plugin;
11use crate::source;
12
13enum RunMode {
14 Backfill { start: u64, end: u64 },
15 Live { start_slot: Option<u64> },
16}
17
18/// Builder-pattern runner for the streaming engine.
19///
20/// Handles all parallel fetching, retry-forever logic, plugin dispatch,
21/// cursor management, and graceful shutdown.
22///
23/// # Example
24///
25/// ```rust,no_run
26/// use quicknode_cascade::{CascadeRunner, solana};
27///
28/// struct Logger;
29/// impl solana::Plugin for Logger {
30/// fn name(&self) -> &'static str { "logger" }
31/// fn on_block<'a>(&'a self, block: &'a solana::BlockData) -> solana::PluginFuture<'a> {
32/// Box::pin(async move {
33/// println!("slot {}", block.slot);
34/// Ok(())
35/// })
36/// }
37/// }
38///
39/// CascadeRunner::solana_mainnet()
40/// .auth_token("your-jwt-token")
41/// .backfill(400_000_000, 400_000_010)
42/// .with_plugin(Box::new(Logger))
43/// .run()
44/// .expect("done");
45/// ```
46pub struct CascadeRunner {
47 source: String,
48 auth_token: Option<String>,
49 plugins: Vec<Box<dyn Plugin>>,
50 concurrency: usize,
51 encoding: String,
52 mode: RunMode,
53 cursor_file: String,
54 tip_buffer: u64,
55}
56
57impl CascadeRunner {
58 /// Create a runner for Solana mainnet via QuickNode Cascade.
59 pub fn solana_mainnet() -> Self {
60 Self::chain("solana-mainnet")
61 }
62
63 /// Create a runner for Solana devnet via QuickNode Cascade.
64 pub fn solana_devnet() -> Self {
65 Self::chain("solana-devnet")
66 }
67
68 /// Create a runner for a named chain.
69 ///
70 /// The chain name maps to the Cascade endpoint:
71 /// `https://{chain}-cascade.quiknode.io`
72 ///
73 /// # Examples
74 ///
75 /// ```rust
76 /// use quicknode_cascade::CascadeRunner;
77 ///
78 /// // These are equivalent:
79 /// let a = CascadeRunner::solana_mainnet();
80 /// let b = CascadeRunner::chain("solana-mainnet");
81 /// ```
82 pub fn chain(name: &str) -> Self {
83 let source = format!("https://{}-cascade.quiknode.io", name);
84 Self {
85 source,
86 auth_token: None,
87 plugins: Vec::new(),
88 concurrency: 10,
89 encoding: "json".to_string(),
90 mode: RunMode::Live { start_slot: None },
91 cursor_file: "cursor.json".to_string(),
92 tip_buffer: 100,
93 }
94 }
95
96 /// Set the authentication token (JWT) for the Cascade endpoint.
97 ///
98 /// The token is sent as `Authorization: Bearer <token>` on every request.
99 pub fn auth_token(mut self, token: &str) -> Self {
100 self.auth_token = Some(token.to_string());
101 self
102 }
103
104 /// Override the source URL directly (for custom RPC endpoints).
105 ///
106 /// Use this when pointing at your own Solana validator or a non-Cascade RPC.
107 pub fn source_url(mut self, url: &str) -> Self {
108 self.source = url.to_string();
109 self
110 }
111
112 /// Register a plugin. Multiple plugins can be registered; each sees all events.
113 pub fn with_plugin(mut self, plugin: Box<dyn Plugin>) -> Self {
114 self.plugins.push(plugin);
115 self
116 }
117
118 /// Set parallel fetch concurrency (default: 10).
119 pub fn concurrency(mut self, n: usize) -> Self {
120 self.concurrency = n;
121 self
122 }
123
124 /// Set the encoding for fetching blocks (default: "json").
125 ///
126 /// `on_raw` fires for ALL encodings with the raw JSON.
127 ///
128 /// In `"json"` mode, the runner additionally extracts structured data and calls
129 /// `on_block`/`on_transaction`/`on_token_transfer`/`on_account_activity`.
130 ///
131 /// In any other mode (`"jsonParsed"`, `"base64"`, etc.), only `on_raw` fires.
132 pub fn encoding(mut self, encoding: &str) -> Self {
133 self.encoding = encoding.to_string();
134 self
135 }
136
137 /// Set the runner to backfill a specific slot range.
138 pub fn backfill(mut self, start: u64, end: u64) -> Self {
139 self.mode = RunMode::Backfill { start, end };
140 self
141 }
142
143 /// Set the runner to follow the chain tip in real-time.
144 pub fn live(mut self) -> Self {
145 self.mode = RunMode::Live { start_slot: None };
146 self
147 }
148
149 /// Set the runner to follow the chain tip, starting from a specific slot.
150 pub fn live_from(mut self, slot: u64) -> Self {
151 self.mode = RunMode::Live {
152 start_slot: Some(slot),
153 };
154 self
155 }
156
157 /// Set the cursor file path for resume support (default: "cursor.json").
158 pub fn cursor_file(mut self, path: &str) -> Self {
159 self.cursor_file = path.to_string();
160 self
161 }
162
163 /// Set the tip buffer for live mode (default: 100 slots).
164 ///
165 /// Keeps the client N slots behind the chain tip to ensure data
166 /// availability on Cascade. Set to 0 for direct RPC endpoints.
167 pub fn tip_buffer(mut self, slots: u64) -> Self {
168 self.tip_buffer = slots;
169 self
170 }
171
172 /// Run the engine. Blocks until completion (backfill) or shutdown signal (live).
173 ///
174 /// Creates a tokio runtime internally. If called from within an async
175 /// context, use [`run_async`](Self::run_async) instead.
176 pub fn run(self) -> Result<()> {
177 let rt = tokio::runtime::Builder::new_multi_thread()
178 .enable_all()
179 .build()?;
180 rt.block_on(self.run_async())
181 }
182
183 /// Async version of [`run`](Self::run).
184 ///
185 /// Use this from within an existing tokio runtime.
186 pub async fn run_async(self) -> Result<()> {
187 if self.plugins.is_empty() {
188 return Err(anyhow::anyhow!(
189 "No plugins registered. Call .with_plugin() before .run()"
190 ));
191 }
192 if self.concurrency == 0 {
193 return Err(anyhow::anyhow!("Concurrency must be at least 1"));
194 }
195
196 let plugins: Vec<Arc<dyn Plugin>> =
197 self.plugins.into_iter().map(Arc::from).collect();
198 let plugins = Arc::new(plugins);
199
200 let shutdown = Arc::new(AtomicBool::new(false));
201 let shutdown_clone = shutdown.clone();
202 tokio::spawn(async move {
203 tokio::signal::ctrl_c().await.ok();
204 tracing::info!("Shutdown signal received");
205 shutdown_clone.store(true, Ordering::SeqCst);
206 });
207
208 for (i, plugin) in plugins.iter().enumerate() {
209 tracing::info!("Loading plugin: {}", plugin.name());
210 if let Err(e) = plugin.on_load().await {
211 for loaded in plugins[..i].iter() {
212 loaded.on_exit().await.ok();
213 }
214 return Err(anyhow::anyhow!(
215 "Plugin '{}' on_load failed: {}",
216 plugin.name(),
217 e
218 ));
219 }
220 }
221
222 let result = match self.mode {
223 RunMode::Backfill { start, end } => {
224 source::backfill::run_backfill(
225 &self.source,
226 self.auth_token.as_deref(),
227 plugins.clone(),
228 &self.encoding,
229 start,
230 end,
231 self.concurrency,
232 &self.cursor_file,
233 shutdown,
234 )
235 .await
236 }
237 RunMode::Live { start_slot } => {
238 source::live::run_live(
239 &self.source,
240 self.auth_token.as_deref(),
241 plugins.clone(),
242 &self.encoding,
243 start_slot,
244 &self.cursor_file,
245 shutdown,
246 self.tip_buffer,
247 self.concurrency,
248 )
249 .await
250 }
251 };
252
253 for plugin in plugins.iter() {
254 if let Err(e) = plugin.on_exit().await {
255 tracing::warn!("Plugin '{}' on_exit error: {}", plugin.name(), e);
256 }
257 }
258
259 result
260 }
261}