use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use async_trait::async_trait;
use serde_json::{json, Value};
use tokio::sync::Mutex;
use tracing::warn;
use super::types::Webhook;
use crate::enhanced::{parse_enhanced_tx, EnhancedTransaction};
use crate::upstream::UpstreamClient;
pub const POLL_INTERVAL: Duration = Duration::from_millis(500);
#[async_trait]
pub trait PostClient: Send + Sync {
async fn post_json(&self, url: &str, auth: Option<&str>, body: &Value) -> Result<(), String>;
}
#[allow(clippy::implicit_hasher)]
pub async fn tick_once<U, P>(
webhook: &Webhook,
upstream: &U,
poster: &P,
cursors: &Mutex<HashMap<String, String>>,
) -> Vec<EnhancedTransaction>
where
U: UpstreamClient + ?Sized,
P: PostClient + ?Sized,
{
let mut out: Vec<EnhancedTransaction> = Vec::new();
for address in &webhook.account_addresses {
let cursor = cursors.lock().await.get(address).cloned();
let Some((fresh_sigs, newest_signature)) =
fetch_new_signatures(upstream, address, cursor.as_deref(), webhook).await
else {
continue;
};
if let Some(sig) = newest_signature {
cursors.lock().await.insert(address.clone(), sig);
}
for sig in fresh_sigs {
if let Some(etx) = fetch_enhanced(upstream, &sig).await {
out.push(etx);
}
}
}
if !webhook.transaction_types.is_empty() {
out.retain(|e| webhook.transaction_types.iter().any(|t| t == &e.tx_type));
}
if !out.is_empty() {
let payload = json!(out);
if let Err(e) = poster
.post_json(
&webhook.webhook_url,
webhook.auth_header.as_deref(),
&payload,
)
.await
{
warn!(webhook = %webhook.webhook_id, err = %e, "webhook delivery failed");
}
}
out
}
async fn fetch_new_signatures<U: UpstreamClient + ?Sized>(
upstream: &U,
address: &str,
cursor: Option<&str>,
webhook: &Webhook,
) -> Option<(Vec<String>, Option<String>)> {
let mut opts = serde_json::Map::new();
opts.insert("limit".into(), json!(50));
if let Some(c) = cursor {
opts.insert("until".into(), json!(c));
}
let raw = upstream
.rpc_call(
"getSignaturesForAddress",
json!([address, serde_json::Value::Object(opts)]),
)
.await
.ok()?;
let parsed: Value = serde_json::from_slice(&raw).ok()?;
let arr = parsed.as_array().cloned().unwrap_or_default();
if arr.is_empty() {
return Some((Vec::new(), None));
}
let newest_sig = arr
.first()
.and_then(|e| e.get("signature"))
.and_then(Value::as_str)
.map(String::from);
let want_status = webhook.txn_status.as_deref().unwrap_or("all");
let mut out = Vec::with_capacity(arr.len());
for entry in arr.into_iter().rev() {
let Some(signature) = entry.get("signature").and_then(Value::as_str) else {
continue;
};
let err = entry.get("err").cloned().filter(|v| !v.is_null());
let failed = err.is_some();
let keep = match want_status {
"success" => !failed,
"failed" => failed,
_ => true,
};
if !keep {
continue;
}
out.push(signature.to_string());
}
Some((out, newest_sig))
}
async fn fetch_enhanced<U: UpstreamClient + ?Sized>(
upstream: &U,
signature: &str,
) -> Option<EnhancedTransaction> {
let params = json!([
signature,
{
"encoding": "json",
"maxSupportedTransactionVersion": 0,
"commitment": "confirmed"
}
]);
let raw = upstream.rpc_call("getTransaction", params).await.ok()?;
if raw.is_empty() || raw == b"null" {
return None;
}
let parsed: Value = serde_json::from_slice(&raw).ok()?;
parse_enhanced_tx(signature, &parsed)
}
pub fn spawn_delivery_task<U, P>(
webhook: Webhook,
upstream: Arc<U>,
poster: Arc<P>,
) -> tokio::task::JoinHandle<()>
where
U: UpstreamClient + ?Sized + 'static,
P: PostClient + ?Sized + 'static,
{
tokio::spawn(async move {
let cursors: Mutex<HashMap<String, String>> = Mutex::new(HashMap::new());
loop {
tokio::time::sleep(POLL_INTERVAL).await;
let _events = tick_once(&webhook, &*upstream, &*poster, &cursors).await;
}
})
}