use camel_component_api::CamelError;
use sqlx::any::AnyArguments;
use sqlx::any::AnyRow;
use sqlx::query::Query;
use sqlx::{Any, Column, Row};
use tracing::warn;
pub(crate) fn row_to_json(row: &AnyRow) -> Result<serde_json::Value, CamelError> {
let mut map = serde_json::Map::new();
for (i, column) in row.columns().iter().enumerate() {
let name = column.name().to_string();
let value = if let Ok(Some(v)) = row.try_get::<Option<i64>, _>(i) {
serde_json::Value::Number(v.into())
} else if let Ok(Some(v)) = row.try_get::<Option<i32>, _>(i) {
serde_json::Value::Number(v.into())
} else if let Ok(Some(v)) = row.try_get::<Option<f64>, _>(i) {
serde_json::Number::from_f64(v)
.map(serde_json::Value::Number)
.unwrap_or(serde_json::Value::Null)
} else if let Ok(Some(v)) = row.try_get::<Option<bool>, _>(i) {
serde_json::Value::Bool(v)
} else if let Ok(Some(v)) = row.try_get::<Option<String>, _>(i) {
serde_json::Value::String(v)
} else if let Ok(Some(v)) = row.try_get::<Option<Vec<u8>>, _>(i) {
if let Ok(text) = std::str::from_utf8(&v) {
if let Ok(date) = chrono::NaiveDate::parse_from_str(text, "%Y-%m-%d") {
serde_json::Value::String(date.format("%Y-%m-%d").to_string())
} else if let Ok(dt) =
chrono::NaiveDateTime::parse_from_str(text, "%Y-%m-%d %H:%M:%S%.f")
{
serde_json::Value::String(dt.format("%Y-%m-%dT%H:%M:%S%.f").to_string())
} else if let Ok(dt) =
chrono::NaiveDateTime::parse_from_str(text, "%Y-%m-%d %H:%M:%S")
{
serde_json::Value::String(dt.format("%Y-%m-%dT%H:%M:%S").to_string())
} else if let Ok(dt) = chrono::DateTime::parse_from_rfc3339(text) {
serde_json::Value::String(
dt.with_timezone(&chrono::Utc)
.format("%Y-%m-%dT%H:%M:%SZ")
.to_string(),
)
} else {
serde_json::Value::String(text.to_string())
}
} else {
serde_json::Value::Null
}
} else if matches!(row.try_get::<Option<String>, _>(i), Ok(None)) {
serde_json::Value::Null
} else {
warn!(
column = column.name(),
"Could not decode column value, falling back to null"
);
serde_json::Value::Null
};
map.insert(name, value);
}
Ok(serde_json::Value::Object(map))
}
pub(crate) fn bind_json_values<'q>(
mut query: Query<'q, Any, AnyArguments<'q>>,
values: &'q [serde_json::Value],
) -> Query<'q, Any, AnyArguments<'q>> {
for value in values {
query = match value {
serde_json::Value::Null => query.bind(Option::<String>::None),
serde_json::Value::Bool(b) => query.bind(*b),
serde_json::Value::Number(n) => {
if let Some(i) = n.as_i64() {
query.bind(i)
} else if let Some(f) = n.as_f64() {
query.bind(f)
} else {
query.bind(n.to_string())
}
}
serde_json::Value::String(s) => query.bind(s.as_str()),
serde_json::Value::Array(_) | serde_json::Value::Object(_) => {
query.bind(value.to_string())
}
};
}
query
}
pub(crate) fn is_retryable_sqlx_error(e: &sqlx::Error) -> bool {
match e {
sqlx::Error::Io(_) => true,
sqlx::Error::PoolTimedOut => true,
sqlx::Error::PoolClosed => false,
sqlx::Error::Configuration(_) => false,
sqlx::Error::Tls(_) => false,
sqlx::Error::Database(db_err) => {
let msg = db_err.message().to_lowercase();
msg.contains("connection refused")
|| msg.contains("cannot connect")
|| msg.contains("too many connections")
|| msg.contains("no connection to the server")
|| msg.contains("connection reset")
|| msg.contains("broken pipe")
|| msg.contains("server closed the connection")
|| msg.contains("timed out")
}
_ => false,
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn is_retryable_io_error() {
let err = sqlx::Error::Io(std::io::Error::new(
std::io::ErrorKind::ConnectionRefused,
"refused",
));
assert!(is_retryable_sqlx_error(&err));
}
#[test]
fn is_retryable_pool_timed_out() {
let err = sqlx::Error::PoolTimedOut;
assert!(is_retryable_sqlx_error(&err));
}
#[test]
fn is_not_retryable_pool_closed() {
let err = sqlx::Error::PoolClosed;
assert!(!is_retryable_sqlx_error(&err));
}
#[test]
fn is_not_retryable_configuration() {
let err = sqlx::Error::Configuration("unknown driver".into());
assert!(!is_retryable_sqlx_error(&err));
}
#[test]
fn is_not_retryable_tls() {
let err = sqlx::Error::Tls(Box::new(std::io::Error::new(
std::io::ErrorKind::Other,
"certificate error",
)));
assert!(!is_retryable_sqlx_error(&err));
}
#[test]
fn is_retryable_database_connection_refused() {
let err = sqlx::Error::Database(Box::new(MockDbError {
message: "connection refused".into(),
code: None,
}));
assert!(is_retryable_sqlx_error(&err));
}
#[test]
fn is_retryable_database_too_many_connections() {
let err = sqlx::Error::Database(Box::new(MockDbError {
message: "too many connections".into(),
code: None,
}));
assert!(is_retryable_sqlx_error(&err));
}
#[test]
fn is_not_retryable_database_syntax_error() {
let err = sqlx::Error::Database(Box::new(MockDbError {
message: "syntax error at or near 'SELECT'".into(),
code: None,
}));
assert!(!is_retryable_sqlx_error(&err));
}
#[test]
fn is_not_retryable_database_unknown_host() {
let err = sqlx::Error::Database(Box::new(MockDbError {
message: "could not translate host name to address: unknown host".into(),
code: None,
}));
assert!(!is_retryable_sqlx_error(&err));
}
struct MockDbError {
message: String,
code: Option<Box<dyn std::fmt::Display + Send + Sync>>,
}
impl std::fmt::Debug for MockDbError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("MockDbError")
.field("message", &self.message)
.finish()
}
}
impl std::fmt::Display for MockDbError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.message)
}
}
impl std::error::Error for MockDbError {}
impl sqlx::error::DatabaseError for MockDbError {
fn message(&self) -> &str {
&self.message
}
fn kind(&self) -> sqlx::error::ErrorKind {
sqlx::error::ErrorKind::Other
}
fn as_error(&self) -> &(dyn std::error::Error + Send + Sync + 'static) {
self
}
fn as_error_mut(&mut self) -> &mut (dyn std::error::Error + Send + Sync + 'static) {
self
}
fn into_error(self: Box<Self>) -> Box<dyn std::error::Error + Send + Sync + 'static> {
self
}
}
}