chaindexing 0.1.80

Index any EVM chain and query in SQL
Documentation
// TODO: Rewrite after migrating to tokio-postgres

use std::{
    future::Future,
    pin::Pin,
    sync::Arc,
    task::{Context, Poll},
};

use futures_util::Stream;
use pin_project_lite::pin_project;

use futures_util::FutureExt;
use serde::Deserialize;
use tokio::sync::Mutex;

use crate::{ChaindexingRepo, ChaindexingRepoClient, ContractAddress, LoadsDataWithRawQuery};

type DataStream = Vec<ContractAddress>;

enum ContractAddressesStreamState {
    GetFromAndTo,
    PollFromAndToFuture(Pin<Box<dyn Future<Output = (i64, i64)> + Send>>),
    GetDataStreamFuture((i64, i64)),
    PollDataStreamFuture((Pin<Box<dyn Future<Output = DataStream> + Send>>, i64, i64)),
}

pin_project!(
    pub struct ContractAddressesStream {
        chain_id_: i64,
        from: Option<i64>,
        to: Option<i64>,
        chunk_size: i64,
        client: Arc<Mutex<ChaindexingRepoClient>>,
        state: ContractAddressesStreamState,
    }
);

impl ContractAddressesStream {
    pub fn new(client: &Arc<Mutex<ChaindexingRepoClient>>, chain_id_: i64) -> Self {
        Self {
            chain_id_,
            from: None,
            to: None,
            chunk_size: 500,
            client: client.clone(),
            state: ContractAddressesStreamState::GetFromAndTo,
        }
    }
    pub fn with_chunk_size(mut self, chunk_size: i64) -> Self {
        self.chunk_size = chunk_size;
        self
    }
}

impl Stream for ContractAddressesStream {
    type Item = DataStream;

    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        let this = self.project();
        let chain_id_ = *this.chain_id_;
        let from = *this.from;
        let to = *this.to;

        match this.state {
            ContractAddressesStreamState::GetFromAndTo => {
                let client = this.client.clone();

                *this.state = ContractAddressesStreamState::PollFromAndToFuture(
                    async move {
                        let client = client.lock().await;

                        #[derive(Deserialize)]
                        struct MinOrMax {
                            min: Option<i64>,
                            max: Option<i64>,
                        }

                        let from = match from {
                            Some(from) => from,
                            None => {
                                let query = format!(
                                    "
                                SELECT MIN(id) FROM chaindexing_contract_addresses 
                                WHERE chain_id = {chain_id_}"
                                );

                                let min_or_max: Option<MinOrMax> =
                                    ChaindexingRepo::load_data(&client, &query).await;

                                min_or_max.and_then(|mm| mm.min).unwrap_or(0)
                            }
                        };

                        let to = match to {
                            Some(to) => to,
                            None => {
                                let query = format!(
                                    "
                                SELECT MAX(id) FROM chaindexing_contract_addresses 
                                WHERE chain_id = {chain_id_}"
                                );

                                let min_or_max: Option<MinOrMax> =
                                    ChaindexingRepo::load_data(&client, &query).await;

                                min_or_max.and_then(|mm| mm.max).unwrap_or(0)
                            }
                        };

                        (from, to)
                    }
                    .boxed(),
                );

                cx.waker().wake_by_ref();
                Poll::Pending
            }
            ContractAddressesStreamState::PollFromAndToFuture(from_and_to_future) => {
                let (from, to): (i64, i64) =
                    futures_util::ready!(from_and_to_future.as_mut().poll(cx));

                *this.state = ContractAddressesStreamState::GetDataStreamFuture((from, to));

                cx.waker().wake_by_ref();

                Poll::Pending
            }
            ContractAddressesStreamState::GetDataStreamFuture((from, to)) => {
                let client = this.client.clone();
                let from = *from;
                let to = *to;

                if from > to {
                    Poll::Ready(None)
                } else {
                    let chunk_limit = from + *this.chunk_size;

                    let data_stream_future = async move {
                        let client = client.lock().await;

                        let query = format!(
                            "
                        SELECT * FROM chaindexing_contract_addresses 
                        WHERE chain_id = {chain_id_} AND id BETWEEN {from} AND {chunk_limit}
                        "
                        );

                        let addresses: Vec<ContractAddress> =
                            ChaindexingRepo::load_data_list(&client, &query).await;

                        addresses
                    }
                    .boxed();

                    *this.state = ContractAddressesStreamState::PollDataStreamFuture((
                        data_stream_future,
                        chunk_limit,
                        to,
                    ));

                    cx.waker().wake_by_ref();

                    Poll::Pending
                }
            }
            ContractAddressesStreamState::PollDataStreamFuture((
                data_stream_future,
                next_from,
                to,
            )) => {
                let streamed_data = futures_util::ready!(data_stream_future.as_mut().poll(cx));

                *this.state = ContractAddressesStreamState::GetDataStreamFuture((*next_from, *to));

                cx.waker().wake_by_ref();

                Poll::Ready(Some(streamed_data))
            }
        }
    }
}