zero4rs 2.0.0

zero4rs is a powerful, pragmatic, and extremely fast web framework for Rust
Documentation
use lazy_static::lazy_static;

pub mod consumer;
pub mod model2;
pub mod models;

use sqlx::{MySql, Pool, QueryBuilder};

use crate::core::elasticsearch::EsClient;
use crate::services::kline_service;

use crate::commons::parse_utc_string_to_millis;

use crate::services::kline_service::model2::KlineDataCreator;
use crate::services::kline_service::models::FetchKline;
use crate::services::kline_service::models::KlineDataVo;

use crate::core::error2::Error;
use crate::core::error2::Result;

lazy_static! {
    pub static ref KLINE_CHANNEL: &'static str = "binance_crypto_stream_kline_channel";
    pub static ref KLINE_INDEX: &'static str = "crypto_kline";
    pub static ref BIND_LIMIT: usize = 65535;
}

pub async fn query_kline_data(
    es_client: &EsClient,
    input: kline_service::models::FetchKline,
) -> Result<Vec<models::KlineDataVo>> {
    let index_name = format!("{}-*", *KLINE_INDEX);

    let render_dsl = es_client
        .handlebars
        .render("kline_data_dsl", &input)
        .map_err(|e| {
            log::error!("render_dsl-error: error={:?}", e);
            anyhow::anyhow!(e)
        })?;

    match es_client
        .select_after::<models::KlineData>(&index_name, &render_dsl)
        .await
    {
        Ok((result, _)) => Ok(result.into_iter().map(models::KlineDataVo::from).collect()),
        Err(e) => Err(Error::throw("", Some(e))),
    }
}

pub async fn save_kline_bulk(pool: &Pool<MySql>, data: Vec<KlineDataCreator>) -> Result<u64> {
    let mut insert_result_int = 0;

    for chunk in data.chunks(5000) {
        let mut query_builder: QueryBuilder<MySql> = QueryBuilder::new(
            r#"INSERT INTO t_kline (f_symbol, f_period, f_open, f_close, f_high, f_low, f_time, f_vendor) "#,
        );

        query_builder.push_values(chunk.iter().take(*BIND_LIMIT / 8), |mut b, record| {
            b.push_bind(record.s.to_owned())
                .push_bind(record.period.to_owned())
                .push_bind(record.o.to_string())
                .push_bind(record.c.to_string())
                .push_bind(record.h.to_string())
                .push_bind(record.l.to_string())
                .push_bind(parse_utc_string_to_millis(&record.t).unwrap())
                .push_bind(record.vr.to_owned());
        });

        insert_result_int += query_builder
            .build()
            .execute(pool)
            .await
            .map_err(|e| anyhow::anyhow!(e))?
            .rows_affected();
    }

    Ok(insert_result_int)
}

pub async fn select_kline_data(pool: &Pool<MySql>, filter: FetchKline) -> Result<Vec<KlineDataVo>> {
    let list = filter
        .build()
        .build()
        .fetch_all(pool)
        .await
        .map_err(|e| anyhow::anyhow!(e))?
        .into_iter()
        .map(FetchKline::from_row)
        .collect();

    match list {
        Ok(data) => Ok(data),
        Err(err) => Err(Error::throw("", Some(err))),
    }
}