use std::marker::PhantomData;
use std::path::PathBuf;
use rusqlite::{Connection, OpenFlags};
use crate::{CacheError, Uri, UriError};
#[non_exhaustive]
#[derive(Debug, thiserror::Error)]
pub enum SqlError {
#[error(transparent)]
Io(#[from] std::io::Error),
#[error("sql error: {0}")]
Sql(#[source] Box<dyn std::error::Error + Send + Sync>),
#[error("table not found: {0}")]
TableNotFound(String),
#[error("deserialize error: {0}")]
Deserialize(#[source] Box<dyn std::error::Error + Send + Sync>),
#[error("out of bounds: {start}..{end} (rows {rows})")]
OutOfBounds {
start: usize,
end: usize,
rows: usize,
},
#[error(transparent)]
Uri(#[from] UriError),
#[error(transparent)]
Cache(#[from] CacheError),
}
#[derive(Debug)]
pub struct SqlReader {
path: PathBuf,
original_uri: Uri,
effective_uri: Uri,
}
impl SqlReader {
pub fn from(uri: impl Into<Uri>) -> Result<Self, SqlError> {
let original_uri = uri.into();
let effective_uri = original_uri.force_cache()?;
let path = effective_uri.as_path().ok_or_else(|| {
SqlError::Uri(crate::UriError::InvalidUri(effective_uri.clone()))
})?;
let _conn = Connection::open_with_flags(
&path,
OpenFlags::SQLITE_OPEN_READ_ONLY,
)
.map_err(|e| SqlError::Sql(Box::new(e)))?;
Ok(SqlReader {
path,
original_uri,
effective_uri,
})
}
pub fn tables(&self) -> Result<Vec<String>, SqlError> {
let conn = Connection::open_with_flags(
&self.path,
OpenFlags::SQLITE_OPEN_READ_ONLY,
)
.map_err(|e| SqlError::Sql(Box::new(e)))?;
let mut stmt = conn
.prepare("SELECT name FROM sqlite_master WHERE type='table' ORDER BY name")
.map_err(|e| SqlError::Sql(Box::new(e)))?;
let names = stmt
.query_map([], |row| row.get::<_, String>(0))
.map_err(|e| SqlError::Sql(Box::new(e)))?
.collect::<Result<Vec<_>, _>>()
.map_err(|e| SqlError::Sql(Box::new(e)))?;
Ok(names)
}
pub fn from_table<T>(&self, name: &str) -> Result<SqlTable<T>, SqlError> {
let conn = Connection::open_with_flags(
&self.path,
OpenFlags::SQLITE_OPEN_READ_ONLY,
)
.map_err(|e| SqlError::Sql(Box::new(e)))?;
let exists: bool = conn
.query_row(
"SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name=?1",
rusqlite::params![name],
|row| row.get::<_, i64>(0),
)
.map_err(|e| SqlError::Sql(Box::new(e)))?
> 0;
if !exists {
return Err(SqlError::TableNotFound(name.to_string()));
}
let rows: usize =
conn.query_row(
&format!("SELECT COUNT(*) FROM \"{}\"", name),
[],
|row| row.get::<_, i64>(0),
)
.map_err(|e| SqlError::Sql(Box::new(e)))? as usize;
let mut pragma_stmt = conn
.prepare(&format!("PRAGMA table_info(\"{}\")", name))
.map_err(|e| SqlError::Sql(Box::new(e)))?;
let cols = pragma_stmt
.query_map([], |_| Ok(()))
.map_err(|e| SqlError::Sql(Box::new(e)))?
.count();
Ok(SqlTable {
db_path: self.path.clone(),
table_name: name.to_string(),
rows,
cols,
_marker: PhantomData,
})
}
pub fn original_uri(&self) -> &Uri {
&self.original_uri
}
pub fn effective_uri(&self) -> &Uri {
&self.effective_uri
}
}
#[derive(Debug)]
pub struct SqlTable<T> {
db_path: PathBuf,
table_name: String,
rows: usize,
cols: usize,
_marker: PhantomData<T>,
}
impl<T: for<'de> serde::Deserialize<'de>> SqlTable<T> {
#[must_use]
pub fn shape(&self) -> (usize, usize) {
(self.rows, self.cols)
}
#[must_use]
pub fn is_empty(&self) -> bool {
self.rows == 0
}
pub fn read_all(&self) -> Result<Vec<T>, SqlError> {
self.query(self.rows, 0)
}
pub fn read_range(
&self,
range: std::ops::Range<usize>,
) -> Result<Vec<T>, SqlError> {
if range.end > self.rows {
return Err(SqlError::OutOfBounds {
start: range.start,
end: range.end,
rows: self.rows,
});
}
let limit = range.end - range.start;
let offset = range.start;
self.query(limit, offset)
}
fn query(&self, limit: usize, offset: usize) -> Result<Vec<T>, SqlError> {
let conn = Connection::open_with_flags(
&self.db_path,
OpenFlags::SQLITE_OPEN_READ_ONLY,
)
.map_err(|e| SqlError::Sql(Box::new(e)))?;
let sql = format!(
"SELECT * FROM \"{}\" LIMIT {} OFFSET {}",
self.table_name, limit, offset
);
let mut stmt =
conn.prepare(&sql).map_err(|e| SqlError::Sql(Box::new(e)))?;
let col_names: Vec<String> = stmt
.column_names()
.into_iter()
.map(|s| s.to_string())
.collect();
let rows = stmt
.query_map([], |row| {
let mut map = serde_json::Map::new();
for (i, col) in col_names.iter().enumerate() {
let val: rusqlite::types::Value = row.get(i)?;
map.insert(col.clone(), rusqlite_value_to_json(val));
}
Ok(map)
})
.map_err(|e| SqlError::Sql(Box::new(e)))?;
let mut result = Vec::with_capacity(limit);
for row_result in rows {
let map = row_result.map_err(|e| SqlError::Sql(Box::new(e)))?;
let value = serde_json::Value::Object(map);
let item = serde_json::from_value::<T>(value)
.map_err(|e| SqlError::Deserialize(Box::new(e)))?;
result.push(item);
}
Ok(result)
}
}
fn rusqlite_value_to_json(v: rusqlite::types::Value) -> serde_json::Value {
match v {
rusqlite::types::Value::Null => serde_json::Value::Null,
rusqlite::types::Value::Integer(i) => serde_json::json!(i),
rusqlite::types::Value::Real(f) => serde_json::Number::from_f64(f)
.map(serde_json::Value::Number)
.unwrap_or(serde_json::Value::Null),
rusqlite::types::Value::Text(s) => serde_json::Value::String(s),
rusqlite::types::Value::Blob(_) => serde_json::Value::Null,
}
}