rebuilderd 0.26.0

rebuilderd - independent build verification daemon
Documentation
use crate::code_migrations::code_migration;
use diesel::connection::{
    CacheSize, Instrumentation, LoadConnection, SimpleConnection, TransactionManager,
};
use diesel::expression::QueryMetadata;
use diesel::migration::Migration;
use diesel::prelude::*;
use diesel::query_builder::{Query, QueryFragment, QueryId};
use diesel::r2d2::{self, ConnectionManager};
use diesel::sql_query;
use diesel_migrations::{EmbeddedMigrations, MigrationHarness, embed_migrations};
use rebuilderd_common::errors::*;

pub const MIGRATIONS: EmbeddedMigrations = embed_migrations!("migrations");

pub type Pool = r2d2::Pool<ConnectionManager<SqliteConnectionWrap>>;

pub fn setup(url: &str) -> Result<SqliteConnection> {
    info!("Using database at {:?}", url);
    let mut connection = SqliteConnection::establish(url)?;

    let mut database_schema_changed = false;

    loop {
        let pending_migrations = connection
            .pending_migrations(MIGRATIONS)
            .map_err(|err| anyhow!("Failed to check for pending migrations: {err:#}"))?;

        let Some(next_migration) = pending_migrations.first() else {
            break;
        };

        info!(
            "Starting database migration: {}",
            next_migration.name().version()
        );

        let version = code_migration::run_code_backed_migration(&mut connection, next_migration)
            .map_err(|err| anyhow!("Failed to run pending migration: {err:#}"))?;

        info!("Applied database migration: {version}");
        database_schema_changed = true;
    }

    if database_schema_changed {
        info!("reclaiming disk space (this might take a while)");
        sql_query("VACUUM;").execute(&mut connection)?;

        info!("analyzing new schema and optimizing queries");
        sql_query("ANALYZE;").execute(&mut connection)?;
    }

    Ok(connection)
}

pub fn setup_pool(url: &str) -> Result<Pool> {
    setup(url)?;

    let manager = ConnectionManager::<SqliteConnectionWrap>::new(url);
    let pool = r2d2::Pool::builder()
        .build(manager)
        .context("Failed to create pool")?;
    Ok(pool)
}

pub struct SqliteConnectionWrap(SqliteConnection);

impl LoadConnection for SqliteConnectionWrap {
    type Cursor<'conn, 'query> = <SqliteConnection as LoadConnection>::Cursor<'conn, 'query>;
    type Row<'conn, 'query> = <SqliteConnection as LoadConnection>::Row<'conn, 'query>;

    fn load<'conn, 'query, T>(
        &'conn mut self,
        source: T,
    ) -> QueryResult<Self::Cursor<'conn, 'query>>
    where
        T: Query + QueryFragment<Self::Backend> + QueryId + 'query,
        Self::Backend: QueryMetadata<T::SqlType>,
    {
        self.0.load(source)
    }
}

impl std::convert::AsMut<SqliteConnection> for SqliteConnectionWrap {
    fn as_mut(&mut self) -> &mut SqliteConnection {
        &mut self.0
    }
}

impl diesel::r2d2::R2D2Connection for SqliteConnectionWrap {
    fn ping(&mut self) -> QueryResult<()> {
        self.0.ping()
    }

    fn is_broken(&mut self) -> bool {
        self.0.is_broken()
    }
}

impl diesel::connection::ConnectionSealed for SqliteConnectionWrap {}

impl diesel::connection::SimpleConnection for SqliteConnectionWrap {
    fn batch_execute(&mut self, query: &str) -> QueryResult<()> {
        self.0.batch_execute(query)
    }
}

impl Connection for SqliteConnectionWrap {
    type Backend = <SqliteConnection as Connection>::Backend;
    type TransactionManager = <SqliteConnection as Connection>::TransactionManager;

    fn establish(database_url: &str) -> ConnectionResult<Self> {
        let mut c = SqliteConnection::establish(database_url).map_err(|err| {
            warn!("establish returned error: {:?}", err);
            err
        })?;

        c.batch_execute(
            "
            PRAGMA busy_timeout = 10000;        -- sleep if the database is busy
            PRAGMA foreign_keys = ON;           -- enforce foreign keys
        ",
        )
        .map_err(|err| {
            warn!("executing pragmas for busy_timeout failed: {:?}", err);
            ConnectionError::CouldntSetupConfiguration(err)
        })?;

        c.batch_execute("
            PRAGMA journal_mode = WAL;          -- better write-concurrency
            PRAGMA synchronous = NORMAL;        -- fsync only in critical moments
            PRAGMA wal_autocheckpoint = 1000;   -- write WAL changes back every 1000 pages, for an in average 1MB WAL file. May affect readers if number is increased
            PRAGMA wal_checkpoint(TRUNCATE);    -- free some space by truncating possibly massive WAL files from the last run.
            PRAGMA cache_size = 134217728;      -- set disk cache size to 128MB
        ").map_err(|err| {
            warn!("executing pragmas for wall mode failed: {:?}", err);
            ConnectionError::CouldntSetupConfiguration(err)
        })?;

        Ok(Self(c))
    }

    fn execute_returning_count<T>(&mut self, source: &T) -> QueryResult<usize>
    where
        T: QueryFragment<Self::Backend> + QueryId,
    {
        self.0.execute_returning_count(source)
    }

    fn transaction_state(
        &mut self,
    ) -> &mut <Self::TransactionManager as TransactionManager<Self>>::TransactionStateData {
        self.0.transaction_state()
    }

    fn instrumentation(&mut self) -> &mut dyn Instrumentation {
        self.0.instrumentation()
    }

    fn set_instrumentation(&mut self, instrumentation: impl Instrumentation) {
        self.0.set_instrumentation(instrumentation)
    }

    fn set_prepared_statement_cache_size(&mut self, size: CacheSize) {
        self.0.set_prepared_statement_cache_size(size);
    }
}