historyprovider 2.10.1

historyprovider-rs
Documentation
use std::{error::Error, marker::{Send, Sync}};

use futures::{StreamExt, channel::mpsc::{UnboundedReceiver, UnboundedSender, unbounded}};
use historyprovider::{AppTasks, HpConfig, make_client};
use historyprovider::init_logger;
use log::debug;
use shvclient::{ConnectionCommand, ConnectionEvent};
use shvproto::RpcValue;
use shvrpc::{RpcMessage, RpcMessageMetaTags};
use tokio::task::JoinHandle;

pub async fn mock_run(hp_config: &HpConfig, conn_evt_rx: futures::channel::mpsc::UnboundedReceiver<shvclient::ConnectionEvent>) -> shvrpc::Result<()> {
    let mut tasks = AppTasks::default();
    let (app_state, client, init_function) = make_client(hp_config, &mut tasks)?;
    client
        .mock_run_with_init(init_function, conn_evt_rx)
        .await?;

    tasks.wait_until_finished(app_state.as_ref()).await;
    Ok(())
}

async fn request(conn_evt_tx: &mut UnboundedSender<ConnectionEvent>, path: &str, method: &str, param: RpcValue) {
    conn_evt_tx.unbounded_send(ConnectionEvent::RpcFrameReceived(RpcMessage::new_request(path, method).with_param(param).to_frame().unwrap())).unwrap();
}

struct PendingRequest(RpcMessage);
impl PendingRequest {
    async fn respond(mut self, conn_evt_tx: &mut UnboundedSender<ConnectionEvent>, result: &str) {
        let result = RpcValue::from_cpon(result).unwrap_or_else(|err| panic!("Invalid CPON '{result}': {err}"));
        debug!(target: "test-driver", "==> {result:?}");
        self.0.set_result(result);
        conn_evt_tx.unbounded_send(ConnectionEvent::RpcFrameReceived(self.0.to_frame().expect("to_frame() must work"))).expect("sending ConnectionEvent must work");
    }
}

async fn await_request(conn_cmd_rx: &mut UnboundedReceiver<ConnectionCommand>, expected_path: &str, expected_method: &str, expected_param: RpcValue) -> PendingRequest {
    let Some(command) = conn_cmd_rx.next().await else {
        panic!("Expected a {expected_path}:{expected_method} - {expected_param}, but no call accepted");
    };

    match command {
        ConnectionCommand::SendMessage(rpc_message) => {
            let shv_path = rpc_message.shv_path().expect("msg must have a path");
            let method = rpc_message.method().expect("msg must have a method");
            let param = rpc_message.param().cloned().unwrap_or_else(RpcValue::null);
            debug!(target: "test-driver", "<== {shv_path}:{method}, param: {param}");
            assert_eq!(shv_path, expected_path);
            assert_eq!(method, expected_method);
            assert_eq!(param, expected_param);

            PendingRequest(rpc_message.prepare_response().expect("prepare_response must work"))
        },
    }
}

struct TestApp {
    app: JoinHandle<Result<(), Box<dyn Error + Send + Sync>>>,
    conn_evt_tx: UnboundedSender<ConnectionEvent>,
    conn_cmd_rx: UnboundedReceiver<ConnectionCommand>,
}

impl TestApp {
    fn new() -> Self {
        let (conn_evt_tx, conn_evt_rx) = futures::channel::mpsc::unbounded::<ConnectionEvent>();
        let config = Default::default();
        let app = tokio::spawn(async move {
            mock_run(&config, conn_evt_rx).await
        });

        let (conn_cmd_tx, conn_cmd_rx) = unbounded();
        conn_evt_tx.unbounded_send(ConnectionEvent::Connected(conn_cmd_tx)).expect("Events must work");
        Self {
            app,
            conn_evt_tx,
            conn_cmd_rx,
        }
    }
}

#[tokio::test]
async fn test_start_and_end() -> shvrpc::Result<()> {
    init_logger();
    let TestApp {app, mut conn_evt_tx, mut conn_cmd_rx} = TestApp::new();
    await_request(&mut conn_cmd_rx, ".broker", "ls", RpcValue::null()).await
        .respond(&mut conn_evt_tx, r#"["client"]"#).await;
    await_request(&mut conn_cmd_rx, "sites", "getSites", RpcValue::null()).await
        .respond(&mut conn_evt_tx, r#"{
            "_meta":{
                "HP3":{"type":"HP3"},
            }
        },"#).await;

    request(&mut conn_evt_tx, "", "syncLog", RpcValue::null()).await;
    conn_evt_tx.unbounded_send(ConnectionEvent::Disconnected)?;
    drop(conn_evt_tx);
    app.await?
}