athena_rs 3.26.1

Hyper performant polyglot Database driver
Documentation
//! Compatibility adapter for gateway contracts.
//!
//! The portable gateway-domain DTOs and normalization helpers now live in the
//! `athena-gateway` crate. This module keeps the historical import paths stable
//! while `athena_rs` only provides runtime-specific conversions.

use crate::drivers::postgresql::raw_sql::PostgresSqlTransactionMode;
use serde::{Deserialize, Serialize};

pub use athena_gateway::{
    GATEWAY_DEFERRED_KIND_DELETE, GATEWAY_DEFERRED_KIND_FETCH, GATEWAY_DEFERRED_KIND_INSERT,
    GATEWAY_DEFERRED_KIND_QUERY, GATEWAY_DEFERRED_KIND_RPC, GATEWAY_DEFERRED_KIND_UPDATE,
    GatewayApiRequest, GatewayApiRequestPayload, GatewayDeferredRequest, GatewayDeleteRequest,
    GatewayDeleteResourceIdPlan, GatewayFetchRequest, GatewayInsertRequest, GatewayOperationKind,
    GatewayRequestCondition, GatewayRowsMeta, GatewayRowsResponse, GatewayRpcFilter,
    GatewayRpcFilterOperator, GatewayRpcOrder, GatewayRpcRequest, GatewaySqlExecutionMode,
    GatewaySqlExecutionRequest, GatewaySqlRequest, GatewayUpdateRequest, extract_update_payload,
    normalize_delete_resource_id_column_name, normalize_delete_resource_id_lookup_table,
    normalize_gateway_schema_name, parse_columns_from_body, parse_conditions_from_body,
    plan_delete_resource_id_resolution, qualify_gateway_table_name, schema_name_from_body,
};

#[derive(Debug, Clone, Copy, Deserialize, Serialize, PartialEq, Eq)]
#[serde(rename_all = "lowercase")]
pub enum D1MigrationDialect {
    PostgreSQL,
}

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "lowercase")]
pub enum D1MigrationAction {
    Converted,
    Skipped,
    Warning,
}

const fn default_true() -> bool {
    true
}

fn default_empty_string() -> String {
    String::new()
}

fn default_d1_migration_dialect() -> D1MigrationDialect {
    D1MigrationDialect::PostgreSQL
}

#[derive(Debug, Clone, Deserialize, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct D1MigrationRequest {
    pub driver: String,
    #[serde(alias = "db_name")]
    pub db_name: String,
    #[serde(default = "default_empty_string", alias = "schema_sql")]
    pub schema_sql: String,
    #[serde(default = "default_d1_migration_dialect")]
    pub dialect: D1MigrationDialect,
    #[serde(default = "default_true")]
    #[serde(alias = "dry_run")]
    pub dry_run: bool,
    #[serde(default = "default_true")]
    #[serde(alias = "strict_mode")]
    pub strict: bool,
    #[serde(alias = "batch_size")]
    pub batch_size: Option<usize>,
    #[serde(alias = "files")]
    pub files: Option<Vec<String>>,
    #[serde(alias = "statements")]
    pub statements: Option<Vec<String>>,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct D1MigrationSourceRange {
    pub statement_index: usize,
    pub start_line: usize,
    pub end_line: usize,
    pub start_column: usize,
    pub end_column: usize,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct D1MigrationPlanEntry {
    pub action: D1MigrationAction,
    pub statement_index: usize,
    pub source_range: D1MigrationSourceRange,
    pub source_sql: String,
    pub target_sql: String,
    pub apply_order: usize,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct D1MigrationDiagnostic {
    pub code: String,
    pub message: String,
    pub statement_index: usize,
    pub source_range: D1MigrationSourceRange,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct D1MigrationSourceMeta {
    pub statement_count: usize,
    pub fingerprint: String,
    pub estimated_apply_order: Vec<usize>,
}

pub type D1MigrationWarning = D1MigrationDiagnostic;
pub type D1MigrationError = D1MigrationDiagnostic;

#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct D1MigrationPreviewResponse {
    pub status: String,
    pub original_sql: String,
    pub converted_sql: String,
    pub statements: Vec<D1MigrationPlanEntry>,
    pub warnings: Vec<D1MigrationDiagnostic>,
    pub errors: Vec<D1MigrationDiagnostic>,
    pub source_meta: D1MigrationSourceMeta,
}

#[derive(Debug, Clone, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct D1MigrationAppliedStatementResult {
    pub statement_index: usize,
    pub source_sql: String,
    pub target_sql: String,
    pub batch_index: usize,
    pub rows_affected: Option<u64>,
    pub duration_ms: Option<u64>,
    pub status: String,
}

#[derive(Debug, Clone, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct D1MigrationExecutionResponse {
    pub status: String,
    pub plan_id: Option<String>,
    pub original_sql: String,
    pub converted_sql: String,
    pub per_statement_results: Vec<D1MigrationAppliedStatementResult>,
    pub warnings: Vec<D1MigrationDiagnostic>,
    pub errors: Vec<D1MigrationDiagnostic>,
}

#[derive(Debug, Clone, Deserialize, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct D1MigrationConversion {
    pub converted_sql: String,
    pub original_sql: String,
    pub statement_count: usize,
    pub estimated_apply_order: Vec<usize>,
    pub statements: Vec<D1MigrationPlanEntry>,
    pub warnings: Vec<D1MigrationDiagnostic>,
    pub errors: Vec<D1MigrationDiagnostic>,
    pub source_meta: D1MigrationSourceMeta,
}

#[derive(Debug, Clone, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct D1MigrationUnsupportedFeature {
    pub code: String,
    pub message: String,
    pub source_range: D1MigrationSourceRange,
    pub suggestion: Option<String>,
}

pub type D1MigrationPreview = D1MigrationPreviewResponse;
pub type D1MigrationExecutionResult = D1MigrationExecutionResponse;

pub fn gateway_sql_execution_mode_to_transaction_mode(
    value: GatewaySqlExecutionMode,
) -> PostgresSqlTransactionMode {
    match value {
        GatewaySqlExecutionMode::SingleTransaction => PostgresSqlTransactionMode::SingleTransaction,
        GatewaySqlExecutionMode::PerStatement => PostgresSqlTransactionMode::PerStatement,
    }
}

pub fn postgres_sql_transaction_mode_to_gateway_execution_mode(
    value: PostgresSqlTransactionMode,
) -> GatewaySqlExecutionMode {
    match value {
        PostgresSqlTransactionMode::SingleTransaction => GatewaySqlExecutionMode::SingleTransaction,
        PostgresSqlTransactionMode::PerStatement => GatewaySqlExecutionMode::PerStatement,
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn gateway_sql_execution_mode_maps_to_driver_mode() {
        assert_eq!(
            gateway_sql_execution_mode_to_transaction_mode(GatewaySqlExecutionMode::PerStatement),
            PostgresSqlTransactionMode::PerStatement
        );
    }

    #[test]
    fn d1_migration_execution_response_serializes_camel_case_contract() {
        let response = D1MigrationExecutionResponse {
            status: "applied".to_string(),
            plan_id: Some("plan_123".to_string()),
            original_sql: "CREATE TABLE users (id SERIAL PRIMARY KEY);".to_string(),
            converted_sql: "CREATE TABLE users (id INTEGER PRIMARY KEY AUTOINCREMENT);".to_string(),
            per_statement_results: vec![D1MigrationAppliedStatementResult {
                statement_index: 0,
                source_sql: "CREATE TABLE users (id SERIAL PRIMARY KEY);".to_string(),
                target_sql: "CREATE TABLE users (id INTEGER PRIMARY KEY AUTOINCREMENT);"
                    .to_string(),
                batch_index: 0,
                rows_affected: Some(0),
                duration_ms: Some(12),
                status: "applied".to_string(),
            }],
            warnings: vec![D1MigrationDiagnostic {
                code: "type.serial".to_string(),
                message: "serial mapped to AUTOINCREMENT".to_string(),
                statement_index: 0,
                source_range: D1MigrationSourceRange {
                    statement_index: 0,
                    start_line: 1,
                    end_line: 1,
                    start_column: 1,
                    end_column: 10,
                },
            }],
            errors: Vec::new(),
        };

        let value = serde_json::to_value(response).expect("response serializes");
        assert_eq!(value["status"], serde_json::json!("applied"));
        assert_eq!(value["planId"], serde_json::json!("plan_123"));
        assert_eq!(
            value["originalSql"],
            serde_json::json!("CREATE TABLE users (id SERIAL PRIMARY KEY);")
        );
        assert_eq!(
            value["convertedSql"],
            serde_json::json!("CREATE TABLE users (id INTEGER PRIMARY KEY AUTOINCREMENT);")
        );
        assert_eq!(
            value["perStatementResults"][0]["statementIndex"],
            serde_json::json!(0)
        );
        assert_eq!(
            value["perStatementResults"][0]["batchIndex"],
            serde_json::json!(0)
        );
        assert_eq!(
            value["perStatementResults"][0]["status"],
            serde_json::json!("applied")
        );
        assert_eq!(
            value["warnings"][0]["code"],
            serde_json::json!("type.serial")
        );
    }
}