use lazy_static::lazy_static;
pub mod consumer;
pub mod model2;
pub mod models;
use sqlx::{MySql, Pool, QueryBuilder};
use crate::core::elasticsearch::EsClient;
use crate::services::kline_service;
use crate::commons::parse_utc_string_to_millis;
use crate::services::kline_service::model2::KlineDataCreator;
use crate::services::kline_service::models::FetchKline;
use crate::services::kline_service::models::KlineDataVo;
use crate::core::error2::Error;
use crate::core::error2::Result;
lazy_static! {
pub static ref KLINE_CHANNEL: &'static str = "binance_crypto_stream_kline_channel";
pub static ref KLINE_INDEX: &'static str = "crypto_kline";
pub static ref BIND_LIMIT: usize = 65535;
}
pub async fn query_kline_data(
es_client: &EsClient,
input: kline_service::models::FetchKline,
) -> Result<Vec<models::KlineDataVo>> {
let index_name = format!("{}-*", *KLINE_INDEX);
let render_dsl = es_client
.handlebars
.render("kline_data_dsl", &input)
.map_err(|e| {
log::error!("render_dsl-error: error={:?}", e);
anyhow::anyhow!(e)
})?;
match es_client
.select_after::<models::KlineData>(&index_name, &render_dsl)
.await
{
Ok((result, _)) => Ok(result.into_iter().map(models::KlineDataVo::from).collect()),
Err(e) => Err(Error::throw("", Some(e))),
}
}
pub async fn save_kline_bulk(pool: &Pool<MySql>, data: Vec<KlineDataCreator>) -> Result<u64> {
let mut insert_result_int = 0;
for chunk in data.chunks(5000) {
let mut query_builder: QueryBuilder<MySql> = QueryBuilder::new(
r#"INSERT INTO t_kline (f_symbol, f_period, f_open, f_close, f_high, f_low, f_time, f_vendor) "#,
);
query_builder.push_values(chunk.iter().take(*BIND_LIMIT / 8), |mut b, record| {
b.push_bind(record.s.to_owned())
.push_bind(record.period.to_owned())
.push_bind(record.o.to_string())
.push_bind(record.c.to_string())
.push_bind(record.h.to_string())
.push_bind(record.l.to_string())
.push_bind(parse_utc_string_to_millis(&record.t).unwrap())
.push_bind(record.vr.to_owned());
});
insert_result_int += query_builder
.build()
.execute(pool)
.await
.map_err(|e| anyhow::anyhow!(e))?
.rows_affected();
}
Ok(insert_result_int)
}
pub async fn select_kline_data(pool: &Pool<MySql>, filter: FetchKline) -> Result<Vec<KlineDataVo>> {
let list = filter
.build()
.build()
.fetch_all(pool)
.await
.map_err(|e| anyhow::anyhow!(e))?
.into_iter()
.map(FetchKline::from_row)
.collect();
match list {
Ok(data) => Ok(data),
Err(err) => Err(Error::throw("", Some(err))),
}
}