use super::super::accounts::DEFAULT_ACCOUNT_ID;
use super::super::DbPool;
use crate::error::StorageError;
#[derive(Debug, Clone, serde::Serialize)]
pub struct Connection {
pub id: i64,
pub account_id: String,
pub connector_type: String,
pub account_email: Option<String>,
pub display_name: Option<String>,
pub status: String,
pub metadata_json: String,
pub created_at: String,
pub updated_at: String,
}
type ConnectionRow = (
i64,
String,
String,
Option<String>,
Option<String>,
String,
String,
String,
String,
);
fn row_to_connection(r: ConnectionRow) -> Connection {
Connection {
id: r.0,
account_id: r.1,
connector_type: r.2,
account_email: r.3,
display_name: r.4,
status: r.5,
metadata_json: r.6,
created_at: r.7,
updated_at: r.8,
}
}
pub async fn insert_connection(
pool: &DbPool,
connector_type: &str,
account_email: Option<&str>,
display_name: Option<&str>,
) -> Result<i64, StorageError> {
let row: (i64,) = sqlx::query_as(
"INSERT INTO connections (account_id, connector_type, account_email, display_name) \
VALUES (?, ?, ?, ?) \
RETURNING id",
)
.bind(DEFAULT_ACCOUNT_ID)
.bind(connector_type)
.bind(account_email)
.bind(display_name)
.fetch_one(pool)
.await
.map_err(|e| StorageError::Query { source: e })?;
Ok(row.0)
}
pub async fn get_connection(pool: &DbPool, id: i64) -> Result<Option<Connection>, StorageError> {
let row: Option<ConnectionRow> = sqlx::query_as(
"SELECT id, account_id, connector_type, account_email, display_name, \
status, metadata_json, created_at, updated_at \
FROM connections WHERE id = ?",
)
.bind(id)
.fetch_optional(pool)
.await
.map_err(|e| StorageError::Query { source: e })?;
Ok(row.map(row_to_connection))
}
pub async fn get_connections(pool: &DbPool) -> Result<Vec<Connection>, StorageError> {
let rows: Vec<ConnectionRow> = sqlx::query_as(
"SELECT id, account_id, connector_type, account_email, display_name, \
status, metadata_json, created_at, updated_at \
FROM connections WHERE status = 'active' ORDER BY id",
)
.fetch_all(pool)
.await
.map_err(|e| StorageError::Query { source: e })?;
Ok(rows.into_iter().map(row_to_connection).collect())
}
pub async fn update_connection_status(
pool: &DbPool,
id: i64,
status: &str,
) -> Result<(), StorageError> {
sqlx::query("UPDATE connections SET status = ?, updated_at = datetime('now') WHERE id = ?")
.bind(status)
.bind(id)
.execute(pool)
.await
.map_err(|e| StorageError::Query { source: e })?;
Ok(())
}
pub async fn delete_connection(pool: &DbPool, id: i64) -> Result<(), StorageError> {
sqlx::query("DELETE FROM connections WHERE id = ?")
.bind(id)
.execute(pool)
.await
.map_err(|e| StorageError::Query { source: e })?;
Ok(())
}
pub async fn get_connections_by_type(
pool: &DbPool,
connector_type: &str,
) -> Result<Vec<Connection>, StorageError> {
let rows: Vec<ConnectionRow> = sqlx::query_as(
"SELECT id, account_id, connector_type, account_email, display_name, \
status, metadata_json, created_at, updated_at \
FROM connections WHERE connector_type = ? AND status = 'active' ORDER BY id",
)
.bind(connector_type)
.fetch_all(pool)
.await
.map_err(|e| StorageError::Query { source: e })?;
Ok(rows.into_iter().map(row_to_connection).collect())
}
pub async fn store_encrypted_credentials(
pool: &DbPool,
id: i64,
ciphertext: &[u8],
) -> Result<(), StorageError> {
sqlx::query(
"UPDATE connections SET encrypted_credentials = ?, updated_at = datetime('now') \
WHERE id = ?",
)
.bind(ciphertext)
.bind(id)
.execute(pool)
.await
.map_err(|e| StorageError::Query { source: e })?;
Ok(())
}
pub async fn read_encrypted_credentials(
pool: &DbPool,
id: i64,
) -> Result<Option<Vec<u8>>, StorageError> {
let row: Option<(Option<Vec<u8>>,)> =
sqlx::query_as("SELECT encrypted_credentials FROM connections WHERE id = ?")
.bind(id)
.fetch_optional(pool)
.await
.map_err(|e| StorageError::Query { source: e })?;
Ok(row.and_then(|r| r.0))
}
pub async fn update_connection_metadata(
pool: &DbPool,
id: i64,
metadata_json: &str,
) -> Result<(), StorageError> {
sqlx::query(
"UPDATE connections SET metadata_json = ?, updated_at = datetime('now') WHERE id = ?",
)
.bind(metadata_json)
.bind(id)
.execute(pool)
.await
.map_err(|e| StorageError::Query { source: e })?;
Ok(())
}