1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
//! PoolSync Core Implementation
//!
//! This module contains the core functionality for synchronizing pools across different
//! blockchain networks and protocols. It includes the main `PoolSync` struct and its
//! associated methods for configuring and executing the synchronization process.
//!
use alloy::providers::Provider;
use alloy::providers::ProviderBuilder;
use alloy::rpc::types::serde_helpers::quantity::vec;
use std::collections::HashMap;
use std::sync::Arc;

use crate::builder::PoolSyncBuilder;
use crate::cache::{read_cache_file, write_cache_file, PoolCache};
use crate::chain::Chain;
use crate::errors::*;
use crate::pools::*;
use crate::rpc::Rpc;

/// The main struct for pool synchronization
pub struct PoolSync {
    /// Map of pool types to their fetcher implementations
    pub fetchers: HashMap<PoolType, Arc<dyn PoolFetcher>>,
    /// The chain to sync on
    pub chain: Chain,
    /// The rate limit of the rpc
    pub rate_limit: u64,
}

impl PoolSync {
    /// Construct a new builder to configure sync parameters
    pub fn builder() -> PoolSyncBuilder {
        PoolSyncBuilder::default()
    }

    /// Synchronizes all added pools for the specified chain
    pub async fn sync_pools(&self) -> Result<(Vec<Pool>, u64), PoolSyncError> {
        // load in the dotenv
        dotenv::dotenv().ok();

        // setup arvhice node provider
        let archive = Arc::new(
            ProviderBuilder::new()
                .network::<alloy::network::AnyNetwork>()
                .on_http(std::env::var("ARCHIVE").unwrap().parse().unwrap()),
        );

        // setup full node provider
        let full = Arc::new(
            ProviderBuilder::new()
                .network::<alloy::network::AnyNetwork>()
                .on_http(std::env::var("FULL").unwrap().parse().unwrap()),
        );

        // create the cache files
        std::fs::create_dir_all("cache").unwrap();

        // create all of the caches
        let mut pool_caches: Vec<PoolCache> = self
            .fetchers
            .keys()
            .map(|pool_type| read_cache_file(pool_type, self.chain).unwrap())
            .collect();

        let mut fully_synced = false;
        let mut last_synced_block = 0;

        while !fully_synced {
            fully_synced = true;
            let end_block = full.get_block_number().await.unwrap();

            for cache in &mut pool_caches {
                let start_block = cache.last_synced_block + 1;
                if start_block <= end_block {
                    fully_synced = false;

                    let fetcher = self.fetchers[&cache.pool_type].clone();

                    // fetch all of the pool addresses
                    let pool_addrs = Rpc::fetch_pool_addrs(
                        start_block,
                        end_block,
                        archive.clone(),
                        fetcher.clone(),
                        self.chain,
                        self.rate_limit,
                    )
                    .await.unwrap();

                    // populate all of the pool data
                    let new_pools = Rpc::populate_pools(
                        pool_addrs,
                        full.clone(),
                        cache.pool_type,
                        self.rate_limit,
                    )
                    .await;

                    // sync old pools up to the current tip
                    // v2: the populate pools will have already got all of the reserves up to end block for the current
                    // set of pools, we do not want to fetch it again since we already have it, so we just process the new logs for 
                    // the old pools since those logs are going to modify the state of them
                    // v3: same for v3, the cache.pools have synced up to start block - 1, so all logs from start block to end block
                    // are new logs that can modify the state of the v3 pools, so fetch the state for them
                    /* 
                    let _ = Rpc::populate_liquidity(
                        start_block,
                        end_block,
                        &mut cache.pools,
                        archive.clone(),
                        cache.pool_type
                    ).await;
                */

                    // populate the state for the new pools, if this is v2 we will already have the state and dont need it,
                    // for the v3 start, we do  not nee dthe swap logs because we will already have an up to date state of 
                    // the tick, sqrt price, and liquidyt from populate pools, but we do need to fill in the tick data
                    cache.pools.extend(new_pools);

                    // we need to do the initial tick sync
                    Rpc::populate_liquidity(
                        start_block,
                        end_block,
                        &mut cache.pools,
                        archive.clone(), 
                        cache.pool_type,
                    ).await.unwrap();

                    cache.last_synced_block = end_block;
                    last_synced_block = end_block;
                }
            }
        }

        // write all of the cache files
        pool_caches
            .iter()
            .for_each(|cache| write_cache_file(cache, self.chain).unwrap());

        // return all the pools
        Ok((pool_caches
            .into_iter()
            .flat_map(|cache| cache.pools)
            .collect(), last_synced_block))
    }
}