use laminate::FlexValue;
use serde_json::Value;
#[async_trait::async_trait]
pub trait DataSource: Send + Sync {
async fn query(&self, sql: &str) -> Result<Vec<FlexValue>, DataSourceError>;
async fn query_with(
&self,
sql: &str,
params: &[Value],
) -> Result<Vec<FlexValue>, DataSourceError>;
async fn columns(&self, sql: &str) -> Result<Vec<String>, DataSourceError>;
async fn count(&self, sql: &str) -> Result<u64, DataSourceError>;
}
#[derive(Debug, thiserror::Error)]
pub enum DataSourceError {
#[error("connection failed: {0}")]
ConnectionFailed(String),
#[error("query failed: {0}")]
QueryFailed(String),
#[error("serialization failed: {0}")]
SerializationFailed(String),
#[error("unsupported operation: {0}")]
Unsupported(String),
}
impl From<sqlx::Error> for DataSourceError {
fn from(e: sqlx::Error) -> Self {
match e {
sqlx::Error::Configuration(_) | sqlx::Error::Io(_) => {
DataSourceError::ConnectionFailed(e.to_string())
}
_ => DataSourceError::QueryFailed(e.to_string()),
}
}
}
#[cfg(feature = "postgres")]
pub mod postgres;
#[cfg(feature = "sqlite")]
pub mod sqlite;
#[cfg(feature = "mysql")]
pub mod mysql;
#[cfg(feature = "postgres")]
pub use postgres::PostgresSource;
#[cfg(feature = "sqlite")]
pub use sqlite::SqliteSource;
#[cfg(feature = "mysql")]
pub use mysql::MysqlSource;
pub fn read_jsonl(content: &str) -> Result<Vec<FlexValue>, DataSourceError> {
content
.lines()
.filter(|line| !line.trim().is_empty())
.map(|line| {
FlexValue::from_json(line)
.map_err(|e| DataSourceError::SerializationFailed(e.to_string()))
})
.collect()
}
pub fn read_json_array(json: &str) -> Result<Vec<FlexValue>, DataSourceError> {
let value: Value = serde_json::from_str(json)
.map_err(|e| DataSourceError::SerializationFailed(e.to_string()))?;
match value {
Value::Array(arr) => Ok(arr.into_iter().map(FlexValue::new).collect()),
_ => Err(DataSourceError::SerializationFailed(
"expected a JSON array".into(),
)),
}
}