use futures::stream::StreamExt;
use crate::commons::parse_json_string;
use crate::core::elasticsearch::EsClient;
use crate::core::serde::f64_format::F64;
#[derive(serde::Deserialize, serde::Serialize)]
pub struct KlineData {
pub t: i64,
pub s: String,
pub o: F64,
pub c: F64,
pub h: F64,
pub l: F64,
pub vr: String,
}
pub async fn process_messages(
mut rx: futures::channel::mpsc::UnboundedReceiver<String>,
_es_client: &EsClient,
) {
while let Some(message) = rx.next().await {
if let Ok(kline_data) = parse_json_string::<KlineData>(&message) {
if _es_client
.save(
&format!("{}-{}", *super::KLINE_INDEX, crate::commons::curr_date()),
&kline_data,
)
.await
.is_ok()
{
log::info!("Message received: {}", message);
} else {
log::error!("Failed to save data to Elasticsearch");
}
} else {
log::error!("Failed to parse JSON: {}", message);
}
}
}