aptos_network_sdk/dex/
mod.rs

1pub mod animeswap;
2pub mod auxswap;
3pub mod cellana;
4pub mod liquidswap;
5pub mod pancakeswap;
6pub mod thala;
7use crate::{
8    Aptos,
9    dex::{
10        animeswap::{AnimeSwap, AnimeSwapEventFilters},
11        auxswap::AuxExchange,
12        cellana::{Cellana, CellanaEventConfig},
13        liquidswap::Liquidswap,
14        pancakeswap::{PancakeSwap, PancakeSwapEventFilters},
15        thala::Thala,
16    },
17    event::EventData,
18    global::mainnet::{
19        protocol_address::{
20            ANIMESWAP_PROTOCOL_ADDRESS, AUXSWAP_PROTOCOL_ADDRESS, CELLANASWAP_PROTOCOL_ADDRESS,
21            LIQUIDSWAP_PROTOCOL_ADDRESS, PANCAKESWAP_FACTORY_PROTOCOL_ADDRESS,
22            THALA_PROTOCOL_ADDRESS,
23        },
24        token_address::{APT, THL, USDC, USDT, WORMHOLE_USDC},
25    },
26    wallet::Wallet,
27};
28use serde_json::Value;
29use std::{collections::HashMap, sync::Arc};
30use tokio::sync::broadcast;
31
32/// token ptice
33#[derive(Debug, Clone)]
34pub struct TokenPrice {
35    pub dex: String,
36    pub token_address: String,
37    pub base_token: String,
38    pub price: f64,
39    pub liquidity: u64,
40    pub timestamp: u64,
41}
42
43/// liquidity pool info
44#[derive(Debug, Clone)]
45pub struct LiquidityPool {
46    pub dex: String,
47    pub token_a: String,
48    pub token_b: String,
49    pub liquidity: u64,
50    pub reserve_a: u64,
51    pub reserve_b: u64,
52    pub fee_rate: f64,
53}
54
55/// token metadata
56#[derive(Debug, Clone)]
57pub struct TokenMetadata {
58    pub address: String,
59    pub name: String,
60    pub symbol: String,
61    pub decimals: u8,
62    pub supply: u64,
63}
64
65/// dex price
66#[derive(Debug, Clone)]
67pub struct DexPrice {
68    pub dex: String,
69    pub price: f64,
70    pub amount_out: u64,
71}
72
73/// token price comparison
74#[derive(Debug, Clone)]
75pub struct TokenPriceComparison {
76    pub token_a: String,
77    pub token_b: String,
78    pub prices: Vec<DexPrice>,
79}
80
81pub struct DexAggregator;
82
83impl DexAggregator {
84    /// Find the best price across all DEXs
85    pub async fn find_best_swap(
86        client: Arc<Aptos>,
87        from_token: &str,
88        to_token: &str,
89        amount_in: u64,
90    ) -> Result<DexSwapQuote, String> {
91        let mut quotes = Vec::new();
92        if let Ok(quote) =
93            Self::get_liquidswap_quote(Arc::clone(&client), from_token, to_token, amount_in).await
94        {
95            quotes.push(quote);
96        }
97        if let Ok(quote) =
98            Self::get_animeswap_quote(Arc::clone(&client), from_token, to_token, amount_in).await
99        {
100            quotes.push(quote);
101        }
102        if let Ok(quote) =
103            Self::get_thala_quote(Arc::clone(&client), from_token, to_token, amount_in).await
104        {
105            quotes.push(quote);
106        }
107        if let Ok(quote) =
108            Self::get_pancakeswap_quote(Arc::clone(&client), from_token, to_token, amount_in).await
109        {
110            quotes.push(quote);
111        }
112        if let Ok(quote) =
113            Self::get_cellana_quote(Arc::clone(&client), from_token, to_token, amount_in).await
114        {
115            quotes.push(quote);
116        }
117        if let Ok(quote) =
118            Self::get_aux_quote(Arc::clone(&client), from_token, to_token, amount_in).await
119        {
120            quotes.push(quote);
121        }
122        if quotes.is_empty() {
123            return Err("No suitable DEX found for this trade".to_string());
124        }
125        // Sort by output amount and select the best quote
126        quotes.sort_by(|a, b| b.amount_out.cmp(&a.amount_out));
127        Ok(quotes.first().unwrap().clone())
128    }
129
130    /// Perform optimal exchange
131    pub async fn exe_best_swap(
132        client: Arc<Aptos>,
133        wallet: Arc<Wallet>,
134        from_token: &str,
135        to_token: &str,
136        amount_in: u64,
137        slippage: f64,
138    ) -> Result<Value, String> {
139        let quote =
140            Self::find_best_swap(Arc::clone(&client), from_token, to_token, amount_in).await?;
141        let min_amount_out = (quote.amount_out as f64 * (1.0 - slippage)) as u64;
142        match quote.dex.as_str() {
143            "Liquidswap" => {
144                Liquidswap::swap_exact_input(
145                    client,
146                    wallet,
147                    from_token,
148                    to_token,
149                    amount_in,
150                    min_amount_out,
151                )
152                .await
153            }
154            "AnimeSwap" => {
155                AnimeSwap::swap_exact_tokens_for_tokens(
156                    client,
157                    wallet,
158                    vec![from_token, to_token],
159                    amount_in,
160                    min_amount_out,
161                )
162                .await
163            }
164            "Thala" => {
165                Thala::swap_exact_input(
166                    client,
167                    wallet,
168                    from_token,
169                    to_token,
170                    amount_in,
171                    min_amount_out,
172                )
173                .await
174            }
175            "PancakeSwap" => {
176                let wallet_address = wallet.address().map_err(|e| e.to_string())?;
177                PancakeSwap::swap_exact_tokens_for_tokens(
178                    client,
179                    wallet,
180                    amount_in,
181                    min_amount_out,
182                    vec![from_token, to_token],
183                    &wallet_address,
184                    Self::get_deadline(300),
185                )
186                .await
187            }
188            "Cellana" => {
189                Cellana::swap(
190                    client,
191                    wallet,
192                    from_token,
193                    to_token,
194                    amount_in,
195                    min_amount_out,
196                )
197                .await
198            }
199            "AuxExchange" => {
200                AuxExchange::swap_exact_input(
201                    client,
202                    wallet,
203                    from_token,
204                    to_token,
205                    amount_in,
206                    min_amount_out,
207                )
208                .await
209            }
210            _ => Err(format!("Unsupported DEX: {}", quote.dex)),
211        }
212    }
213
214    /// Compare prices across multiple DEXs in batches
215    pub async fn compare_all_dex_prices(
216        client: Arc<Aptos>,
217        from_token: &str,
218        to_token: &str,
219        amount_in: u64,
220    ) -> Result<Vec<DexSwapQuote>, String> {
221        let mut quotes = Vec::new();
222        if let Ok(quote) =
223            Self::get_liquidswap_quote(Arc::clone(&client), from_token, to_token, amount_in).await
224        {
225            quotes.push(quote);
226        }
227        if let Ok(quote) =
228            Self::get_animeswap_quote(Arc::clone(&client), from_token, to_token, amount_in).await
229        {
230            quotes.push(quote);
231        }
232        if let Ok(quote) =
233            Self::get_thala_quote(Arc::clone(&client), from_token, to_token, amount_in).await
234        {
235            quotes.push(quote);
236        }
237        if let Ok(quote) =
238            Self::get_pancakeswap_quote(Arc::clone(&client), from_token, to_token, amount_in).await
239        {
240            quotes.push(quote);
241        }
242        if let Ok(quote) =
243            Self::get_cellana_quote(Arc::clone(&client), from_token, to_token, amount_in).await
244        {
245            quotes.push(quote);
246        }
247        if let Ok(quote) =
248            Self::get_aux_quote(Arc::clone(&client), from_token, to_token, amount_in).await
249        {
250            quotes.push(quote);
251        }
252        quotes.sort_by(|a, b| b.amount_out.cmp(&a.amount_out));
253        Ok(quotes)
254    }
255
256    // How to obtain quotes from various DEXs
257    async fn get_liquidswap_quote(
258        client: Arc<Aptos>,
259        from_token: &str,
260        to_token: &str,
261        amount_in: u64,
262    ) -> Result<DexSwapQuote, String> {
263        match Liquidswap::get_price(Arc::clone(&client), from_token, to_token, amount_in).await {
264            Ok(price) => {
265                let amount_out = (price * amount_in as f64) as u64;
266                Ok(DexSwapQuote {
267                    dex: "Liquidswap".to_string(),
268                    amount_out,
269                    price,
270                    dex_address: LIQUIDSWAP_PROTOCOL_ADDRESS.to_string(),
271                })
272            }
273            Err(e) => Err(e),
274        }
275    }
276
277    async fn get_animeswap_quote(
278        client: Arc<Aptos>,
279        from_token: &str,
280        to_token: &str,
281        amount_in: u64,
282    ) -> Result<DexSwapQuote, String> {
283        match AnimeSwap::get_reserves(Arc::clone(&client), from_token, to_token).await {
284            Ok((reserve_in, reserve_out)) => {
285                let amount_out = Self::calculate_amm_output(amount_in, reserve_in, reserve_out);
286                let price = if amount_in > 0 {
287                    amount_out as f64 / amount_in as f64
288                } else {
289                    0.0
290                };
291                Ok(DexSwapQuote {
292                    dex: "AnimeSwap".to_string(),
293                    amount_out,
294                    price,
295                    dex_address: ANIMESWAP_PROTOCOL_ADDRESS.to_string(),
296                })
297            }
298            Err(_) => Err("Failed to get AnimeSwap reserves".to_string()),
299        }
300    }
301
302    async fn get_thala_quote(
303        client: Arc<Aptos>,
304        from_token: &str,
305        to_token: &str,
306        amount_in: u64,
307    ) -> Result<DexSwapQuote, String> {
308        match Thala::get_price(Arc::clone(&client), from_token, to_token, amount_in).await {
309            Ok(price) => {
310                let amount_out = (price * amount_in as f64) as u64;
311                Ok(DexSwapQuote {
312                    dex: "Thala".to_string(),
313                    amount_out,
314                    price,
315                    dex_address: THALA_PROTOCOL_ADDRESS.to_string(),
316                })
317            }
318            Err(e) => Err(e),
319        }
320    }
321
322    async fn get_pancakeswap_quote(
323        client: Arc<Aptos>,
324        from_token: &str,
325        to_token: &str,
326        amount_in: u64,
327    ) -> Result<DexSwapQuote, String> {
328        match PancakeSwap::get_reserves(Arc::clone(&client), from_token, to_token).await {
329            Ok((reserve_in, reserve_out)) => {
330                let amount_out = Self::calculate_amm_output(amount_in, reserve_in, reserve_out);
331                let price = if amount_in > 0 {
332                    amount_out as f64 / amount_in as f64
333                } else {
334                    0.0
335                };
336                Ok(DexSwapQuote {
337                    dex: "PancakeSwap".to_string(),
338                    amount_out,
339                    price,
340                    dex_address: PANCAKESWAP_FACTORY_PROTOCOL_ADDRESS.to_string(),
341                })
342            }
343            Err(_) => Err("Failed to get PancakeSwap reserves".to_string()),
344        }
345    }
346
347    async fn get_cellana_quote(
348        client: Arc<Aptos>,
349        from_token: &str,
350        to_token: &str,
351        amount_in: u64,
352    ) -> Result<DexSwapQuote, String> {
353        match Cellana::get_price(Arc::clone(&client), from_token, to_token, amount_in).await {
354            Ok(price) => {
355                let amount_out = (price * amount_in as f64) as u64;
356                Ok(DexSwapQuote {
357                    dex: "Cellana".to_string(),
358                    amount_out,
359                    price,
360                    dex_address: CELLANASWAP_PROTOCOL_ADDRESS.to_string(),
361                })
362            }
363            Err(e) => Err(e),
364        }
365    }
366
367    async fn get_aux_quote(
368        client: Arc<Aptos>,
369        from_token: &str,
370        to_token: &str,
371        amount_in: u64,
372    ) -> Result<DexSwapQuote, String> {
373        match AuxExchange::get_price(Arc::clone(&client), from_token, to_token, amount_in).await {
374            Ok(amount_out) => {
375                let price = if amount_in > 0 {
376                    amount_out as f64 / amount_in as f64
377                } else {
378                    0.0
379                };
380                Ok(DexSwapQuote {
381                    dex: "AuxExchange".to_string(),
382                    amount_out,
383                    price,
384                    dex_address: AUXSWAP_PROTOCOL_ADDRESS.to_string(),
385                })
386            }
387            Err(e) => {
388                match AuxExchange::get_pool_info(Arc::clone(&client), from_token, to_token).await {
389                    Ok(pool_info) => {
390                        if let (Some(reserve_in), Some(reserve_out)) = (
391                            pool_info
392                                .get("coin_a_reserve")
393                                .and_then(|v| v.as_str())
394                                .and_then(|s| s.parse::<u64>().ok()),
395                            pool_info
396                                .get("coin_b_reserve")
397                                .and_then(|v| v.as_str())
398                                .and_then(|s| s.parse::<u64>().ok()),
399                        ) {
400                            let amount_out =
401                                Self::calculate_amm_output(amount_in, reserve_in, reserve_out);
402                            let price = if amount_in > 0 {
403                                amount_out as f64 / amount_in as f64
404                            } else {
405                                0.0
406                            };
407                            Ok(DexSwapQuote {
408                                dex: "AuxExchange".to_string(),
409                                amount_out,
410                                price,
411                                dex_address: AUXSWAP_PROTOCOL_ADDRESS.to_string(),
412                            })
413                        } else {
414                            Err(format!("Failed to parse pool reserves: {}", e))
415                        }
416                    }
417                    Err(pool_err) => Err(format!(
418                        "Failed to get AuxExchange quote: {} (pool error: {})",
419                        e, pool_err
420                    )),
421                }
422            }
423        }
424    }
425
426    /// Calculate AMM output amount
427    fn calculate_amm_output(amount_in: u64, reserve_in: u64, reserve_out: u64) -> u64 {
428        if reserve_in == 0 || reserve_out == 0 {
429            return 0;
430        }
431        let amount_in_with_fee = amount_in * 997;
432        let numerator = amount_in_with_fee * reserve_out;
433        let denominator = reserve_in * 1000 + amount_in_with_fee;
434        if denominator == 0 {
435            return 0;
436        }
437        numerator / denominator
438    }
439
440    /// Get transaction deadline timestamp
441    fn get_deadline(seconds_from_now: u64) -> u64 {
442        std::time::SystemTime::now()
443            .duration_since(std::time::UNIX_EPOCH)
444            .unwrap()
445            .as_secs()
446            + seconds_from_now
447    }
448
449    /// Get a list of all supported DEXs
450    pub fn get_supported_dexes() -> Vec<DexInfo> {
451        vec![
452            DexInfo {
453                name: "Liquidswap".to_string(),
454                address: LIQUIDSWAP_PROTOCOL_ADDRESS.to_string(),
455                description: "Pontem Network - Largest DEX on Aptos".to_string(),
456                supports_liquidity: true,
457                supports_swap: true,
458                is_amm: true,
459            },
460            DexInfo {
461                name: "AuxExchange".to_string(),
462                address: AUXSWAP_PROTOCOL_ADDRESS.to_string(),
463                description: "Orderbook-based DEX with AMM".to_string(),
464                supports_liquidity: true,
465                supports_swap: true,
466                is_amm: false,
467            },
468            DexInfo {
469                name: "AnimeSwap".to_string(),
470                address: ANIMESWAP_PROTOCOL_ADDRESS.to_string(),
471                description: "Multi-chain DEX with anime theme".to_string(),
472                supports_liquidity: true,
473                supports_swap: true,
474                is_amm: true,
475            },
476            DexInfo {
477                name: "Thala".to_string(),
478                address: THALA_PROTOCOL_ADDRESS.to_string(),
479                description: "DeFi protocol with THL token and staking".to_string(),
480                supports_liquidity: true,
481                supports_swap: true,
482                is_amm: true,
483            },
484            DexInfo {
485                name: "PancakeSwap".to_string(),
486                address: PANCAKESWAP_FACTORY_PROTOCOL_ADDRESS.to_string(),
487                description: "Multi-chain DEX with CAKE token".to_string(),
488                supports_liquidity: true,
489                supports_swap: true,
490                is_amm: true,
491            },
492            DexInfo {
493                name: "Cellana".to_string(),
494                address: CELLANASWAP_PROTOCOL_ADDRESS.to_string(),
495                description: "DEX with CELL token and farming".to_string(),
496                supports_liquidity: true,
497                supports_swap: true,
498                is_amm: true,
499            },
500        ]
501    }
502
503    /// Get the price of a specified token in all DEXs (relative to APT)
504    pub async fn get_token_price(
505        client: Arc<Aptos>,
506        token_address: &str,
507    ) -> Result<Vec<TokenPrice>, String> {
508        let apt_coin = "0x1::aptos_coin::AptosCoin";
509        let mut prices = Vec::new();
510        let dex_checks = vec![
511            (
512                "Liquidswap",
513                Self::get_token_price_on_dex(
514                    Arc::clone(&client),
515                    "Liquidswap",
516                    token_address,
517                    apt_coin,
518                ),
519            ),
520            (
521                "Thala",
522                Self::get_token_price_on_dex(Arc::clone(&client), "Thala", token_address, apt_coin),
523            ),
524            (
525                "PancakeSwap",
526                Self::get_token_price_on_dex(
527                    Arc::clone(&client),
528                    "PancakeSwap",
529                    token_address,
530                    apt_coin,
531                ),
532            ),
533            (
534                "AnimeSwap",
535                Self::get_token_price_on_dex(
536                    Arc::clone(&client),
537                    "AnimeSwap",
538                    token_address,
539                    apt_coin,
540                ),
541            ),
542            (
543                "Cellana",
544                Self::get_token_price_on_dex(
545                    Arc::clone(&client),
546                    "Cellana",
547                    token_address,
548                    apt_coin,
549                ),
550            ),
551        ];
552        for (dex_name, check_future) in dex_checks {
553            if let Ok(price) = check_future.await {
554                prices.push(price);
555            }
556        }
557        prices.sort_by(|a, b| {
558            b.price
559                .partial_cmp(&a.price)
560                .unwrap_or(std::cmp::Ordering::Equal)
561        });
562        Ok(prices)
563    }
564
565    /// Get token prices on a specific DEX
566    async fn get_token_price_on_dex(
567        client: Arc<Aptos>,
568        dex_name: &str,
569        token_address: &str,
570        base_token: &str,
571    ) -> Result<TokenPrice, String> {
572        let amount_in = 1_000_000;
573        let quote = match dex_name {
574            "Liquidswap" => {
575                Self::get_liquidswap_quote(client.clone(), token_address, base_token, amount_in)
576                    .await
577            }
578            "Thala" => {
579                Self::get_thala_quote(client.clone(), token_address, base_token, amount_in).await
580            }
581            "PancakeSwap" => {
582                Self::get_pancakeswap_quote(client.clone(), token_address, base_token, amount_in)
583                    .await
584            }
585            "AnimeSwap" => {
586                Self::get_animeswap_quote(client.clone(), token_address, base_token, amount_in)
587                    .await
588            }
589            "Cellana" => {
590                Self::get_cellana_quote(client.clone(), token_address, base_token, amount_in).await
591            }
592            _ => Err("Unsupported DEX".to_string()),
593        }?;
594        Ok(TokenPrice {
595            dex: dex_name.to_string(),
596            token_address: token_address.to_string(),
597            base_token: base_token.to_string(),
598            price: quote.price,
599            liquidity: Self::get_pool_liquidity(
600                client.clone(),
601                dex_name,
602                token_address,
603                base_token,
604            )
605            .await
606            .unwrap_or(0),
607            timestamp: std::time::SystemTime::now()
608                .duration_since(std::time::UNIX_EPOCH)
609                .unwrap()
610                .as_secs(),
611        })
612    }
613
614    /// Get the total liquidity of the liquidity pool
615    async fn get_pool_liquidity(
616        client: Arc<Aptos>,
617        dex_name: &str,
618        token_a: &str,
619        token_b: &str,
620    ) -> Result<u64, String> {
621        match dex_name {
622            "Liquidswap" => {
623                let pool_info = Liquidswap::get_pool_info(client, token_a, token_b).await?;
624                let reserve_a = pool_info
625                    .get("coin_x_reserve")
626                    .and_then(|v| v.as_str())
627                    .and_then(|s| s.parse().ok())
628                    .unwrap_or(0);
629                let reserve_b = pool_info
630                    .get("coin_y_reserve")
631                    .and_then(|v| v.as_str())
632                    .and_then(|s| s.parse().ok())
633                    .unwrap_or(0);
634                Ok(reserve_a + reserve_b)
635            }
636            "Thala" => {
637                let pool_info = Thala::get_pool_info(client, token_a, token_b).await?;
638                let reserve_a = pool_info
639                    .get("reserve_x")
640                    .and_then(|v| v.as_str())
641                    .and_then(|s| s.parse().ok())
642                    .unwrap_or(0);
643                let reserve_b = pool_info
644                    .get("reserve_y")
645                    .and_then(|v| v.as_str())
646                    .and_then(|s| s.parse().ok())
647                    .unwrap_or(0);
648                Ok(reserve_a + reserve_b)
649            }
650            "AnimeSwap" => {
651                let (reserve_a, reserve_b) =
652                    AnimeSwap::get_reserves(client, token_a, token_b).await?;
653                Ok(reserve_a + reserve_b)
654            }
655            "PancakeSwap" => {
656                let (reserve_a, reserve_b) =
657                    PancakeSwap::get_reserves(client, token_a, token_b).await?;
658                Ok(reserve_a + reserve_b)
659            }
660            "Cellana" => {
661                let pool_info = Cellana::get_pool_info(client, token_a, token_b).await?;
662                let reserve_a = pool_info
663                    .get("reserve_x")
664                    .and_then(|v| v.as_str())
665                    .and_then(|s| s.parse().ok())
666                    .unwrap_or(0);
667                let reserve_b = pool_info
668                    .get("reserve_y")
669                    .and_then(|v| v.as_str())
670                    .and_then(|s| s.parse().ok())
671                    .unwrap_or(0);
672                Ok(reserve_a + reserve_b)
673            }
674            _ => Ok(0),
675        }
676    }
677
678    /// Find the liquidity pools of a token across all DEXs
679    pub async fn find_token_liquidity_pools(
680        client: Arc<Aptos>,
681        token_address: &str,
682    ) -> Result<Vec<LiquidityPool>, String> {
683        let common_tokens = vec![APT, USDC, USDT, WORMHOLE_USDC];
684        let mut pools = Vec::new();
685        for base_token in common_tokens {
686            let dex_checks = vec![
687                (
688                    "Liquidswap",
689                    Self::check_pool_exists(
690                        Arc::clone(&client),
691                        "Liquidswap",
692                        token_address,
693                        base_token,
694                    ),
695                ),
696                (
697                    "Thala",
698                    Self::check_pool_exists(
699                        Arc::clone(&client),
700                        "Thala",
701                        token_address,
702                        base_token,
703                    ),
704                ),
705                (
706                    "PancakeSwap",
707                    Self::check_pool_exists(
708                        Arc::clone(&client),
709                        "PancakeSwap",
710                        token_address,
711                        base_token,
712                    ),
713                ),
714                (
715                    "AnimeSwap",
716                    Self::check_pool_exists(
717                        Arc::clone(&client),
718                        "AnimeSwap",
719                        token_address,
720                        base_token,
721                    ),
722                ),
723                (
724                    "Cellana",
725                    Self::check_pool_exists(
726                        Arc::clone(&client),
727                        "Cellana",
728                        token_address,
729                        base_token,
730                    ),
731                ),
732            ];
733            for (dex_name, check_future) in dex_checks {
734                if let Ok(Some(pool)) = check_future.await {
735                    pools.push(pool);
736                }
737            }
738        }
739        pools.sort_by(|a, b| b.liquidity.cmp(&a.liquidity));
740        Ok(pools)
741    }
742
743    /// Check if a liquidity pool exists on a specific DEX
744    async fn check_pool_exists(
745        client: Arc<Aptos>,
746        dex_name: &str,
747        token_a: &str,
748        token_b: &str,
749    ) -> Result<Option<LiquidityPool>, String> {
750        let liquidity =
751            Self::get_pool_liquidity(Arc::clone(&client), dex_name, token_a, token_b).await;
752        if let Ok(liquidity) = liquidity {
753            if liquidity > 0 {
754                let pool = LiquidityPool {
755                    dex: dex_name.to_string(),
756                    token_a: token_a.to_string(),
757                    token_b: token_b.to_string(),
758                    liquidity,
759                    reserve_a: 0,
760                    reserve_b: 0,
761                    fee_rate: 0.003,
762                };
763                return Ok(Some(pool));
764            }
765        }
766        Ok(None)
767    }
768
769    /// Get the metadata information of the token
770    pub async fn get_token_metadata(
771        client: Arc<Aptos>,
772        token_address: &str,
773    ) -> Result<TokenMetadata, String> {
774        let coin_info_type = format!("0x1::coin::CoinInfo<{}>", token_address);
775        if let Ok(Some(resource)) = client.get_account_resource("0x1", &coin_info_type).await {
776            if let Value::Object(data) = &resource.data {
777                return Ok(TokenMetadata {
778                    address: token_address.to_string(),
779                    name: data
780                        .get("name")
781                        .and_then(|v| v.as_str())
782                        .unwrap_or("")
783                        .to_string(),
784                    symbol: data
785                        .get("symbol")
786                        .and_then(|v| v.as_str())
787                        .unwrap_or("")
788                        .to_string(),
789                    decimals: data.get("decimals").and_then(|v| v.as_u64()).unwrap_or(0) as u8,
790                    supply: data
791                        .get("supply")
792                        .and_then(|v| v.get("vec"))
793                        .and_then(|v| v.as_array())
794                        .and_then(|v| v.get(0))
795                        .and_then(|v| v.get("value"))
796                        .and_then(|v| v.as_str())
797                        .and_then(|s| s.parse().ok())
798                        .unwrap_or(0),
799                });
800            }
801        }
802        Ok(TokenMetadata {
803            address: token_address.to_string(),
804            name: "Unknown".to_string(),
805            symbol: "UNKNOWN".to_string(),
806            decimals: 8,
807            supply: 0,
808        })
809    }
810
811    pub async fn get_top_prices_comparison(
812        client: Arc<Aptos>,
813    ) -> Result<Vec<TokenPriceComparison>, String> {
814        let popular_pairs = vec![
815            (USDC, APT), // USDC/APT
816            (USDT, APT), // USDT/APT
817            (THL, APT),  // THL/APT
818        ];
819        let mut comparisons = Vec::new();
820        for (token_a, token_b) in popular_pairs {
821            let mut prices = Vec::new();
822            let amount_in = 1_000_000;
823            if let Ok(quote) =
824                Self::get_liquidswap_quote(Arc::clone(&client), token_a, token_b, amount_in).await
825            {
826                prices.push(DexPrice {
827                    dex: "Liquidswap".to_string(),
828                    price: quote.price,
829                    amount_out: quote.amount_out,
830                });
831            }
832            if let Ok(quote) =
833                Self::get_thala_quote(Arc::clone(&client), token_a, token_b, amount_in).await
834            {
835                prices.push(DexPrice {
836                    dex: "Thala".to_string(),
837                    price: quote.price,
838                    amount_out: quote.amount_out,
839                });
840            }
841            if let Ok(quote) =
842                Self::get_pancakeswap_quote(Arc::clone(&client), token_a, token_b, amount_in).await
843            {
844                prices.push(DexPrice {
845                    dex: "PancakeSwap".to_string(),
846                    price: quote.price,
847                    amount_out: quote.amount_out,
848                });
849            }
850            if prices.len() > 1 {
851                comparisons.push(TokenPriceComparison {
852                    token_a: token_a.to_string(),
853                    token_b: token_b.to_string(),
854                    prices,
855                });
856            }
857        }
858        Ok(comparisons)
859    }
860}
861
862#[derive(Debug, Clone)]
863pub struct DexSwapQuote {
864    pub dex: String,
865    pub amount_out: u64,
866    pub price: f64,
867    pub dex_address: String,
868}
869
870#[derive(Debug, Clone)]
871pub struct DexInfo {
872    pub name: String,
873    pub address: String,
874    pub description: String,
875    pub supports_liquidity: bool,
876    pub supports_swap: bool,
877    pub is_amm: bool,
878}
879
880/// dex event monitor
881pub struct DexEventMonitor {
882    clients: HashMap<String, broadcast::Sender<EventData>>,
883}
884
885impl DexEventMonitor {
886    pub fn new() -> Self {
887        Self {
888            clients: HashMap::new(),
889        }
890    }
891    pub async fn start_monitoring_all_dexes(
892        &mut self,
893        client: Arc<Aptos>,
894    ) -> Result<(), String> {
895        let dexes = vec![
896            "Liquidswap",
897            "Thala",
898            "PancakeSwap",
899            "Cellana",
900            "AnimeSwap",
901            "AuxExchange",
902        ];
903        for dex_name in dexes {
904            let (sender, _) = broadcast::channel(1000);
905            self.clients.insert(dex_name.to_string(), sender);
906        }
907        Self::start_dex_monitoring_task(
908            Arc::clone(&client),
909            "Liquidswap",
910            self.get_sender("Liquidswap"),
911        );
912        Self::start_dex_monitoring_task(Arc::clone(&client), "Thala", self.get_sender("Thala"));
913        Self::start_dex_monitoring_task(
914            Arc::clone(&client),
915            "PancakeSwap",
916            self.get_sender("PancakeSwap"),
917        );
918        Self::start_dex_monitoring_task(Arc::clone(&client), "Cellana", self.get_sender("Cellana"));
919        Self::start_dex_monitoring_task(
920            Arc::clone(&client),
921            "AnimeSwap",
922            self.get_sender("AnimeSwap"),
923        );
924        Self::start_dex_monitoring_task(
925            Arc::clone(&client),
926            "AuxExchange",
927            self.get_sender("AuxExchange"),
928        );
929        Ok(())
930    }
931
932    fn start_dex_monitoring_task(
933        client: Arc<Aptos>,
934        dex_name: &str,
935        sender: Option<broadcast::Sender<EventData>>,
936    ) {
937        if let Some(sender) = sender {
938            let client = Arc::clone(&client);
939            let dex_name = dex_name.to_string();
940            tokio::spawn(async move {
941                match dex_name.as_str() {
942                    "Liquidswap" => {
943                        let _ = Liquidswap::listen_events(client, sender, vec![]).await;
944                    }
945                    "Thala" => {
946                        let _ = Thala::listen_events(client, sender, vec![]).await;
947                    }
948                    "PancakeSwap" => {
949                        let filters = PancakeSwapEventFilters {
950                            min_swap_amount: Some(1000000000),
951                            include_cake_pairs: true,
952                            tracked_pairs: None,
953                        };
954                        let _ = PancakeSwap::listen_events(client, sender, filters).await;
955                    }
956                    "Cellana" => {
957                        let config = CellanaEventConfig {
958                            monitor_cell_pairs: true,
959                            min_swap_amount: 1000000000,
960                            monitor_farming: true,
961                            tracked_tokens: vec![],
962                        };
963                        let _ = Cellana::listen_events(client, sender, config).await;
964                    }
965                    "AnimeSwap" => {
966                        let filters = AnimeSwapEventFilters {
967                            min_swap_amount: Some(1000000000),
968                            tracked_tokens: None,
969                            min_liquidity_amount: Some(500000000),
970                        };
971                        let _ = AnimeSwap::listen_events(client, sender, filters).await;
972                    }
973                    "AuxExchange" => {
974                        let _ = AuxExchange::listen_events(client, sender, vec![]).await;
975                    }
976                    _ => {}
977                }
978            });
979        }
980    }
981
982    fn get_sender(&self, dex_name: &str) -> Option<broadcast::Sender<EventData>> {
983        self.clients.get(dex_name).cloned()
984    }
985
986    pub fn subscribe_to_dex(&self, dex_name: &str) -> Option<broadcast::Receiver<EventData>> {
987        self.clients.get(dex_name).map(|sender| sender.subscribe())
988    }
989
990    pub fn get_all_receivers(&self) -> Vec<(String, broadcast::Receiver<EventData>)> {
991        self.clients
992            .iter()
993            .map(|(name, sender)| (name.clone(), sender.subscribe()))
994            .collect()
995    }
996
997    pub fn publish_to_dex(&self, dex_name: &str, event: EventData) -> Result<(), String> {
998        if let Some(sender) = self.clients.get(dex_name) {
999            let _ = sender.send(event);
1000            Ok(())
1001        } else {
1002            Err(format!("DEX {} not found", dex_name))
1003        }
1004    }
1005}
1006
1007pub struct DexAnalytics;
1008
1009impl DexAnalytics {
1010    /// analyze dex volume distribution
1011    pub async fn analyze_dex_volume_distribution(
1012        client: Arc<Aptos>,
1013        _time_period_hours: u64,
1014    ) -> Result<HashMap<String, u64>, String> {
1015        let mut volume_map = HashMap::new();
1016        let dex_volume_futures = vec![
1017            (
1018                "Liquidswap",
1019                Self::get_liquidswap_volume(Arc::clone(&client)).await,
1020            ),
1021            ("Thala", Self::get_thala_volume(Arc::clone(&client)).await),
1022            (
1023                "PancakeSwap",
1024                Self::get_pancakeswap_volume(Arc::clone(&client)).await,
1025            ),
1026            (
1027                "AnimeSwap",
1028                Self::get_animeswap_volume(Arc::clone(&client)).await,
1029            ),
1030            (
1031                "Cellana",
1032                Self::get_cellana_volume(Arc::clone(&client)).await,
1033            ),
1034            (
1035                "AuxExchange",
1036                Self::get_aux_volume(Arc::clone(&client)).await,
1037            ),
1038        ];
1039        let mut handles = Vec::new();
1040        for (dex_name, volume_future) in dex_volume_futures {
1041            let handle = tokio::spawn(async move {
1042                match volume_future {
1043                    Ok(volume) => Some((dex_name.to_string(), volume)),
1044                    Err(e) => {
1045                        eprintln!("Failed to get volume for {}: {}", dex_name, e);
1046                        None
1047                    }
1048                }
1049            });
1050            handles.push(handle);
1051        }
1052        for handle in handles {
1053            if let Some((dex_name, volume)) = handle.await.map_err(|e| e.to_string())? {
1054                volume_map.insert(dex_name, volume);
1055            }
1056        }
1057        Ok(volume_map)
1058    }
1059
1060    /// get liquidswap volume
1061    async fn get_liquidswap_volume(client: Arc<Aptos>) -> Result<u64, String> {
1062        let events = crate::dex::liquidswap::Liquidswap::get_swap_events(client).await?;
1063        let total_volume = events
1064            .iter()
1065            .map(|event| {
1066                if let Some(amount_in) = event.event_data.get("amount_in") {
1067                    amount_in
1068                        .as_str()
1069                        .and_then(|s| s.parse::<u64>().ok())
1070                        .unwrap_or(0)
1071                } else if let Some(amount_out) = event.event_data.get("amount_out") {
1072                    amount_out
1073                        .as_str()
1074                        .and_then(|s| s.parse::<u64>().ok())
1075                        .unwrap_or(0)
1076                } else {
1077                    0
1078                }
1079            })
1080            .sum();
1081        Ok(total_volume)
1082    }
1083
1084    /// get thala volume
1085    async fn get_thala_volume(client: Arc<Aptos>) -> Result<u64, String> {
1086        let events = crate::dex::thala::Thala::get_swap_events(client).await?;
1087        let total_volume = events
1088            .iter()
1089            .map(|event| {
1090                if let Some(amount_in) = event.event_data.get("amount_in") {
1091                    amount_in
1092                        .as_str()
1093                        .and_then(|s| s.parse::<u64>().ok())
1094                        .unwrap_or(0)
1095                } else if let Some(amount_x_in) = event.event_data.get("amount_x_in") {
1096                    amount_x_in
1097                        .as_str()
1098                        .and_then(|s| s.parse::<u64>().ok())
1099                        .unwrap_or(0)
1100                } else if let Some(amount_y_in) = event.event_data.get("amount_y_in") {
1101                    amount_y_in
1102                        .as_str()
1103                        .and_then(|s| s.parse::<u64>().ok())
1104                        .unwrap_or(0)
1105                } else {
1106                    0
1107                }
1108            })
1109            .sum();
1110        Ok(total_volume)
1111    }
1112
1113    /// get pancakeswap volume
1114    async fn get_pancakeswap_volume(client: Arc<Aptos>) -> Result<u64, String> {
1115        let events = crate::dex::pancakeswap::PancakeSwap::get_swap_events(client).await?;
1116        let total_volume = events
1117            .iter()
1118            .map(|event| {
1119                if let Some(amount0_in) = event.event_data.get("amount0_in") {
1120                    amount0_in
1121                        .as_str()
1122                        .and_then(|s| s.parse::<u64>().ok())
1123                        .unwrap_or(0)
1124                } else if let Some(amount1_in) = event.event_data.get("amount1_in") {
1125                    amount1_in
1126                        .as_str()
1127                        .and_then(|s| s.parse::<u64>().ok())
1128                        .unwrap_or(0)
1129                } else if let Some(amount0_out) = event.event_data.get("amount0_out") {
1130                    amount0_out
1131                        .as_str()
1132                        .and_then(|s| s.parse::<u64>().ok())
1133                        .unwrap_or(0)
1134                } else if let Some(amount1_out) = event.event_data.get("amount1_out") {
1135                    amount1_out
1136                        .as_str()
1137                        .and_then(|s| s.parse::<u64>().ok())
1138                        .unwrap_or(0)
1139                } else {
1140                    0
1141                }
1142            })
1143            .sum();
1144        Ok(total_volume)
1145    }
1146
1147    /// get animeswap volume
1148    async fn get_animeswap_volume(client: Arc<Aptos>) -> Result<u64, String> {
1149        let events = crate::dex::animeswap::AnimeSwap::get_swap_events(client).await?;
1150        let total_volume = events
1151            .iter()
1152            .map(|event| {
1153                if let Some(amount0_in) = event.event_data.get("amount0_in") {
1154                    amount0_in
1155                        .as_str()
1156                        .and_then(|s| s.parse::<u64>().ok())
1157                        .unwrap_or(0)
1158                } else if let Some(amount1_in) = event.event_data.get("amount1_in") {
1159                    amount1_in
1160                        .as_str()
1161                        .and_then(|s| s.parse::<u64>().ok())
1162                        .unwrap_or(0)
1163                } else if let Some(amount_in) = event.event_data.get("amount_in") {
1164                    amount_in
1165                        .as_str()
1166                        .and_then(|s| s.parse::<u64>().ok())
1167                        .unwrap_or(0)
1168                } else {
1169                    0
1170                }
1171            })
1172            .sum();
1173        Ok(total_volume)
1174    }
1175
1176    /// get cellana volume
1177    async fn get_cellana_volume(client: Arc<Aptos>) -> Result<u64, String> {
1178        let events = crate::dex::cellana::Cellana::get_swap_events(client).await?;
1179        let total_volume = events
1180            .iter()
1181            .map(|event| {
1182                if let Some(amount_in) = event.event_data.get("amount_in") {
1183                    amount_in
1184                        .as_str()
1185                        .and_then(|s| s.parse::<u64>().ok())
1186                        .unwrap_or(0)
1187                } else if let Some(amount_x) = event.event_data.get("amount_x") {
1188                    amount_x
1189                        .as_str()
1190                        .and_then(|s| s.parse::<u64>().ok())
1191                        .unwrap_or(0)
1192                } else if let Some(amount_y) = event.event_data.get("amount_y") {
1193                    amount_y
1194                        .as_str()
1195                        .and_then(|s| s.parse::<u64>().ok())
1196                        .unwrap_or(0)
1197                } else {
1198                    0
1199                }
1200            })
1201            .sum();
1202        Ok(total_volume)
1203    }
1204
1205    /// get aux volume
1206    async fn get_aux_volume(client: Arc<Aptos>) -> Result<u64, String> {
1207        let events = crate::dex::auxswap::AuxExchange::get_swap_events(client).await?;
1208        let total_volume = events
1209            .iter()
1210            .map(|event| {
1211                if let Some(amount_in) = event.event_data.get("amount_in") {
1212                    amount_in
1213                        .as_str()
1214                        .and_then(|s| s.parse::<u64>().ok())
1215                        .unwrap_or(0)
1216                } else if let Some(quantity) = event.event_data.get("quantity") {
1217                    quantity
1218                        .as_str()
1219                        .and_then(|s| s.parse::<u64>().ok())
1220                        .unwrap_or(0)
1221                } else if let Some(amount) = event.event_data.get("amount") {
1222                    amount
1223                        .as_str()
1224                        .and_then(|s| s.parse::<u64>().ok())
1225                        .unwrap_or(0)
1226                } else {
1227                    0
1228                }
1229            })
1230            .sum();
1231        Ok(total_volume)
1232    }
1233
1234    /// get liquidity depth
1235    pub async fn get_liquidity_depth(
1236        _client: Arc<Aptos>,
1237        token_a: &str,
1238        token_b: &str,
1239    ) -> Result<Vec<DexLiquidity>, String> {
1240        let mut liquidity_data = Vec::new();
1241        liquidity_data.push(DexLiquidity {
1242            dex: "Liquidswap".to_string(),
1243            token_a: token_a.to_string(),
1244            token_b: token_b.to_string(),
1245            reserve_a: 500000000000,
1246            reserve_b: 500000000000,
1247            total_liquidity: 1000000000000,
1248        });
1249        liquidity_data.push(DexLiquidity {
1250            dex: "Thala".to_string(),
1251            token_a: token_a.to_string(),
1252            token_b: token_b.to_string(),
1253            reserve_a: 250000000000,
1254            reserve_b: 250000000000,
1255            total_liquidity: 500000000000,
1256        });
1257        liquidity_data.push(DexLiquidity {
1258            dex: "PancakeSwap".to_string(),
1259            token_a: token_a.to_string(),
1260            token_b: token_b.to_string(),
1261            reserve_a: 150000000000,
1262            reserve_b: 150000000000,
1263            total_liquidity: 300000000000,
1264        });
1265        liquidity_data.push(DexLiquidity {
1266            dex: "AnimeSwap".to_string(),
1267            token_a: token_a.to_string(),
1268            token_b: token_b.to_string(),
1269            reserve_a: 100000000000,
1270            reserve_b: 100000000000,
1271            total_liquidity: 200000000000,
1272        });
1273        liquidity_data.push(DexLiquidity {
1274            dex: "Cellana".to_string(),
1275            token_a: token_a.to_string(),
1276            token_b: token_b.to_string(),
1277            reserve_a: 75000000000,
1278            reserve_b: 75000000000,
1279            total_liquidity: 150000000000,
1280        });
1281        liquidity_data.sort_by(|a, b| b.total_liquidity.cmp(&a.total_liquidity));
1282        Ok(liquidity_data)
1283    }
1284}
1285
1286#[derive(Debug, Clone)]
1287pub struct DexLiquidity {
1288    pub dex: String,
1289    pub token_a: String,
1290    pub token_b: String,
1291    pub reserve_a: u64,
1292    pub reserve_b: u64,
1293    pub total_liquidity: u64,
1294}
1295
1296pub struct DexUtils;
1297
1298impl DexUtils {
1299    pub fn calculate_price_impact(amount_in: u64, reserve_in: u64, reserve_out: u64) -> f64 {
1300        if reserve_in == 0 || reserve_out == 0 {
1301            return 0.0;
1302        }
1303        let amount_out_before = reserve_out as f64 / reserve_in as f64 * amount_in as f64;
1304        let amount_out_after =
1305            DexAggregator::calculate_amm_output(amount_in, reserve_in, reserve_out) as f64;
1306        if amount_out_before == 0.0 {
1307            return 0.0;
1308        }
1309        ((amount_out_before - amount_out_after) / amount_out_before).abs() * 100.0
1310    }
1311
1312    pub fn calculate_optimal_slippage(price_impact: f64) -> f64 {
1313        if price_impact < 0.1 {
1314            0.5
1315        } else if price_impact < 1.0 {
1316            1.0
1317        } else {
1318            2.0
1319        }
1320    }
1321
1322    pub fn format_token_amount(amount: u64, decimals: u8) -> String {
1323        let divisor = 10u64.pow(decimals as u32);
1324        let whole = amount / divisor;
1325        let fractional = amount % divisor;
1326        if fractional == 0 {
1327            format!("{}", whole)
1328        } else {
1329            format!(
1330                "{}.{:0>width$}",
1331                whole,
1332                fractional,
1333                width = decimals as usize
1334            )
1335        }
1336    }
1337
1338    pub async fn validate_token_pair(
1339        client: Arc<Aptos>,
1340        token_a: &str,
1341        token_b: &str,
1342    ) -> Result<Vec<String>, String> {
1343        let mut supported_dexes = Vec::new();
1344        if Liquidswap::get_pool_info(Arc::clone(&client), token_a, token_b)
1345            .await
1346            .is_ok()
1347        {
1348            supported_dexes.push("Liquidswap".to_string());
1349        }
1350
1351        if Thala::get_pool_info(Arc::clone(&client), token_a, token_b)
1352            .await
1353            .is_ok()
1354        {
1355            supported_dexes.push("Thala".to_string());
1356        }
1357
1358        if PancakeSwap::get_reserves(Arc::clone(&client), token_a, token_b)
1359            .await
1360            .is_ok()
1361        {
1362            supported_dexes.push("PancakeSwap".to_string());
1363        }
1364
1365        if AnimeSwap::get_reserves(Arc::clone(&client), token_a, token_b)
1366            .await
1367            .is_ok()
1368        {
1369            supported_dexes.push("AnimeSwap".to_string());
1370        }
1371
1372        if Cellana::get_pool_info(Arc::clone(&client), token_a, token_b)
1373            .await
1374            .is_ok()
1375        {
1376            supported_dexes.push("Cellana".to_string());
1377        }
1378
1379        Ok(supported_dexes)
1380    }
1381}
1382
1383impl Default for PancakeSwapEventFilters {
1384    fn default() -> Self {
1385        Self {
1386            min_swap_amount: Some(1000000000),
1387            include_cake_pairs: true,
1388            tracked_pairs: None,
1389        }
1390    }
1391}
1392
1393impl Default for CellanaEventConfig {
1394    fn default() -> Self {
1395        Self {
1396            monitor_cell_pairs: true,
1397            min_swap_amount: 1000000000,
1398            monitor_farming: true,
1399            tracked_tokens: vec![],
1400        }
1401    }
1402}
1403
1404impl Default for AnimeSwapEventFilters {
1405    fn default() -> Self {
1406        Self {
1407            min_swap_amount: Some(1000000000),
1408            tracked_tokens: None,
1409            min_liquidity_amount: Some(500000000),
1410        }
1411    }
1412}