rsql_driver_libsql 0.19.3

rsql libsql driver
Documentation
use crate::metadata;
use async_trait::async_trait;
use file_type::FileType;
use libsql::Builder;
use libsql::replication::Frames;
use rsql_driver::Error::IoError;
use rsql_driver::{Metadata, QueryResult, Result, ToSql, Value};
use std::collections::HashMap;
use std::fmt::Debug;
use url::Url;

#[derive(Debug)]
pub struct Driver;

#[async_trait]
impl rsql_driver::Driver for Driver {
    fn identifier(&self) -> &'static str {
        "libsql"
    }

    async fn connect(&self, url: &str) -> Result<Box<dyn rsql_driver::Connection>> {
        let connection = Connection::new(url).await?;
        Ok(Box::new(connection))
    }

    fn supports_file_type(&self, _file_type: &FileType) -> bool {
        false
    }
}

pub struct Connection {
    url: String,
    connection: libsql::Connection,
}

impl Connection {
    pub(crate) async fn new(url: &str) -> Result<Connection> {
        let parsed_url = Url::parse(url)?;
        let host = parsed_url.host();
        let file_name = parsed_url.path();

        let database = if let Some(host) = host {
            let params: HashMap<String, String> = parsed_url.query_pairs().into_owned().collect();
            let auth_token = params.get("auth_token").map_or("", |value| value.as_str());
            let database_url = format!("libsql://{host}");
            Builder::new_remote(database_url, auth_token.to_string())
                .build()
                .await
                .map_err(|error| IoError(error.to_string()))?
        } else if file_name.is_empty() {
            Builder::new_local(":memory:")
                .build()
                .await
                .map_err(|error| IoError(error.to_string()))?
        } else {
            let db = Builder::new_local_replica(file_name)
                .build()
                .await
                .map_err(|error| IoError(error.to_string()))?;
            let frames = Frames::Vec(vec![]);
            db.sync_frames(frames)
                .await
                .map_err(|error| IoError(error.to_string()))?;
            db
        };

        let connection = database
            .connect()
            .map_err(|error| IoError(error.to_string()))?;

        Ok(Connection {
            url: url.to_string(),
            connection,
        })
    }
}

#[async_trait]
impl rsql_driver::Connection for Connection {
    fn url(&self) -> &String {
        &self.url
    }

    async fn execute(&mut self, sql: &str, params: &[&dyn ToSql]) -> Result<u64> {
        let values = rsql_driver::to_values(params);
        let libsql_params = to_libsql_params(&values);
        let rows = self
            .connection
            .execute(sql, libsql_params)
            .await
            .map_err(|error| IoError(error.to_string()))?;
        Ok(rows)
    }

    async fn query(&mut self, sql: &str, params: &[&dyn ToSql]) -> Result<Box<dyn QueryResult>> {
        let values = rsql_driver::to_values(params);
        let libsql_params = to_libsql_params(&values);
        let statement = self
            .connection
            .prepare(sql)
            .await
            .map_err(|error| IoError(error.to_string()))?;
        let columns: Vec<String> = statement
            .columns()
            .iter()
            .map(|column| column.name().to_string())
            .collect();

        let mut query_rows = statement
            .query(libsql_params)
            .await
            .map_err(|error| IoError(error.to_string()))?;
        let mut rows = Vec::new();
        while let Some(query_row) = query_rows
            .next()
            .await
            .map_err(|error| IoError(error.to_string()))?
        {
            let mut row = Vec::new();
            for (index, _column_name) in columns.iter().enumerate() {
                let index = i32::try_from(index)?;
                let value = crate::results::convert_to_value(&query_row, index)?;
                row.push(value);
            }
            rows.push(row);
        }

        let query_result = crate::results::LibSqlQueryResult::new(columns, rows);
        Ok(Box::new(query_result))
    }

    async fn metadata(&mut self) -> Result<Metadata> {
        metadata::get_metadata(self).await
    }
}

fn to_libsql_params(values: &[Value]) -> Vec<libsql::Value> {
    values
        .iter()
        .map(|value| match value {
            Value::Null => libsql::Value::Null,
            Value::Bool(v) => libsql::Value::Integer(i64::from(*v)),
            Value::I8(v) => libsql::Value::Integer(i64::from(*v)),
            Value::I16(v) => libsql::Value::Integer(i64::from(*v)),
            Value::I32(v) => libsql::Value::Integer(i64::from(*v)),
            Value::I64(v) => libsql::Value::Integer(*v),
            Value::U8(v) => libsql::Value::Integer(i64::from(*v)),
            Value::U16(v) => libsql::Value::Integer(i64::from(*v)),
            Value::U32(v) => libsql::Value::Integer(i64::from(*v)),
            Value::U64(v) => libsql::Value::Integer(*v as i64),
            Value::F32(v) => libsql::Value::Real(f64::from(*v)),
            Value::F64(v) => libsql::Value::Real(*v),
            Value::String(v) => libsql::Value::Text(v.clone()),
            Value::Bytes(v) => libsql::Value::Blob(v.clone()),
            _ => libsql::Value::Text(value.to_string()),
        })
        .collect()
}

impl Debug for Connection {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        f.debug_struct("Connection")
            .field("url", &self.url)
            .finish_non_exhaustive()
    }
}

#[cfg(test)]
mod test {
    use super::*;
    use rsql_driver::Driver;

    const DATABASE_URL: &str = "libsql://";

    #[tokio::test]
    async fn test_debug() -> Result<()> {
        let driver = crate::Driver;
        let connection = driver.connect(DATABASE_URL).await?;

        assert!(format!("{connection:?}").contains("Connection"));
        assert!(format!("{connection:?}").contains(DATABASE_URL));
        Ok(())
    }

    #[tokio::test]
    async fn test_driver_connect() -> Result<()> {
        let driver = crate::Driver;
        let mut connection = driver.connect(DATABASE_URL).await?;
        assert_eq!(DATABASE_URL, connection.url());
        connection.close().await?;
        Ok(())
    }

    #[tokio::test]
    async fn test_connection_interface() -> Result<()> {
        let driver = crate::Driver;
        let mut connection = driver.connect(DATABASE_URL).await?;

        let _ = connection
            .execute("CREATE TABLE person (id INTEGER, name TEXT)", &[])
            .await?;

        let rows = connection
            .execute("INSERT INTO person (id, name) VALUES (1, 'foo')", &[])
            .await?;
        assert_eq!(rows, 1);

        let mut query_result = connection.query("SELECT id, name FROM person", &[]).await?;
        assert_eq!(query_result.columns(), vec!["id", "name"]);
        assert_eq!(
            query_result.next().await.cloned(),
            Some(vec![Value::I64(1), Value::String("foo".to_string())])
        );
        assert!(query_result.next().await.is_none());

        connection.close().await?;
        Ok(())
    }

    /// Reference: <https://www.sqlite.org/datatype3.html>
    #[tokio::test]
    async fn test_table_data_types() -> Result<()> {
        let driver = crate::Driver;
        let mut connection = driver.connect(DATABASE_URL).await?;

        let _ = connection
            .execute(
                "CREATE TABLE t1(t TEXT, nu NUMERIC, i INTEGER, r REAL, no BLOB)",
                &[],
            )
            .await?;

        let rows = connection
            .execute(
                "INSERT INTO t1 (t, nu, i, r, no) VALUES ('foo', 123, 456, 789.123, x'2a')",
                &[],
            )
            .await?;
        assert_eq!(rows, 1);

        let mut query_result = connection
            .query("SELECT t, nu, i, r, no FROM t1", &[])
            .await?;
        assert_eq!(query_result.columns(), vec!["t", "nu", "i", "r", "no"]);
        assert_eq!(
            query_result.next().await.cloned(),
            Some(vec![
                Value::String("foo".to_string()),
                Value::I64(123),
                Value::I64(456),
                Value::F64(789.123),
                Value::Bytes(vec![42])
            ])
        );
        assert!(query_result.next().await.is_none());

        connection.close().await?;
        Ok(())
    }

    async fn test_data_type(sql: &str) -> Result<Option<Value>> {
        let driver = crate::Driver;
        let mut connection = driver.connect(DATABASE_URL).await?;
        let mut query_result = connection.query(sql, &[]).await?;
        let mut value: Option<Value> = None;

        assert_eq!(query_result.columns().len(), 1);

        if let Some(row) = query_result.next().await {
            assert_eq!(row.len(), 1);

            value = row.first().cloned();
        }
        assert!(query_result.next().await.is_none());

        connection.close().await?;
        Ok(value)
    }

    #[tokio::test]
    async fn test_data_type_bytes() -> Result<()> {
        assert_eq!(
            test_data_type("SELECT x'2a'").await?,
            Some(Value::Bytes(vec![42]))
        );
        Ok(())
    }

    #[tokio::test]
    async fn test_data_type_i64() -> Result<()> {
        assert_eq!(
            test_data_type("SELECT 2147483647").await?,
            Some(Value::I64(2_147_483_647))
        );
        Ok(())
    }

    #[tokio::test]
    async fn test_data_type_f64() -> Result<()> {
        assert_eq!(
            test_data_type("SELECT 12345.6789").await?,
            Some(Value::F64(12_345.678_9))
        );
        Ok(())
    }

    #[tokio::test]
    async fn test_data_type_string() -> Result<()> {
        assert_eq!(
            test_data_type("SELECT 'foo'").await?,
            Some(Value::String("foo".to_string()))
        );
        Ok(())
    }

    #[tokio::test]
    async fn test_execute_with_params() -> Result<()> {
        let driver = crate::Driver;
        let mut connection = driver.connect(DATABASE_URL).await?;

        let _ = connection
            .execute(
                "CREATE TABLE test_params (id INTEGER, name TEXT, score REAL)",
                &[],
            )
            .await?;

        let rows = connection
            .execute(
                "INSERT INTO test_params (id, name, score) VALUES (?, ?, ?)",
                &[&1i64, &"Alice", &95.5f64],
            )
            .await?;
        assert_eq!(rows, 1);

        let mut query_result = connection
            .query(
                "SELECT id, name, score FROM test_params WHERE id = ?",
                &[&1i64],
            )
            .await?;

        assert_eq!(
            query_result.next().await.cloned(),
            Some(vec![
                Value::I64(1),
                Value::String("Alice".to_string()),
                Value::F64(95.5),
            ])
        );
        assert!(query_result.next().await.is_none());

        connection.close().await?;
        Ok(())
    }
}