carbon-core 0.12.0

Core library for Carbon
Documentation
use solana_instruction::AccountMeta;

use crate::{
    account::AccountMetadata,
    error::CarbonResult,
    instruction::InstructionMetadata,
    postgres::{
        metadata::{AccountRowMetadata, InstructionRowMetadata},
        operations::{Delete, Insert, LookUp, Upsert},
        primitives::{Pubkey, U32},
    },
};

#[derive(sqlx::FromRow, Debug, Clone)]
pub struct AccountRow<
    T: serde::Serialize + for<'de> serde::Deserialize<'de> + Clone + Send + Sync + Unpin + 'static,
> {
    #[sqlx(flatten)]
    pub metadata: AccountRowMetadata,
    pub data: sqlx::types::Json<T>,
}

impl<
        T: serde::Serialize + for<'de> serde::Deserialize<'de> + Clone + Send + Sync + Unpin + 'static,
    > AccountRow<T>
{
    pub fn from_parts(source: T, metadata: AccountMetadata) -> Self {
        Self {
            metadata: metadata.into(),
            data: sqlx::types::Json(source),
        }
    }
}

#[async_trait::async_trait]
impl<
        T: serde::Serialize + for<'de> serde::Deserialize<'de> + Clone + Send + Sync + Unpin + 'static,
    > Insert for AccountRow<T>
{
    async fn insert(&self, pool: &sqlx::PgPool) -> CarbonResult<()> {
        sqlx::query(r#"INSERT INTO accounts (__pubkey, __slot, data) VALUES ($1, $2, $3)"#)
            .bind(self.metadata.pubkey)
            .bind(self.metadata.slot.clone())
            .bind(self.data.clone())
            .execute(pool)
            .await
            .map_err(|e| crate::error::Error::Custom(e.to_string()))?;
        Ok(())
    }
}

#[async_trait::async_trait]
impl<
        T: serde::Serialize + for<'de> serde::Deserialize<'de> + Clone + Send + Sync + Unpin + 'static,
    > Upsert for AccountRow<T>
{
    async fn upsert(&self, pool: &sqlx::PgPool) -> CarbonResult<()> {
        sqlx::query(r#"INSERT INTO accounts (__pubkey, __slot, data) VALUES ($1, $2, $3) ON CONFLICT (__pubkey) DO UPDATE SET __slot = $2, data = $3"#)
            .bind(self.metadata.pubkey)
            .bind(self.metadata.slot.clone())
            .bind(self.data.clone())
            .execute(pool)
            .await
            .map_err(|e| crate::error::Error::Custom(e.to_string()))?;
        Ok(())
    }
}

#[async_trait::async_trait]
impl<
        T: serde::Serialize + for<'de> serde::Deserialize<'de> + Clone + Send + Sync + Unpin + 'static,
    > Delete for AccountRow<T>
{
    type Key = Pubkey;

    async fn delete(key: Self::Key, pool: &sqlx::PgPool) -> CarbonResult<()> {
        sqlx::query(r#"DELETE FROM accounts WHERE __pubkey = $1"#)
            .bind(key)
            .execute(pool)
            .await
            .map_err(|e| crate::error::Error::Custom(e.to_string()))?;
        Ok(())
    }
}

#[async_trait::async_trait]
impl<
        T: serde::Serialize + for<'de> serde::Deserialize<'de> + Clone + Send + Sync + Unpin + 'static,
    > LookUp for AccountRow<T>
{
    type Key = Pubkey;

    async fn lookup(key: Self::Key, pool: &sqlx::PgPool) -> CarbonResult<Option<Self>> {
        let row = sqlx::query_as(r#"SELECT * FROM accounts WHERE __pubkey = $1"#)
            .bind(key)
            .fetch_optional(pool)
            .await
            .map_err(|e| crate::error::Error::Custom(e.to_string()))?;
        Ok(row)
    }
}

pub struct AccountRowMigrationOperation;

#[async_trait::async_trait]
impl sqlx_migrator::Operation<sqlx::Postgres> for AccountRowMigrationOperation {
    async fn up(
        &self,
        connection: &mut sqlx::PgConnection,
    ) -> Result<(), sqlx_migrator::error::Error> {
        sqlx::query(
            r#"CREATE TABLE IF NOT EXISTS accounts (
            __pubkey BYTEA NOT NULL,
            __slot NUMERIC,
            data JSONB NOT NULL,
            PRIMARY KEY (__pubkey)
        )"#,
        )
        .execute(connection)
        .await?;
        Ok(())
    }

    async fn down(
        &self,
        connection: &mut sqlx::PgConnection,
    ) -> Result<(), sqlx_migrator::error::Error> {
        sqlx::query(r#"DROP TABLE IF EXISTS accounts"#)
            .execute(connection)
            .await?;
        Ok(())
    }
}

pub struct GenericAccountsMigration;

impl sqlx_migrator::Migration<sqlx::Postgres> for GenericAccountsMigration {
    fn app(&self) -> &str {
        "carbon_core"
    }

    fn name(&self) -> &str {
        "generic_accounts"
    }

    fn operations(&self) -> Vec<Box<dyn sqlx_migrator::Operation<sqlx::Postgres>>> {
        vec![Box::new(AccountRowMigrationOperation)]
    }

    fn parents(&self) -> Vec<Box<dyn sqlx_migrator::Migration<sqlx::Postgres>>> {
        vec![]
    }
}

#[derive(sqlx::FromRow, Debug, Clone)]
pub struct InstructionRow<
    T: serde::Serialize + for<'de> serde::Deserialize<'de> + Clone + Send + Sync + Unpin + 'static,
> {
    #[sqlx(flatten)]
    pub metadata: InstructionRowMetadata,
    pub data: sqlx::types::Json<T>,
    pub accounts: sqlx::types::Json<Vec<AccountMeta>>,
}

impl<
        T: serde::Serialize + for<'de> serde::Deserialize<'de> + Clone + Send + Sync + Unpin + 'static,
    > InstructionRow<T>
{
    pub fn from_parts(
        source: T,
        metadata: InstructionMetadata,
        accounts: Vec<AccountMeta>,
    ) -> Self {
        Self {
            metadata: metadata.into(),
            data: sqlx::types::Json(source),
            accounts: sqlx::types::Json(accounts),
        }
    }
}

#[async_trait::async_trait]
impl<
        T: serde::Serialize + for<'de> serde::Deserialize<'de> + Clone + Send + Sync + Unpin + 'static,
    > Insert for InstructionRow<T>
{
    async fn insert(&self, pool: &sqlx::PgPool) -> CarbonResult<()> {
        sqlx::query(r#"INSERT INTO instructions (__signature, __instruction_index, __stack_height, __slot, data, accounts) VALUES ($1, $2, $3, $4, $5, $6)"#)
            .bind(self.metadata.signature.clone())
            .bind(self.metadata.instruction_index)
            .bind(self.metadata.stack_height)
            .bind(self.metadata.slot.clone())
            .bind(self.data.clone())
            .bind(self.accounts.clone())
            .execute(pool)
            .await
            .map_err(|e| crate::error::Error::Custom(e.to_string()))?;
        Ok(())
    }
}

#[async_trait::async_trait]
impl<
        T: serde::Serialize + for<'de> serde::Deserialize<'de> + Clone + Send + Sync + Unpin + 'static,
    > Upsert for InstructionRow<T>
{
    async fn upsert(&self, pool: &sqlx::PgPool) -> CarbonResult<()> {
        sqlx::query(r#"INSERT INTO instructions (__signature, __instruction_index, __stack_height, __slot, data, accounts) VALUES ($1, $2, $3, $4, $5, $6) ON CONFLICT (__signature, __instruction_index, __stack_height) DO UPDATE SET __slot = $4, data = $5, accounts = $6"#)
            .bind(self.metadata.signature.clone())
            .bind(self.metadata.instruction_index)
            .bind(self.metadata.stack_height)
            .bind(self.metadata.slot.clone())
            .bind(self.data.clone())
            .bind(self.accounts.clone())
            .execute(pool)
            .await
            .map_err(|e| crate::error::Error::Custom(e.to_string()))?;
        Ok(())
    }
}

#[async_trait::async_trait]
impl<
        T: serde::Serialize + for<'de> serde::Deserialize<'de> + Clone + Send + Sync + Unpin + 'static,
    > Delete for InstructionRow<T>
{
    type Key = (String, U32, U32);

    async fn delete(key: Self::Key, pool: &sqlx::PgPool) -> CarbonResult<()> {
        sqlx::query(r#"DELETE FROM instructions WHERE __signature = $1 AND __instruction_index = $2 AND __stack_height = $3"#)
            .bind(key.0)
            .bind(key.1)
            .bind(key.2)
            .execute(pool)
            .await
            .map_err(|e| crate::error::Error::Custom(e.to_string()))?;
        Ok(())
    }
}

#[async_trait::async_trait]
impl<
        T: serde::Serialize + for<'de> serde::Deserialize<'de> + Clone + Send + Sync + Unpin + 'static,
    > LookUp for InstructionRow<T>
{
    type Key = (String, U32, U32);

    async fn lookup(key: Self::Key, pool: &sqlx::PgPool) -> CarbonResult<Option<Self>> {
        let row =
            sqlx::query_as(r#"SELECT * FROM instructions WHERE __signature = $1 AND __instruction_index = $2 AND __stack_height = $3"#)
                .bind(key.0)
                .bind(key.1)
                .bind(key.2)
                .fetch_optional(pool)
                .await
                .map_err(|e| crate::error::Error::Custom(e.to_string()))?;
        Ok(row)
    }
}

pub struct InstructionRowMigrationOperation;

#[async_trait::async_trait]
impl sqlx_migrator::Operation<sqlx::Postgres> for InstructionRowMigrationOperation {
    async fn up(
        &self,
        connection: &mut sqlx::PgConnection,
    ) -> Result<(), sqlx_migrator::error::Error> {
        sqlx::query(
            r#"CREATE TABLE IF NOT EXISTS instructions (
            __signature TEXT NOT NULL,
            __instruction_index BIGINT NOT NULL,
            __stack_height BIGINT NOT NULL,
            __slot NUMERIC,
            data JSONB NOT NULL,
            accounts JSONB NOT NULL,
            PRIMARY KEY (__signature, __instruction_index, __stack_height)
        )"#,
        )
        .execute(connection)
        .await?;
        Ok(())
    }

    async fn down(
        &self,
        connection: &mut sqlx::PgConnection,
    ) -> Result<(), sqlx_migrator::error::Error> {
        sqlx::query(r#"DROP TABLE IF EXISTS instructions"#)
            .execute(connection)
            .await?;
        Ok(())
    }
}

pub struct GenericInstructionMigration;

impl sqlx_migrator::Migration<sqlx::Postgres> for GenericInstructionMigration {
    fn app(&self) -> &str {
        "carbon_core"
    }

    fn name(&self) -> &str {
        "generic_instructions"
    }

    fn operations(&self) -> Vec<Box<dyn sqlx_migrator::Operation<sqlx::Postgres>>> {
        vec![Box::new(InstructionRowMigrationOperation)]
    }

    fn parents(&self) -> Vec<Box<dyn sqlx_migrator::Migration<sqlx::Postgres>>> {
        vec![]
    }
}