sqlw-backend 0.1.0

Database executor implementations for sqlw
Documentation
//! MySQL async executor implementation for sqlw using mysql_async crate.

use std::future::Future;
use std::sync::Arc;

use crate::error::DBError;
use mysql_async::prelude::Queryable;
use mysql_async::{Opts, Pool};
use sqlw::{FromRow, Query, RowCell, RowError, RowLike, Value};

/// A borrowed row from a MySQL query result.
pub struct MySqlRowRef<'a> {
    row: &'a mysql_async::Row,
}

impl<'a> MySqlRowRef<'a> {
    /// Creates a new row reference from a `mysql_async` row.
    pub fn new(row: &'a mysql_async::Row) -> Self {
        Self { row }
    }

    fn column_index(&self, name: &str) -> Result<usize, RowError> {
        self.row
            .columns()
            .iter()
            .position(|col| col.name_str() == name)
            .ok_or_else(|| RowError::ColumnNotFound {
                name: name.to_string(),
            })
    }

    fn get_value_by_index(&self, index: usize) -> Result<Value, RowError> {
        let columns = self.row.columns();
        let column = columns
            .get(index)
            .ok_or_else(|| RowError::Any(format!("Column index {} not found", index)))?;

        use mysql_async::consts::ColumnType;

        match column.column_type() {
            ColumnType::MYSQL_TYPE_VAR_STRING
            | ColumnType::MYSQL_TYPE_STRING
            | ColumnType::MYSQL_TYPE_VARCHAR
            | ColumnType::MYSQL_TYPE_BLOB => {
                let val: Option<String> = self.row.get(index);
                match val {
                    Some(s) => Ok(Value::Text(s)),
                    None => Ok(Value::Null),
                }
            }
            ColumnType::MYSQL_TYPE_TINY
            | ColumnType::MYSQL_TYPE_SHORT
            | ColumnType::MYSQL_TYPE_INT24
            | ColumnType::MYSQL_TYPE_LONG => {
                let val: Option<i32> = self.row.get(index);
                match val {
                    Some(v) => Ok(Value::Int(v as i64)),
                    None => Ok(Value::Null),
                }
            }
            ColumnType::MYSQL_TYPE_LONGLONG => {
                let val: Option<i64> = self.row.get(index);
                match val {
                    Some(v) => Ok(Value::Int(v)),
                    None => Ok(Value::Null),
                }
            }
            ColumnType::MYSQL_TYPE_FLOAT | ColumnType::MYSQL_TYPE_DOUBLE => {
                let val: Option<f64> = self.row.get(index);
                match val {
                    Some(v) => Ok(Value::Float(v)),
                    None => Ok(Value::Null),
                }
            }
            _ => {
                let val: Option<String> = self.row.get(index);
                match val {
                    Some(s) => Ok(Value::Text(s)),
                    None => Ok(Value::Null),
                }
            }
        }
    }
}

impl<'a> RowLike for MySqlRowRef<'a> {
    fn cell<'b>(&'b self, name: &str) -> Result<RowCell<'b>, RowError> {
        let index = self.column_index(name)?;
        Ok(RowCell::Owned(self.get_value_by_index(index)?))
    }
}

/// An async MySQL executor using `mysql_async` crate.
///
/// Provides native async database operations via the [`QueryExecutor`](sqlw::QueryExecutor) trait.
/// This is the recommended MySQL implementation for high-concurrency scenarios.
#[derive(Debug, Clone)]
pub struct MySqlExecutor {
    /// Connection pool for MySQL async
    pool: Arc<Pool>,
}

impl MySqlExecutor {
    /// Creates a new `MySqlExecutor` with the given connector function.
    ///
    /// # Example
    ///
    /// ```ignore
    /// use sqlw_backend::mysql_async::MySqlExecutor;
    ///
    /// let executor = MySqlExecutor::new(|| async {
    ///     let opts = Opts::from_url("mysql://root:password@localhost:3307/db_name")?;
    ///     let pool = Pool::new(opts);
    ///     Ok(pool)
    /// }).await?;
    /// ```
    pub async fn new<F, Fut>(connector: F) -> Result<Self, DBError>
    where
        F: FnOnce() -> Fut,
        Fut: Future<Output = Result<Pool, mysql_async::Error>> + Send + 'static,
    {
        let pool = connector()
            .await
            .map_err(|e| DBError::Connection(e.into()))?;
        Ok(MySqlExecutor {
            pool: Arc::new(pool),
        })
    }

    /// Returns a reference to the underlying MySQL async pool.
    pub fn pool(&self) -> &Pool {
        &self.pool
    }

    /// Creates a new executor directly from a connection URL.
    ///
    /// # Example
    ///
    /// ```ignore
    /// use sqlw_backend::mysql_async::MySqlExecutor;
    ///
    /// let executor = MySqlExecutor::from_url("mysql://root:password@localhost:3307/db_name").await?;
    /// ```
    pub async fn from_url(url: &str) -> Result<Self, DBError> {
        let opts = Opts::from_url(url).map_err(|e| DBError::Connection(e.into()))?;
        let pool = Pool::new(opts);
        Ok(Self {
            pool: Arc::new(pool),
        })
    }
}

impl sqlw::QueryExecutor for MySqlExecutor {
    type Error = DBError;

    fn query_void(&self, query: Query) -> impl Future<Output = Result<(), DBError>> {
        let pool = Arc::clone(&self.pool);
        async move {
            let (sql, args) = query.split();

            let mysql_params: Vec<mysql_async::Value> =
                args.into_iter().map(sqlw_to_mysql_value).collect();

            let mut conn = pool
                .get_conn()
                .await
                .map_err(|e| DBError::Execution(e.into()))?;

            conn.exec_drop(&sql, mysql_params)
                .await
                .map_err(|e| DBError::Execution(e.into()))
        }
    }

    fn query_one<T: FromRow + Send + 'static>(
        &self,
        query: Query,
    ) -> impl Future<Output = Result<Option<T>, DBError>> {
        let pool = Arc::clone(&self.pool);
        async move {
            let (sql, args) = query.split();

            let mysql_params: Vec<mysql_async::Value> =
                args.into_iter().map(sqlw_to_mysql_value).collect();

            let mut conn = pool
                .get_conn()
                .await
                .map_err(|e| DBError::Execution(e.into()))?;

            let result: Option<mysql_async::Row> = conn
                .exec_first(&sql, mysql_params)
                .await
                .map_err(|e| DBError::Execution(e.into()))?;

            match result {
                Some(row) => {
                    let row_ref = MySqlRowRef::new(&row);
                    T::from_row(&row_ref).map(Some).map_err(DBError::Mapping)
                }
                None => Ok(None),
            }
        }
    }

    fn query_list<T: FromRow + Send + 'static>(
        &self,
        query: Query,
    ) -> impl Future<Output = Result<Vec<T>, DBError>> {
        let pool = Arc::clone(&self.pool);
        async move {
            let (sql, args) = query.split();

            let mysql_params: Vec<mysql_async::Value> =
                args.into_iter().map(sqlw_to_mysql_value).collect();

            let mut conn = pool
                .get_conn()
                .await
                .map_err(|e| DBError::Execution(e.into()))?;

            let rows: Vec<mysql_async::Row> = conn
                .exec(&sql, mysql_params)
                .await
                .map_err(|e| DBError::Execution(e.into()))?;

            let mut results = Vec::new();
            for row in rows {
                let row_ref = MySqlRowRef::new(&row);
                results.push(T::from_row(&row_ref)?);
            }

            Ok(results)
        }
    }
}

fn sqlw_to_mysql_value(value: Value) -> mysql_async::Value {
    match value {
        Value::Text(s) => mysql_async::Value::Bytes(s.into_bytes()),
        Value::Int(i) => mysql_async::Value::Int(i),
        Value::Float(f) => mysql_async::Value::Double(f),
        Value::Bool(b) => mysql_async::Value::Int(i64::from(b)),
        Value::Blob(b) => mysql_async::Value::Bytes(b),
        Value::Null => mysql_async::Value::NULL,
    }
}