zero4rs 2.0.0

zero4rs is a powerful, pragmatic, and extremely fast web framework for Rust
Documentation
use futures::stream::StreamExt;

use crate::commons::parse_json_string;
use crate::core::elasticsearch::EsClient;
// use crate::core::redis::ReidsPubSubAsync;
use crate::core::serde::f64_format::F64;

#[derive(serde::Deserialize, serde::Serialize)]
pub struct KlineData {
    pub t: i64,
    pub s: String,
    pub o: F64,
    pub c: F64,
    pub h: F64,
    pub l: F64,
    pub vr: String,
}

// pub fn consumers1(redis_sub: ReidsPubSubAsync, _es_client: EsClient) {
//     let redis_sub1 = redis_sub.clone();
//     let redis_sub2 = redis_sub.clone();

//     tokio::spawn(async move {
//         redis_sub1.subscribe3("one_topic", &|channel, msg| {
//             log::info!("consumers1: channel={}, msg={}", channel, msg);
//         })
//     });

//     tokio::spawn(async move {
//         let _rx = redis_sub2.subscribe(*super::KLINE_CHANNEL).await.unwrap();
//         process_messages(_rx, &_es_client).await;
//     });
// }

pub async fn process_messages(
    mut rx: futures::channel::mpsc::UnboundedReceiver<String>,
    _es_client: &EsClient,
) {
    while let Some(message) = rx.next().await {
        if let Ok(kline_data) = parse_json_string::<KlineData>(&message) {
            if _es_client
                .save(
                    &format!("{}-{}", *super::KLINE_INDEX, crate::commons::curr_date()),
                    &kline_data,
                )
                .await
                .is_ok()
            {
                log::info!("Message received: {}", message);
            } else {
                log::error!("Failed to save data to Elasticsearch");
            }
        } else {
            log::error!("Failed to parse JSON: {}", message);
        }
    }
}