rxqlite 0.1.15

A secured distributed sqlite database built upon `openraft`, `sqlx` and `sqlite`.
Documentation
use std::sync::Arc;

use warp::reply;

use crate::app::App;
use crate::Node;
use crate::TypeConfig;
use openraft::LeaderId;
use openraft::LogId;
use rxqlite_common::{Message,MessageResponse};
use crate::convert::RXQliteInto;

pub async fn sql_consistent_or_fast(
    message: Message,
    app: Arc<App>,
    consistent: bool,
) -> Result::<
            openraft::raft::ClientWriteResponse<TypeConfig>,
            openraft::error::RaftError<u64, openraft::error::ClientWriteError<u64, Node>>,
        > {
    let sql = message.sql();

    let is_write = rxqlite_sqlx_common::is_query_write(sql);
    if let Err(err) = &is_write {
      let response_message = MessageResponse::Error(format!("{}",err));
      let client_write_response = openraft::raft::ClientWriteResponse::<TypeConfig> {
            log_id: LogId {
                leader_id: LeaderId {
                    term: u64::MAX,
                    node_id: u64::MAX,
                },
                index: u64::MAX,
            },
            data: Some(response_message),
            membership: None,
        };
        let res = Result::<
            openraft::raft::ClientWriteResponse<TypeConfig>,
            openraft::error::RaftError<u64, openraft::error::ClientWriteError<u64, Node>>,
        >::Ok(client_write_response);
        return res;
    }
    let is_write=is_write.unwrap();
    if is_write {
        let res: Result<openraft::raft::ClientWriteResponse<TypeConfig>, _> =
            app.raft.client_write(message).await;
        res
    } else {
        let do_it_locally = if consistent {
            if let Ok(_read_log_id) = app.raft.ensure_linearizable().await {
                true
            } else {
                false
            }
        } else {
            true
        };
        if do_it_locally {
            let sqlite_and_path = app.sqlite_and_path.read().await;
            let response_message = rxqlite_sqlx_common::do_sql(&sqlite_and_path, message).await;
            let client_write_response = openraft::raft::ClientWriteResponse::<TypeConfig> {
                log_id: LogId {
                    leader_id: LeaderId {
                        term: u64::MAX,
                        node_id: u64::MAX,
                    },
                    index: u64::MAX,
                },
                data: Some(response_message),
                membership: None,
            };
            let res = Result::<
                openraft::raft::ClientWriteResponse<TypeConfig>,
                openraft::error::RaftError<u64, openraft::error::ClientWriteError<u64, Node>>,
            >::Ok(client_write_response);
            res
        } else {
            let server_metrics = app.raft.server_metrics().borrow().clone();
            if let Some(leader_id) = server_metrics.current_leader {
                let membership_config = server_metrics.membership_config;
                let leader_node = membership_config.nodes().find_map(|(node_id, node)| {
                    if node_id == &leader_id {
                        Some(node.clone())
                    } else {
                        None
                    }
                });
                let res = Result::<
                    openraft::raft::ClientWriteResponse<TypeConfig>,
                    openraft::error::RaftError<u64, openraft::error::ClientWriteError<u64, Node>>,
                >::Err(openraft::error::RaftError::APIError(
                    openraft::error::ClientWriteError::ForwardToLeader(
                        openraft::error::ForwardToLeader {
                            leader_id: Some(leader_id),
                            leader_node: leader_node,
                        },
                    ),
                ));
                res
            } else {
                let res = Result::<
                    openraft::raft::ClientWriteResponse<TypeConfig>,
                    openraft::error::RaftError<u64, openraft::error::ClientWriteError<u64, Node>>,
                >::Err(openraft::error::RaftError::APIError(
                    openraft::error::ClientWriteError::ForwardToLeader(
                        openraft::error::ForwardToLeader {
                            leader_id: None,
                            leader_node: None,
                        },
                    ),
                ));
                res
            }
        }
    }
}

pub async fn sql(
    message: Message,
    app: Arc<App>,
) -> Result<impl warp::Reply, std::convert::Infallible> {
    let res=sql_consistent_or_fast(message, app, false).await;
    Ok(reply::json(&res))
}

pub async fn sql_consistent(
    message: Message,
    app: Arc<App>,
) -> Result<impl warp::Reply, std::convert::Infallible> {
    let res=sql_consistent_or_fast(message, app, true).await;
    Ok(reply::json(&res))
}

pub async fn lite_sql(
    message: Message,
    app: Arc<App>,
) -> Result<impl warp::Reply, std::convert::Infallible> {
    let res=sql_consistent_or_fast(message, app, false).await.into_lite();
    Ok(reply::json(&res))
}

pub async fn lite_sql_consistent(
    message: Message,
    app: Arc<App>,
) -> Result<impl warp::Reply, std::convert::Infallible> {
    let res=sql_consistent_or_fast(message, app, true).await.into_lite();
    Ok(reply::json(&res))
}