Skip to main content

quicknode_cascade/
runner.rs

1//! CascadeRunner — the builder-pattern engine that wires everything together.
2//!
3//! Chain-agnostic runner that handles parallel fetching, retry-forever logic,
4//! plugin dispatch, cursor management, and graceful shutdown.
5
6use anyhow::Result;
7use std::sync::atomic::{AtomicBool, Ordering};
8use std::sync::Arc;
9
10use crate::solana::Plugin;
11use crate::source;
12
13enum RunMode {
14    Backfill { start: u64, end: u64 },
15    Live { start_slot: Option<u64> },
16}
17
18/// Builder-pattern runner for the streaming engine.
19///
20/// Handles all parallel fetching, retry-forever logic, plugin dispatch,
21/// cursor management, and graceful shutdown.
22///
23/// # Example
24///
25/// ```rust,no_run
26/// use quicknode_cascade::{CascadeRunner, solana};
27///
28/// struct Logger;
29/// impl solana::Plugin for Logger {
30///     fn name(&self) -> &'static str { "logger" }
31///     fn on_block<'a>(&'a self, block: &'a solana::BlockData) -> solana::PluginFuture<'a> {
32///         Box::pin(async move {
33///             println!("slot {}", block.slot);
34///             Ok(())
35///         })
36///     }
37/// }
38///
39/// CascadeRunner::solana_mainnet()
40///     .auth_token("your-jwt-token")
41///     .backfill(400_000_000, 400_000_010)
42///     .with_plugin(Box::new(Logger))
43///     .run()
44///     .expect("done");
45/// ```
46pub struct CascadeRunner {
47    source: String,
48    auth_token: Option<String>,
49    plugins: Vec<Box<dyn Plugin>>,
50    concurrency: usize,
51    encoding: String,
52    mode: RunMode,
53    cursor_file: String,
54    tip_buffer: u64,
55}
56
57impl CascadeRunner {
58    /// Create a runner for Solana mainnet via QuickNode Cascade.
59    pub fn solana_mainnet() -> Self {
60        Self::chain("solana-mainnet")
61    }
62
63    /// Create a runner for Solana devnet via QuickNode Cascade.
64    pub fn solana_devnet() -> Self {
65        Self::chain("solana-devnet")
66    }
67
68    /// Create a runner for a named chain.
69    ///
70    /// The chain name maps to the Cascade endpoint:
71    /// `https://{chain}-cascade.quiknode.io`
72    ///
73    /// # Examples
74    ///
75    /// ```rust
76    /// use quicknode_cascade::CascadeRunner;
77    ///
78    /// // These are equivalent:
79    /// let a = CascadeRunner::solana_mainnet();
80    /// let b = CascadeRunner::chain("solana-mainnet");
81    /// ```
82    pub fn chain(name: &str) -> Self {
83        let source = format!("https://{}-cascade.quiknode.io", name);
84        Self {
85            source,
86            auth_token: None,
87            plugins: Vec::new(),
88            concurrency: 10,
89            encoding: "json".to_string(),
90            mode: RunMode::Live { start_slot: None },
91            cursor_file: "cursor.json".to_string(),
92            tip_buffer: 100,
93        }
94    }
95
96    /// Set the authentication token (JWT) for the Cascade endpoint.
97    ///
98    /// The token is sent as `Authorization: Bearer <token>` on every request.
99    pub fn auth_token(mut self, token: &str) -> Self {
100        self.auth_token = Some(token.to_string());
101        self
102    }
103
104    /// Override the source URL directly (for custom RPC endpoints).
105    ///
106    /// Use this when pointing at your own Solana validator or a non-Cascade RPC.
107    pub fn source_url(mut self, url: &str) -> Self {
108        self.source = url.to_string();
109        self
110    }
111
112    /// Register a plugin. Multiple plugins can be registered; each sees all events.
113    pub fn with_plugin(mut self, plugin: Box<dyn Plugin>) -> Self {
114        self.plugins.push(plugin);
115        self
116    }
117
118    /// Set parallel fetch concurrency (default: 10).
119    pub fn concurrency(mut self, n: usize) -> Self {
120        self.concurrency = n;
121        self
122    }
123
124    /// Set the encoding for fetching blocks (default: "json").
125    ///
126    /// `on_raw` fires for ALL encodings with the raw JSON.
127    ///
128    /// In `"json"` mode, the runner additionally extracts structured data and calls
129    /// `on_block`/`on_transaction`/`on_token_transfer`/`on_account_activity`.
130    ///
131    /// In any other mode (`"jsonParsed"`, `"base64"`, etc.), only `on_raw` fires.
132    pub fn encoding(mut self, encoding: &str) -> Self {
133        self.encoding = encoding.to_string();
134        self
135    }
136
137    /// Set the runner to backfill a specific slot range.
138    pub fn backfill(mut self, start: u64, end: u64) -> Self {
139        self.mode = RunMode::Backfill { start, end };
140        self
141    }
142
143    /// Set the runner to follow the chain tip in real-time.
144    pub fn live(mut self) -> Self {
145        self.mode = RunMode::Live { start_slot: None };
146        self
147    }
148
149    /// Set the runner to follow the chain tip, starting from a specific slot.
150    pub fn live_from(mut self, slot: u64) -> Self {
151        self.mode = RunMode::Live {
152            start_slot: Some(slot),
153        };
154        self
155    }
156
157    /// Set the cursor file path for resume support (default: "cursor.json").
158    pub fn cursor_file(mut self, path: &str) -> Self {
159        self.cursor_file = path.to_string();
160        self
161    }
162
163    /// Set the tip buffer for live mode (default: 100 slots).
164    ///
165    /// Keeps the client N slots behind the chain tip to ensure data
166    /// availability on Cascade. Set to 0 for direct RPC endpoints.
167    pub fn tip_buffer(mut self, slots: u64) -> Self {
168        self.tip_buffer = slots;
169        self
170    }
171
172    /// Run the engine. Blocks until completion (backfill) or shutdown signal (live).
173    ///
174    /// Creates a tokio runtime internally. If called from within an async
175    /// context, use [`run_async`](Self::run_async) instead.
176    pub fn run(self) -> Result<()> {
177        let rt = tokio::runtime::Builder::new_multi_thread()
178            .enable_all()
179            .build()?;
180        rt.block_on(self.run_async())
181    }
182
183    /// Async version of [`run`](Self::run).
184    ///
185    /// Use this from within an existing tokio runtime.
186    pub async fn run_async(self) -> Result<()> {
187        if self.plugins.is_empty() {
188            return Err(anyhow::anyhow!(
189                "No plugins registered. Call .with_plugin() before .run()"
190            ));
191        }
192        if self.concurrency == 0 {
193            return Err(anyhow::anyhow!("Concurrency must be at least 1"));
194        }
195
196        let plugins: Vec<Arc<dyn Plugin>> =
197            self.plugins.into_iter().map(Arc::from).collect();
198        let plugins = Arc::new(plugins);
199
200        let shutdown = Arc::new(AtomicBool::new(false));
201        let shutdown_clone = shutdown.clone();
202        tokio::spawn(async move {
203            tokio::signal::ctrl_c().await.ok();
204            tracing::info!("Shutdown signal received");
205            shutdown_clone.store(true, Ordering::SeqCst);
206        });
207
208        for (i, plugin) in plugins.iter().enumerate() {
209            tracing::info!("Loading plugin: {}", plugin.name());
210            if let Err(e) = plugin.on_load().await {
211                for loaded in plugins[..i].iter() {
212                    loaded.on_exit().await.ok();
213                }
214                return Err(anyhow::anyhow!(
215                    "Plugin '{}' on_load failed: {}",
216                    plugin.name(),
217                    e
218                ));
219            }
220        }
221
222        let result = match self.mode {
223            RunMode::Backfill { start, end } => {
224                source::backfill::run_backfill(
225                    &self.source,
226                    self.auth_token.as_deref(),
227                    plugins.clone(),
228                    &self.encoding,
229                    start,
230                    end,
231                    self.concurrency,
232                    &self.cursor_file,
233                    shutdown,
234                )
235                .await
236            }
237            RunMode::Live { start_slot } => {
238                source::live::run_live(
239                    &self.source,
240                    self.auth_token.as_deref(),
241                    plugins.clone(),
242                    &self.encoding,
243                    start_slot,
244                    &self.cursor_file,
245                    shutdown,
246                    self.tip_buffer,
247                    self.concurrency,
248                )
249                .await
250            }
251        };
252
253        for plugin in plugins.iter() {
254            if let Err(e) = plugin.on_exit().await {
255                tracing::warn!("Plugin '{}' on_exit error: {}", plugin.name(), e);
256            }
257        }
258
259        result
260    }
261}