#![deny(unsafe_code)]
#![warn(missing_debug_implementations)]
pub mod connect;
pub mod connector;
pub mod error;
pub mod params;
pub mod topology;
pub mod types;
#[cfg(feature = "embedded")]
pub mod embedded;
#[cfg(feature = "grpc")]
pub mod grpc;
#[cfg(feature = "grpc")]
pub mod router;
#[cfg(feature = "redwire")]
pub mod redwire;
#[cfg(feature = "http")]
pub mod http;
pub use error::{ClientError, ErrorCode, Result};
pub use params::{IntoParams, IntoValue, Value};
pub use types::{BulkInsertResult, InsertResult, JsonValue, KvWatchEvent, QueryResult, ValueOut};
pub use connector::{
repl, BulkCreateStatus, CreatedEntity, HealthStatus, OperationStatus, QueryResponse,
RedDBClient,
};
use connect::Target;
#[derive(Debug)]
pub enum Reddb {
#[cfg(feature = "embedded")]
Embedded(embedded::EmbeddedClient),
#[cfg(feature = "grpc")]
Grpc(grpc::GrpcClient),
#[cfg(feature = "http")]
Http(http::HttpClient),
Unavailable(&'static str),
}
impl Reddb {
pub async fn connect(uri: &str) -> Result<Self> {
let target = connect::parse(uri)?;
match target {
Target::Memory => {
#[cfg(feature = "embedded")]
{
embedded::EmbeddedClient::in_memory().map(Reddb::Embedded)
}
#[cfg(not(feature = "embedded"))]
{
Err(ClientError::feature_disabled("embedded"))
}
}
Target::File { path } => {
#[cfg(feature = "embedded")]
{
embedded::EmbeddedClient::open(path).map(Reddb::Embedded)
}
#[cfg(not(feature = "embedded"))]
{
let _ = path;
Err(ClientError::feature_disabled("embedded"))
}
}
Target::Grpc { endpoint } => {
#[cfg(feature = "grpc")]
{
grpc::GrpcClient::connect(endpoint).await.map(Reddb::Grpc)
}
#[cfg(not(feature = "grpc"))]
{
let _ = endpoint;
Err(ClientError::feature_disabled("grpc"))
}
}
Target::GrpcCluster {
primary,
replicas,
force_primary,
} => {
#[cfg(feature = "grpc")]
{
grpc::GrpcClient::connect_cluster(primary, replicas, force_primary)
.await
.map(Reddb::Grpc)
}
#[cfg(not(feature = "grpc"))]
{
let _ = (primary, replicas, force_primary);
Err(ClientError::feature_disabled("grpc"))
}
}
Target::Http { base_url } => {
#[cfg(feature = "http")]
{
http::HttpClient::connect(http::HttpOptions::new(base_url))
.await
.map(Reddb::Http)
}
#[cfg(not(feature = "http"))]
{
let _ = base_url;
Err(ClientError::feature_disabled("http"))
}
}
}
}
pub async fn query(&self, sql: &str) -> Result<QueryResult> {
match self {
#[cfg(feature = "embedded")]
Reddb::Embedded(c) => c.query(sql),
#[cfg(feature = "grpc")]
Reddb::Grpc(c) => c.query(sql).await,
#[cfg(feature = "http")]
Reddb::Http(c) => c.query(sql).await,
Reddb::Unavailable(name) => Err(ClientError::feature_disabled(name)),
}
}
pub async fn query_with<P: IntoParams>(&self, sql: &str, params: P) -> Result<QueryResult> {
let values = params.into_params();
match self {
#[cfg(feature = "embedded")]
Reddb::Embedded(c) => c.query_with(sql, values),
#[cfg(feature = "grpc")]
Reddb::Grpc(c) => c.query_with(sql, &values).await,
#[cfg(feature = "http")]
Reddb::Http(c) => c.query_with(sql, &values).await,
Reddb::Unavailable(name) => Err(ClientError::feature_disabled(name)),
}
}
pub async fn execute_with<P: IntoParams>(&self, sql: &str, params: P) -> Result<QueryResult> {
self.query_with(sql, params).await
}
pub async fn insert(&self, collection: &str, payload: &JsonValue) -> Result<InsertResult> {
match self {
#[cfg(feature = "embedded")]
Reddb::Embedded(c) => c.insert(collection, payload),
#[cfg(feature = "grpc")]
Reddb::Grpc(c) => c.insert(collection, payload).await,
#[cfg(feature = "http")]
Reddb::Http(c) => c.insert(collection, payload).await,
Reddb::Unavailable(name) => Err(ClientError::feature_disabled(name)),
}
}
pub async fn bulk_insert(
&self,
collection: &str,
payloads: &[JsonValue],
) -> Result<BulkInsertResult> {
match self {
#[cfg(feature = "embedded")]
Reddb::Embedded(c) => c.bulk_insert(collection, payloads),
#[cfg(feature = "grpc")]
Reddb::Grpc(c) => c.bulk_insert(collection, payloads).await,
#[cfg(feature = "http")]
Reddb::Http(c) => c.bulk_insert(collection, payloads).await,
Reddb::Unavailable(name) => Err(ClientError::feature_disabled(name)),
}
}
pub async fn delete(&self, collection: &str, id: &str) -> Result<u64> {
match self {
#[cfg(feature = "embedded")]
Reddb::Embedded(c) => c.delete(collection, id),
#[cfg(feature = "grpc")]
Reddb::Grpc(c) => c.delete(collection, id).await,
#[cfg(feature = "http")]
Reddb::Http(c) => c.delete(collection, id).await,
Reddb::Unavailable(name) => Err(ClientError::feature_disabled(name)),
}
}
pub async fn close(&self) -> Result<()> {
match self {
#[cfg(feature = "embedded")]
Reddb::Embedded(c) => c.close(),
#[cfg(feature = "grpc")]
Reddb::Grpc(c) => c.close().await,
#[cfg(feature = "http")]
Reddb::Http(c) => c.close().await,
Reddb::Unavailable(_) => Ok(()),
}
}
pub fn kv(&self) -> KvClient<'_> {
KvClient {
db: self,
collection: "kv_default",
}
}
pub fn config(&self) -> ConfigClient<'_> {
self.config_collection("red.config")
}
pub fn vault(&self) -> VaultClient<'_> {
self.vault_collection("red.vault")
}
pub fn config_collection<'a>(&'a self, collection: &'a str) -> ConfigClient<'a> {
ConfigClient {
db: self,
collection,
}
}
pub fn vault_collection<'a>(&'a self, collection: &'a str) -> VaultClient<'a> {
VaultClient {
db: self,
collection,
}
}
}
#[derive(Debug)]
pub struct KvClient<'a> {
db: &'a Reddb,
collection: &'static str,
}
impl<'a> KvClient<'a> {
pub async fn put(&self, key: &str, value: JsonValue, tags: &[&str]) -> Result<QueryResult> {
let tag_clause = if tags.is_empty() {
String::new()
} else {
format!(
" TAGS [{}]",
tags.iter()
.map(|tag| kv_tag_literal(tag))
.collect::<Vec<_>>()
.join(", ")
)
};
self.db
.query(&format!(
"KV PUT {}.{} = {}{}",
kv_identifier(self.collection),
kv_identifier(key),
kv_value_literal(&value),
tag_clause
))
.await
}
pub async fn invalidate_tags(&self, tags: &[&str]) -> Result<u64> {
let result = self
.db
.query(&format!(
"INVALIDATE TAGS [{}] FROM {}",
tags.iter()
.map(|tag| kv_tag_literal(tag))
.collect::<Vec<_>>()
.join(", "),
kv_identifier(self.collection)
))
.await?;
Ok(result.affected)
}
pub async fn watch(&self, key: &str) -> Result<Vec<KvWatchEvent>> {
self.watch_from_lsn(key, None).await
}
pub async fn watch_from_lsn(
&self,
key: &str,
from_lsn: Option<u64>,
) -> Result<Vec<KvWatchEvent>> {
#[cfg(not(feature = "http"))]
{
let _ = key;
let _ = from_lsn;
let _ = self.collection;
}
match self.db {
#[cfg(feature = "http")]
Reddb::Http(c) => c.watch_kv(self.collection, key, from_lsn, None).await,
#[cfg(feature = "embedded")]
Reddb::Embedded(_) => Err(ClientError::feature_disabled("kv.watch embedded")),
#[cfg(feature = "grpc")]
Reddb::Grpc(_) => Err(ClientError::feature_disabled("kv.watch grpc")),
Reddb::Unavailable(name) => Err(ClientError::feature_disabled(name)),
}
}
pub async fn watch_prefix(&self, prefix: &str) -> Result<Vec<KvWatchEvent>> {
self.watch_prefix_from_lsn(prefix, None).await
}
pub async fn watch_prefix_from_lsn(
&self,
prefix: &str,
from_lsn: Option<u64>,
) -> Result<Vec<KvWatchEvent>> {
let key = format!("{prefix}.*");
self.watch_from_lsn(&key, from_lsn).await
}
}
#[derive(Debug)]
pub struct ConfigClient<'a> {
db: &'a Reddb,
collection: &'a str,
}
impl<'a> ConfigClient<'a> {
pub async fn put(&self, key: &str, value: JsonValue, tags: &[&str]) -> Result<QueryResult> {
let mut sql = format!(
"PUT CONFIG {} {} = {}",
kv_identifier(self.collection),
kv_identifier(key),
kv_value_literal(&value)
);
append_tag_clause(&mut sql, tags);
self.db.query(&sql).await
}
pub async fn put_secret_ref(
&self,
key: &str,
vault_collection: &str,
vault_key: &str,
tags: &[&str],
) -> Result<QueryResult> {
let mut sql = format!(
"PUT CONFIG {} {} = SECRET_REF(vault, {}.{})",
kv_identifier(self.collection),
kv_identifier(key),
kv_identifier(vault_collection),
kv_identifier(vault_key)
);
append_tag_clause(&mut sql, tags);
self.db.query(&sql).await
}
pub async fn get(&self, key: &str) -> Result<QueryResult> {
self.db
.query(&format!(
"GET CONFIG {} {}",
kv_identifier(self.collection),
kv_identifier(key)
))
.await
}
pub async fn resolve(&self, key: &str) -> Result<QueryResult> {
self.db
.query(&format!(
"RESOLVE CONFIG {} {}",
kv_identifier(self.collection),
kv_identifier(key)
))
.await
}
}
#[derive(Debug)]
pub struct VaultClient<'a> {
db: &'a Reddb,
collection: &'a str,
}
impl<'a> VaultClient<'a> {
pub async fn put(&self, key: &str, value: JsonValue, tags: &[&str]) -> Result<QueryResult> {
let mut sql = format!(
"VAULT PUT {}.{} = {}",
kv_identifier(self.collection),
kv_identifier(key),
kv_value_literal(&value)
);
append_tag_clause(&mut sql, tags);
self.db.query(&sql).await
}
pub async fn get(&self, key: &str) -> Result<QueryResult> {
self.db
.query(&format!(
"VAULT GET {}.{}",
kv_identifier(self.collection),
kv_identifier(key)
))
.await
}
pub async fn unseal(&self, key: &str) -> Result<QueryResult> {
self.db
.query(&format!(
"UNSEAL VAULT {}.{}",
kv_identifier(self.collection),
kv_identifier(key)
))
.await
}
}
fn append_tag_clause(sql: &mut String, tags: &[&str]) {
if tags.is_empty() {
return;
}
sql.push_str(" TAGS [");
sql.push_str(
&tags
.iter()
.map(|tag| kv_tag_literal(tag))
.collect::<Vec<_>>()
.join(", "),
);
sql.push(']');
}
fn kv_identifier(value: &str) -> String {
value
.chars()
.map(|ch| {
if ch.is_ascii_alphanumeric() || ch == '_' || ch == '.' {
ch
} else {
'_'
}
})
.collect()
}
fn kv_value_literal(value: &JsonValue) -> String {
match value {
JsonValue::Null => "NULL".to_string(),
JsonValue::Bool(value) => value.to_string(),
JsonValue::Number(value) => value.to_string(),
JsonValue::String(value) => format!("'{}'", value.replace('\'', "''")),
JsonValue::Array(_) | JsonValue::Object(_) => {
format!("'{}'", value.to_json_string().replace('\'', "''"))
}
}
}
fn kv_tag_literal(value: &str) -> String {
format!("'{}'", value.replace('\'', "''"))
}
pub fn version() -> &'static str {
env!("CARGO_PKG_VERSION")
}