Skip to main content

quicknode_cascade/
runner.rs

1//! CascadeRunner — the builder-pattern engine that wires everything together.
2//!
3//! Chain-agnostic runner that handles parallel fetching, retry-forever logic,
4//! plugin dispatch, cursor management, and graceful shutdown.
5
6use anyhow::Result;
7use std::sync::atomic::{AtomicBool, Ordering};
8use std::sync::Arc;
9
10use crate::solana::Plugin;
11use crate::source;
12
13enum RunMode {
14    Backfill { start: u64, end: u64 },
15    Live { start_slot: Option<u64> },
16}
17
18/// Builder-pattern runner for the streaming engine.
19///
20/// Handles all parallel fetching, retry-forever logic, plugin dispatch,
21/// cursor management, and graceful shutdown.
22///
23/// # Example
24///
25/// ```rust,no_run
26/// use quicknode_cascade::{CascadeRunner, solana};
27///
28/// struct Logger;
29/// impl solana::Plugin for Logger {
30///     fn name(&self) -> &'static str { "logger" }
31///     fn on_block<'a>(&'a self, block: &'a solana::BlockData) -> solana::PluginFuture<'a> {
32///         Box::pin(async move {
33///             println!("slot {}", block.slot);
34///             Ok(())
35///         })
36///     }
37/// }
38///
39/// CascadeRunner::solana_mainnet()
40///     .auth_token("your-jwt-token")
41///     .backfill(400_000_000, 400_000_010)
42///     .with_plugin(Box::new(Logger))
43///     .run()
44///     .expect("done");
45/// ```
46pub struct CascadeRunner {
47    source: String,
48    auth_token: Option<String>,
49    plugins: Vec<Box<dyn Plugin>>,
50    concurrency: usize,
51    encoding: String,
52    mode: RunMode,
53    cursor_file: String,
54    tip_buffer: u64,
55}
56
57impl CascadeRunner {
58    /// Create a runner for Solana mainnet via QuickNode Cascade.
59    pub fn solana_mainnet() -> Self {
60        Self::chain("solana-mainnet")
61    }
62
63    /// Create a runner for Solana devnet via QuickNode Cascade.
64    pub fn solana_devnet() -> Self {
65        Self::chain("solana-devnet")
66    }
67
68    /// Create a runner for a named chain.
69    ///
70    /// The chain name maps to the Cascade endpoint:
71    /// `https://{chain}-cascade.quiknode.io`
72    ///
73    /// # Examples
74    ///
75    /// ```rust
76    /// use quicknode_cascade::CascadeRunner;
77    ///
78    /// // These are equivalent:
79    /// let a = CascadeRunner::solana_mainnet();
80    /// let b = CascadeRunner::chain("solana-mainnet");
81    /// ```
82    pub fn chain(name: &str) -> Self {
83        let source = format!("https://{}-cascade.quiknode.io", name);
84        Self {
85            source,
86            auth_token: None,
87            plugins: Vec::new(),
88            concurrency: 10,
89            encoding: "json".to_string(),
90            mode: RunMode::Live { start_slot: None },
91            cursor_file: "cursor.json".to_string(),
92            tip_buffer: 100,
93        }
94    }
95
96    /// Set the authentication token (JWT) for the Cascade endpoint.
97    ///
98    /// The token is sent as `Authorization: Bearer <token>` on every request.
99    pub fn auth_token(mut self, token: &str) -> Self {
100        self.auth_token = Some(token.to_string());
101        self
102    }
103
104    /// Override the source URL directly (for custom RPC endpoints).
105    ///
106    /// Use this when pointing at your own Solana validator or a non-Cascade RPC.
107    pub fn source_url(mut self, url: &str) -> Self {
108        self.source = url.to_string();
109        self
110    }
111
112    /// Register a plugin. Multiple plugins can be registered; each sees all events.
113    pub fn with_plugin(mut self, plugin: Box<dyn Plugin>) -> Self {
114        self.plugins.push(plugin);
115        self
116    }
117
118    /// Set parallel fetch concurrency (default: 10).
119    pub fn concurrency(mut self, n: usize) -> Self {
120        self.concurrency = n;
121        self
122    }
123
124    /// Set the encoding for fetching blocks (default: "json").
125    ///
126    /// In `"json"` mode, the runner extracts structured data and calls
127    /// `on_block`/`on_transaction`/`on_token_transfer`/`on_account_activity`.
128    ///
129    /// In any other mode (`"jsonParsed"`, `"base64"`, etc.), it calls
130    /// `on_raw_block` with the full JSON-RPC response.
131    pub fn encoding(mut self, encoding: &str) -> Self {
132        self.encoding = encoding.to_string();
133        self
134    }
135
136    /// Set the runner to backfill a specific slot range.
137    pub fn backfill(mut self, start: u64, end: u64) -> Self {
138        self.mode = RunMode::Backfill { start, end };
139        self
140    }
141
142    /// Set the runner to follow the chain tip in real-time.
143    pub fn live(mut self) -> Self {
144        self.mode = RunMode::Live { start_slot: None };
145        self
146    }
147
148    /// Set the runner to follow the chain tip, starting from a specific slot.
149    pub fn live_from(mut self, slot: u64) -> Self {
150        self.mode = RunMode::Live {
151            start_slot: Some(slot),
152        };
153        self
154    }
155
156    /// Set the cursor file path for resume support (default: "cursor.json").
157    pub fn cursor_file(mut self, path: &str) -> Self {
158        self.cursor_file = path.to_string();
159        self
160    }
161
162    /// Set the tip buffer for live mode (default: 100 slots).
163    ///
164    /// Keeps the client N slots behind the chain tip to ensure data
165    /// availability on Cascade. Set to 0 for direct RPC endpoints.
166    pub fn tip_buffer(mut self, slots: u64) -> Self {
167        self.tip_buffer = slots;
168        self
169    }
170
171    /// Run the engine. Blocks until completion (backfill) or shutdown signal (live).
172    ///
173    /// Creates a tokio runtime internally. If called from within an async
174    /// context, use [`run_async`](Self::run_async) instead.
175    pub fn run(self) -> Result<()> {
176        let rt = tokio::runtime::Builder::new_multi_thread()
177            .enable_all()
178            .build()?;
179        rt.block_on(self.run_async())
180    }
181
182    /// Async version of [`run`](Self::run).
183    ///
184    /// Use this from within an existing tokio runtime.
185    pub async fn run_async(self) -> Result<()> {
186        if self.plugins.is_empty() {
187            return Err(anyhow::anyhow!(
188                "No plugins registered. Call .with_plugin() before .run()"
189            ));
190        }
191
192        let plugins: Vec<Arc<dyn Plugin>> =
193            self.plugins.into_iter().map(Arc::from).collect();
194        let plugins = Arc::new(plugins);
195
196        let shutdown = Arc::new(AtomicBool::new(false));
197        let shutdown_clone = shutdown.clone();
198        tokio::spawn(async move {
199            tokio::signal::ctrl_c().await.ok();
200            tracing::info!("Shutdown signal received");
201            shutdown_clone.store(true, Ordering::SeqCst);
202        });
203
204        for plugin in plugins.iter() {
205            tracing::info!("Loading plugin: {}", plugin.name());
206            if let Err(e) = plugin.on_load().await {
207                return Err(anyhow::anyhow!(
208                    "Plugin '{}' on_load failed: {}",
209                    plugin.name(),
210                    e
211                ));
212            }
213        }
214
215        let result = match self.mode {
216            RunMode::Backfill { start, end } => {
217                source::backfill::run_backfill(
218                    &self.source,
219                    self.auth_token.as_deref(),
220                    plugins.clone(),
221                    &self.encoding,
222                    start,
223                    end,
224                    self.concurrency,
225                    &self.cursor_file,
226                    shutdown,
227                )
228                .await
229            }
230            RunMode::Live { start_slot } => {
231                source::live::run_live(
232                    &self.source,
233                    self.auth_token.as_deref(),
234                    plugins.clone(),
235                    &self.encoding,
236                    start_slot,
237                    &self.cursor_file,
238                    shutdown,
239                    self.tip_buffer,
240                    self.concurrency,
241                )
242                .await
243            }
244        };
245
246        for plugin in plugins.iter() {
247            if let Err(e) = plugin.on_exit().await {
248                tracing::warn!("Plugin '{}' on_exit error: {}", plugin.name(), e);
249            }
250        }
251
252        result
253    }
254}