use std::{collections::HashMap, fmt::Write};
use async_trait::async_trait;
use fraiseql_error::{FraiseQLError, Result};
use futures::stream::StreamExt;
use super::{
traits::DatabaseAdapter,
types::{DatabaseType, JsonbValue, PoolMetrics, sql_hints::OrderByClause},
where_clause::WhereClause,
where_sql_generator::WhereSqlGenerator,
wire_pool::WireClientFactory,
};
#[derive(Debug, Clone)]
pub struct FraiseWireAdapter {
factory: WireClientFactory,
chunk_size: usize,
}
impl FraiseWireAdapter {
#[must_use]
pub fn new(connection_string: impl Into<String>) -> Self {
Self {
factory: WireClientFactory::new(connection_string),
chunk_size: 1024, }
}
#[must_use]
pub const fn with_chunk_size(mut self, chunk_size: usize) -> Self {
self.chunk_size = chunk_size;
self
}
fn build_query(
&self,
view: &str,
where_clause: Option<&WhereClause>,
limit: Option<u32>,
offset: Option<u32>,
) -> Result<String> {
let mut sql = format!("SELECT data FROM {view} ");
if let Some(clause) = where_clause {
let where_sql = WhereSqlGenerator::to_sql(clause)?;
sql.push_str("WHERE ");
sql.push_str(&where_sql);
sql.push(' ');
}
if let Some(offset_val) = offset {
write!(sql, "OFFSET {offset_val} ").expect("write to String");
}
if let Some(limit_val) = limit {
write!(sql, "LIMIT {limit_val}").expect("write to String");
}
Ok(sql.trim().to_string())
}
async fn execute_manual_query(
&self,
view: &str,
where_clause: Option<&WhereClause>,
limit: Option<u32>,
offset: Option<u32>,
) -> Result<Vec<JsonbValue>> {
let _sql = self.build_query(view, where_clause, limit, offset)?;
let client = self.factory.create_client().await?;
let mut builder = client.query::<serde_json::Value>(view).chunk_size(self.chunk_size);
if let Some(clause) = where_clause {
let where_sql = WhereSqlGenerator::to_sql(clause)?;
builder = builder.where_sql(where_sql);
}
let mut stream = builder.execute().await.map_err(|e| FraiseQLError::Database {
message: format!("fraiseql-wire query failed: {e}"),
sql_state: None,
})?;
let mut results = Vec::new();
let offset_usize = offset.unwrap_or(0) as usize;
let limit_usize = limit.map(|l| l as usize);
let mut count = 0;
while let Some(item) = stream.next().await {
let json = item.map_err(|e| FraiseQLError::Database {
message: format!("Stream error: {e}"),
sql_state: None,
})?;
if count >= offset_usize {
results.push(JsonbValue::new(json));
if let Some(lim) = limit_usize {
if results.len() >= lim {
break;
}
}
}
count += 1;
}
Ok(results)
}
}
#[async_trait]
impl DatabaseAdapter for FraiseWireAdapter {
async fn execute_with_projection(
&self,
view: &str,
_projection: Option<&crate::types::SqlProjectionHint>,
where_clause: Option<&WhereClause>,
limit: Option<u32>,
offset: Option<u32>,
order_by: Option<&[OrderByClause]>,
) -> Result<Vec<JsonbValue>> {
self.execute_where_query(view, where_clause, limit, offset, order_by).await
}
async fn execute_where_query(
&self,
view: &str,
where_clause: Option<&WhereClause>,
limit: Option<u32>,
offset: Option<u32>,
_order_by: Option<&[OrderByClause]>,
) -> Result<Vec<JsonbValue>> {
let entity = view;
let client = self.factory.create_client().await?;
let mut builder = client.query::<serde_json::Value>(entity).chunk_size(self.chunk_size);
if let Some(clause) = where_clause {
let where_sql = WhereSqlGenerator::to_sql(clause)?;
builder = builder.where_sql(where_sql);
}
if limit.is_some() || offset.is_some() {
return self.execute_manual_query(view, where_clause, limit, offset).await;
}
let mut stream = builder.execute().await.map_err(|e| FraiseQLError::Database {
message: format!("fraiseql-wire query failed: {e}"),
sql_state: None,
})?;
let mut results = Vec::new();
while let Some(item) = stream.next().await {
let json = item.map_err(|e| FraiseQLError::Database {
message: format!("Stream error: {e}"),
sql_state: None,
})?;
results.push(JsonbValue::new(json));
}
Ok(results)
}
fn database_type(&self) -> DatabaseType {
DatabaseType::PostgreSQL
}
async fn health_check(&self) -> Result<()> {
if self.factory.connection_string().is_empty() {
return Err(FraiseQLError::Database {
message: "Connection string is empty".to_string(),
sql_state: None,
});
}
Ok(())
}
fn pool_metrics(&self) -> PoolMetrics {
PoolMetrics {
total_connections: 0,
idle_connections: 0,
active_connections: 0,
waiting_requests: 0,
}
}
async fn execute_raw_query(
&self,
_sql: &str,
) -> Result<Vec<HashMap<String, serde_json::Value>>> {
Err(FraiseQLError::Database {
message: "fraiseql-wire does not support arbitrary SQL queries. Use execute_where_query instead.".to_string(),
sql_state: None,
})
}
async fn execute_parameterized_aggregate(
&self,
_sql: &str,
_params: &[serde_json::Value],
) -> Result<Vec<HashMap<String, serde_json::Value>>> {
Err(FraiseQLError::Database {
message: "fraiseql-wire does not support aggregate SQL queries.".to_string(),
sql_state: None,
})
}
}
#[cfg(test)]
mod tests {
#![allow(clippy::unwrap_used)]
use super::*;
#[test]
fn test_adapter_creation() {
let adapter = FraiseWireAdapter::new("postgres://localhost/test");
assert_eq!(adapter.database_type(), DatabaseType::PostgreSQL);
assert_eq!(adapter.chunk_size, 1024);
}
#[test]
fn test_adapter_with_chunk_size() {
let adapter = FraiseWireAdapter::new("postgres://localhost/test").with_chunk_size(512);
assert_eq!(adapter.chunk_size, 512);
}
#[test]
fn test_build_query_simple() {
let adapter = FraiseWireAdapter::new("postgres://localhost/test");
let sql = adapter.build_query("v_user", None, None, None).unwrap();
assert_eq!(sql, "SELECT data FROM v_user");
}
#[test]
fn test_build_query_with_limit_offset() {
let adapter = FraiseWireAdapter::new("postgres://localhost/test");
let sql = adapter.build_query("v_user", None, Some(10), Some(5)).unwrap();
assert_eq!(sql, "SELECT data FROM v_user OFFSET 5 LIMIT 10");
}
#[test]
fn test_pool_metrics() {
let adapter = FraiseWireAdapter::new("postgres://localhost/test");
let metrics = adapter.pool_metrics();
assert_eq!(metrics.total_connections, 0);
assert_eq!(metrics.idle_connections, 0);
assert_eq!(metrics.active_connections, 0);
assert_eq!(metrics.waiting_requests, 0);
}
}