use anyhow::Result;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use crate::solana::Plugin;
use crate::source;
enum RunMode {
Backfill { start: u64, end: u64 },
Live { start_slot: Option<u64> },
}
pub struct CascadeRunner {
source: String,
auth_token: Option<String>,
plugins: Vec<Box<dyn Plugin>>,
concurrency: usize,
encoding: String,
mode: RunMode,
cursor_file: String,
tip_buffer: u64,
}
impl CascadeRunner {
pub fn solana_mainnet() -> Self {
Self::chain("solana-mainnet")
}
pub fn solana_devnet() -> Self {
Self::chain("solana-devnet")
}
pub fn chain(name: &str) -> Self {
let source = format!("https://{}-cascade.quiknode.io", name);
Self {
source,
auth_token: None,
plugins: Vec::new(),
concurrency: 10,
encoding: "json".to_string(),
mode: RunMode::Live { start_slot: None },
cursor_file: "cursor.json".to_string(),
tip_buffer: 100,
}
}
pub fn auth_token(mut self, token: &str) -> Self {
self.auth_token = Some(token.to_string());
self
}
pub fn source_url(mut self, url: &str) -> Self {
self.source = url.to_string();
self
}
pub fn with_plugin(mut self, plugin: Box<dyn Plugin>) -> Self {
self.plugins.push(plugin);
self
}
pub fn concurrency(mut self, n: usize) -> Self {
self.concurrency = n;
self
}
pub fn encoding(mut self, encoding: &str) -> Self {
self.encoding = encoding.to_string();
self
}
pub fn backfill(mut self, start: u64, end: u64) -> Self {
self.mode = RunMode::Backfill { start, end };
self
}
pub fn live(mut self) -> Self {
self.mode = RunMode::Live { start_slot: None };
self
}
pub fn live_from(mut self, slot: u64) -> Self {
self.mode = RunMode::Live {
start_slot: Some(slot),
};
self
}
pub fn cursor_file(mut self, path: &str) -> Self {
self.cursor_file = path.to_string();
self
}
pub fn tip_buffer(mut self, slots: u64) -> Self {
self.tip_buffer = slots;
self
}
pub fn run(self) -> Result<()> {
let rt = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()?;
rt.block_on(self.run_async())
}
pub async fn run_async(self) -> Result<()> {
if self.plugins.is_empty() {
return Err(anyhow::anyhow!(
"No plugins registered. Call .with_plugin() before .run()"
));
}
if self.concurrency == 0 {
return Err(anyhow::anyhow!("Concurrency must be at least 1"));
}
let plugins: Vec<Arc<dyn Plugin>> =
self.plugins.into_iter().map(Arc::from).collect();
let plugins = Arc::new(plugins);
let shutdown = Arc::new(AtomicBool::new(false));
let shutdown_clone = shutdown.clone();
tokio::spawn(async move {
tokio::signal::ctrl_c().await.ok();
tracing::info!("Shutdown signal received");
shutdown_clone.store(true, Ordering::SeqCst);
});
for (i, plugin) in plugins.iter().enumerate() {
tracing::info!("Loading plugin: {}", plugin.name());
if let Err(e) = plugin.on_load().await {
for loaded in plugins[..i].iter() {
loaded.on_exit().await.ok();
}
return Err(anyhow::anyhow!(
"Plugin '{}' on_load failed: {}",
plugin.name(),
e
));
}
}
let result = match self.mode {
RunMode::Backfill { start, end } => {
source::backfill::run_backfill(
&self.source,
self.auth_token.as_deref(),
plugins.clone(),
&self.encoding,
start,
end,
self.concurrency,
&self.cursor_file,
shutdown,
)
.await
}
RunMode::Live { start_slot } => {
source::live::run_live(
&self.source,
self.auth_token.as_deref(),
plugins.clone(),
&self.encoding,
start_slot,
&self.cursor_file,
shutdown,
self.tip_buffer,
self.concurrency,
)
.await
}
};
for plugin in plugins.iter() {
if let Err(e) = plugin.on_exit().await {
tracing::warn!("Plugin '{}' on_exit error: {}", plugin.name(), e);
}
}
result
}
}