rxqlite 0.1.24

A secured distributed sqlite database built upon `openraft`, `sqlx` and `sqlite`.
Documentation
#![deny(warnings)]

use clap::Parser;
use rxqlite::{init_rxqlite, start_rxqlite,InstanceParams};
//use tracing_subscriber::EnvFilter;
use rxqlite::RSQliteNodeTlsConfig;
use rxqlite::key_providers::{KeyProvider,FileSystemKeyProvider,InsecureRemovableMediaKeyProvider };
use serde::{Serialize, Deserialize};
use std::path::PathBuf;

#[derive(Parser, Clone, Debug)]
#[derive(Serialize, Deserialize)]
#[clap(author, version, about, long_about = None)]
pub struct Opt {
    #[clap(long)]
    pub id: u64,

    #[clap(long)]
    pub http_addr: Option<String>,

    #[clap(long)]
    pub rpc_addr: Option<String>,

    #[clap(long,action = clap::ArgAction::SetTrue)]
    leader: Option<bool>,

    #[clap(long, action = clap::ArgAction::Append)]
    member: Vec<String>, // id;http_addr;rpc_addr

    #[clap(long)]
    key_path: Option<String>,

    #[clap(long)]
    cert_path: Option<String>,

    #[clap(long,action = clap::ArgAction::SetTrue)]
    accept_invalid_certificates: Option<bool>,

    #[clap(long,action = clap::ArgAction::SetTrue)]
    no_database_encryption: Option<bool>,

    #[clap(long)]
    notifications_addr: Option<String>,
    
    #[clap(long,action = clap::ArgAction::SetTrue)]
    test_node: Option<bool>,
    
    #[clap(long,action = clap::ArgAction::SetTrue)]
    insecure_use_removable_media: Option<bool>,
    
    #[clap(long)]
    data_path: Option<PathBuf>,
}

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    
    // Setup the logger
    let subscriber = tracing_subscriber::fmt()
        // Use a more compact, abbreviated log format
        .compact()
        // Display source code file paths
        .with_file(true)
        // Display source code line numbers
        .with_line_number(true)
        // Display the thread ID an event was recorded on
        .with_thread_ids(true)
        // Don't display the event's target (module path)
        .with_target(true)
        //use RUST_LOG environment variable
        .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
        .finish();
    tracing::subscriber::set_global_default(subscriber)?;

    // Parse the parameters passed by arguments.
    let mut options = Opt::parse();
    
    let base_path = if let Some(data_path) = options.data_path.as_ref() {
      data_path.join(format!("data-{}", options.id))
    } else {
      PathBuf::from(format!("data-{}", options.id))
    };
    let start_parameters_path= base_path.join("instance_params.json");
    let has_data_path=base_path.is_dir();
    if has_data_path && start_parameters_path.is_file() {
      let tls_instance_params_json =
        tokio::fs::read_to_string(&start_parameters_path).await?;
      options = serde_json::from_str(&tls_instance_params_json)?;
    } else {
      std::fs::create_dir_all(&base_path)?;
      let instance_params_json = serde_json::to_string(&options)?;
      tokio::fs::write(
          &start_parameters_path,
          instance_params_json.as_bytes(),
      )
      .await?;
    }
    let is_test_node = options.test_node.unwrap_or(false);
    
    let insecure_use_removable_media = options.insecure_use_removable_media.unwrap_or(false);
    
    let mut key_provider: Option<Box<dyn KeyProvider>> = None;
    if options.key_path.is_some() && options.cert_path.is_some() {
      if insecure_use_removable_media {
        key_provider = Some(InsecureRemovableMediaKeyProvider ::new(
          options.key_path.as_ref().unwrap(),
          options.cert_path.as_ref().unwrap(),
        ).await?);
      } else {
        key_provider = Some(FileSystemKeyProvider::new(
          options.key_path.as_ref().unwrap(),
          options.cert_path.as_ref().unwrap(),
        )?);
      }
    }
    
    let tls_config = if options.key_path.is_some() && options.cert_path.is_some() {
        Some(RSQliteNodeTlsConfig {
            /*
            key_path: options.key_path.unwrap(),
            cert_path: options.cert_path.unwrap(),
            */
            accept_invalid_certificates: options.accept_invalid_certificates.unwrap_or(false),
        })
    } else {
        None
    };
    let instance_params = InstanceParams {
      node_id: options.id,
      http_addr: options.http_addr.ok_or(anyhow::anyhow!("missing node http-addr"))?,
      rpc_addr: options.rpc_addr.ok_or(anyhow::anyhow!("missing node rpc-addr"))?,
      notifications_addr: options.notifications_addr.ok_or(anyhow::anyhow!("missing node notifications-addr"))?,
      tls_config,
      key_provider,
      no_database_encryption: options.no_database_encryption.unwrap_or(false),
      is_test_node,
    };
    if has_data_path {    
        start_rxqlite(
          base_path,
          instance_params,
        )
        .await?;
        Ok(())
    } else {
        
    
        let leader = options.leader.unwrap_or(false);
        if !leader && options.member.len() > 0 {
            return Err(anyhow::anyhow!(
                "members can be specified on the leader node only"
            ));
        }
        let mut members = vec![];
        for member in options.member.into_iter() {
            let mut elements = member.split(";");
            let node_id = if let Some(node_id_str) = elements.next() {
                match node_id_str.parse::<u64>() {
                    Ok(node_id) => node_id,
                    Err(r) => {
                        return Err(anyhow::anyhow!(format!(
                            "couldn't parse member id from: {}({})",
                            node_id_str, r
                        )));
                    }
                }
            } else {
                return Err(anyhow::anyhow!(
                    "member must be provided in the form 'node_id;http_addr;rpc_addr'"
                ));
            };
            let http_addr = if let Some(http_addr_str) = elements.next() {
                http_addr_str.to_string()
            } else {
                return Err(anyhow::anyhow!(
                    "member must be provided in the form 'node_id;http_addr;rpc_addr'"
                ));
            };
            let rpc_addr = if let Some(http_addr_str) = elements.next() {
                http_addr_str.to_string()
            } else {
                return Err(anyhow::anyhow!(
                    "member must be provided in the form 'node_id;http_addr;rpc_addr'"
                ));
            };
            if elements.next().is_some() {
                return Err(anyhow::anyhow!(
                    "member must be provided in the form 'node_id;http_addr;rpc_addr'"
                ));
            }
            members.push((node_id, http_addr, rpc_addr));
        }
        init_rxqlite(
            base_path,
            instance_params,
            leader,
            members,
        )
        .await
    }
}