use std::collections::BTreeMap;
use std::sync::Arc;
use thiserror::Error;
use tidepool_core::{compute_proof, ProofError, TreeState};
use crate::cache::{CacheError, CacheStore, SearchFilter};
use crate::cnft::{CnftStore, LeafRecord};
use crate::upstream::UpstreamClient;
use super::cnft_to_das::leaf_record_to_das_asset;
use super::decoder::AccountDecoder;
use super::fetch::{fetch_and_cache_asset, FetchError};
use super::types::{
DasAsset, DasAssetProof, DasBalances, DasNftEditionEntry, DasNftEditions, DasTokenAccount,
DasTokenAccounts, DasTokenBalance,
};
#[derive(Debug, Error)]
pub enum DasError {
#[error(transparent)]
Store(#[from] crate::cnft::store::StoreError),
#[error(transparent)]
Cache(#[from] CacheError),
#[error(transparent)]
Fetch(#[from] FetchError),
#[error("proof computation failed: {0}")]
Proof(ProofError),
#[error("upstream error: {0}")]
Upstream(String),
#[error("bad request: {0}")]
BadRequest(String),
}
impl From<ProofError> for DasError {
fn from(e: ProofError) -> Self {
Self::Proof(e)
}
}
pub type DasResult<T> = Result<T, DasError>;
pub async fn get_asset<S: CnftStore + ?Sized>(
cnft: &S,
asset_id: &[u8; 32],
) -> DasResult<Option<DasAsset>> {
if let Some(record) = cnft.get_leaf(asset_id).await? {
return Ok(Some(leaf_record_to_das_asset(&record)));
}
Ok(None)
}
pub async fn get_asset_full<S, C, U>(
cnft: &S,
cache: &C,
upstream: &U,
decoders: &[Arc<dyn AccountDecoder>],
asset_id_b58: &str,
) -> DasResult<Option<DasAsset>>
where
S: CnftStore + ?Sized,
C: CacheStore + ?Sized,
U: UpstreamClient + ?Sized,
{
if let Some(id_bytes) = try_decode_bs58_32(asset_id_b58) {
if let Some(record) = cnft.get_leaf(&id_bytes).await? {
return Ok(Some(leaf_record_to_das_asset(&record)));
}
}
if let Some(cached) = cache.get_asset(asset_id_b58).await? {
return Ok(Some(cached));
}
Ok(fetch_and_cache_asset(upstream, cache, decoders, asset_id_b58).await?)
}
pub async fn get_asset_batch<S, C, U>(
cnft: &S,
cache: &C,
upstream: &U,
decoders: &[Arc<dyn AccountDecoder>],
asset_ids_b58: &[String],
) -> DasResult<Vec<Option<DasAsset>>>
where
S: CnftStore + ?Sized,
C: CacheStore + ?Sized,
U: UpstreamClient + ?Sized,
{
let mut out = Vec::with_capacity(asset_ids_b58.len());
for id in asset_ids_b58 {
out.push(get_asset_full(cnft, cache, upstream, decoders, id).await?);
}
Ok(out)
}
pub async fn get_assets_by_owner<C: CacheStore + ?Sized>(
cache: &C,
owner: &str,
) -> DasResult<Vec<DasAsset>> {
Ok(cache.get_assets_by_owner(owner).await?)
}
pub async fn get_assets_by_authority<C: CacheStore + ?Sized>(
cache: &C,
authority: &str,
) -> DasResult<Vec<DasAsset>> {
Ok(cache.get_assets_by_authority(authority).await?)
}
pub async fn get_assets_by_creator<C: CacheStore + ?Sized>(
cache: &C,
creator: &str,
only_verified: bool,
) -> DasResult<Vec<DasAsset>> {
Ok(cache.get_assets_by_creator(creator, only_verified).await?)
}
pub async fn get_assets_by_group<C: CacheStore + ?Sized>(
cache: &C,
group_key: &str,
group_value: &str,
) -> DasResult<Vec<DasAsset>> {
Ok(cache.get_assets_by_group(group_key, group_value).await?)
}
pub async fn search_assets<C: CacheStore + ?Sized>(
cache: &C,
filter: &SearchFilter,
) -> DasResult<Vec<DasAsset>> {
Ok(cache.search_assets(filter).await?)
}
fn try_decode_bs58_32(s: &str) -> Option<[u8; 32]> {
let bytes = bs58::decode(s).into_vec().ok()?;
bytes.try_into().ok()
}
pub async fn get_asset_proof<S: CnftStore + ?Sized>(
cnft: &S,
asset_id: &[u8; 32],
) -> DasResult<Option<DasAssetProof>> {
let Some(leaf) = cnft.get_leaf(asset_id).await? else {
return Ok(None);
};
let Some(tree_info) = cnft.get_tree(&leaf.tree).await? else {
return Ok(None);
};
let state = build_tree_state(cnft, &leaf.tree, tree_info.depth).await?;
let proof = compute_proof(&state, leaf.leaf_index)?;
Ok(Some(DasAssetProof {
root: bs58::encode(proof.root).into_string(),
proof: proof
.proof
.iter()
.map(|n| bs58::encode(n).into_string())
.collect(),
node_index: proof.node_index,
leaf: bs58::encode(proof.leaf).into_string(),
tree_id: bs58::encode(leaf.tree).into_string(),
last_indexed_slot: 0,
}))
}
pub async fn get_asset_proof_batch<S: CnftStore + ?Sized>(
cnft: &S,
asset_ids: &[[u8; 32]],
) -> DasResult<Vec<Option<DasAssetProof>>> {
let mut results: Vec<Option<DasAssetProof>> = Vec::with_capacity(asset_ids.len());
let mut leaves: Vec<Option<LeafRecord>> = Vec::with_capacity(asset_ids.len());
let mut trees_needed: BTreeMap<[u8; 32], u8> = BTreeMap::new();
for id in asset_ids {
let leaf = cnft.get_leaf(id).await?;
if let Some(ref l) = leaf {
if let std::collections::btree_map::Entry::Vacant(e) = trees_needed.entry(l.tree) {
e.insert(0);
}
}
leaves.push(leaf);
}
let mut tree_states: BTreeMap<[u8; 32], TreeState> = BTreeMap::new();
for tree in trees_needed.keys().copied().collect::<Vec<_>>() {
let Some(info) = cnft.get_tree(&tree).await? else {
continue;
};
let state = build_tree_state(cnft, &tree, info.depth).await?;
tree_states.insert(tree, state);
}
for (i, id) in asset_ids.iter().enumerate() {
let Some(leaf) = leaves[i].as_ref() else {
results.push(None);
continue;
};
let Some(state) = tree_states.get(&leaf.tree) else {
results.push(None);
continue;
};
let proof = compute_proof(state, leaf.leaf_index)?;
results.push(Some(DasAssetProof {
root: bs58::encode(proof.root).into_string(),
proof: proof
.proof
.iter()
.map(|n| bs58::encode(n).into_string())
.collect(),
node_index: proof.node_index,
leaf: bs58::encode(proof.leaf).into_string(),
tree_id: bs58::encode(leaf.tree).into_string(),
last_indexed_slot: 0,
}));
let _ = id;
}
Ok(results)
}
pub async fn get_nft_editions<C, U>(
cache: &C,
upstream: &U,
decoders: &[Arc<dyn AccountDecoder>],
master_mint: &str,
page: u64,
limit: u64,
) -> DasResult<Option<DasNftEditions>>
where
C: CacheStore + ?Sized,
U: UpstreamClient + ?Sized,
{
if cache.get_master_edition(master_mint).await?.is_none() {
let _ = fetch_and_cache_asset(upstream, cache, decoders, master_mint).await;
}
let Some(master) = cache.get_master_edition(master_mint).await? else {
return Ok(None);
};
let all_prints = cache
.list_print_editions(&master.master_edition_pda)
.await?;
let total = all_prints.len() as u64;
let limit = limit.max(1);
let start = page.saturating_sub(1).saturating_mul(limit);
let end = start.saturating_add(limit).min(total);
let editions: Vec<DasNftEditionEntry> = if start >= total {
Vec::new()
} else {
all_prints[start as usize..end as usize]
.iter()
.map(|r| DasNftEditionEntry {
mint: r.print_mint.clone(),
edition_address: r.print_edition_pda.clone(),
edition: r.edition_num,
})
.collect()
};
Ok(Some(DasNftEditions {
total,
limit,
page,
master_edition_address: master.master_edition_pda.clone(),
supply: master.supply,
max_supply: master.max_supply,
editions,
}))
}
pub const SPL_TOKEN_PROGRAM_ID: &str = "TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA";
pub const TOKEN_2022_PROGRAM_ID: &str = "TokenzQdBNbLqP5VEhdkAS6EPFLC1PHnBqCXEpPxuEb";
#[derive(Debug, Clone, Default)]
pub struct TokenAccountsFilter {
pub owner: Option<String>,
pub mint: Option<String>,
pub page: u64,
pub limit: u64,
pub show_zero_balance: bool,
}
pub async fn get_token_accounts<U: UpstreamClient + ?Sized>(
upstream: &U,
filter: &TokenAccountsFilter,
) -> DasResult<DasTokenAccounts> {
let limit = filter.limit.max(1);
let page = filter.page.max(1);
if filter.owner.is_none() && filter.mint.is_none() {
return Err(DasError::BadRequest(
"getTokenAccounts requires at least one of `owner` or `mint`".into(),
));
}
let mut accounts: Vec<DasTokenAccount> = Vec::new();
for program_id in [SPL_TOKEN_PROGRAM_ID, TOKEN_2022_PROGRAM_ID] {
let entries = if let Some(owner) = &filter.owner {
fetch_by_owner(upstream, owner, filter.mint.as_deref(), program_id).await?
} else {
fetch_by_mint(upstream, filter.mint.as_deref().unwrap(), program_id).await?
};
accounts.extend(entries);
}
if !filter.show_zero_balance {
accounts.retain(|a| a.amount > 0);
}
accounts.sort_by(|a, b| a.address.cmp(&b.address));
let total = accounts.len() as u64;
let start = page.saturating_sub(1).saturating_mul(limit);
let end = start.saturating_add(limit).min(total);
let page_accounts: Vec<DasTokenAccount> = if start >= total {
Vec::new()
} else {
accounts[start as usize..end as usize].to_vec()
};
Ok(DasTokenAccounts {
total,
limit,
page,
token_accounts: page_accounts,
})
}
async fn fetch_by_owner<U: UpstreamClient + ?Sized>(
upstream: &U,
owner: &str,
mint: Option<&str>,
program_id: &str,
) -> DasResult<Vec<DasTokenAccount>> {
let filter = if let Some(m) = mint {
serde_json::json!({ "mint": m })
} else {
serde_json::json!({ "programId": program_id })
};
let params = serde_json::json!([
owner,
filter,
{ "encoding": "jsonParsed", "commitment": "confirmed" }
]);
let raw = upstream
.rpc_call("getTokenAccountsByOwner", params)
.await
.map_err(|e| DasError::Upstream(e.to_string()))?;
Ok(parse_token_account_list(&raw))
}
async fn fetch_by_mint<U: UpstreamClient + ?Sized>(
upstream: &U,
mint: &str,
program_id: &str,
) -> DasResult<Vec<DasTokenAccount>> {
let params = serde_json::json!([
program_id,
{
"encoding": "jsonParsed",
"commitment": "confirmed",
"filters": [
{ "dataSize": 165 },
{ "memcmp": { "offset": 0, "bytes": mint } }
]
}
]);
let raw = upstream
.rpc_call("getProgramAccounts", params)
.await
.map_err(|e| DasError::Upstream(e.to_string()))?;
Ok(parse_token_account_list(&raw))
}
fn parse_token_account_list(raw: &[u8]) -> Vec<DasTokenAccount> {
use serde_json::Value;
let value: Value = serde_json::from_slice(raw).unwrap_or(Value::Null);
let arr = if let Some(inner) = value.get("value") {
inner.as_array().cloned().unwrap_or_default()
} else {
value.as_array().cloned().unwrap_or_default()
};
let mut out = Vec::with_capacity(arr.len());
for entry in arr {
let Some(address) = entry.get("pubkey").and_then(Value::as_str) else {
continue;
};
let info = entry
.pointer("/account/data/parsed/info")
.cloned()
.unwrap_or(Value::Null);
let Some(mint) = info.get("mint").and_then(Value::as_str) else {
continue;
};
let Some(owner) = info.get("owner").and_then(Value::as_str) else {
continue;
};
let amount = info
.get("tokenAmount")
.and_then(|t| t.get("amount"))
.and_then(Value::as_str)
.and_then(|s| s.parse::<u64>().ok())
.unwrap_or(0);
let delegate = info
.get("delegate")
.and_then(Value::as_str)
.map(String::from);
let delegated_amount = info
.get("delegatedAmount")
.and_then(|t| t.get("amount"))
.and_then(Value::as_str)
.and_then(|s| s.parse::<u64>().ok())
.unwrap_or(0);
let frozen = info
.get("state")
.and_then(Value::as_str)
.is_some_and(|s| s == "frozen");
out.push(DasTokenAccount {
address: address.to_string(),
mint: mint.to_string(),
owner: owner.to_string(),
amount,
delegated_amount,
frozen,
delegate,
});
}
out
}
pub async fn get_balances<U: UpstreamClient + ?Sized>(
upstream: &U,
owner: &str,
) -> DasResult<DasBalances> {
let lamports = fetch_native_balance(upstream, owner).await?;
let mut tokens: Vec<DasTokenBalance> = Vec::new();
for program_id in [SPL_TOKEN_PROGRAM_ID, TOKEN_2022_PROGRAM_ID] {
tokens.extend(fetch_token_balances(upstream, owner, program_id).await?);
}
tokens.sort_by(|a, b| {
a.mint
.cmp(&b.mint)
.then_with(|| a.token_account.cmp(&b.token_account))
});
Ok(DasBalances {
native_balance: lamports,
tokens,
})
}
async fn fetch_native_balance<U: UpstreamClient + ?Sized>(
upstream: &U,
owner: &str,
) -> DasResult<u64> {
let raw = upstream
.rpc_call("getBalance", serde_json::json!([owner]))
.await
.map_err(|e| DasError::Upstream(e.to_string()))?;
let parsed: serde_json::Value = serde_json::from_slice(&raw).unwrap_or(serde_json::Value::Null);
let lamports = parsed
.get("value")
.and_then(serde_json::Value::as_u64)
.unwrap_or(0);
Ok(lamports)
}
async fn fetch_token_balances<U: UpstreamClient + ?Sized>(
upstream: &U,
owner: &str,
program_id: &str,
) -> DasResult<Vec<DasTokenBalance>> {
let params = serde_json::json!([
owner,
{ "programId": program_id },
{ "encoding": "jsonParsed", "commitment": "confirmed" }
]);
let raw = upstream
.rpc_call("getTokenAccountsByOwner", params)
.await
.map_err(|e| DasError::Upstream(e.to_string()))?;
Ok(parse_token_balance_list(&raw))
}
fn parse_token_balance_list(raw: &[u8]) -> Vec<DasTokenBalance> {
use serde_json::Value;
let value: Value = serde_json::from_slice(raw).unwrap_or(Value::Null);
let arr = value
.get("value")
.and_then(Value::as_array)
.cloned()
.unwrap_or_default();
let mut out = Vec::with_capacity(arr.len());
for entry in arr {
let Some(token_account) = entry.get("pubkey").and_then(Value::as_str) else {
continue;
};
let info = entry
.pointer("/account/data/parsed/info")
.cloned()
.unwrap_or(Value::Null);
let Some(mint) = info.get("mint").and_then(Value::as_str) else {
continue;
};
let token_amount = info.get("tokenAmount").cloned().unwrap_or(Value::Null);
let amount = token_amount
.get("amount")
.and_then(Value::as_str)
.and_then(|s| s.parse::<u64>().ok())
.unwrap_or(0);
if amount == 0 {
continue;
}
let decimals = token_amount
.get("decimals")
.and_then(Value::as_u64)
.unwrap_or(0) as u8;
out.push(DasTokenBalance {
token_account: token_account.to_string(),
mint: mint.to_string(),
amount,
decimals,
price_in_usd: None,
total_price: None,
});
}
out
}
async fn build_tree_state<S: CnftStore + ?Sized>(
cnft: &S,
tree: &[u8; 32],
depth: u8,
) -> DasResult<TreeState> {
let mut leaves = BTreeMap::new();
for rec in cnft.list_leaves(tree).await? {
if !rec.burned {
leaves.insert(rec.leaf_index, rec.leaf_hash);
}
}
Ok(TreeState { depth, leaves })
}