rust_bus 3.0.6

bus — Lightweight CQRS Library for Rust
Documentation
use crate::BusError;
use crate::contracts::bus_event::IBusEvent;
use crate::contracts::enums::{Field, Period, Replace, State, Timestamp, ToColumn};
use crate::contracts::meta::BusMetadata;
use crate::dispatch::dispatch_db::ValidatedJob;
use chrono::Utc;
use sea_orm::{ActiveModelTrait, ConnectionTrait};
use std::collections::HashMap;
use std::hash::Hasher;
use twox_hash::XxHash3_64;
use uuid::Uuid;

pub(crate) async fn insert_sea_mysql<TEvent>(
    txn: &sea_orm::DatabaseTransaction,
    jobs: Vec<ValidatedJob<'_>>,
    event: &TEvent,
    metadata: &BusMetadata,
) -> Result<(), BusError>
where
    TEvent: IBusEvent,
{
    let now = Utc::now();
    let event_json = serde_json::to_value(event)?;
    let metadata_json = serde_json::to_value(metadata)?;

    for job in jobs.iter() {
        let mut hasher = XxHash3_64::with_seed(0);
        hasher.write(job.reg.handler_identity.as_bytes());
        hasher.write(job.reg.event_identity.as_bytes());
        let hash_type_name = hasher.finish() as i64;

        let mut sql =
            "SELECT id, state FROM bus_jobs USE INDEX(idx_bus_jobs_hash_type_name) WHERE hash_type_name = ? AND type_name_handler = ? AND type_name_event = ? "
                .to_string();

        let mut params: Vec<sea_orm::Value> = vec![
            hash_type_name.into(),
            job.reg.handler_identity.into(),
            job.reg.event_identity.into(),
        ];

        let mut update_case: HashMap<&str, Vec<&str>> = HashMap::new();

        if let Some(ref unique) = job.reg.unique {
            if let Period::Duration { field, duration } = &unique.period {
                match field {
                    Timestamp::InsertedAt => sql.push_str(" AND inserted_at > ? "),
                    Timestamp::ScheduledAt => sql.push_str(" AND scheduled_at > ? "),
                };

                let duration_since = now
                    - chrono::Duration::from_std(*duration).map_err(|_| {
                        BusError::Configuration(format!(
                            "period.duration has incorrect value for DataBase Handler '{}'",
                            &job.reg.handler_identity
                        ))
                    })?;

                params.push(duration_since.into());
            }

            for field in unique.fields {
                match field {
                    Field::Queue => {
                        sql.push_str(" AND queue = ? ");
                        params.push(job.reg.queue.into());
                    }
                    Field::Priority => {
                        sql.push_str(" AND priority = ? ");
                        params.push(job.priority.into());
                    }
                    Field::MaxAttempts => {
                        sql.push_str(" AND max_attempts = ? ");
                        params.push(job.max_attempts.into());
                    }
                    Field::ExecutionTimeout => {
                        sql.push_str(" AND execution_timeout_sec = ? ");
                        params.push(job.timeout_sec.into());
                    }
                    Field::Tags => {
                        let tags_json = serde_json::to_string(&job.tags_json)
                            .unwrap_or_else(|_| "[]".to_string());

                        sql.push_str(" AND JSON_CONTAINS(tags, ?) ");
                        params.push(tags_json.into());
                    }
                    Field::Meta => {
                        let meta_json_string = serde_json::to_string(&metadata_json)
                            .unwrap_or_else(|_| "{}".to_string());

                        sql.push_str(" AND JSON_CONTAINS(meta, ?) ");
                        params.push(meta_json_string.into());
                    }
                    Field::Event => {
                        sql.push_str(" AND payload = CAST(? AS JSON) ");
                        let event_string = serde_json::to_string(&event_json)?;
                        params.push(event_string.into());
                    }
                }
            }

            for key in unique.keys.iter() {
                let path = format!("$.\"{}\"", key.replace('\\', "\\\\").replace('"', "\\\""));

                if let Some(val) = event_json.get(*key) {
                    match val {
                        serde_json::Value::Null => {
                            sql.push_str(" AND JSON_EXTRACT(payload, ?) IS NULL ");
                            params.push(path.into());
                        }
                        serde_json::Value::String(s) => {
                            sql.push_str(" AND payload->>? = ? ");
                            params.push(path.into());
                            params.push(s.clone().into());
                        }
                        serde_json::Value::Number(n) => {
                            sql.push_str(" AND payload->>? = ? ");
                            params.push(path.into());
                            params.push(n.to_string().into());
                        }
                        serde_json::Value::Bool(b) => {
                            sql.push_str(" AND payload->>? = ? ");
                            params.push(path.into());
                            params.push(if *b { "true" } else { "false" }.into());
                        }
                        _ => {
                            sql.push_str(" AND JSON_EXTRACT(payload, ?) = CAST(? AS JSON) ");
                            params.push(path.into());
                            params.push(serde_json::to_string(val)?.into());
                        }
                    }
                } else {
                    sql.push_str(" AND JSON_CONTAINS_PATH(payload, 'one', ?) = 0 ");
                    params.push(path.into());
                }
            }

            let states_strings: Vec<&str> = unique
                .states
                .iter()
                .map(|s| match s {
                    State::Available => "available",
                    State::Scheduled => "scheduled",
                    State::Executing => "executing",
                    State::Retryable => "retryable",
                    State::Completed => "completed",
                    State::Cancelled => "cancelled",
                    State::Discarded => "discarded",
                })
                .collect();

            if !states_strings.is_empty() {
                let placeholders: String = states_strings
                    .iter()
                    .map(|_| "?")
                    .collect::<Vec<_>>()
                    .join(", ");

                sql.push_str(" AND bus_jobs.state IN (");
                sql.push_str(&placeholders);
                sql.push_str(") ");

                for state in states_strings {
                    params.push(state.into());
                }
            }

            for replace_item in unique.replace.iter() {
                let (state, columns): (&str, Box<dyn Iterator<Item = &'static str> + '_>) =
                    match replace_item {
                        Replace::Available(f) => {
                            ("available", Box::new(f.iter().map(|f| f.to_column())))
                        }
                        Replace::Scheduled(f) => {
                            ("scheduled", Box::new(f.iter().map(|f| f.to_column())))
                        }
                        Replace::Executing(f) => {
                            ("executing", Box::new(f.iter().map(|f| f.to_column())))
                        }
                        Replace::Retryable(f) => {
                            ("retryable", Box::new(f.iter().map(|f| f.to_column())))
                        }
                        Replace::Completed(f) => {
                            ("completed", Box::new(f.iter().map(|f| f.to_column())))
                        }
                        Replace::Cancelled(f) => {
                            ("cancelled", Box::new(f.iter().map(|f| f.to_column())))
                        }
                        Replace::Discarded(f) => {
                            ("discarded", Box::new(f.iter().map(|f| f.to_column())))
                        }
                    };

                for col in columns {
                    update_case.entry(col).or_default().push(state);
                }
            }
        }

        sql.push_str(" LIMIT 1 FOR UPDATE ");

        let stmt = sea_orm::Statement::from_sql_and_values(sea_orm::DbBackend::MySql, sql, params);

        let query_res = txn
            .query_one_raw(stmt)
            .await
            .map_err(|e| BusError::Database(e.to_string()))?;

        let existing = if let Some(row) = query_res {
            let id: Vec<u8> = row
                .try_get("", "id")
                .map_err(|e| BusError::Database(e.to_string()))?;

            let state: String = row
                .try_get("", "state")
                .map_err(|e| BusError::Database(e.to_string()))?;

            Some((id, state))
        } else {
            None
        };

        if let Some((existing_id, _)) = existing {
            if !update_case.is_empty() {
                let mut sql_up = "UPDATE bus_jobs SET ".to_string();
                let mut params_up: Vec<sea_orm::Value> = vec![];

                let mut first_col = true;

                for (col_name, states) in update_case.iter() {
                    if !first_col {
                        sql_up.push_str(", ");
                    }
                    first_col = false;

                    let states_placeholders: String =
                        states.iter().map(|_| "?").collect::<Vec<_>>().join(", ");

                    sql_up.push_str(col_name);
                    sql_up.push_str(" = CASE WHEN state IN (");
                    sql_up.push_str(&states_placeholders);
                    sql_up.push_str(") THEN ");

                    for state in states {
                        params_up.push((*state).into());
                    }

                    match *col_name {
                        "priority" => {
                            sql_up.push_str(" ? ");
                            params_up.push(job.priority.into());
                        }
                        "queue" => {
                            sql_up.push_str(" ? ");
                            params_up.push(job.reg.queue.into());
                        }
                        "max_attempts" => {
                            sql_up.push_str(" ? ");
                            params_up.push(job.max_attempts.into());
                        }
                        "execution_timeout_sec" => {
                            sql_up.push_str(" ? ");
                            params_up.push(job.timeout_sec.into());
                        }
                        "tags" => {
                            sql_up.push_str(" ? ");
                            params_up.push(serde_json::to_string(&job.tags_json)?.into());
                        }
                        "payload" => {
                            sql_up.push_str(" ? ");
                            params_up.push(serde_json::to_string(&event_json)?.into());
                        }
                        "meta" => {
                            sql_up.push_str(" ? ");
                            params_up.push(serde_json::to_string(&metadata_json)?.into());
                        }
                        _ => {
                            sql_up.push_str("`");
                            sql_up.push_str(col_name);
                            sql_up.push_str("` ");
                        }
                    }

                    sql_up.push_str(" ELSE `");
                    sql_up.push_str(col_name);
                    sql_up.push_str("` END");
                }
                sql_up.push_str(" WHERE id = ? ");
                params_up.push(existing_id.into());

                let stmt_up = sea_orm::Statement::from_sql_and_values(
                    sea_orm::DbBackend::MySql,
                    sql_up,
                    params_up,
                );

                txn.execute_raw(stmt_up)
                    .await
                    .map_err(|e| sea_orm::DbErr::Custom(e.to_string()))?;
            }
        } else {
            let active_model = crate::models::bus_jobs::ActiveModel {
                id: sea_orm::Set(Uuid::now_v7()),
                hash_type_name: sea_orm::Set(hash_type_name),
                inserted_at: sea_orm::Set(now),
                scheduled_at: sea_orm::Set(job.schedule_in),
                attempted_at: sea_orm::Set(None),
                completed_at: sea_orm::Set(None),
                cancelled_at: sea_orm::Set(None),
                discarded_at: sea_orm::Set(None),
                state: sea_orm::Set(crate::models::sea_orm_active_enums::BusJobState::Available),
                priority: sea_orm::Set(job.priority),
                attempt: sea_orm::Set(0i32),
                max_attempts: sea_orm::Set(job.max_attempts),
                execution_timeout_sec: sea_orm::Set(job.timeout_sec),
                queue: sea_orm::Set(job.reg.queue.to_owned()),
                type_name_event: sea_orm::Set(job.reg.event_identity.to_owned()),
                type_name_handler: sea_orm::Set(job.reg.handler_identity.to_owned()),
                payload: sea_orm::Set(event_json.to_owned()),
                meta: sea_orm::Set(metadata_json.to_owned()),
                tags: sea_orm::Set(job.tags_json.to_owned()),
                errors: sea_orm::Set(serde_json::json!([])),
                attempted_by: sea_orm::Set(serde_json::json!([])),
            };

            active_model
                .insert(txn)
                .await
                .map_err(|e| BusError::Database(e.to_string()))?;
        }
    }

    Ok(())
}