carbon-core 0.12.0

Core library for Carbon
Documentation
use solana_instruction::AccountMeta;
use std::sync::Arc;

use crate::{
    account::{AccountMetadata, AccountProcessorInputType},
    error::CarbonResult,
    instruction::{InstructionMetadata, InstructionProcessorInputType},
    metrics::MetricsCollection,
    postgres::{
        operations::Upsert,
        rows::{AccountRow, InstructionRow},
    },
};

pub struct PostgresAccountProcessor<T, W> {
    pool: sqlx::PgPool,
    _phantom: std::marker::PhantomData<(T, W)>,
}

impl<T, W> PostgresAccountProcessor<T, W> {
    pub fn new(pool: sqlx::PgPool) -> Self {
        Self {
            pool,
            _phantom: std::marker::PhantomData,
        }
    }
}

#[async_trait::async_trait]
impl<T, W> crate::processor::Processor for PostgresAccountProcessor<T, W>
where
    T: Clone + Send + Sync + 'static,
    W: From<(T, AccountMetadata)> + Upsert + Send + 'static,
{
    type InputType = AccountProcessorInputType<T>;

    async fn process(
        &mut self,
        input: Self::InputType,
        metrics: Arc<MetricsCollection>,
    ) -> CarbonResult<()> {
        let (metadata, decoded_account, _raw) = input;

        let start = std::time::Instant::now();

        let wrapper = W::from((decoded_account.data, metadata));

        match wrapper.upsert(&self.pool).await {
            Ok(()) => {
                metrics
                    .increment_counter("postgres.accounts.upsert.upserted", 1)
                    .await?;
                metrics
                    .record_histogram(
                        "postgres.accounts.upsert.duration_milliseconds",
                        start.elapsed().as_millis() as f64,
                    )
                    .await?;
                Ok(())
            }
            Err(e) => {
                metrics
                    .increment_counter("postgres.accounts.upsert.failed", 1)
                    .await?;
                return Err(e);
            }
        }
    }
}

pub struct PostgresJsonAccountProcessor<T> {
    pool: sqlx::PgPool,
    _phantom: std::marker::PhantomData<T>,
}

impl<T> PostgresJsonAccountProcessor<T> {
    pub fn new(pool: sqlx::PgPool) -> Self {
        Self {
            pool,
            _phantom: std::marker::PhantomData,
        }
    }
}

#[async_trait::async_trait]
impl<T> crate::processor::Processor for PostgresJsonAccountProcessor<T>
where
    T: serde::Serialize + for<'de> serde::Deserialize<'de> + Clone + Send + Sync + Unpin + 'static,
{
    type InputType = AccountProcessorInputType<T>;

    async fn process(
        &mut self,
        input: Self::InputType,
        metrics: Arc<MetricsCollection>,
    ) -> CarbonResult<()> {
        let (metadata, decoded_account, _raw) = input;

        let account_row = AccountRow::from_parts(decoded_account.data, metadata);

        let start = std::time::Instant::now();

        match account_row.upsert(&self.pool).await {
            Ok(()) => {
                metrics
                    .increment_counter("postgres.accounts.upsert.upserted", 1)
                    .await?;
                metrics
                    .record_histogram(
                        "postgres.accounts.upsert.duration_milliseconds",
                        start.elapsed().as_millis() as f64,
                    )
                    .await?;
                Ok(())
            }
            Err(e) => {
                metrics
                    .increment_counter("postgres.accounts.upsert.failed", 1)
                    .await?;
                return Err(e);
            }
        }
    }
}

pub struct PostgresInstructionProcessor<T, W> {
    pool: sqlx::PgPool,
    _phantom: std::marker::PhantomData<(T, W)>,
}

impl<T, W> PostgresInstructionProcessor<T, W> {
    pub fn new(pool: sqlx::PgPool) -> Self {
        Self {
            pool,
            _phantom: std::marker::PhantomData,
        }
    }
}

#[async_trait::async_trait]
impl<T, W> crate::processor::Processor for PostgresInstructionProcessor<T, W>
where
    T: Clone + Send + Sync + 'static,
    W: From<(T, InstructionMetadata, Vec<AccountMeta>)> + Upsert + Send + 'static,
{
    type InputType = InstructionProcessorInputType<T>;

    async fn process(
        &mut self,
        input: Self::InputType,
        metrics: Arc<MetricsCollection>,
    ) -> CarbonResult<()> {
        let (metadata, decoded_instruction, _nested_instructions, _raw) = input;

        let start = std::time::Instant::now();

        let wrapper = W::from((
            decoded_instruction.data,
            metadata,
            decoded_instruction.accounts,
        ));

        match wrapper.upsert(&self.pool).await {
            Ok(()) => {
                metrics
                    .increment_counter("postgres.instructions.upsert.upserted", 1)
                    .await?;
                metrics
                    .record_histogram(
                        "postgres.instructions.upsert.duration_milliseconds",
                        start.elapsed().as_millis() as f64,
                    )
                    .await?;
                Ok(())
            }
            Err(e) => {
                metrics
                    .increment_counter("postgres.instructions.upsert.failed", 1)
                    .await?;
                return Err(e);
            }
        }
    }
}

pub struct PostgresJsonInstructionProcessor<T> {
    pool: sqlx::PgPool,
    _phantom: std::marker::PhantomData<T>,
}

impl<T> PostgresJsonInstructionProcessor<T> {
    pub fn new(pool: sqlx::PgPool) -> Self {
        Self {
            pool,
            _phantom: std::marker::PhantomData,
        }
    }
}

#[async_trait::async_trait]
impl<T> crate::processor::Processor for PostgresJsonInstructionProcessor<T>
where
    T: serde::Serialize + for<'de> serde::Deserialize<'de> + Clone + Send + Sync + Unpin + 'static,
{
    type InputType = InstructionProcessorInputType<T>;

    async fn process(
        &mut self,
        input: Self::InputType,
        metrics: Arc<MetricsCollection>,
    ) -> CarbonResult<()> {
        let (metadata, decoded_instruction, _nested_instructions, _raw) = input;

        let instruction_row = InstructionRow::from_parts(
            decoded_instruction.data,
            metadata,
            decoded_instruction.accounts,
        );

        let start = std::time::Instant::now();

        match instruction_row.upsert(&self.pool).await {
            Ok(()) => {
                metrics
                    .increment_counter("postgres.instructions.upsert.upserted", 1)
                    .await?;
                metrics
                    .record_histogram(
                        "postgres.instructions.upsert.duration_milliseconds",
                        start.elapsed().as_millis() as f64,
                    )
                    .await?;
                Ok(())
            }
            Err(e) => {
                metrics
                    .increment_counter("postgres.instructions.upsert.failed", 1)
                    .await?;
                return Err(e);
            }
        }
    }
}