1pub mod animeswap;
2pub mod auxswap;
3pub mod cellana;
4pub mod liquidswap;
5pub mod pancakeswap;
6pub mod thala;
7use crate::{
8 Aptos,
9 dex::{
10 animeswap::{AnimeSwap, AnimeSwapEventFilters},
11 auxswap::AuxExchange,
12 cellana::{Cellana, CellanaEventConfig},
13 liquidswap::Liquidswap,
14 pancakeswap::{PancakeSwap, PancakeSwapEventFilters},
15 thala::Thala,
16 },
17 event::EventData,
18 global::mainnet::{
19 protocol_address::{
20 ANIMESWAP_PROTOCOL_ADDRESS, AUXSWAP_PROTOCOL_ADDRESS, CELLANASWAP_PROTOCOL_ADDRESS,
21 LIQUIDSWAP_PROTOCOL_ADDRESS, PANCAKESWAP_FACTORY_PROTOCOL_ADDRESS,
22 THALA_PROTOCOL_ADDRESS,
23 },
24 token_address::{APT, THL, USDC, USDT, WORMHOLE_USDC},
25 },
26 wallet::Wallet,
27};
28use serde_json::Value;
29use std::{collections::HashMap, sync::Arc};
30use tokio::sync::broadcast;
31
32#[derive(Debug, Clone)]
34pub struct TokenPrice {
35 pub dex: String,
36 pub token_address: String,
37 pub base_token: String,
38 pub price: f64,
39 pub liquidity: u64,
40 pub timestamp: u64,
41}
42
43#[derive(Debug, Clone)]
45pub struct LiquidityPool {
46 pub dex: String,
47 pub token_a: String,
48 pub token_b: String,
49 pub liquidity: u64,
50 pub reserve_a: u64,
51 pub reserve_b: u64,
52 pub fee_rate: f64,
53}
54
55#[derive(Debug, Clone)]
57pub struct TokenMetadata {
58 pub address: String,
59 pub name: String,
60 pub symbol: String,
61 pub decimals: u8,
62 pub supply: u64,
63}
64
65#[derive(Debug, Clone)]
67pub struct DexPrice {
68 pub dex: String,
69 pub price: f64,
70 pub amount_out: u64,
71}
72
73#[derive(Debug, Clone)]
75pub struct TokenPriceComparison {
76 pub token_a: String,
77 pub token_b: String,
78 pub prices: Vec<DexPrice>,
79}
80
81pub struct DexAggregator;
82
83impl DexAggregator {
84 pub async fn find_best_swap(
86 client: Arc<Aptos>,
87 from_token: &str,
88 to_token: &str,
89 amount_in: u64,
90 ) -> Result<DexSwapQuote, String> {
91 let mut quotes = Vec::new();
92 if let Ok(quote) =
93 Self::get_liquidswap_quote(Arc::clone(&client), from_token, to_token, amount_in).await
94 {
95 quotes.push(quote);
96 }
97 if let Ok(quote) =
98 Self::get_animeswap_quote(Arc::clone(&client), from_token, to_token, amount_in).await
99 {
100 quotes.push(quote);
101 }
102 if let Ok(quote) =
103 Self::get_thala_quote(Arc::clone(&client), from_token, to_token, amount_in).await
104 {
105 quotes.push(quote);
106 }
107 if let Ok(quote) =
108 Self::get_pancakeswap_quote(Arc::clone(&client), from_token, to_token, amount_in).await
109 {
110 quotes.push(quote);
111 }
112 if let Ok(quote) =
113 Self::get_cellana_quote(Arc::clone(&client), from_token, to_token, amount_in).await
114 {
115 quotes.push(quote);
116 }
117 if let Ok(quote) =
118 Self::get_aux_quote(Arc::clone(&client), from_token, to_token, amount_in).await
119 {
120 quotes.push(quote);
121 }
122 if quotes.is_empty() {
123 return Err("No suitable DEX found for this trade".to_string());
124 }
125 quotes.sort_by(|a, b| b.amount_out.cmp(&a.amount_out));
127 Ok(quotes.first().unwrap().clone())
128 }
129
130 pub async fn exe_best_swap(
132 client: Arc<Aptos>,
133 wallet: Arc<Wallet>,
134 from_token: &str,
135 to_token: &str,
136 amount_in: u64,
137 slippage: f64,
138 ) -> Result<Value, String> {
139 let quote =
140 Self::find_best_swap(Arc::clone(&client), from_token, to_token, amount_in).await?;
141 let min_amount_out = (quote.amount_out as f64 * (1.0 - slippage)) as u64;
142 match quote.dex.as_str() {
143 "Liquidswap" => {
144 Liquidswap::swap_exact_input(
145 client,
146 wallet,
147 from_token,
148 to_token,
149 amount_in,
150 min_amount_out,
151 )
152 .await
153 }
154 "AnimeSwap" => {
155 AnimeSwap::swap_exact_tokens_for_tokens(
156 client,
157 wallet,
158 vec![from_token, to_token],
159 amount_in,
160 min_amount_out,
161 )
162 .await
163 }
164 "Thala" => {
165 Thala::swap_exact_input(
166 client,
167 wallet,
168 from_token,
169 to_token,
170 amount_in,
171 min_amount_out,
172 )
173 .await
174 }
175 "PancakeSwap" => {
176 let wallet_address = wallet.address().map_err(|e| e.to_string())?;
177 PancakeSwap::swap_exact_tokens_for_tokens(
178 client,
179 wallet,
180 amount_in,
181 min_amount_out,
182 vec![from_token, to_token],
183 &wallet_address,
184 Self::get_deadline(300),
185 )
186 .await
187 }
188 "Cellana" => {
189 Cellana::swap(
190 client,
191 wallet,
192 from_token,
193 to_token,
194 amount_in,
195 min_amount_out,
196 )
197 .await
198 }
199 "AuxExchange" => {
200 AuxExchange::swap_exact_input(
201 client,
202 wallet,
203 from_token,
204 to_token,
205 amount_in,
206 min_amount_out,
207 )
208 .await
209 }
210 _ => Err(format!("Unsupported DEX: {}", quote.dex)),
211 }
212 }
213
214 pub async fn compare_all_dex_prices(
216 client: Arc<Aptos>,
217 from_token: &str,
218 to_token: &str,
219 amount_in: u64,
220 ) -> Result<Vec<DexSwapQuote>, String> {
221 let mut quotes = Vec::new();
222 if let Ok(quote) =
223 Self::get_liquidswap_quote(Arc::clone(&client), from_token, to_token, amount_in).await
224 {
225 quotes.push(quote);
226 }
227 if let Ok(quote) =
228 Self::get_animeswap_quote(Arc::clone(&client), from_token, to_token, amount_in).await
229 {
230 quotes.push(quote);
231 }
232 if let Ok(quote) =
233 Self::get_thala_quote(Arc::clone(&client), from_token, to_token, amount_in).await
234 {
235 quotes.push(quote);
236 }
237 if let Ok(quote) =
238 Self::get_pancakeswap_quote(Arc::clone(&client), from_token, to_token, amount_in).await
239 {
240 quotes.push(quote);
241 }
242 if let Ok(quote) =
243 Self::get_cellana_quote(Arc::clone(&client), from_token, to_token, amount_in).await
244 {
245 quotes.push(quote);
246 }
247 if let Ok(quote) =
248 Self::get_aux_quote(Arc::clone(&client), from_token, to_token, amount_in).await
249 {
250 quotes.push(quote);
251 }
252 quotes.sort_by(|a, b| b.amount_out.cmp(&a.amount_out));
253 Ok(quotes)
254 }
255
256 async fn get_liquidswap_quote(
258 client: Arc<Aptos>,
259 from_token: &str,
260 to_token: &str,
261 amount_in: u64,
262 ) -> Result<DexSwapQuote, String> {
263 match Liquidswap::get_price(Arc::clone(&client), from_token, to_token, amount_in).await {
264 Ok(price) => {
265 let amount_out = (price * amount_in as f64) as u64;
266 Ok(DexSwapQuote {
267 dex: "Liquidswap".to_string(),
268 amount_out,
269 price,
270 dex_address: LIQUIDSWAP_PROTOCOL_ADDRESS.to_string(),
271 })
272 }
273 Err(e) => Err(e),
274 }
275 }
276
277 async fn get_animeswap_quote(
278 client: Arc<Aptos>,
279 from_token: &str,
280 to_token: &str,
281 amount_in: u64,
282 ) -> Result<DexSwapQuote, String> {
283 match AnimeSwap::get_reserves(Arc::clone(&client), from_token, to_token).await {
284 Ok((reserve_in, reserve_out)) => {
285 let amount_out = Self::calculate_amm_output(amount_in, reserve_in, reserve_out);
286 let price = if amount_in > 0 {
287 amount_out as f64 / amount_in as f64
288 } else {
289 0.0
290 };
291 Ok(DexSwapQuote {
292 dex: "AnimeSwap".to_string(),
293 amount_out,
294 price,
295 dex_address: ANIMESWAP_PROTOCOL_ADDRESS.to_string(),
296 })
297 }
298 Err(_) => Err("Failed to get AnimeSwap reserves".to_string()),
299 }
300 }
301
302 async fn get_thala_quote(
303 client: Arc<Aptos>,
304 from_token: &str,
305 to_token: &str,
306 amount_in: u64,
307 ) -> Result<DexSwapQuote, String> {
308 match Thala::get_price(Arc::clone(&client), from_token, to_token, amount_in).await {
309 Ok(price) => {
310 let amount_out = (price * amount_in as f64) as u64;
311 Ok(DexSwapQuote {
312 dex: "Thala".to_string(),
313 amount_out,
314 price,
315 dex_address: THALA_PROTOCOL_ADDRESS.to_string(),
316 })
317 }
318 Err(e) => Err(e),
319 }
320 }
321
322 async fn get_pancakeswap_quote(
323 client: Arc<Aptos>,
324 from_token: &str,
325 to_token: &str,
326 amount_in: u64,
327 ) -> Result<DexSwapQuote, String> {
328 match PancakeSwap::get_reserves(Arc::clone(&client), from_token, to_token).await {
329 Ok((reserve_in, reserve_out)) => {
330 let amount_out = Self::calculate_amm_output(amount_in, reserve_in, reserve_out);
331 let price = if amount_in > 0 {
332 amount_out as f64 / amount_in as f64
333 } else {
334 0.0
335 };
336 Ok(DexSwapQuote {
337 dex: "PancakeSwap".to_string(),
338 amount_out,
339 price,
340 dex_address: PANCAKESWAP_FACTORY_PROTOCOL_ADDRESS.to_string(),
341 })
342 }
343 Err(_) => Err("Failed to get PancakeSwap reserves".to_string()),
344 }
345 }
346
347 async fn get_cellana_quote(
348 client: Arc<Aptos>,
349 from_token: &str,
350 to_token: &str,
351 amount_in: u64,
352 ) -> Result<DexSwapQuote, String> {
353 match Cellana::get_price(Arc::clone(&client), from_token, to_token, amount_in).await {
354 Ok(price) => {
355 let amount_out = (price * amount_in as f64) as u64;
356 Ok(DexSwapQuote {
357 dex: "Cellana".to_string(),
358 amount_out,
359 price,
360 dex_address: CELLANASWAP_PROTOCOL_ADDRESS.to_string(),
361 })
362 }
363 Err(e) => Err(e),
364 }
365 }
366
367 async fn get_aux_quote(
368 client: Arc<Aptos>,
369 from_token: &str,
370 to_token: &str,
371 amount_in: u64,
372 ) -> Result<DexSwapQuote, String> {
373 match AuxExchange::get_price(Arc::clone(&client), from_token, to_token, amount_in).await {
374 Ok(amount_out) => {
375 let price = if amount_in > 0 {
376 amount_out as f64 / amount_in as f64
377 } else {
378 0.0
379 };
380 Ok(DexSwapQuote {
381 dex: "AuxExchange".to_string(),
382 amount_out,
383 price,
384 dex_address: AUXSWAP_PROTOCOL_ADDRESS.to_string(),
385 })
386 }
387 Err(e) => {
388 match AuxExchange::get_pool_info(Arc::clone(&client), from_token, to_token).await {
389 Ok(pool_info) => {
390 if let (Some(reserve_in), Some(reserve_out)) = (
391 pool_info
392 .get("coin_a_reserve")
393 .and_then(|v| v.as_str())
394 .and_then(|s| s.parse::<u64>().ok()),
395 pool_info
396 .get("coin_b_reserve")
397 .and_then(|v| v.as_str())
398 .and_then(|s| s.parse::<u64>().ok()),
399 ) {
400 let amount_out =
401 Self::calculate_amm_output(amount_in, reserve_in, reserve_out);
402 let price = if amount_in > 0 {
403 amount_out as f64 / amount_in as f64
404 } else {
405 0.0
406 };
407 Ok(DexSwapQuote {
408 dex: "AuxExchange".to_string(),
409 amount_out,
410 price,
411 dex_address: AUXSWAP_PROTOCOL_ADDRESS.to_string(),
412 })
413 } else {
414 Err(format!("Failed to parse pool reserves: {}", e))
415 }
416 }
417 Err(pool_err) => Err(format!(
418 "Failed to get AuxExchange quote: {} (pool error: {})",
419 e, pool_err
420 )),
421 }
422 }
423 }
424 }
425
426 fn calculate_amm_output(amount_in: u64, reserve_in: u64, reserve_out: u64) -> u64 {
428 if reserve_in == 0 || reserve_out == 0 {
429 return 0;
430 }
431 let amount_in_with_fee = amount_in * 997;
432 let numerator = amount_in_with_fee * reserve_out;
433 let denominator = reserve_in * 1000 + amount_in_with_fee;
434 if denominator == 0 {
435 return 0;
436 }
437 numerator / denominator
438 }
439
440 fn get_deadline(seconds_from_now: u64) -> u64 {
442 std::time::SystemTime::now()
443 .duration_since(std::time::UNIX_EPOCH)
444 .unwrap()
445 .as_secs()
446 + seconds_from_now
447 }
448
449 pub fn get_supported_dexes() -> Vec<DexInfo> {
451 vec![
452 DexInfo {
453 name: "Liquidswap".to_string(),
454 address: LIQUIDSWAP_PROTOCOL_ADDRESS.to_string(),
455 description: "Pontem Network - Largest DEX on Aptos".to_string(),
456 supports_liquidity: true,
457 supports_swap: true,
458 is_amm: true,
459 },
460 DexInfo {
461 name: "AuxExchange".to_string(),
462 address: AUXSWAP_PROTOCOL_ADDRESS.to_string(),
463 description: "Orderbook-based DEX with AMM".to_string(),
464 supports_liquidity: true,
465 supports_swap: true,
466 is_amm: false,
467 },
468 DexInfo {
469 name: "AnimeSwap".to_string(),
470 address: ANIMESWAP_PROTOCOL_ADDRESS.to_string(),
471 description: "Multi-chain DEX with anime theme".to_string(),
472 supports_liquidity: true,
473 supports_swap: true,
474 is_amm: true,
475 },
476 DexInfo {
477 name: "Thala".to_string(),
478 address: THALA_PROTOCOL_ADDRESS.to_string(),
479 description: "DeFi protocol with THL token and staking".to_string(),
480 supports_liquidity: true,
481 supports_swap: true,
482 is_amm: true,
483 },
484 DexInfo {
485 name: "PancakeSwap".to_string(),
486 address: PANCAKESWAP_FACTORY_PROTOCOL_ADDRESS.to_string(),
487 description: "Multi-chain DEX with CAKE token".to_string(),
488 supports_liquidity: true,
489 supports_swap: true,
490 is_amm: true,
491 },
492 DexInfo {
493 name: "Cellana".to_string(),
494 address: CELLANASWAP_PROTOCOL_ADDRESS.to_string(),
495 description: "DEX with CELL token and farming".to_string(),
496 supports_liquidity: true,
497 supports_swap: true,
498 is_amm: true,
499 },
500 ]
501 }
502
503 pub async fn get_token_price(
505 client: Arc<Aptos>,
506 token_address: &str,
507 ) -> Result<Vec<TokenPrice>, String> {
508 let apt_coin = "0x1::aptos_coin::AptosCoin";
509 let mut prices = Vec::new();
510 let dex_checks = vec![
511 (
512 "Liquidswap",
513 Self::get_token_price_on_dex(
514 Arc::clone(&client),
515 "Liquidswap",
516 token_address,
517 apt_coin,
518 ),
519 ),
520 (
521 "Thala",
522 Self::get_token_price_on_dex(Arc::clone(&client), "Thala", token_address, apt_coin),
523 ),
524 (
525 "PancakeSwap",
526 Self::get_token_price_on_dex(
527 Arc::clone(&client),
528 "PancakeSwap",
529 token_address,
530 apt_coin,
531 ),
532 ),
533 (
534 "AnimeSwap",
535 Self::get_token_price_on_dex(
536 Arc::clone(&client),
537 "AnimeSwap",
538 token_address,
539 apt_coin,
540 ),
541 ),
542 (
543 "Cellana",
544 Self::get_token_price_on_dex(
545 Arc::clone(&client),
546 "Cellana",
547 token_address,
548 apt_coin,
549 ),
550 ),
551 ];
552 for (dex_name, check_future) in dex_checks {
553 if let Ok(price) = check_future.await {
554 prices.push(price);
555 }
556 }
557 prices.sort_by(|a, b| {
558 b.price
559 .partial_cmp(&a.price)
560 .unwrap_or(std::cmp::Ordering::Equal)
561 });
562 Ok(prices)
563 }
564
565 async fn get_token_price_on_dex(
567 client: Arc<Aptos>,
568 dex_name: &str,
569 token_address: &str,
570 base_token: &str,
571 ) -> Result<TokenPrice, String> {
572 let amount_in = 1_000_000;
573 let quote = match dex_name {
574 "Liquidswap" => {
575 Self::get_liquidswap_quote(client.clone(), token_address, base_token, amount_in)
576 .await
577 }
578 "Thala" => {
579 Self::get_thala_quote(client.clone(), token_address, base_token, amount_in).await
580 }
581 "PancakeSwap" => {
582 Self::get_pancakeswap_quote(client.clone(), token_address, base_token, amount_in)
583 .await
584 }
585 "AnimeSwap" => {
586 Self::get_animeswap_quote(client.clone(), token_address, base_token, amount_in)
587 .await
588 }
589 "Cellana" => {
590 Self::get_cellana_quote(client.clone(), token_address, base_token, amount_in).await
591 }
592 _ => Err("Unsupported DEX".to_string()),
593 }?;
594 Ok(TokenPrice {
595 dex: dex_name.to_string(),
596 token_address: token_address.to_string(),
597 base_token: base_token.to_string(),
598 price: quote.price,
599 liquidity: Self::get_pool_liquidity(
600 client.clone(),
601 dex_name,
602 token_address,
603 base_token,
604 )
605 .await
606 .unwrap_or(0),
607 timestamp: std::time::SystemTime::now()
608 .duration_since(std::time::UNIX_EPOCH)
609 .unwrap()
610 .as_secs(),
611 })
612 }
613
614 async fn get_pool_liquidity(
616 client: Arc<Aptos>,
617 dex_name: &str,
618 token_a: &str,
619 token_b: &str,
620 ) -> Result<u64, String> {
621 match dex_name {
622 "Liquidswap" => {
623 let pool_info = Liquidswap::get_pool_info(client, token_a, token_b).await?;
624 let reserve_a = pool_info
625 .get("coin_x_reserve")
626 .and_then(|v| v.as_str())
627 .and_then(|s| s.parse().ok())
628 .unwrap_or(0);
629 let reserve_b = pool_info
630 .get("coin_y_reserve")
631 .and_then(|v| v.as_str())
632 .and_then(|s| s.parse().ok())
633 .unwrap_or(0);
634 Ok(reserve_a + reserve_b)
635 }
636 "Thala" => {
637 let pool_info = Thala::get_pool_info(client, token_a, token_b).await?;
638 let reserve_a = pool_info
639 .get("reserve_x")
640 .and_then(|v| v.as_str())
641 .and_then(|s| s.parse().ok())
642 .unwrap_or(0);
643 let reserve_b = pool_info
644 .get("reserve_y")
645 .and_then(|v| v.as_str())
646 .and_then(|s| s.parse().ok())
647 .unwrap_or(0);
648 Ok(reserve_a + reserve_b)
649 }
650 "AnimeSwap" => {
651 let (reserve_a, reserve_b) =
652 AnimeSwap::get_reserves(client, token_a, token_b).await?;
653 Ok(reserve_a + reserve_b)
654 }
655 "PancakeSwap" => {
656 let (reserve_a, reserve_b) =
657 PancakeSwap::get_reserves(client, token_a, token_b).await?;
658 Ok(reserve_a + reserve_b)
659 }
660 "Cellana" => {
661 let pool_info = Cellana::get_pool_info(client, token_a, token_b).await?;
662 let reserve_a = pool_info
663 .get("reserve_x")
664 .and_then(|v| v.as_str())
665 .and_then(|s| s.parse().ok())
666 .unwrap_or(0);
667 let reserve_b = pool_info
668 .get("reserve_y")
669 .and_then(|v| v.as_str())
670 .and_then(|s| s.parse().ok())
671 .unwrap_or(0);
672 Ok(reserve_a + reserve_b)
673 }
674 _ => Ok(0),
675 }
676 }
677
678 pub async fn find_token_liquidity_pools(
680 client: Arc<Aptos>,
681 token_address: &str,
682 ) -> Result<Vec<LiquidityPool>, String> {
683 let common_tokens = vec![APT, USDC, USDT, WORMHOLE_USDC];
684 let mut pools = Vec::new();
685 for base_token in common_tokens {
686 let dex_checks = vec![
687 (
688 "Liquidswap",
689 Self::check_pool_exists(
690 Arc::clone(&client),
691 "Liquidswap",
692 token_address,
693 base_token,
694 ),
695 ),
696 (
697 "Thala",
698 Self::check_pool_exists(
699 Arc::clone(&client),
700 "Thala",
701 token_address,
702 base_token,
703 ),
704 ),
705 (
706 "PancakeSwap",
707 Self::check_pool_exists(
708 Arc::clone(&client),
709 "PancakeSwap",
710 token_address,
711 base_token,
712 ),
713 ),
714 (
715 "AnimeSwap",
716 Self::check_pool_exists(
717 Arc::clone(&client),
718 "AnimeSwap",
719 token_address,
720 base_token,
721 ),
722 ),
723 (
724 "Cellana",
725 Self::check_pool_exists(
726 Arc::clone(&client),
727 "Cellana",
728 token_address,
729 base_token,
730 ),
731 ),
732 ];
733 for (dex_name, check_future) in dex_checks {
734 if let Ok(Some(pool)) = check_future.await {
735 pools.push(pool);
736 }
737 }
738 }
739 pools.sort_by(|a, b| b.liquidity.cmp(&a.liquidity));
740 Ok(pools)
741 }
742
743 async fn check_pool_exists(
745 client: Arc<Aptos>,
746 dex_name: &str,
747 token_a: &str,
748 token_b: &str,
749 ) -> Result<Option<LiquidityPool>, String> {
750 let liquidity =
751 Self::get_pool_liquidity(Arc::clone(&client), dex_name, token_a, token_b).await;
752 if let Ok(liquidity) = liquidity {
753 if liquidity > 0 {
754 let pool = LiquidityPool {
755 dex: dex_name.to_string(),
756 token_a: token_a.to_string(),
757 token_b: token_b.to_string(),
758 liquidity,
759 reserve_a: 0,
760 reserve_b: 0,
761 fee_rate: 0.003,
762 };
763 return Ok(Some(pool));
764 }
765 }
766 Ok(None)
767 }
768
769 pub async fn get_token_metadata(
771 client: Arc<Aptos>,
772 token_address: &str,
773 ) -> Result<TokenMetadata, String> {
774 let coin_info_type = format!("0x1::coin::CoinInfo<{}>", token_address);
775 if let Ok(Some(resource)) = client.get_account_resource("0x1", &coin_info_type).await {
776 if let Value::Object(data) = &resource.data {
777 return Ok(TokenMetadata {
778 address: token_address.to_string(),
779 name: data
780 .get("name")
781 .and_then(|v| v.as_str())
782 .unwrap_or("")
783 .to_string(),
784 symbol: data
785 .get("symbol")
786 .and_then(|v| v.as_str())
787 .unwrap_or("")
788 .to_string(),
789 decimals: data.get("decimals").and_then(|v| v.as_u64()).unwrap_or(0) as u8,
790 supply: data
791 .get("supply")
792 .and_then(|v| v.get("vec"))
793 .and_then(|v| v.as_array())
794 .and_then(|v| v.get(0))
795 .and_then(|v| v.get("value"))
796 .and_then(|v| v.as_str())
797 .and_then(|s| s.parse().ok())
798 .unwrap_or(0),
799 });
800 }
801 }
802 Ok(TokenMetadata {
803 address: token_address.to_string(),
804 name: "Unknown".to_string(),
805 symbol: "UNKNOWN".to_string(),
806 decimals: 8,
807 supply: 0,
808 })
809 }
810
811 pub async fn get_top_prices_comparison(
812 client: Arc<Aptos>,
813 ) -> Result<Vec<TokenPriceComparison>, String> {
814 let popular_pairs = vec![
815 (USDC, APT), (USDT, APT), (THL, APT), ];
819 let mut comparisons = Vec::new();
820 for (token_a, token_b) in popular_pairs {
821 let mut prices = Vec::new();
822 let amount_in = 1_000_000;
823 if let Ok(quote) =
824 Self::get_liquidswap_quote(Arc::clone(&client), token_a, token_b, amount_in).await
825 {
826 prices.push(DexPrice {
827 dex: "Liquidswap".to_string(),
828 price: quote.price,
829 amount_out: quote.amount_out,
830 });
831 }
832 if let Ok(quote) =
833 Self::get_thala_quote(Arc::clone(&client), token_a, token_b, amount_in).await
834 {
835 prices.push(DexPrice {
836 dex: "Thala".to_string(),
837 price: quote.price,
838 amount_out: quote.amount_out,
839 });
840 }
841 if let Ok(quote) =
842 Self::get_pancakeswap_quote(Arc::clone(&client), token_a, token_b, amount_in).await
843 {
844 prices.push(DexPrice {
845 dex: "PancakeSwap".to_string(),
846 price: quote.price,
847 amount_out: quote.amount_out,
848 });
849 }
850 if prices.len() > 1 {
851 comparisons.push(TokenPriceComparison {
852 token_a: token_a.to_string(),
853 token_b: token_b.to_string(),
854 prices,
855 });
856 }
857 }
858 Ok(comparisons)
859 }
860}
861
862#[derive(Debug, Clone)]
863pub struct DexSwapQuote {
864 pub dex: String,
865 pub amount_out: u64,
866 pub price: f64,
867 pub dex_address: String,
868}
869
870#[derive(Debug, Clone)]
871pub struct DexInfo {
872 pub name: String,
873 pub address: String,
874 pub description: String,
875 pub supports_liquidity: bool,
876 pub supports_swap: bool,
877 pub is_amm: bool,
878}
879
880pub struct DexEventMonitor {
882 clients: HashMap<String, broadcast::Sender<EventData>>,
883}
884
885impl DexEventMonitor {
886 pub fn new() -> Self {
887 Self {
888 clients: HashMap::new(),
889 }
890 }
891 pub async fn start_monitoring_all_dexes(
892 &mut self,
893 client: Arc<Aptos>,
894 ) -> Result<(), String> {
895 let dexes = vec![
896 "Liquidswap",
897 "Thala",
898 "PancakeSwap",
899 "Cellana",
900 "AnimeSwap",
901 "AuxExchange",
902 ];
903 for dex_name in dexes {
904 let (sender, _) = broadcast::channel(1000);
905 self.clients.insert(dex_name.to_string(), sender);
906 }
907 Self::start_dex_monitoring_task(
908 Arc::clone(&client),
909 "Liquidswap",
910 self.get_sender("Liquidswap"),
911 );
912 Self::start_dex_monitoring_task(Arc::clone(&client), "Thala", self.get_sender("Thala"));
913 Self::start_dex_monitoring_task(
914 Arc::clone(&client),
915 "PancakeSwap",
916 self.get_sender("PancakeSwap"),
917 );
918 Self::start_dex_monitoring_task(Arc::clone(&client), "Cellana", self.get_sender("Cellana"));
919 Self::start_dex_monitoring_task(
920 Arc::clone(&client),
921 "AnimeSwap",
922 self.get_sender("AnimeSwap"),
923 );
924 Self::start_dex_monitoring_task(
925 Arc::clone(&client),
926 "AuxExchange",
927 self.get_sender("AuxExchange"),
928 );
929 Ok(())
930 }
931
932 fn start_dex_monitoring_task(
933 client: Arc<Aptos>,
934 dex_name: &str,
935 sender: Option<broadcast::Sender<EventData>>,
936 ) {
937 if let Some(sender) = sender {
938 let client = Arc::clone(&client);
939 let dex_name = dex_name.to_string();
940 tokio::spawn(async move {
941 match dex_name.as_str() {
942 "Liquidswap" => {
943 let _ = Liquidswap::listen_events(client, sender, vec![]).await;
944 }
945 "Thala" => {
946 let _ = Thala::listen_events(client, sender, vec![]).await;
947 }
948 "PancakeSwap" => {
949 let filters = PancakeSwapEventFilters {
950 min_swap_amount: Some(1000000000),
951 include_cake_pairs: true,
952 tracked_pairs: None,
953 };
954 let _ = PancakeSwap::listen_events(client, sender, filters).await;
955 }
956 "Cellana" => {
957 let config = CellanaEventConfig {
958 monitor_cell_pairs: true,
959 min_swap_amount: 1000000000,
960 monitor_farming: true,
961 tracked_tokens: vec![],
962 };
963 let _ = Cellana::listen_events(client, sender, config).await;
964 }
965 "AnimeSwap" => {
966 let filters = AnimeSwapEventFilters {
967 min_swap_amount: Some(1000000000),
968 tracked_tokens: None,
969 min_liquidity_amount: Some(500000000),
970 };
971 let _ = AnimeSwap::listen_events(client, sender, filters).await;
972 }
973 "AuxExchange" => {
974 let _ = AuxExchange::listen_events(client, sender, vec![]).await;
975 }
976 _ => {}
977 }
978 });
979 }
980 }
981
982 fn get_sender(&self, dex_name: &str) -> Option<broadcast::Sender<EventData>> {
983 self.clients.get(dex_name).cloned()
984 }
985
986 pub fn subscribe_to_dex(&self, dex_name: &str) -> Option<broadcast::Receiver<EventData>> {
987 self.clients.get(dex_name).map(|sender| sender.subscribe())
988 }
989
990 pub fn get_all_receivers(&self) -> Vec<(String, broadcast::Receiver<EventData>)> {
991 self.clients
992 .iter()
993 .map(|(name, sender)| (name.clone(), sender.subscribe()))
994 .collect()
995 }
996
997 pub fn publish_to_dex(&self, dex_name: &str, event: EventData) -> Result<(), String> {
998 if let Some(sender) = self.clients.get(dex_name) {
999 let _ = sender.send(event);
1000 Ok(())
1001 } else {
1002 Err(format!("DEX {} not found", dex_name))
1003 }
1004 }
1005}
1006
1007pub struct DexAnalytics;
1008
1009impl DexAnalytics {
1010 pub async fn analyze_dex_volume_distribution(
1012 client: Arc<Aptos>,
1013 _time_period_hours: u64,
1014 ) -> Result<HashMap<String, u64>, String> {
1015 let mut volume_map = HashMap::new();
1016 let dex_volume_futures = vec![
1017 (
1018 "Liquidswap",
1019 Self::get_liquidswap_volume(Arc::clone(&client)).await,
1020 ),
1021 ("Thala", Self::get_thala_volume(Arc::clone(&client)).await),
1022 (
1023 "PancakeSwap",
1024 Self::get_pancakeswap_volume(Arc::clone(&client)).await,
1025 ),
1026 (
1027 "AnimeSwap",
1028 Self::get_animeswap_volume(Arc::clone(&client)).await,
1029 ),
1030 (
1031 "Cellana",
1032 Self::get_cellana_volume(Arc::clone(&client)).await,
1033 ),
1034 (
1035 "AuxExchange",
1036 Self::get_aux_volume(Arc::clone(&client)).await,
1037 ),
1038 ];
1039 let mut handles = Vec::new();
1040 for (dex_name, volume_future) in dex_volume_futures {
1041 let handle = tokio::spawn(async move {
1042 match volume_future {
1043 Ok(volume) => Some((dex_name.to_string(), volume)),
1044 Err(e) => {
1045 eprintln!("Failed to get volume for {}: {}", dex_name, e);
1046 None
1047 }
1048 }
1049 });
1050 handles.push(handle);
1051 }
1052 for handle in handles {
1053 if let Some((dex_name, volume)) = handle.await.map_err(|e| e.to_string())? {
1054 volume_map.insert(dex_name, volume);
1055 }
1056 }
1057 Ok(volume_map)
1058 }
1059
1060 async fn get_liquidswap_volume(client: Arc<Aptos>) -> Result<u64, String> {
1062 let events = crate::dex::liquidswap::Liquidswap::get_swap_events(client).await?;
1063 let total_volume = events
1064 .iter()
1065 .map(|event| {
1066 if let Some(amount_in) = event.event_data.get("amount_in") {
1067 amount_in
1068 .as_str()
1069 .and_then(|s| s.parse::<u64>().ok())
1070 .unwrap_or(0)
1071 } else if let Some(amount_out) = event.event_data.get("amount_out") {
1072 amount_out
1073 .as_str()
1074 .and_then(|s| s.parse::<u64>().ok())
1075 .unwrap_or(0)
1076 } else {
1077 0
1078 }
1079 })
1080 .sum();
1081 Ok(total_volume)
1082 }
1083
1084 async fn get_thala_volume(client: Arc<Aptos>) -> Result<u64, String> {
1086 let events = crate::dex::thala::Thala::get_swap_events(client).await?;
1087 let total_volume = events
1088 .iter()
1089 .map(|event| {
1090 if let Some(amount_in) = event.event_data.get("amount_in") {
1091 amount_in
1092 .as_str()
1093 .and_then(|s| s.parse::<u64>().ok())
1094 .unwrap_or(0)
1095 } else if let Some(amount_x_in) = event.event_data.get("amount_x_in") {
1096 amount_x_in
1097 .as_str()
1098 .and_then(|s| s.parse::<u64>().ok())
1099 .unwrap_or(0)
1100 } else if let Some(amount_y_in) = event.event_data.get("amount_y_in") {
1101 amount_y_in
1102 .as_str()
1103 .and_then(|s| s.parse::<u64>().ok())
1104 .unwrap_or(0)
1105 } else {
1106 0
1107 }
1108 })
1109 .sum();
1110 Ok(total_volume)
1111 }
1112
1113 async fn get_pancakeswap_volume(client: Arc<Aptos>) -> Result<u64, String> {
1115 let events = crate::dex::pancakeswap::PancakeSwap::get_swap_events(client).await?;
1116 let total_volume = events
1117 .iter()
1118 .map(|event| {
1119 if let Some(amount0_in) = event.event_data.get("amount0_in") {
1120 amount0_in
1121 .as_str()
1122 .and_then(|s| s.parse::<u64>().ok())
1123 .unwrap_or(0)
1124 } else if let Some(amount1_in) = event.event_data.get("amount1_in") {
1125 amount1_in
1126 .as_str()
1127 .and_then(|s| s.parse::<u64>().ok())
1128 .unwrap_or(0)
1129 } else if let Some(amount0_out) = event.event_data.get("amount0_out") {
1130 amount0_out
1131 .as_str()
1132 .and_then(|s| s.parse::<u64>().ok())
1133 .unwrap_or(0)
1134 } else if let Some(amount1_out) = event.event_data.get("amount1_out") {
1135 amount1_out
1136 .as_str()
1137 .and_then(|s| s.parse::<u64>().ok())
1138 .unwrap_or(0)
1139 } else {
1140 0
1141 }
1142 })
1143 .sum();
1144 Ok(total_volume)
1145 }
1146
1147 async fn get_animeswap_volume(client: Arc<Aptos>) -> Result<u64, String> {
1149 let events = crate::dex::animeswap::AnimeSwap::get_swap_events(client).await?;
1150 let total_volume = events
1151 .iter()
1152 .map(|event| {
1153 if let Some(amount0_in) = event.event_data.get("amount0_in") {
1154 amount0_in
1155 .as_str()
1156 .and_then(|s| s.parse::<u64>().ok())
1157 .unwrap_or(0)
1158 } else if let Some(amount1_in) = event.event_data.get("amount1_in") {
1159 amount1_in
1160 .as_str()
1161 .and_then(|s| s.parse::<u64>().ok())
1162 .unwrap_or(0)
1163 } else if let Some(amount_in) = event.event_data.get("amount_in") {
1164 amount_in
1165 .as_str()
1166 .and_then(|s| s.parse::<u64>().ok())
1167 .unwrap_or(0)
1168 } else {
1169 0
1170 }
1171 })
1172 .sum();
1173 Ok(total_volume)
1174 }
1175
1176 async fn get_cellana_volume(client: Arc<Aptos>) -> Result<u64, String> {
1178 let events = crate::dex::cellana::Cellana::get_swap_events(client).await?;
1179 let total_volume = events
1180 .iter()
1181 .map(|event| {
1182 if let Some(amount_in) = event.event_data.get("amount_in") {
1183 amount_in
1184 .as_str()
1185 .and_then(|s| s.parse::<u64>().ok())
1186 .unwrap_or(0)
1187 } else if let Some(amount_x) = event.event_data.get("amount_x") {
1188 amount_x
1189 .as_str()
1190 .and_then(|s| s.parse::<u64>().ok())
1191 .unwrap_or(0)
1192 } else if let Some(amount_y) = event.event_data.get("amount_y") {
1193 amount_y
1194 .as_str()
1195 .and_then(|s| s.parse::<u64>().ok())
1196 .unwrap_or(0)
1197 } else {
1198 0
1199 }
1200 })
1201 .sum();
1202 Ok(total_volume)
1203 }
1204
1205 async fn get_aux_volume(client: Arc<Aptos>) -> Result<u64, String> {
1207 let events = crate::dex::auxswap::AuxExchange::get_swap_events(client).await?;
1208 let total_volume = events
1209 .iter()
1210 .map(|event| {
1211 if let Some(amount_in) = event.event_data.get("amount_in") {
1212 amount_in
1213 .as_str()
1214 .and_then(|s| s.parse::<u64>().ok())
1215 .unwrap_or(0)
1216 } else if let Some(quantity) = event.event_data.get("quantity") {
1217 quantity
1218 .as_str()
1219 .and_then(|s| s.parse::<u64>().ok())
1220 .unwrap_or(0)
1221 } else if let Some(amount) = event.event_data.get("amount") {
1222 amount
1223 .as_str()
1224 .and_then(|s| s.parse::<u64>().ok())
1225 .unwrap_or(0)
1226 } else {
1227 0
1228 }
1229 })
1230 .sum();
1231 Ok(total_volume)
1232 }
1233
1234 pub async fn get_liquidity_depth(
1236 _client: Arc<Aptos>,
1237 token_a: &str,
1238 token_b: &str,
1239 ) -> Result<Vec<DexLiquidity>, String> {
1240 let mut liquidity_data = Vec::new();
1241 liquidity_data.push(DexLiquidity {
1242 dex: "Liquidswap".to_string(),
1243 token_a: token_a.to_string(),
1244 token_b: token_b.to_string(),
1245 reserve_a: 500000000000,
1246 reserve_b: 500000000000,
1247 total_liquidity: 1000000000000,
1248 });
1249 liquidity_data.push(DexLiquidity {
1250 dex: "Thala".to_string(),
1251 token_a: token_a.to_string(),
1252 token_b: token_b.to_string(),
1253 reserve_a: 250000000000,
1254 reserve_b: 250000000000,
1255 total_liquidity: 500000000000,
1256 });
1257 liquidity_data.push(DexLiquidity {
1258 dex: "PancakeSwap".to_string(),
1259 token_a: token_a.to_string(),
1260 token_b: token_b.to_string(),
1261 reserve_a: 150000000000,
1262 reserve_b: 150000000000,
1263 total_liquidity: 300000000000,
1264 });
1265 liquidity_data.push(DexLiquidity {
1266 dex: "AnimeSwap".to_string(),
1267 token_a: token_a.to_string(),
1268 token_b: token_b.to_string(),
1269 reserve_a: 100000000000,
1270 reserve_b: 100000000000,
1271 total_liquidity: 200000000000,
1272 });
1273 liquidity_data.push(DexLiquidity {
1274 dex: "Cellana".to_string(),
1275 token_a: token_a.to_string(),
1276 token_b: token_b.to_string(),
1277 reserve_a: 75000000000,
1278 reserve_b: 75000000000,
1279 total_liquidity: 150000000000,
1280 });
1281 liquidity_data.sort_by(|a, b| b.total_liquidity.cmp(&a.total_liquidity));
1282 Ok(liquidity_data)
1283 }
1284}
1285
1286#[derive(Debug, Clone)]
1287pub struct DexLiquidity {
1288 pub dex: String,
1289 pub token_a: String,
1290 pub token_b: String,
1291 pub reserve_a: u64,
1292 pub reserve_b: u64,
1293 pub total_liquidity: u64,
1294}
1295
1296pub struct DexUtils;
1297
1298impl DexUtils {
1299 pub fn calculate_price_impact(amount_in: u64, reserve_in: u64, reserve_out: u64) -> f64 {
1300 if reserve_in == 0 || reserve_out == 0 {
1301 return 0.0;
1302 }
1303 let amount_out_before = reserve_out as f64 / reserve_in as f64 * amount_in as f64;
1304 let amount_out_after =
1305 DexAggregator::calculate_amm_output(amount_in, reserve_in, reserve_out) as f64;
1306 if amount_out_before == 0.0 {
1307 return 0.0;
1308 }
1309 ((amount_out_before - amount_out_after) / amount_out_before).abs() * 100.0
1310 }
1311
1312 pub fn calculate_optimal_slippage(price_impact: f64) -> f64 {
1313 if price_impact < 0.1 {
1314 0.5
1315 } else if price_impact < 1.0 {
1316 1.0
1317 } else {
1318 2.0
1319 }
1320 }
1321
1322 pub fn format_token_amount(amount: u64, decimals: u8) -> String {
1323 let divisor = 10u64.pow(decimals as u32);
1324 let whole = amount / divisor;
1325 let fractional = amount % divisor;
1326 if fractional == 0 {
1327 format!("{}", whole)
1328 } else {
1329 format!(
1330 "{}.{:0>width$}",
1331 whole,
1332 fractional,
1333 width = decimals as usize
1334 )
1335 }
1336 }
1337
1338 pub async fn validate_token_pair(
1339 client: Arc<Aptos>,
1340 token_a: &str,
1341 token_b: &str,
1342 ) -> Result<Vec<String>, String> {
1343 let mut supported_dexes = Vec::new();
1344 if Liquidswap::get_pool_info(Arc::clone(&client), token_a, token_b)
1345 .await
1346 .is_ok()
1347 {
1348 supported_dexes.push("Liquidswap".to_string());
1349 }
1350
1351 if Thala::get_pool_info(Arc::clone(&client), token_a, token_b)
1352 .await
1353 .is_ok()
1354 {
1355 supported_dexes.push("Thala".to_string());
1356 }
1357
1358 if PancakeSwap::get_reserves(Arc::clone(&client), token_a, token_b)
1359 .await
1360 .is_ok()
1361 {
1362 supported_dexes.push("PancakeSwap".to_string());
1363 }
1364
1365 if AnimeSwap::get_reserves(Arc::clone(&client), token_a, token_b)
1366 .await
1367 .is_ok()
1368 {
1369 supported_dexes.push("AnimeSwap".to_string());
1370 }
1371
1372 if Cellana::get_pool_info(Arc::clone(&client), token_a, token_b)
1373 .await
1374 .is_ok()
1375 {
1376 supported_dexes.push("Cellana".to_string());
1377 }
1378
1379 Ok(supported_dexes)
1380 }
1381}
1382
1383impl Default for PancakeSwapEventFilters {
1384 fn default() -> Self {
1385 Self {
1386 min_swap_amount: Some(1000000000),
1387 include_cake_pairs: true,
1388 tracked_pairs: None,
1389 }
1390 }
1391}
1392
1393impl Default for CellanaEventConfig {
1394 fn default() -> Self {
1395 Self {
1396 monitor_cell_pairs: true,
1397 min_swap_amount: 1000000000,
1398 monitor_farming: true,
1399 tracked_tokens: vec![],
1400 }
1401 }
1402}
1403
1404impl Default for AnimeSwapEventFilters {
1405 fn default() -> Self {
1406 Self {
1407 min_swap_amount: Some(1000000000),
1408 tracked_tokens: None,
1409 min_liquidity_amount: Some(500000000),
1410 }
1411 }
1412}