wtx 0.44.1

A collection of different transport implementations and related tools focused primarily on web technologies.
Documentation
use crate::{
  codec::CodecController,
  collection::Vector,
  database::{
    Database, DatabaseTy, FromRecords,
    executor::Executor,
    schema_manager::{DbMigration, Uid, UserMigration, UserMigrationGroup, VERSION},
  },
  misc::Lease,
};
use alloc::string::String;
use core::fmt::Write;

#[cfg(any(feature = "mysql", feature = "postgres"))]
pub(crate) async fn delete_migrations<E, S>(
  buffer_cmd: &mut String,
  executor: &mut E,
  mg: &UserMigrationGroup<S>,
  schema_prefix: &str,
  uid: Uid,
) -> Result<(), <E::Database as CodecController>::Error>
where
  E: Executor,
  S: Lease<str>,
{
  buffer_cmd.write_fmt(format_args!(
      "DELETE FROM {schema_prefix}_wtx_migration WHERE _wtx_migration_mg_uid = {mg_uid} AND uid > {uid}",
      mg_uid = mg.uid(),
    )).map_err(crate::Error::from)?;
  executor.execute_ignored(buffer_cmd.as_str()).await?;
  buffer_cmd.clear();
  Ok(())
}

#[cfg(any(feature = "mysql", feature = "postgres"))]
pub(crate) async fn insert_migrations<'migration, DBS, E, I, S>(
  buffer_cmd: &mut String,
  executor: &mut E,
  mg: &UserMigrationGroup<S>,
  migrations: I,
  schema_prefix: &str,
) -> Result<(), <E::Database as CodecController>::Error>
where
  DBS: Lease<[DatabaseTy]> + 'migration,
  E: Executor,
  I: Clone + Iterator<Item = &'migration UserMigration<DBS, S>>,
  S: Lease<str> + 'migration,
{
  buffer_cmd
    .write_fmt(format_args!(
      "INSERT INTO {schema_prefix}_wtx_migration_group (uid, name, version)
      SELECT * FROM (SELECT {mg_uid} AS uid, '{mg_name}' AS name, {VERSION} AS version) AS tmp
      WHERE NOT EXISTS (
        SELECT 1 FROM {schema_prefix}_wtx_migration_group WHERE uid = {mg_uid}
      );",
      mg_name = mg.name(),
      mg_uid = mg.uid(),
    ))
    .map_err(Into::into)?;
  executor.execute_ignored(buffer_cmd.as_str()).await?;
  buffer_cmd.clear();

  for migration in migrations.clone() {
    buffer_cmd.push_str(migration.sql_up());
  }
  executor
    .transaction(|this| async {
      this.execute_ignored(buffer_cmd.as_str()).await?;
      Ok(((), this))
    })
    .await?;
  buffer_cmd.clear();

  for migration in migrations {
    buffer_cmd
      .write_fmt(format_args!(
        "INSERT INTO {schema_prefix}_wtx_migration (
          uid, _wtx_migration_mg_uid, checksum, name
        ) VALUES (
          {m_uid}, {mg_uid}, '{m_checksum}', '{m_name}'
        );",
        m_checksum = migration.checksum(),
        m_name = migration.name(),
        m_uid = migration.uid(),
        mg_uid = mg.uid(),
        schema_prefix = schema_prefix,
      ))
      .map_err(Into::into)?;
  }
  executor
    .transaction(|this| async {
      this.execute_ignored(buffer_cmd.as_str()).await?;
      Ok(((), this))
    })
    .await?;
  buffer_cmd.clear();

  Ok(())
}

#[cfg(any(feature = "mysql", feature = "postgres"))]
pub(crate) async fn migrations_by_mg_uid_query<'exec, E, ERR, D>(
  buffer_cmd: &mut String,
  executor: &'exec mut E,
  mg_uid: Uid,
  results: &mut Vector<DbMigration>,
  schema_prefix: &str,
) -> Result<(), ERR>
where
  D: Database<Error = ERR>,
  E: Executor<Database = D>,
  ERR: From<crate::Error>,
  DbMigration: FromRecords<'exec, E::Database>,
{
  buffer_cmd.write_fmt(format_args!(
      "SELECT \
        _wtx_migration.uid, \
        _wtx_migration_group.uid as mg_uid, \
        _wtx_migration_group.name as mg_name, \
        _wtx_migration_group.version as mg_version, \
        _wtx_migration.checksum, \
        _wtx_migration.created_on, \
        _wtx_migration.name, \
        _wtx_migration.repeatability \
      FROM \
        {schema_prefix}_wtx_migration_group \
      JOIN \
        {schema_prefix}_wtx_migration ON _wtx_migration._wtx_migration_mg_uid = _wtx_migration_group.uid \
      WHERE \
        _wtx_migration_group.uid = {mg_uid} \
      ORDER BY \
        _wtx_migration.uid ASC;",
    )).map_err(crate::Error::from)?;
  for elem in
    DbMigration::many(&executor.execute_stmt_many(buffer_cmd.as_str(), (), |_elem| Ok(())).await?)
  {
    if let Err(elem) = results.push(elem?) {
      buffer_cmd.clear();
      return Err(elem.into());
    }
  }
  buffer_cmd.clear();
  Ok(())
}