use crate::metadata;
use async_trait::async_trait;
use file_type::FileType;
use libsql::Builder;
use libsql::replication::Frames;
use rsql_driver::Error::IoError;
use rsql_driver::{Metadata, QueryResult, Result, ToSql, Value};
use std::collections::HashMap;
use std::fmt::Debug;
use url::Url;
#[derive(Debug)]
pub struct Driver;
#[async_trait]
impl rsql_driver::Driver for Driver {
fn identifier(&self) -> &'static str {
"libsql"
}
async fn connect(&self, url: &str) -> Result<Box<dyn rsql_driver::Connection>> {
let connection = Connection::new(url).await?;
Ok(Box::new(connection))
}
fn supports_file_type(&self, _file_type: &FileType) -> bool {
false
}
}
pub struct Connection {
url: String,
connection: libsql::Connection,
}
impl Connection {
pub(crate) async fn new(url: &str) -> Result<Connection> {
let parsed_url = Url::parse(url)?;
let host = parsed_url.host();
let file_name = parsed_url.path();
let database = if let Some(host) = host {
let params: HashMap<String, String> = parsed_url.query_pairs().into_owned().collect();
let auth_token = params.get("auth_token").map_or("", |value| value.as_str());
let database_url = format!("libsql://{host}");
Builder::new_remote(database_url, auth_token.to_string())
.build()
.await
.map_err(|error| IoError(error.to_string()))?
} else if file_name.is_empty() {
Builder::new_local(":memory:")
.build()
.await
.map_err(|error| IoError(error.to_string()))?
} else {
let db = Builder::new_local_replica(file_name)
.build()
.await
.map_err(|error| IoError(error.to_string()))?;
let frames = Frames::Vec(vec![]);
db.sync_frames(frames)
.await
.map_err(|error| IoError(error.to_string()))?;
db
};
let connection = database
.connect()
.map_err(|error| IoError(error.to_string()))?;
Ok(Connection {
url: url.to_string(),
connection,
})
}
}
#[async_trait]
impl rsql_driver::Connection for Connection {
fn url(&self) -> &String {
&self.url
}
async fn execute(&mut self, sql: &str, params: &[&dyn ToSql]) -> Result<u64> {
let values = rsql_driver::to_values(params);
let libsql_params = to_libsql_params(&values);
let rows = self
.connection
.execute(sql, libsql_params)
.await
.map_err(|error| IoError(error.to_string()))?;
Ok(rows)
}
async fn query(&mut self, sql: &str, params: &[&dyn ToSql]) -> Result<Box<dyn QueryResult>> {
let values = rsql_driver::to_values(params);
let libsql_params = to_libsql_params(&values);
let statement = self
.connection
.prepare(sql)
.await
.map_err(|error| IoError(error.to_string()))?;
let columns: Vec<String> = statement
.columns()
.iter()
.map(|column| column.name().to_string())
.collect();
let mut query_rows = statement
.query(libsql_params)
.await
.map_err(|error| IoError(error.to_string()))?;
let mut rows = Vec::new();
while let Some(query_row) = query_rows
.next()
.await
.map_err(|error| IoError(error.to_string()))?
{
let mut row = Vec::new();
for (index, _column_name) in columns.iter().enumerate() {
let index = i32::try_from(index)?;
let value = crate::results::convert_to_value(&query_row, index)?;
row.push(value);
}
rows.push(row);
}
let query_result = crate::results::LibSqlQueryResult::new(columns, rows);
Ok(Box::new(query_result))
}
async fn metadata(&mut self) -> Result<Metadata> {
metadata::get_metadata(self).await
}
}
fn to_libsql_params(values: &[Value]) -> Vec<libsql::Value> {
values
.iter()
.map(|value| match value {
Value::Null => libsql::Value::Null,
Value::Bool(v) => libsql::Value::Integer(i64::from(*v)),
Value::I8(v) => libsql::Value::Integer(i64::from(*v)),
Value::I16(v) => libsql::Value::Integer(i64::from(*v)),
Value::I32(v) => libsql::Value::Integer(i64::from(*v)),
Value::I64(v) => libsql::Value::Integer(*v),
Value::U8(v) => libsql::Value::Integer(i64::from(*v)),
Value::U16(v) => libsql::Value::Integer(i64::from(*v)),
Value::U32(v) => libsql::Value::Integer(i64::from(*v)),
Value::U64(v) => libsql::Value::Integer(*v as i64),
Value::F32(v) => libsql::Value::Real(f64::from(*v)),
Value::F64(v) => libsql::Value::Real(*v),
Value::String(v) => libsql::Value::Text(v.clone()),
Value::Bytes(v) => libsql::Value::Blob(v.clone()),
_ => libsql::Value::Text(value.to_string()),
})
.collect()
}
impl Debug for Connection {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Connection")
.field("url", &self.url)
.finish_non_exhaustive()
}
}
#[cfg(test)]
mod test {
use super::*;
use rsql_driver::Driver;
const DATABASE_URL: &str = "libsql://";
#[tokio::test]
async fn test_debug() -> Result<()> {
let driver = crate::Driver;
let connection = driver.connect(DATABASE_URL).await?;
assert!(format!("{connection:?}").contains("Connection"));
assert!(format!("{connection:?}").contains(DATABASE_URL));
Ok(())
}
#[tokio::test]
async fn test_driver_connect() -> Result<()> {
let driver = crate::Driver;
let mut connection = driver.connect(DATABASE_URL).await?;
assert_eq!(DATABASE_URL, connection.url());
connection.close().await?;
Ok(())
}
#[tokio::test]
async fn test_connection_interface() -> Result<()> {
let driver = crate::Driver;
let mut connection = driver.connect(DATABASE_URL).await?;
let _ = connection
.execute("CREATE TABLE person (id INTEGER, name TEXT)", &[])
.await?;
let rows = connection
.execute("INSERT INTO person (id, name) VALUES (1, 'foo')", &[])
.await?;
assert_eq!(rows, 1);
let mut query_result = connection.query("SELECT id, name FROM person", &[]).await?;
assert_eq!(query_result.columns(), vec!["id", "name"]);
assert_eq!(
query_result.next().await.cloned(),
Some(vec![Value::I64(1), Value::String("foo".to_string())])
);
assert!(query_result.next().await.is_none());
connection.close().await?;
Ok(())
}
#[tokio::test]
async fn test_table_data_types() -> Result<()> {
let driver = crate::Driver;
let mut connection = driver.connect(DATABASE_URL).await?;
let _ = connection
.execute(
"CREATE TABLE t1(t TEXT, nu NUMERIC, i INTEGER, r REAL, no BLOB)",
&[],
)
.await?;
let rows = connection
.execute(
"INSERT INTO t1 (t, nu, i, r, no) VALUES ('foo', 123, 456, 789.123, x'2a')",
&[],
)
.await?;
assert_eq!(rows, 1);
let mut query_result = connection
.query("SELECT t, nu, i, r, no FROM t1", &[])
.await?;
assert_eq!(query_result.columns(), vec!["t", "nu", "i", "r", "no"]);
assert_eq!(
query_result.next().await.cloned(),
Some(vec![
Value::String("foo".to_string()),
Value::I64(123),
Value::I64(456),
Value::F64(789.123),
Value::Bytes(vec![42])
])
);
assert!(query_result.next().await.is_none());
connection.close().await?;
Ok(())
}
async fn test_data_type(sql: &str) -> Result<Option<Value>> {
let driver = crate::Driver;
let mut connection = driver.connect(DATABASE_URL).await?;
let mut query_result = connection.query(sql, &[]).await?;
let mut value: Option<Value> = None;
assert_eq!(query_result.columns().len(), 1);
if let Some(row) = query_result.next().await {
assert_eq!(row.len(), 1);
value = row.first().cloned();
}
assert!(query_result.next().await.is_none());
connection.close().await?;
Ok(value)
}
#[tokio::test]
async fn test_data_type_bytes() -> Result<()> {
assert_eq!(
test_data_type("SELECT x'2a'").await?,
Some(Value::Bytes(vec![42]))
);
Ok(())
}
#[tokio::test]
async fn test_data_type_i64() -> Result<()> {
assert_eq!(
test_data_type("SELECT 2147483647").await?,
Some(Value::I64(2_147_483_647))
);
Ok(())
}
#[tokio::test]
async fn test_data_type_f64() -> Result<()> {
assert_eq!(
test_data_type("SELECT 12345.6789").await?,
Some(Value::F64(12_345.678_9))
);
Ok(())
}
#[tokio::test]
async fn test_data_type_string() -> Result<()> {
assert_eq!(
test_data_type("SELECT 'foo'").await?,
Some(Value::String("foo".to_string()))
);
Ok(())
}
#[tokio::test]
async fn test_execute_with_params() -> Result<()> {
let driver = crate::Driver;
let mut connection = driver.connect(DATABASE_URL).await?;
let _ = connection
.execute(
"CREATE TABLE test_params (id INTEGER, name TEXT, score REAL)",
&[],
)
.await?;
let rows = connection
.execute(
"INSERT INTO test_params (id, name, score) VALUES (?, ?, ?)",
&[&1i64, &"Alice", &95.5f64],
)
.await?;
assert_eq!(rows, 1);
let mut query_result = connection
.query(
"SELECT id, name, score FROM test_params WHERE id = ?",
&[&1i64],
)
.await?;
assert_eq!(
query_result.next().await.cloned(),
Some(vec![
Value::I64(1),
Value::String("Alice".to_string()),
Value::F64(95.5),
])
);
assert!(query_result.next().await.is_none());
connection.close().await?;
Ok(())
}
}