fuel-indexer 0.2.2

Fuel Indexer
use crate::{
    executor::{ExecutorSource, NativeIndexExecutor, WasmIndexExecutor},
    Database, IndexerConfig, IndexerResult, Manifest,
};
use async_std::sync::{Arc, Mutex};
use fuel_indexer_database::{queries, types::IndexAssetType, IndexerConnectionPool};
use fuel_indexer_lib::{defaults, utils::ServiceRequest};
use fuel_indexer_schema::db::manager::SchemaManager;
use fuel_indexer_types::abi::BlockData;
use futures::{
    stream::{FuturesUnordered, StreamExt},
    Future,
};
use std::collections::HashMap;
use std::marker::Send;
use std::sync::atomic::{AtomicBool, Ordering};
use tokio::{
    sync::mpsc::Receiver,
    task::JoinHandle,
    time::{sleep, Duration},
};
use tracing::{debug, error, info, warn};

pub struct IndexerService {
    config: IndexerConfig,
    pool: IndexerConnectionPool,
    manager: SchemaManager,
    database_url: String,
    handles: HashMap<String, JoinHandle<()>>,
    rx: Option<Receiver<ServiceRequest>>,
    killers: HashMap<String, Arc<AtomicBool>>,
}

impl IndexerService {
    pub async fn new(
        config: IndexerConfig,
        pool: IndexerConnectionPool,
        rx: Option<Receiver<ServiceRequest>>,
    ) -> IndexerResult<IndexerService> {
        let database_url = config.database.to_string();

        let manager = SchemaManager::new(pool.clone());

        Ok(IndexerService {
            config,
            pool,
            manager,
            database_url,
            handles: HashMap::default(),
            killers: HashMap::default(),
            rx,
        })
    }

    pub async fn register_index_from_manifest(
        &mut self,
        manifest: Manifest,
    ) -> IndexerResult<()> {
        let database_url = self.database_url.clone();
        let mut conn = self.pool.acquire().await?;
        let index =
            queries::register_index(&mut conn, &manifest.namespace, &manifest.identifier)
                .await?;
        let schema = manifest.graphql_schema()?;
        let schema_bytes = schema.as_bytes().to_vec();

        self.manager
            .new_schema(&manifest.namespace, &schema, &mut conn)
            .await?;

        let (handle, exec_source, killer) = WasmIndexExecutor::create(
            &self.config.fuel_node.clone(),
            &database_url,
            &manifest,
            ExecutorSource::Manifest,
            self.config.stop_idle_indexers,
        )
        .await?;

        let mut items = vec![
            (IndexAssetType::Wasm, exec_source.to_vec()),
            (IndexAssetType::Manifest, manifest.to_bytes()?),
            (IndexAssetType::Schema, schema_bytes),
        ];

        while let Some((asset_type, bytes)) = items.pop() {
            info!(
                "Registering Asset({:?}) for Index({})",
                asset_type,
                index.uid()
            );

            {
                queries::register_index_asset(
                    &mut conn,
                    &manifest.namespace,
                    &manifest.identifier,
                    bytes,
                    asset_type,
                )
                .await?;
            }
        }

        info!("Registered Index({})", &manifest.uid());
        self.handles.insert(manifest.uid(), handle);
        self.killers.insert(manifest.uid(), killer);

        Ok(())
    }

    pub async fn register_indices_from_registry(&mut self) -> IndexerResult<()> {
        let mut conn = self.pool.acquire().await?;
        let indices = queries::registered_indices(&mut conn).await?;
        for index in indices {
            let assets = queries::latest_assets_for_index(&mut conn, &index.id).await?;
            let manifest = Manifest::from_slice(&assets.manifest.bytes)?;

            let (handle, _module_bytes, killer) = WasmIndexExecutor::create(
                &self.config.fuel_node,
                &self.config.database.to_string(),
                &manifest,
                ExecutorSource::Registry(assets.wasm.bytes),
                self.config.stop_idle_indexers,
            )
            .await?;

            info!("Registered Index({})", manifest.uid());
            self.handles.insert(manifest.uid(), handle);
            self.killers.insert(manifest.uid(), killer);
        }

        Ok(())
    }

    pub async fn register_native_index<
        T: Future<Output = IndexerResult<()>> + Send + 'static,
    >(
        &mut self,
        manifest: Manifest,
        handle_events: fn(Vec<BlockData>, Arc<Mutex<Database>>) -> T,
    ) -> IndexerResult<()> {
        let mut conn = self.pool.acquire().await?;
        let _index =
            queries::register_index(&mut conn, &manifest.namespace, &manifest.identifier)
                .await?;
        let schema = manifest.graphql_schema()?;
        let _schema_bytes = schema.as_bytes().to_vec();

        self.manager
            .new_schema(&manifest.namespace, &schema, &mut conn)
            .await?;

        let uid = manifest.uid();
        let (handle, _module_bytes, killer) = NativeIndexExecutor::<T>::create(
            &self.database_url,
            &self.config.fuel_node,
            manifest,
            self.config.stop_idle_indexers,
            handle_events,
        )
        .await?;

        info!("Registered NativeIndex({})", uid);

        self.handles.insert(uid.clone(), handle);
        self.killers.insert(uid, killer);
        Ok(())
    }

    pub async fn run(self) {
        let IndexerService {
            handles,
            rx,
            pool,
            config,
            killers,
            ..
        } = self;

        let futs = Arc::new(Mutex::new(FuturesUnordered::from_iter(
            handles.into_values(),
        )));

        tokio::spawn(create_service_task(
            rx,
            config.clone(),
            pool.clone(),
            futs.clone(),
            killers,
        ))
        .await
        .unwrap();

        while let Some(fut) = futs.lock().await.next().await {
            info!("Retired a future {fut:?}");
        }
    }
}

async fn create_service_task(
    rx: Option<Receiver<ServiceRequest>>,
    config: IndexerConfig,
    pool: IndexerConnectionPool,
    futs: Arc<Mutex<FuturesUnordered<JoinHandle<()>>>>,
    mut killers: HashMap<String, Arc<AtomicBool>>,
) {
    if let Some(mut rx) = rx {
        loop {
            let futs = futs.lock().await;
            match rx.try_recv() {
                Ok(service_request) => match service_request {
                    ServiceRequest::AssetReload(request) => {
                        let mut conn = pool
                            .acquire()
                            .await
                            .expect("Failed to acquire connection from pool");

                        match queries::index_id_for(
                            &mut conn,
                            &request.namespace,
                            &request.identifier,
                        )
                        .await
                        {
                            Ok(id) => {
                                let assets =
                                    queries::latest_assets_for_index(&mut conn, &id)
                                        .await
                                        .expect("Could not get latest assets for index");

                                let manifest: Manifest =
                                    serde_yaml::from_slice(&assets.manifest.bytes)
                                        .expect("Failed to deserialize manifest");

                                let (handle, _module_bytes, killer) = WasmIndexExecutor::create(
                                    &config.fuel_node,
                                    &config.database.to_string(),
                                    &manifest,
                                    ExecutorSource::Registry(assets.wasm.bytes),
                                    config.stop_idle_indexers,
                                )
                                .await
                                .expect(
                                    "Failed to spawn executor from index asset registry",
                                );

                                futs.push(handle);
                                killers.insert(manifest.uid(), killer);
                            }
                            Err(e) => {
                                error!(
                                    "Failed to find Index({}.{}): {}",
                                    &request.namespace, &request.identifier, e
                                );

                                continue;
                            }
                        }
                    }
                    ServiceRequest::IndexStop(request) => {
                        let uid = format!("{}.{}", request.namespace, request.identifier);

                        if let Some(killer) = killers.remove(&uid) {
                            killer.store(true, Ordering::SeqCst);
                        } else {
                            warn!("Stop Indexer: No indexer with the name Index({uid})");
                        }
                    }
                },
                Err(e) => {
                    debug!("No service request to handle: {e:?}");
                    sleep(Duration::from_secs(defaults::IDLE_SERVICE_WAIT_SECS)).await;
                }
            }
        }
    }
}