athena_rs 3.22.1

Hyper performant polyglot Database driver
Documentation
use crate::api::gateway::contracts::{D1MigrationAppliedStatementResult, D1MigrationPlanEntry};

pub struct D1MigrationBatch {
    pub index: usize,
    pub statement_indexes: Vec<usize>,
    pub sql: String,
}

pub fn chunk_statements(
    statements: &[D1MigrationPlanEntry],
    batch_size: usize,
) -> Vec<D1MigrationBatch> {
    let targets: Vec<(usize, String)> = statements
        .iter()
        .filter(|entry| !entry.target_sql.trim().is_empty())
        .map(|entry| (entry.statement_index, entry.target_sql.clone()))
        .collect();
    if targets.is_empty() {
        return Vec::new();
    }

    let chunk_size = batch_size.max(1);
    let mut batches = Vec::new();
    let mut batch_index = 0usize;
    for chunk in targets.chunks(chunk_size) {
        let indexes = chunk.iter().map(|(index, _)| *index).collect();
        let sql = chunk
            .iter()
            .map(|(_, sql)| sql.trim().trim_end_matches(';').to_string())
            .filter(|entry| !entry.is_empty())
            .collect::<Vec<_>>()
            .join(";\n");

        batches.push(D1MigrationBatch {
            index: batch_index,
            statement_indexes: indexes,
            sql: format!("{sql};"),
        });
        batch_index += 1;
    }

    batches
}

pub fn plan_batch_results(
    plan: &[D1MigrationPlanEntry],
    batches: &[D1MigrationBatch],
    status: &str,
    duration_ms: Option<u64>,
    rows_affected: Option<u64>,
) -> Vec<D1MigrationAppliedStatementResult> {
    batches
        .iter()
        .flat_map(|batch| {
            batch.statement_indexes.iter().map(move |index| {
                let source_sql = plan
                    .iter()
                    .find(|entry| entry.statement_index == *index)
                    .map(|entry| entry.source_sql.clone())
                    .unwrap_or_else(|| "".to_string());
                let target_sql = plan
                    .iter()
                    .find(|entry| entry.statement_index == *index)
                    .map(|entry| entry.target_sql.clone())
                    .unwrap_or_else(|| "".to_string());
                D1MigrationAppliedStatementResult {
                    statement_index: *index,
                    source_sql,
                    target_sql,
                    batch_index: batch.index,
                    rows_affected,
                    duration_ms,
                    status: status.to_string(),
                }
            })
        })
        .collect()
}