1#![deny(unsafe_code)]
49#![warn(missing_debug_implementations)]
50
51pub mod connect;
52pub mod connector;
53pub mod error;
54pub mod params;
55pub mod topology;
56pub mod types;
57
58#[cfg(feature = "embedded")]
59pub mod embedded;
60
61#[cfg(feature = "grpc")]
62pub mod grpc;
63
64#[cfg(feature = "grpc")]
65pub mod router;
66
67#[cfg(feature = "redwire")]
68pub mod redwire;
69
70#[cfg(feature = "http")]
71pub mod http;
72
73pub use error::{ClientError, ErrorCode, Result};
74pub use params::{IntoParams, IntoValue, Value};
75pub use types::{BulkInsertResult, InsertResult, JsonValue, KvWatchEvent, QueryResult, ValueOut};
76
77pub use connector::{
82 repl, BulkCreateStatus, CreatedEntity, HealthStatus, OperationStatus, QueryResponse,
83 RedDBClient,
84};
85
86use connect::Target;
87
88#[derive(Debug)]
90pub enum Reddb {
91 #[cfg(feature = "embedded")]
92 Embedded(embedded::EmbeddedClient),
93 #[cfg(feature = "grpc")]
94 Grpc(grpc::GrpcClient),
95 #[cfg(feature = "http")]
96 Http(http::HttpClient),
97 Unavailable(&'static str),
103}
104
105impl Reddb {
106 pub async fn connect(uri: &str) -> Result<Self> {
108 let target = connect::parse(uri)?;
109 match target {
110 Target::Memory => {
111 #[cfg(feature = "embedded")]
112 {
113 embedded::EmbeddedClient::in_memory().map(Reddb::Embedded)
114 }
115 #[cfg(not(feature = "embedded"))]
116 {
117 Err(ClientError::feature_disabled("embedded"))
118 }
119 }
120 Target::File { path } => {
121 #[cfg(feature = "embedded")]
122 {
123 embedded::EmbeddedClient::open(path).map(Reddb::Embedded)
124 }
125 #[cfg(not(feature = "embedded"))]
126 {
127 let _ = path;
128 Err(ClientError::feature_disabled("embedded"))
129 }
130 }
131 Target::Grpc { endpoint } => {
132 #[cfg(feature = "grpc")]
133 {
134 grpc::GrpcClient::connect(endpoint).await.map(Reddb::Grpc)
135 }
136 #[cfg(not(feature = "grpc"))]
137 {
138 let _ = endpoint;
139 Err(ClientError::feature_disabled("grpc"))
140 }
141 }
142 Target::GrpcCluster {
143 primary,
144 replicas,
145 force_primary,
146 } => {
147 #[cfg(feature = "grpc")]
148 {
149 grpc::GrpcClient::connect_cluster(primary, replicas, force_primary)
150 .await
151 .map(Reddb::Grpc)
152 }
153 #[cfg(not(feature = "grpc"))]
154 {
155 let _ = (primary, replicas, force_primary);
156 Err(ClientError::feature_disabled("grpc"))
157 }
158 }
159 Target::Http { base_url } => {
160 #[cfg(feature = "http")]
161 {
162 http::HttpClient::connect(http::HttpOptions::new(base_url))
163 .await
164 .map(Reddb::Http)
165 }
166 #[cfg(not(feature = "http"))]
167 {
168 let _ = base_url;
169 Err(ClientError::feature_disabled("http"))
170 }
171 }
172 }
173 }
174
175 pub async fn query(&self, sql: &str) -> Result<QueryResult> {
176 match self {
177 #[cfg(feature = "embedded")]
178 Reddb::Embedded(c) => c.query(sql),
179 #[cfg(feature = "grpc")]
180 Reddb::Grpc(c) => c.query(sql).await,
181 #[cfg(feature = "http")]
182 Reddb::Http(c) => c.query(sql).await,
183 Reddb::Unavailable(name) => Err(ClientError::feature_disabled(name)),
184 }
185 }
186
187 pub async fn query_with<P: IntoParams>(&self, sql: &str, params: P) -> Result<QueryResult> {
208 let values = params.into_params();
209 match self {
210 #[cfg(feature = "embedded")]
211 Reddb::Embedded(c) => c.query_with(sql, values),
212 #[cfg(feature = "grpc")]
213 Reddb::Grpc(c) => c.query_with(sql, &values).await,
214 #[cfg(feature = "http")]
215 Reddb::Http(c) => c.query_with(sql, &values).await,
216 Reddb::Unavailable(name) => Err(ClientError::feature_disabled(name)),
217 }
218 }
219
220 pub async fn execute_with<P: IntoParams>(&self, sql: &str, params: P) -> Result<QueryResult> {
224 self.query_with(sql, params).await
225 }
226
227 pub async fn insert(&self, collection: &str, payload: &JsonValue) -> Result<InsertResult> {
228 match self {
229 #[cfg(feature = "embedded")]
230 Reddb::Embedded(c) => c.insert(collection, payload),
231 #[cfg(feature = "grpc")]
232 Reddb::Grpc(c) => c.insert(collection, payload).await,
233 #[cfg(feature = "http")]
234 Reddb::Http(c) => c.insert(collection, payload).await,
235 Reddb::Unavailable(name) => Err(ClientError::feature_disabled(name)),
236 }
237 }
238
239 pub async fn bulk_insert(
240 &self,
241 collection: &str,
242 payloads: &[JsonValue],
243 ) -> Result<BulkInsertResult> {
244 match self {
245 #[cfg(feature = "embedded")]
246 Reddb::Embedded(c) => c.bulk_insert(collection, payloads),
247 #[cfg(feature = "grpc")]
248 Reddb::Grpc(c) => c.bulk_insert(collection, payloads).await,
249 #[cfg(feature = "http")]
250 Reddb::Http(c) => c.bulk_insert(collection, payloads).await,
251 Reddb::Unavailable(name) => Err(ClientError::feature_disabled(name)),
252 }
253 }
254
255 pub async fn delete(&self, collection: &str, id: &str) -> Result<u64> {
256 match self {
257 #[cfg(feature = "embedded")]
258 Reddb::Embedded(c) => c.delete(collection, id),
259 #[cfg(feature = "grpc")]
260 Reddb::Grpc(c) => c.delete(collection, id).await,
261 #[cfg(feature = "http")]
262 Reddb::Http(c) => c.delete(collection, id).await,
263 Reddb::Unavailable(name) => Err(ClientError::feature_disabled(name)),
264 }
265 }
266
267 pub async fn close(&self) -> Result<()> {
268 match self {
269 #[cfg(feature = "embedded")]
270 Reddb::Embedded(c) => c.close(),
271 #[cfg(feature = "grpc")]
272 Reddb::Grpc(c) => c.close().await,
273 #[cfg(feature = "http")]
274 Reddb::Http(c) => c.close().await,
275 Reddb::Unavailable(_) => Ok(()),
276 }
277 }
278
279 pub fn kv(&self) -> KvClient<'_> {
280 KvClient {
281 db: self,
282 collection: "kv_default",
283 }
284 }
285
286 pub fn config(&self) -> ConfigClient<'_> {
287 self.config_collection("red.config")
288 }
289
290 pub fn vault(&self) -> VaultClient<'_> {
291 self.vault_collection("red.vault")
292 }
293
294 pub fn config_collection<'a>(&'a self, collection: &'a str) -> ConfigClient<'a> {
295 ConfigClient {
296 db: self,
297 collection,
298 }
299 }
300
301 pub fn vault_collection<'a>(&'a self, collection: &'a str) -> VaultClient<'a> {
302 VaultClient {
303 db: self,
304 collection,
305 }
306 }
307}
308
309#[derive(Debug)]
310pub struct KvClient<'a> {
311 db: &'a Reddb,
312 collection: &'static str,
313}
314
315impl<'a> KvClient<'a> {
316 pub async fn put(&self, key: &str, value: JsonValue, tags: &[&str]) -> Result<QueryResult> {
317 let tag_clause = if tags.is_empty() {
318 String::new()
319 } else {
320 format!(
321 " TAGS [{}]",
322 tags.iter()
323 .map(|tag| kv_tag_literal(tag))
324 .collect::<Vec<_>>()
325 .join(", ")
326 )
327 };
328 self.db
329 .query(&format!(
330 "KV PUT {}.{} = {}{}",
331 kv_identifier(self.collection),
332 kv_identifier(key),
333 kv_value_literal(&value),
334 tag_clause
335 ))
336 .await
337 }
338
339 pub async fn invalidate_tags(&self, tags: &[&str]) -> Result<u64> {
340 let result = self
341 .db
342 .query(&format!(
343 "INVALIDATE TAGS [{}] FROM {}",
344 tags.iter()
345 .map(|tag| kv_tag_literal(tag))
346 .collect::<Vec<_>>()
347 .join(", "),
348 kv_identifier(self.collection)
349 ))
350 .await?;
351 Ok(result.affected)
352 }
353
354 pub async fn watch(&self, key: &str) -> Result<Vec<KvWatchEvent>> {
355 self.watch_from_lsn(key, None).await
356 }
357
358 pub async fn watch_from_lsn(
359 &self,
360 key: &str,
361 from_lsn: Option<u64>,
362 ) -> Result<Vec<KvWatchEvent>> {
363 #[cfg(not(feature = "http"))]
364 {
365 let _ = key;
366 let _ = from_lsn;
367 let _ = self.collection;
368 }
369 match self.db {
370 #[cfg(feature = "http")]
371 Reddb::Http(c) => c.watch_kv(self.collection, key, from_lsn, None).await,
372 #[cfg(feature = "embedded")]
373 Reddb::Embedded(_) => Err(ClientError::feature_disabled("kv.watch embedded")),
374 #[cfg(feature = "grpc")]
375 Reddb::Grpc(_) => Err(ClientError::feature_disabled("kv.watch grpc")),
376 Reddb::Unavailable(name) => Err(ClientError::feature_disabled(name)),
377 }
378 }
379
380 pub async fn watch_prefix(&self, prefix: &str) -> Result<Vec<KvWatchEvent>> {
381 self.watch_prefix_from_lsn(prefix, None).await
382 }
383
384 pub async fn watch_prefix_from_lsn(
385 &self,
386 prefix: &str,
387 from_lsn: Option<u64>,
388 ) -> Result<Vec<KvWatchEvent>> {
389 let key = format!("{prefix}.*");
390 self.watch_from_lsn(&key, from_lsn).await
391 }
392}
393
394#[derive(Debug)]
395pub struct ConfigClient<'a> {
396 db: &'a Reddb,
397 collection: &'a str,
398}
399
400impl<'a> ConfigClient<'a> {
401 pub async fn put(&self, key: &str, value: JsonValue, tags: &[&str]) -> Result<QueryResult> {
402 let mut sql = format!(
403 "PUT CONFIG {} {} = {}",
404 kv_identifier(self.collection),
405 kv_identifier(key),
406 kv_value_literal(&value)
407 );
408 append_tag_clause(&mut sql, tags);
409 self.db.query(&sql).await
410 }
411
412 pub async fn put_secret_ref(
413 &self,
414 key: &str,
415 vault_collection: &str,
416 vault_key: &str,
417 tags: &[&str],
418 ) -> Result<QueryResult> {
419 let mut sql = format!(
420 "PUT CONFIG {} {} = SECRET_REF(vault, {}.{})",
421 kv_identifier(self.collection),
422 kv_identifier(key),
423 kv_identifier(vault_collection),
424 kv_identifier(vault_key)
425 );
426 append_tag_clause(&mut sql, tags);
427 self.db.query(&sql).await
428 }
429
430 pub async fn get(&self, key: &str) -> Result<QueryResult> {
431 self.db
432 .query(&format!(
433 "GET CONFIG {} {}",
434 kv_identifier(self.collection),
435 kv_identifier(key)
436 ))
437 .await
438 }
439
440 pub async fn resolve(&self, key: &str) -> Result<QueryResult> {
441 self.db
442 .query(&format!(
443 "RESOLVE CONFIG {} {}",
444 kv_identifier(self.collection),
445 kv_identifier(key)
446 ))
447 .await
448 }
449}
450
451#[derive(Debug)]
452pub struct VaultClient<'a> {
453 db: &'a Reddb,
454 collection: &'a str,
455}
456
457impl<'a> VaultClient<'a> {
458 pub async fn put(&self, key: &str, value: JsonValue, tags: &[&str]) -> Result<QueryResult> {
459 let mut sql = format!(
460 "VAULT PUT {}.{} = {}",
461 kv_identifier(self.collection),
462 kv_identifier(key),
463 kv_value_literal(&value)
464 );
465 append_tag_clause(&mut sql, tags);
466 self.db.query(&sql).await
467 }
468
469 pub async fn get(&self, key: &str) -> Result<QueryResult> {
470 self.db
471 .query(&format!(
472 "VAULT GET {}.{}",
473 kv_identifier(self.collection),
474 kv_identifier(key)
475 ))
476 .await
477 }
478
479 pub async fn unseal(&self, key: &str) -> Result<QueryResult> {
480 self.db
481 .query(&format!(
482 "UNSEAL VAULT {}.{}",
483 kv_identifier(self.collection),
484 kv_identifier(key)
485 ))
486 .await
487 }
488}
489
490fn append_tag_clause(sql: &mut String, tags: &[&str]) {
491 if tags.is_empty() {
492 return;
493 }
494 sql.push_str(" TAGS [");
495 sql.push_str(
496 &tags
497 .iter()
498 .map(|tag| kv_tag_literal(tag))
499 .collect::<Vec<_>>()
500 .join(", "),
501 );
502 sql.push(']');
503}
504
505fn kv_identifier(value: &str) -> String {
506 value
507 .chars()
508 .map(|ch| {
509 if ch.is_ascii_alphanumeric() || ch == '_' || ch == '.' {
510 ch
511 } else {
512 '_'
513 }
514 })
515 .collect()
516}
517
518fn kv_value_literal(value: &JsonValue) -> String {
519 match value {
520 JsonValue::Null => "NULL".to_string(),
521 JsonValue::Bool(value) => value.to_string(),
522 JsonValue::Number(value) => value.to_string(),
523 JsonValue::String(value) => format!("'{}'", value.replace('\'', "''")),
524 JsonValue::Array(_) | JsonValue::Object(_) => {
525 format!("'{}'", value.to_json_string().replace('\'', "''"))
526 }
527 }
528}
529
530fn kv_tag_literal(value: &str) -> String {
531 format!("'{}'", value.replace('\'', "''"))
532}
533
534pub fn version() -> &'static str {
536 env!("CARGO_PKG_VERSION")
537}