xkv 0.1.58

Static global Redis client manager for Fred and Kvrocks / 全局静态 Redis 客户端管理器,基于 Fred 与 Kvrocks 构建
Documentation
#![cfg_attr(docsrs, feature(doc_cfg))]

use std::{env, path::PathBuf};

use aok::Result;
use err_exit::err_exit;
pub use fred;
use fred::{
  interfaces::ClientLike,
  prelude::{Client, Config, ReconnectPolicy, Server as FredServer, ServerConfig},
  types::RespVersion,
};

#[cfg(feature = "macro")]
mod r#macro;

#[cfg(feature = "macro")]
mod macro_pub_use {
  pub use log;
  pub use tokio;
  pub use xboot;
}

#[cfg(feature = "macro")]
pub use macro_pub_use::*;

#[cfg(feature = "r")]
mod r;

#[cfg(feature = "r")]
pub use r::R;

pub struct Server;

impl Server {
  pub fn unix_sock(path: impl Into<PathBuf>) -> ServerConfig {
    ServerConfig::Unix { path: path.into() }
  }

  pub fn cluster(hosts: Vec<FredServer>) -> ServerConfig {
    ServerConfig::Clustered {
      hosts,
      policy: Default::default(),
    }
  }

  pub fn sentinel(
    service_name: impl Into<String>,
    hosts: Vec<FredServer>,
    username: Option<String>,
    password: Option<String>,
  ) -> ServerConfig {
    ServerConfig::Sentinel {
      service_name: service_name.into(),
      hosts,
      username: Some(username.unwrap_or_else(|| "default".to_string())),
      password,
    }
  }

  pub fn centralized(server: FredServer) -> ServerConfig {
    ServerConfig::Centralized { server }
  }
}

const USER: &str = "USER";
const NODE: &str = "NODE";
const PASSWORD: &str = "PASSWORD";
const DB: &str = "DB";
const SENTINEL_NAME: &str = "SENTINEL_NAME";
const SENTINEL_PASSWORD: &str = "SENTINEL_PASSWORD";
const SENTINEL_USER: &str = "SENTINEL_USER";
const SENTINEL_PORT: &str = "SENTINEL_PORT";
const RESP: &str = "RESP";

const DEFAULT_REDIS_PORT: u16 = 6379;
const DEFAULT_SENTINEL_PORT: u16 = 26379;
const RECONNECT_MAX_ATTEMPTS: u32 = u32::MAX;
const RECONNECT_MAX_DELAY_SEC: u32 = 8;
const RECONNECT_INITIAL_DELAY_SEC: u32 = 1;

pub fn server_li(host_port: impl AsRef<str>, default_port: u16) -> Vec<FredServer> {
  host_port
    .as_ref()
    .split_whitespace()
    .map(|i| {
      if let Some((host, port_str)) = i.rsplit_once(':') {
        let port = port_str.parse::<u16>().unwrap_or(default_port);
        FredServer::new(host, port)
      } else {
        FredServer::new(i, default_port)
      }
    })
    .collect()
}

pub async fn conn(prefix: impl AsRef<str>) -> Result<Client> {
  let prefix = prefix.as_ref();

  let mut key_buf = String::with_capacity(prefix.len() + 1 + 32);
  let mut get_env = |name: &str| -> Option<String> {
    key_buf.clear();
    key_buf.push_str(prefix);
    key_buf.push('_');
    key_buf.push_str(name);
    env::var(&key_buf)
      .ok()
      .map(|s| s.trim().to_owned())
      .filter(|s| !s.is_empty())
  };

  let host_port = get_env(NODE).unwrap_or_else(|| err_exit!("xkv : miss env {}_{}", prefix, NODE));

  let server = if let Some(sentinel_name) = get_env(SENTINEL_NAME) {
    let sentinel_port = get_env(SENTINEL_PORT)
      .map(|i| {
        i.parse::<u16>()
          .unwrap_or_else(|_| err_exit!("xkv : invalid SENTINEL_PORT '{}' for prefix {}", i, prefix))
      })
      .unwrap_or(DEFAULT_SENTINEL_PORT);

    Server::sentinel(
      sentinel_name,
      server_li(host_port, sentinel_port),
      get_env(SENTINEL_USER),
      get_env(SENTINEL_PASSWORD),
    )
  } else if host_port.starts_with('/') {
    Server::unix_sock(host_port)
  } else {
    let mut hosts = server_li(host_port, DEFAULT_REDIS_PORT);
    if hosts.len() == 1 {
      Server::centralized(hosts.pop().unwrap())
    } else {
      Server::cluster(hosts)
    }
  };

  let database = get_env(DB).map(|s| {
    s.parse::<u8>()
      .unwrap_or_else(|_| err_exit!("xkv : invalid DB value '{}' for prefix {}", s, prefix))
  });
  let user = get_env(USER);
  let password = get_env(PASSWORD);
  let resp = get_env(RESP);

  connect(&server, user, password, database, resp.as_deref()).await
}

pub async fn connect(
  server: &ServerConfig,
  username: Option<String>,
  password: Option<String>,
  database: Option<u8>,
  resp: Option<&str>,
) -> Result<Client> {
  let mut conf = Config {
    version: if resp == Some("2") {
      RespVersion::RESP2
    } else {
      RespVersion::RESP3
    },
    ..Default::default()
  };
  conf.server = server.clone();
  conf.username = username;
  conf.password = password;
  conf.database = database;

  let policy = ReconnectPolicy::new_linear(
    RECONNECT_MAX_ATTEMPTS,
    RECONNECT_MAX_DELAY_SEC,
    RECONNECT_INITIAL_DELAY_SEC,
  );
  let client = Client::new(conf, None, None, Some(policy));
  client.init().await?;
  Ok(client)
}