Skip to main content

ethcli/aggregator/
portfolio.rs

1//! Portfolio aggregation from multiple API sources
2//!
3//! This module fetches wallet token balances from multiple sources in parallel
4//! and merges them into a unified view.
5
6use super::{
7    chain_map::normalize_chain_for_source, get_cached_config, AggregatedResult, LatencyMeasure,
8    SourceResult,
9};
10use futures::future::join_all;
11use secrecy::ExposeSecret;
12use serde::{Deserialize, Serialize};
13use std::collections::HashMap;
14
15/// Supported portfolio data sources
16#[derive(Debug, Clone, Copy, PartialEq, Eq)]
17pub enum PortfolioSource {
18    /// Query all sources in parallel
19    All,
20    /// Alchemy Portfolio API
21    Alchemy,
22    /// Moralis Wallet API
23    Moralis,
24    /// Dune SIM Balances API
25    DuneSim,
26    /// Uniswap V3 LP positions via The Graph
27    Uniswap,
28    /// Yearn vault positions via Kong API
29    Yearn,
30}
31
32impl PortfolioSource {
33    pub fn name(&self) -> &'static str {
34        match self {
35            PortfolioSource::All => "all",
36            PortfolioSource::Alchemy => "alchemy",
37            PortfolioSource::Moralis => "moralis",
38            PortfolioSource::DuneSim => "dsim",
39            PortfolioSource::Uniswap => "uniswap",
40            PortfolioSource::Yearn => "yearn",
41        }
42    }
43}
44
45/// Portfolio token balance (distinct from normalize::PortfolioBalance)
46#[derive(Debug, Clone, Serialize, Deserialize)]
47pub struct PortfolioBalance {
48    /// Token contract address (checksummed) or "native"
49    pub address: String,
50    /// Token symbol
51    pub symbol: String,
52    /// Token name
53    pub name: Option<String>,
54    /// Chain/network
55    pub chain: String,
56    /// Raw balance as string (full precision)
57    pub balance_raw: String,
58    /// Balance formatted with decimals
59    pub balance_formatted: f64,
60    /// Token decimals
61    pub decimals: u8,
62    /// USD value (if available)
63    pub usd_value: Option<f64>,
64    /// Token price in USD
65    pub price_usd: Option<f64>,
66    /// Is spam token
67    pub is_spam: Option<bool>,
68    /// Logo URL
69    pub logo: Option<String>,
70}
71
72impl PortfolioBalance {
73    pub fn new(address: &str, symbol: &str, chain: &str, balance_raw: &str, decimals: u8) -> Self {
74        let balance_formatted = parse_balance(balance_raw, decimals);
75        Self {
76            address: address.to_string(),
77            symbol: symbol.to_string(),
78            name: None,
79            chain: chain.to_string(),
80            balance_raw: balance_raw.to_string(),
81            balance_formatted,
82            decimals,
83            usd_value: None,
84            price_usd: None,
85            is_spam: None,
86            logo: None,
87        }
88    }
89
90    pub fn with_name(mut self, name: Option<String>) -> Self {
91        self.name = name;
92        self
93    }
94
95    pub fn with_usd_value(mut self, usd_value: Option<f64>) -> Self {
96        self.usd_value = usd_value;
97        self
98    }
99
100    pub fn with_price_usd(mut self, price_usd: Option<f64>) -> Self {
101        self.price_usd = price_usd;
102        self
103    }
104
105    pub fn with_is_spam(mut self, is_spam: Option<bool>) -> Self {
106        self.is_spam = is_spam;
107        self
108    }
109
110    pub fn with_logo(mut self, logo: Option<String>) -> Self {
111        self.logo = logo;
112        self
113    }
114}
115
116/// Aggregated portfolio result (distinct from normalize::PortfolioResult)
117#[derive(Debug, Clone, Serialize, Deserialize)]
118pub struct PortfolioResult {
119    /// Total portfolio value in USD
120    pub total_usd_value: f64,
121    /// Merged token balances (deduplicated by address+chain)
122    pub tokens: Vec<MergedToken>,
123    /// Chains covered in the query
124    pub chains_covered: Vec<String>,
125    /// Number of unique tokens
126    pub token_count: usize,
127}
128
129/// A token merged from multiple sources
130#[derive(Debug, Clone, Serialize, Deserialize)]
131pub struct MergedToken {
132    /// Token contract address
133    pub address: String,
134    /// Token symbol
135    pub symbol: String,
136    /// Token name
137    pub name: Option<String>,
138    /// Chain
139    pub chain: String,
140    /// Balance (highest precision found)
141    pub balance: f64,
142    /// Balance raw string
143    pub balance_raw: String,
144    /// Decimals
145    pub decimals: u8,
146    /// USD value (average across sources if different)
147    pub usd_value: Option<f64>,
148    /// Price USD
149    pub price_usd: Option<f64>,
150    /// Logo URL
151    pub logo: Option<String>,
152    /// Sources that reported this token
153    pub found_in: Vec<String>,
154}
155
156/// Fetch portfolio from all available sources in parallel
157pub async fn fetch_portfolio_all(
158    address: &str,
159    chains: &[&str],
160) -> AggregatedResult<Vec<PortfolioBalance>, PortfolioResult> {
161    let sources = vec![
162        PortfolioSource::Alchemy,
163        PortfolioSource::Moralis,
164        PortfolioSource::DuneSim,
165        PortfolioSource::Uniswap,
166        PortfolioSource::Yearn,
167    ];
168
169    fetch_portfolio_parallel(address, chains, &sources).await
170}
171
172/// Fetch portfolio from specified sources in parallel
173pub async fn fetch_portfolio_parallel(
174    address: &str,
175    chains: &[&str],
176    sources: &[PortfolioSource],
177) -> AggregatedResult<Vec<PortfolioBalance>, PortfolioResult> {
178    let start = LatencyMeasure::start();
179
180    // Build futures for each source
181    let futures: Vec<_> = sources
182        .iter()
183        .filter(|s| **s != PortfolioSource::All)
184        .map(|source| {
185            let address = address.to_string();
186            let chains: Vec<String> = chains.iter().map(|c| c.to_string()).collect();
187            let source = *source;
188            async move { fetch_portfolio_from_source(&address, &chains, source).await }
189        })
190        .collect();
191
192    // Execute ALL in parallel
193    let results: Vec<SourceResult<Vec<PortfolioBalance>>> = join_all(futures).await;
194
195    // Merge and deduplicate tokens across sources
196    let aggregation = merge_portfolio_results(&results);
197
198    AggregatedResult::new(aggregation, results, start.elapsed_ms())
199}
200
201/// Fetch portfolio from a single source
202pub async fn fetch_portfolio_from_source(
203    address: &str,
204    chains: &[String],
205    source: PortfolioSource,
206) -> SourceResult<Vec<PortfolioBalance>> {
207    let measure = LatencyMeasure::start();
208
209    match source {
210        PortfolioSource::Alchemy => fetch_alchemy_portfolio(address, chains, measure).await,
211        PortfolioSource::Moralis => fetch_moralis_portfolio(address, chains, measure).await,
212        PortfolioSource::DuneSim => fetch_dsim_portfolio(address, chains, measure).await,
213        PortfolioSource::Uniswap => fetch_uniswap_portfolio(address, chains, measure).await,
214        PortfolioSource::Yearn => fetch_yearn_portfolio(address, chains, measure).await,
215        PortfolioSource::All => SourceResult::error("all", "Use fetch_portfolio_all instead", 0),
216    }
217}
218
219/// Fetch portfolio from Alchemy
220async fn fetch_alchemy_portfolio(
221    address: &str,
222    chains: &[String],
223    measure: LatencyMeasure,
224) -> SourceResult<Vec<PortfolioBalance>> {
225    // Get API key from config
226    let config = get_cached_config();
227    let api_key = match config
228        .as_ref()
229        .and_then(|c| c.alchemy.as_ref())
230        .map(|a| a.api_key.expose_secret().to_string())
231    {
232        Some(key) => key,
233        None => match std::env::var("ALCHEMY_API_KEY") {
234            Ok(key) => key,
235            Err(_) => {
236                return SourceResult::error(
237                    "alchemy",
238                    "ALCHEMY_API_KEY not configured",
239                    measure.elapsed_ms(),
240                )
241            }
242        },
243    };
244
245    // Convert chains to Alchemy network names
246    let networks: Vec<String> = chains
247        .iter()
248        .map(|c| normalize_chain_for_source("alchemy", c))
249        .collect();
250    let networks_refs: Vec<&str> = networks.iter().map(|s| s.as_str()).collect();
251
252    // Use default network for client initialization
253    let network_str = networks
254        .first()
255        .map(|s| s.as_str())
256        .unwrap_or("eth-mainnet");
257    let network = crate::cli::simulate::AlchemyArgs::parse_network(network_str);
258    let client = match alcmy::Client::new(&api_key, network) {
259        Ok(c) => c,
260        Err(e) => {
261            return SourceResult::error(
262                "alchemy",
263                format!("Client creation error: {}", e),
264                measure.elapsed_ms(),
265            );
266        }
267    };
268
269    // Build address-network pairs
270    let addr_networks: Vec<(&str, &[&str])> = vec![(address, networks_refs.as_slice())];
271
272    match client.portfolio().get_token_balances(&addr_networks).await {
273        Ok(response) => {
274            let mut balances = Vec::new();
275            for token in &response.data.tokens {
276                // Use token_address for ERC20s, or "ETH" for native token
277                let token_addr = token
278                    .token_address
279                    .as_deref()
280                    .unwrap_or("0x0000000000000000000000000000000000000000");
281                let balance = PortfolioBalance::new(
282                    token_addr,
283                    token.symbol.as_deref().unwrap_or("???"),
284                    &token.network,
285                    &token.token_balance,
286                    token.decimals.unwrap_or(18),
287                )
288                .with_name(token.name.clone())
289                .with_logo(token.logo.clone());
290                balances.push(balance);
291            }
292            SourceResult::success("alchemy", balances, measure.elapsed_ms())
293        }
294        Err(e) => SourceResult::error("alchemy", format!("API error: {}", e), measure.elapsed_ms()),
295    }
296}
297
298/// Fetch portfolio from Moralis
299async fn fetch_moralis_portfolio(
300    address: &str,
301    chains: &[String],
302    measure: LatencyMeasure,
303) -> SourceResult<Vec<PortfolioBalance>> {
304    // Get API key from config
305    let config = get_cached_config();
306    let api_key = match config
307        .as_ref()
308        .and_then(|c| c.moralis.as_ref())
309        .map(|m| m.api_key.expose_secret().to_string())
310    {
311        Some(key) => key,
312        None => match std::env::var("MORALIS_API_KEY") {
313            Ok(key) => key,
314            Err(_) => {
315                return SourceResult::error(
316                    "moralis",
317                    "MORALIS_API_KEY not configured",
318                    measure.elapsed_ms(),
319                )
320            }
321        },
322    };
323
324    let client = match mrls::Client::new(&api_key) {
325        Ok(c) => c,
326        Err(e) => {
327            return SourceResult::error(
328                "moralis",
329                format!("Client error: {}", e),
330                measure.elapsed_ms(),
331            )
332        }
333    };
334
335    // Moralis queries chains in parallel for better performance
336    let chain_futures: Vec<_> = chains
337        .iter()
338        .map(|chain| {
339            let client = client.clone();
340            let chain = chain.clone();
341            let address = address.to_string();
342            async move {
343                let chain_name = normalize_chain_for_source("moralis", &chain);
344                let query = mrls::wallet::WalletQuery::new().chain(chain_name);
345
346                match client
347                    .wallet()
348                    .get_token_balances(&address, Some(&query))
349                    .await
350                {
351                    Ok(tokens) => {
352                        let balances: Vec<PortfolioBalance> = tokens
353                            .into_iter()
354                            .map(|token| {
355                                PortfolioBalance::new(
356                                    &token.token_address,
357                                    token.symbol.as_deref().unwrap_or("???"),
358                                    &chain,
359                                    &token.balance,
360                                    token.decimals.unwrap_or(18),
361                                )
362                                .with_name(token.name.clone())
363                                .with_usd_value(token.usd_value)
364                                .with_price_usd(token.usd_price)
365                                .with_is_spam(token.possible_spam)
366                                .with_logo(token.logo.clone())
367                            })
368                            .collect();
369                        Ok(balances)
370                    }
371                    Err(e) => {
372                        // Log error but continue with other chains
373                        eprintln!("Moralis error for chain {}: {}", chain, e);
374                        Err(e)
375                    }
376                }
377            }
378        })
379        .collect();
380
381    // Execute all chain queries in parallel
382    let results = join_all(chain_futures).await;
383
384    // Flatten successful results
385    let all_balances: Vec<PortfolioBalance> = results
386        .into_iter()
387        .filter_map(|r| r.ok())
388        .flatten()
389        .collect();
390
391    // Return empty success instead of error when no balances found
392    // An empty wallet is a valid state, not an error
393    SourceResult::success("moralis", all_balances, measure.elapsed_ms())
394}
395
396/// Fetch portfolio from Dune SIM
397async fn fetch_dsim_portfolio(
398    address: &str,
399    chains: &[String],
400    measure: LatencyMeasure,
401) -> SourceResult<Vec<PortfolioBalance>> {
402    // Get API key from config
403    let config = get_cached_config();
404    let api_key = match config
405        .as_ref()
406        .and_then(|c| c.dune_sim.as_ref())
407        .map(|d| d.api_key.expose_secret().to_string())
408    {
409        Some(key) => key,
410        None => match std::env::var("DUNE_SIM_API_KEY") {
411            Ok(key) => key,
412            Err(_) => {
413                return SourceResult::error(
414                    "dsim",
415                    "DUNE_SIM_API_KEY not configured",
416                    measure.elapsed_ms(),
417                )
418            }
419        },
420    };
421
422    let client = match dnsim::Client::new(&api_key) {
423        Ok(c) => c,
424        Err(e) => {
425            return SourceResult::error(
426                "dsim",
427                format!("Client error: {}", e),
428                measure.elapsed_ms(),
429            )
430        }
431    };
432
433    // Build chain IDs filter
434    let chain_ids: Vec<&str> = chains.iter().filter_map(|c| chain_to_id(c)).collect();
435
436    let options = if chain_ids.is_empty() {
437        dnsim::balances::BalancesOptions::new()
438    } else {
439        let mut opts = dnsim::balances::BalancesOptions::new();
440        opts.chain_ids = Some(chain_ids.join(","));
441        opts
442    };
443
444    match client.balances().get_with_options(address, &options).await {
445        Ok(response) => {
446            let balances: Vec<PortfolioBalance> = response
447                .balances
448                .iter()
449                .map(|b| {
450                    let mut balance = PortfolioBalance::new(
451                        &b.address, &b.symbol, &b.chain, &b.amount, b.decimals,
452                    )
453                    .with_name(b.name.clone())
454                    .with_usd_value(b.value_usd)
455                    .with_price_usd(b.price_usd);
456
457                    if let Some(ref meta) = b.token_metadata {
458                        balance = balance.with_logo(meta.logo.clone());
459                    }
460
461                    balance
462                })
463                .collect();
464            SourceResult::success("dsim", balances, measure.elapsed_ms())
465        }
466        Err(e) => SourceResult::error("dsim", format!("API error: {}", e), measure.elapsed_ms()),
467    }
468}
469
470/// Fetch Uniswap LP positions (V2, V3, and V4)
471async fn fetch_uniswap_portfolio(
472    address: &str,
473    chains: &[String],
474    measure: LatencyMeasure,
475) -> SourceResult<Vec<PortfolioBalance>> {
476    // Get API key from config
477    let config = get_cached_config();
478    let api_key = match config
479        .as_ref()
480        .and_then(|c| c.thegraph.as_ref())
481        .map(|g| g.api_key.expose_secret().to_string())
482    {
483        Some(key) => key,
484        None => match std::env::var("THEGRAPH_API_KEY") {
485            Ok(key) => key,
486            Err(_) => {
487                return SourceResult::error(
488                    "uniswap",
489                    "THEGRAPH_API_KEY not configured",
490                    measure.elapsed_ms(),
491                )
492            }
493        },
494    };
495
496    // Query all chains in parallel for better performance
497    let chain_futures: Vec<_> = chains
498        .iter()
499        .map(|chain| {
500            let api_key = api_key.clone();
501            let address = address.to_string();
502            let chain = chain.clone();
503            async move {
504                let mut balances = Vec::new();
505                let chain_lower = chain.to_lowercase();
506
507                // === V2 Positions (Ethereum mainnet only) ===
508                if matches!(
509                    chain_lower.as_str(),
510                    "ethereum" | "mainnet" | "eth" | "eth-mainnet"
511                ) {
512                    if let Ok(client) =
513                        unswp::SubgraphClient::new(unswp::SubgraphConfig::mainnet_v2(&api_key))
514                    {
515                        if let Ok(positions) = client.get_positions_v2(&address).await {
516                            for pos in positions {
517                                let lp_balance: f64 =
518                                    pos.liquidity_token_balance.parse().unwrap_or(0.0);
519                                if lp_balance <= 0.0 {
520                                    continue;
521                                }
522
523                                // Calculate share of pool
524                                let total_supply: f64 =
525                                    pos.pair.total_supply.parse().unwrap_or(1.0);
526                                let share = if total_supply > 0.0 {
527                                    lp_balance / total_supply
528                                } else {
529                                    0.0
530                                };
531
532                                // Estimate USD value from reserves
533                                let usd_value = pos
534                                    .pair
535                                    .reserve_usd
536                                    .as_ref()
537                                    .and_then(|r| r.parse::<f64>().ok())
538                                    .map(|reserve_usd| reserve_usd * share);
539
540                                let symbol = format!(
541                                    "UNI-V2 {}/{}",
542                                    pos.pair.token0.symbol, pos.pair.token1.symbol
543                                );
544
545                                let balance = PortfolioBalance::new(
546                                    &pos.pair.id,
547                                    &symbol,
548                                    &chain,
549                                    &pos.liquidity_token_balance,
550                                    18,
551                                )
552                                .with_name(Some(format!(
553                                    "Uniswap V2 LP: {}/{}",
554                                    pos.pair.token0.symbol, pos.pair.token1.symbol
555                                )))
556                                .with_usd_value(usd_value);
557
558                                balances.push(balance);
559                            }
560                        }
561                    }
562                }
563
564                // === V3 Positions ===
565                let v3_config = match chain_lower.as_str() {
566                    "ethereum" | "mainnet" | "eth" | "eth-mainnet" => {
567                        Some(unswp::SubgraphConfig::mainnet_v3(&api_key))
568                    }
569                    "arbitrum" | "arb" | "arb-mainnet" | "arbitrum-mainnet" => {
570                        Some(unswp::SubgraphConfig::arbitrum_v3(&api_key))
571                    }
572                    "optimism" | "op" | "op-mainnet" | "optimism-mainnet" => {
573                        Some(unswp::SubgraphConfig::optimism_v3(&api_key))
574                    }
575                    "polygon" | "matic" | "polygon-mainnet" => Some(
576                        unswp::SubgraphConfig::mainnet_v3(&api_key)
577                            .with_subgraph_id(unswp::subgraph_ids::POLYGON_V3),
578                    ),
579                    "base" | "base-mainnet" => Some(unswp::SubgraphConfig::base_v3(&api_key)),
580                    _ => None,
581                };
582
583                if let Some(config) = v3_config {
584                    if let Ok(client) = unswp::SubgraphClient::new(config) {
585                        if let Ok(positions) = client.get_positions(&address).await {
586                            for pos in positions {
587                                let liquidity: u128 = pos.liquidity.parse().unwrap_or(0);
588                                if liquidity == 0 {
589                                    continue;
590                                }
591
592                                let net_token0: f64 = pos.deposited_token0.parse().unwrap_or(0.0)
593                                    - pos.withdrawn_token0.parse().unwrap_or(0.0);
594                                let net_token1: f64 = pos.deposited_token1.parse().unwrap_or(0.0)
595                                    - pos.withdrawn_token1.parse().unwrap_or(0.0);
596
597                                let usd_value = estimate_lp_usd_value(
598                                    &pos.pool.token0.symbol,
599                                    &pos.pool.token1.symbol,
600                                    net_token0,
601                                    net_token1,
602                                );
603
604                                let fee_tier: f64 =
605                                    pos.pool.fee_tier.parse().unwrap_or(0.0) / 10000.0;
606
607                                let symbol = format!(
608                                    "UNI-V3 {}/{} ({}%)",
609                                    pos.pool.token0.symbol, pos.pool.token1.symbol, fee_tier
610                                );
611
612                                let balance = PortfolioBalance::new(
613                                    &pos.id,
614                                    &symbol,
615                                    &chain,
616                                    &pos.liquidity,
617                                    18,
618                                )
619                                .with_name(Some(format!(
620                                    "Uniswap V3 LP: {}/{}",
621                                    pos.pool.token0.symbol, pos.pool.token1.symbol
622                                )))
623                                .with_usd_value(usd_value);
624
625                                balances.push(balance);
626                            }
627                        }
628                    }
629                }
630
631                // === V4 Positions ===
632                let v4_config = match chain_lower.as_str() {
633                    "ethereum" | "mainnet" | "eth" | "eth-mainnet" => {
634                        Some(unswp::SubgraphConfig::mainnet_v4(&api_key))
635                    }
636                    "arbitrum" | "arb" | "arb-mainnet" | "arbitrum-mainnet" => {
637                        Some(unswp::SubgraphConfig::arbitrum_v4(&api_key))
638                    }
639                    "base" | "base-mainnet" => Some(unswp::SubgraphConfig::base_v4(&api_key)),
640                    "polygon" | "matic" | "polygon-mainnet" => Some(
641                        unswp::SubgraphConfig::mainnet_v4(&api_key)
642                            .with_subgraph_id(unswp::subgraph_ids::POLYGON_V4),
643                    ),
644                    _ => None,
645                };
646
647                if let Some(config) = v4_config {
648                    if let Ok(client) = unswp::SubgraphClient::new(config) {
649                        if let Ok(positions) = client.get_positions_v4(&address).await {
650                            for pos in positions {
651                                let liquidity: u128 = pos.liquidity.parse().unwrap_or(0);
652                                if liquidity == 0 {
653                                    continue;
654                                }
655
656                                // V4 has TVL in USD directly on pool
657                                let usd_value =
658                                    pos.pool.total_value_locked_usd.as_ref().and_then(|tvl| {
659                                        // Estimate position value as fraction of pool TVL
660                                        // This is rough - actual calculation would need more data
661                                        tvl.parse::<f64>().ok()
662                                    });
663
664                                let fee: f64 = pos.pool.fee.parse().unwrap_or(0.0) / 10000.0;
665
666                                let symbol = format!(
667                                    "UNI-V4 {}/{} ({}%)",
668                                    pos.pool.token0.symbol, pos.pool.token1.symbol, fee
669                                );
670
671                                let balance = PortfolioBalance::new(
672                                    &pos.id,
673                                    &symbol,
674                                    &chain,
675                                    &pos.liquidity,
676                                    18,
677                                )
678                                .with_name(Some(format!(
679                                    "Uniswap V4 LP: {}/{}",
680                                    pos.pool.token0.symbol, pos.pool.token1.symbol
681                                )))
682                                .with_usd_value(usd_value);
683
684                                balances.push(balance);
685                            }
686                        }
687                    }
688                }
689
690                balances
691            }
692        })
693        .collect();
694
695    // Execute all chain queries in parallel and flatten results
696    let results = join_all(chain_futures).await;
697    let all_balances: Vec<PortfolioBalance> = results.into_iter().flatten().collect();
698
699    // Return empty success instead of error when no LP positions found
700    // An empty position list is a valid state, not an error
701    SourceResult::success("uniswap", all_balances, measure.elapsed_ms())
702}
703
704/// Fetch Yearn vault positions via Kong API + on-chain multicall
705///
706/// Kong API no longer provides user balance data, so we:
707/// 1. Get vault list from Kong (which vaults exist)
708/// 2. Use multicall to batch balanceOf(user) calls on-chain
709/// 3. Filter to non-zero balances and get vault details
710/// 4. Use pricePerShare + underlying token price for accurate USD values
711async fn fetch_yearn_portfolio(
712    address: &str,
713    chains: &[String],
714    measure: LatencyMeasure,
715) -> SourceResult<Vec<PortfolioBalance>> {
716    use crate::config::Chain;
717    use crate::rpc::multicall::{selectors, MulticallBuilder};
718    use crate::rpc::Endpoint;
719    use alloy::primitives::Address;
720
721    // Create ykong client (no API key needed)
722    let client = match ykong::Client::new() {
723        Ok(c) => c,
724        Err(e) => {
725            return SourceResult::error(
726                "yearn",
727                format!("Client error: {}", e),
728                measure.elapsed_ms(),
729            )
730        }
731    };
732
733    // Parse user address
734    let user_address: Address = match address.parse() {
735        Ok(a) => a,
736        Err(_) => {
737            return SourceResult::error("yearn", "Invalid address format", measure.elapsed_ms())
738        }
739    };
740
741    // Query all chains in parallel for better performance
742    let chain_futures: Vec<_> = chains
743        .iter()
744        .filter_map(|chain| {
745            let chain_id = chain_name_to_id(chain)?;
746            Some((chain.clone(), chain_id))
747        })
748        .map(|(chain, chain_id)| {
749            let client = client.clone();
750            async move {
751                let mut balances = Vec::new();
752
753                // Get all vaults for this chain from Kong
754                let vaults = match client.vaults().by_chain(chain_id).await {
755                    Ok(v) => v,
756                    Err(e) => {
757                        eprintln!("Yearn: Failed to get vault list for chain {}: {}", chain, e);
758                        return balances;
759                    }
760                };
761
762                if vaults.is_empty() {
763                    return balances;
764                }
765
766                // Get RPC endpoint for this chain
767                let target_chain = Chain::from_chain_id(chain_id);
768                let config = match get_cached_config() {
769                    Some(c) => c,
770                    None => return balances,
771                };
772
773                let chain_endpoints: Vec<_> = config
774                    .endpoints
775                    .iter()
776                    .filter(|e| e.enabled && e.chain == target_chain)
777                    .cloned()
778                    .collect();
779
780                if chain_endpoints.is_empty() {
781                    eprintln!("Yearn: No RPC endpoint configured for {}", chain);
782                    return balances;
783                }
784
785                let endpoint = match Endpoint::new(chain_endpoints[0].clone(), 30, None) {
786                    Ok(e) => e,
787                    Err(_) => return balances,
788                };
789
790                let provider = endpoint.provider();
791
792                // Build multicall for balanceOf(user) on all vaults
793                // Process in batches to avoid RPC limits (max ~500 calls per batch)
794                const BATCH_SIZE: usize = 200;
795                let mut vault_balances: Vec<(ykong::Vault, u128)> = Vec::new();
796
797                for batch in vaults.chunks(BATCH_SIZE) {
798                    let mut builder = MulticallBuilder::new();
799
800                    for vault in batch {
801                        if let Ok(vault_addr) = vault.address.parse::<Address>() {
802                            builder = builder.add_call_allow_failure(
803                                vault_addr,
804                                selectors::balance_of(user_address),
805                            );
806                        }
807                    }
808
809                    if builder.is_empty() {
810                        continue;
811                    }
812
813                    // Execute multicall
814                    match builder.execute_with_retry(provider, 2).await {
815                        Ok(results) => {
816                            for (i, result) in results.iter().enumerate() {
817                                if let Some(balance) = result.decode_uint256() {
818                                    if !balance.is_zero() {
819                                        // Store vault with non-zero balance
820                                        let balance_u128: u128 =
821                                            balance.try_into().unwrap_or(u128::MAX);
822                                        vault_balances.push((batch[i].clone(), balance_u128));
823                                    }
824                                }
825                            }
826                        }
827                        Err(e) => {
828                            eprintln!("Yearn: Multicall failed for {}: {}", chain, e);
829                        }
830                    }
831                }
832
833                // Collect unique underlying token addresses for price lookup
834                let underlying_tokens: Vec<String> = vault_balances
835                    .iter()
836                    .filter_map(|(vault, _)| {
837                        vault
838                            .token
839                            .clone()
840                            .or_else(|| vault.asset.as_ref().map(|a| a.address.clone()))
841                    })
842                    .collect::<std::collections::HashSet<_>>()
843                    .into_iter()
844                    .collect();
845
846                // Fetch underlying token prices from Kong in parallel
847                let price_futures: Vec<_> = underlying_tokens
848                    .iter()
849                    .map(|token_addr| {
850                        let client = client.clone();
851                        let token_addr = token_addr.clone();
852                        async move {
853                            match client.prices().usd(chain_id, &token_addr).await {
854                                Ok(Some(price)) => Some((token_addr.to_lowercase(), price)),
855                                _ => None,
856                            }
857                        }
858                    })
859                    .collect();
860
861                let price_results = join_all(price_futures).await;
862                let underlying_prices: HashMap<String, f64> =
863                    price_results.into_iter().flatten().collect();
864
865                // Convert to PortfolioBalance for vaults with non-zero balances
866                for (vault, balance_raw) in vault_balances {
867                    let symbol = vault
868                        .symbol
869                        .clone()
870                        .unwrap_or_else(|| format!("yv-{}", &vault.address[..8]));
871                    let name = vault.name.clone();
872                    let decimals: u8 = vault
873                        .decimals
874                        .as_ref()
875                        .and_then(|d| d.parse().ok())
876                        .unwrap_or(18);
877
878                    let balance_str = balance_raw.to_string();
879
880                    // Get underlying token address and its price
881                    let underlying_addr = vault
882                        .token
883                        .as_ref()
884                        .or_else(|| vault.asset.as_ref().map(|a| &a.address));
885                    let underlying_price = underlying_addr
886                        .and_then(|addr| underlying_prices.get(&addr.to_lowercase()))
887                        .copied();
888
889                    // Calculate USD value using pricePerShare method (preferred)
890                    // or fall back to TVL method
891                    let (usd_value, price_usd) = calculate_yearn_position_value_with_price(
892                        &balance_str,
893                        decimals,
894                        vault.price_per_share.as_deref(),
895                        underlying_price,
896                        vault.tvl.as_ref().and_then(|t| t.close),
897                        vault.total_supply.as_deref(),
898                    );
899
900                    let balance = PortfolioBalance::new(
901                        &vault.address,
902                        &symbol,
903                        &chain,
904                        &balance_str,
905                        decimals,
906                    )
907                    .with_name(name)
908                    .with_usd_value(usd_value)
909                    .with_price_usd(price_usd);
910
911                    balances.push(balance);
912                }
913
914                balances
915            }
916        })
917        .collect();
918
919    // Execute all chain queries in parallel and flatten results
920    let results = join_all(chain_futures).await;
921    let all_balances: Vec<PortfolioBalance> = results.into_iter().flatten().collect();
922
923    // Return empty success instead of error when no vault positions found
924    // An empty position list is a valid state, not an error
925    SourceResult::success("yearn", all_balances, measure.elapsed_ms())
926}
927
928/// Calculate USD value and price of a Yearn vault position
929///
930/// Returns (usd_value, price_per_share_usd)
931///
932/// Uses pricePerShare + underlying token price (preferred method) or
933/// falls back to TVL / totalSupply method.
934fn calculate_yearn_position_value_with_price(
935    balance_raw: &str,
936    decimals: u8,
937    price_per_share: Option<&str>,
938    underlying_price_usd: Option<f64>,
939    tvl_usd: Option<f64>,
940    total_supply: Option<&str>,
941) -> (Option<f64>, Option<f64>) {
942    let balance = parse_balance(balance_raw, decimals);
943    if balance <= 0.0 {
944        return (None, None);
945    }
946
947    // Method 1 (preferred): Use pricePerShare + underlying token price
948    // pricePerShare tells us how many underlying tokens per vault share
949    // Formula: usd_value = vault_shares * pricePerShare * underlying_price
950    if let (Some(pps_str), Some(underlying_price)) = (price_per_share, underlying_price_usd) {
951        // Parse pricePerShare (it's in raw format, same decimals as vault)
952        let pps = parse_balance(pps_str, decimals);
953        if pps > 0.0 {
954            // Price per vault share in USD = pricePerShare * underlying_price
955            let share_price_usd = pps * underlying_price;
956            let usd_value = balance * share_price_usd;
957            return (Some(usd_value), Some(share_price_usd));
958        }
959    }
960
961    // Method 2 (fallback): Use TVL and total supply to estimate share value
962    // This is less accurate but works when we don't have underlying price
963    if let (Some(tvl), Some(supply_str)) = (tvl_usd, total_supply) {
964        if let Ok(supply_raw) = supply_str.parse::<u128>() {
965            let total_supply = supply_raw as f64 / 10f64.powi(decimals as i32);
966            if total_supply > 0.0 {
967                let share_price_usd = tvl / total_supply;
968                let usd_value = balance * share_price_usd;
969                return (Some(usd_value), Some(share_price_usd));
970            }
971        }
972    }
973
974    (None, None)
975}
976
977/// Convert chain name to chain ID for Yearn
978fn chain_name_to_id(chain: &str) -> Option<u64> {
979    match chain.to_lowercase().as_str() {
980        "ethereum" | "eth" | "mainnet" | "eth-mainnet" => Some(1),
981        "polygon" | "matic" | "polygon-mainnet" => Some(137),
982        "arbitrum" | "arb" | "arbitrum-mainnet" | "arb-mainnet" => Some(42161),
983        "optimism" | "op" | "optimism-mainnet" | "op-mainnet" => Some(10),
984        "base" | "base-mainnet" => Some(8453),
985        "fantom" | "ftm" => Some(250),
986        "gnosis" | "xdai" => Some(100),
987        _ => None,
988    }
989}
990
991/// Maximum number of unique tokens to track (prevents unbounded memory growth)
992/// Reduced from 10,000 to 2,000 to limit memory usage for large portfolios
993const MAX_UNIQUE_TOKENS: usize = 2_000;
994
995/// Merge portfolio results from multiple sources
996fn merge_portfolio_results(results: &[SourceResult<Vec<PortfolioBalance>>]) -> PortfolioResult {
997    // Key: (lowercase address, chain) -> Vec<(source, balance)>
998    // Pre-allocate with reasonable capacity based on expected token count
999    let estimated_tokens: usize = results
1000        .iter()
1001        .filter_map(|r| r.data.as_ref())
1002        .map(|d| d.len())
1003        .sum();
1004    let initial_capacity = estimated_tokens.min(MAX_UNIQUE_TOKENS);
1005    let mut token_map: HashMap<(String, String), Vec<(&str, &PortfolioBalance)>> =
1006        HashMap::with_capacity(initial_capacity);
1007    let mut chains_set: std::collections::HashSet<String> = std::collections::HashSet::new();
1008
1009    for result in results {
1010        if let Some(balances) = &result.data {
1011            for balance in balances {
1012                // Cap total unique tokens to prevent unbounded memory growth
1013                let key = (balance.address.to_lowercase(), balance.chain.to_lowercase());
1014                if !token_map.contains_key(&key) && token_map.len() >= MAX_UNIQUE_TOKENS {
1015                    // Skip new tokens once limit is reached
1016                    continue;
1017                }
1018                chains_set.insert(balance.chain.clone());
1019                token_map
1020                    .entry(key)
1021                    .or_default()
1022                    .push((&result.source, balance));
1023            }
1024        }
1025    }
1026
1027    let mut tokens: Vec<MergedToken> = token_map
1028        .into_iter()
1029        .map(|((addr, chain), entries)| {
1030            // Take the first entry as base
1031            let first = entries[0].1;
1032            let found_in: Vec<String> = entries.iter().map(|(s, _)| s.to_string()).collect();
1033
1034            // Average USD values across sources that have them
1035            // Filter out NaN/Infinity values to prevent propagation
1036            let usd_values: Vec<f64> = entries
1037                .iter()
1038                .filter_map(|(_, b)| b.usd_value)
1039                .filter(|v| v.is_finite())
1040                .collect();
1041            let avg_usd_value = if usd_values.is_empty() {
1042                None
1043            } else {
1044                let avg = usd_values.iter().sum::<f64>() / usd_values.len() as f64;
1045                if avg.is_finite() {
1046                    Some(avg)
1047                } else {
1048                    None
1049                }
1050            };
1051
1052            // Filter out NaN/Infinity values from prices
1053            let prices: Vec<f64> = entries
1054                .iter()
1055                .filter_map(|(_, b)| b.price_usd)
1056                .filter(|v| v.is_finite())
1057                .collect();
1058            let avg_price = if prices.is_empty() {
1059                None
1060            } else {
1061                let avg = prices.iter().sum::<f64>() / prices.len() as f64;
1062                if avg.is_finite() {
1063                    Some(avg)
1064                } else {
1065                    None
1066                }
1067            };
1068
1069            // Take highest precision balance
1070            let best_balance = entries
1071                .iter()
1072                .max_by(|(_, a), (_, b)| {
1073                    a.balance_formatted
1074                        .partial_cmp(&b.balance_formatted)
1075                        .unwrap_or(std::cmp::Ordering::Equal)
1076                })
1077                .map(|(_, b)| b)
1078                .unwrap_or(&first);
1079
1080            // Pick best name (prefer non-None)
1081            let name = entries
1082                .iter()
1083                .find_map(|(_, b)| b.name.clone())
1084                .or_else(|| first.name.clone());
1085
1086            // Pick best logo
1087            let logo = entries
1088                .iter()
1089                .find_map(|(_, b)| b.logo.clone())
1090                .or_else(|| first.logo.clone());
1091
1092            MergedToken {
1093                address: addr,
1094                symbol: first.symbol.clone(),
1095                name,
1096                chain,
1097                balance: best_balance.balance_formatted,
1098                balance_raw: best_balance.balance_raw.clone(),
1099                decimals: first.decimals,
1100                usd_value: avg_usd_value,
1101                price_usd: avg_price,
1102                logo,
1103                found_in,
1104            }
1105        })
1106        .collect();
1107
1108    // Sort by USD value descending
1109    tokens.sort_by(|a, b| {
1110        b.usd_value
1111            .unwrap_or(0.0)
1112            .partial_cmp(&a.usd_value.unwrap_or(0.0))
1113            .unwrap_or(std::cmp::Ordering::Equal)
1114    });
1115
1116    let total_usd_value: f64 = tokens.iter().filter_map(|t| t.usd_value).sum();
1117    let token_count = tokens.len();
1118    let chains_covered: Vec<String> = chains_set.into_iter().collect();
1119
1120    PortfolioResult {
1121        total_usd_value,
1122        tokens,
1123        chains_covered,
1124        token_count,
1125    }
1126}
1127
1128/// Parse balance string to f64 with decimals.
1129///
1130/// # Precision Note
1131///
1132/// This function converts token balances to f64, which has 53 bits of mantissa precision.
1133/// Balances exceeding 2^53 (~9 quadrillion) will lose precision. For most tokens with
1134/// 18 decimals, this corresponds to ~9 million tokens - more than sufficient for typical
1135/// portfolio display. For tokens with fewer decimals or extremely large supplies,
1136/// the raw balance string (`balance_raw`) should be used for precise calculations.
1137fn parse_balance(balance: &str, decimals: u8) -> f64 {
1138    // Handle hex strings
1139    let balance = if let Some(stripped) = balance.strip_prefix("0x") {
1140        u128::from_str_radix(stripped, 16)
1141            .map(|v| v.to_string())
1142            .unwrap_or_else(|_| balance.to_string())
1143    } else {
1144        balance.to_string()
1145    };
1146
1147    // Parse as u128 and divide by 10^decimals
1148    if let Ok(raw) = balance.parse::<u128>() {
1149        let divisor = 10u128.pow(decimals as u32);
1150        raw as f64 / divisor as f64
1151    } else {
1152        balance.parse::<f64>().unwrap_or(0.0)
1153    }
1154}
1155
1156/// Estimate USD value for LP positions based on token composition
1157/// This is a simplified estimation - for stablecoin pairs, uses 1:1 USD
1158/// For other pairs, returns None (would need price oracle for accuracy)
1159fn estimate_lp_usd_value(
1160    token0_symbol: &str,
1161    token1_symbol: &str,
1162    net_token0: f64,
1163    net_token1: f64,
1164) -> Option<f64> {
1165    let stables = [
1166        "USDC", "USDT", "DAI", "FRAX", "LUSD", "TUSD", "GUSD", "USDP",
1167    ];
1168
1169    let t0_upper = token0_symbol.to_uppercase();
1170    let t1_upper = token1_symbol.to_uppercase();
1171
1172    let t0_is_stable = stables.iter().any(|s| t0_upper.contains(s));
1173    let t1_is_stable = stables.iter().any(|s| t1_upper.contains(s));
1174
1175    if t0_is_stable && t1_is_stable {
1176        // Both stablecoins - sum directly
1177        Some(net_token0 + net_token1)
1178    } else if t0_is_stable {
1179        // Only token0 is stable - report just the stable portion (conservative)
1180        Some(net_token0 * 2.0) // Approximate: double the stable amount
1181    } else if t1_is_stable {
1182        // Only token1 is stable - report just the stable portion (conservative)
1183        Some(net_token1 * 2.0) // Approximate: double the stable amount
1184    } else {
1185        // Neither is stable - we'd need price data
1186        None
1187    }
1188}
1189
1190/// Map chain name to chain ID for dsim
1191fn chain_to_id(chain: &str) -> Option<&'static str> {
1192    match chain.to_lowercase().as_str() {
1193        "ethereum" | "eth" | "mainnet" | "eth-mainnet" => Some("1"),
1194        "polygon" | "matic" | "polygon-mainnet" => Some("137"),
1195        "arbitrum" | "arb" | "arbitrum-mainnet" | "arb-mainnet" => Some("42161"),
1196        "optimism" | "op" | "optimism-mainnet" | "op-mainnet" => Some("10"),
1197        "base" | "base-mainnet" => Some("8453"),
1198        "avalanche" | "avax" => Some("43114"),
1199        "bsc" | "bnb" => Some("56"),
1200        _ => None,
1201    }
1202}