1#![deny(unsafe_code)]
49#![warn(missing_debug_implementations)]
50
51pub mod connect;
52pub mod connector;
53pub mod error;
54pub mod topology;
55pub mod types;
56
57#[cfg(feature = "embedded")]
58pub mod embedded;
59
60#[cfg(feature = "grpc")]
61pub mod grpc;
62
63#[cfg(feature = "grpc")]
64pub mod router;
65
66#[cfg(feature = "redwire")]
67pub mod redwire;
68
69#[cfg(feature = "http")]
70pub mod http;
71
72pub use error::{ClientError, ErrorCode, Result};
73pub use types::{InsertResult, JsonValue, KvWatchEvent, QueryResult, ValueOut};
74
75pub use connector::{
80 repl, BulkCreateStatus, CreatedEntity, HealthStatus, OperationStatus, QueryResponse,
81 RedDBClient,
82};
83
84use connect::Target;
85
86#[derive(Debug)]
88pub enum Reddb {
89 #[cfg(feature = "embedded")]
90 Embedded(embedded::EmbeddedClient),
91 #[cfg(feature = "grpc")]
92 Grpc(grpc::GrpcClient),
93 #[cfg(feature = "http")]
94 Http(http::HttpClient),
95 Unavailable(&'static str),
101}
102
103impl Reddb {
104 pub async fn connect(uri: &str) -> Result<Self> {
106 let target = connect::parse(uri)?;
107 match target {
108 Target::Memory => {
109 #[cfg(feature = "embedded")]
110 {
111 embedded::EmbeddedClient::in_memory().map(Reddb::Embedded)
112 }
113 #[cfg(not(feature = "embedded"))]
114 {
115 Err(ClientError::feature_disabled("embedded"))
116 }
117 }
118 Target::File { path } => {
119 #[cfg(feature = "embedded")]
120 {
121 embedded::EmbeddedClient::open(path).map(Reddb::Embedded)
122 }
123 #[cfg(not(feature = "embedded"))]
124 {
125 let _ = path;
126 Err(ClientError::feature_disabled("embedded"))
127 }
128 }
129 Target::Grpc { endpoint } => {
130 #[cfg(feature = "grpc")]
131 {
132 grpc::GrpcClient::connect(endpoint).await.map(Reddb::Grpc)
133 }
134 #[cfg(not(feature = "grpc"))]
135 {
136 let _ = endpoint;
137 Err(ClientError::feature_disabled("grpc"))
138 }
139 }
140 Target::GrpcCluster {
141 primary,
142 replicas,
143 force_primary,
144 } => {
145 #[cfg(feature = "grpc")]
146 {
147 grpc::GrpcClient::connect_cluster(primary, replicas, force_primary)
148 .await
149 .map(Reddb::Grpc)
150 }
151 #[cfg(not(feature = "grpc"))]
152 {
153 let _ = (primary, replicas, force_primary);
154 Err(ClientError::feature_disabled("grpc"))
155 }
156 }
157 Target::Http { base_url } => {
158 #[cfg(feature = "http")]
159 {
160 http::HttpClient::connect(http::HttpOptions::new(base_url))
161 .await
162 .map(Reddb::Http)
163 }
164 #[cfg(not(feature = "http"))]
165 {
166 let _ = base_url;
167 Err(ClientError::feature_disabled("http"))
168 }
169 }
170 }
171 }
172
173 pub async fn query(&self, sql: &str) -> Result<QueryResult> {
174 match self {
175 #[cfg(feature = "embedded")]
176 Reddb::Embedded(c) => c.query(sql),
177 #[cfg(feature = "grpc")]
178 Reddb::Grpc(c) => c.query(sql).await,
179 #[cfg(feature = "http")]
180 Reddb::Http(c) => c.query(sql).await,
181 Reddb::Unavailable(name) => Err(ClientError::feature_disabled(name)),
182 }
183 }
184
185 pub async fn insert(&self, collection: &str, payload: &JsonValue) -> Result<InsertResult> {
186 match self {
187 #[cfg(feature = "embedded")]
188 Reddb::Embedded(c) => c.insert(collection, payload),
189 #[cfg(feature = "grpc")]
190 Reddb::Grpc(c) => c.insert(collection, payload).await,
191 #[cfg(feature = "http")]
192 Reddb::Http(c) => c.insert(collection, payload).await,
193 Reddb::Unavailable(name) => Err(ClientError::feature_disabled(name)),
194 }
195 }
196
197 pub async fn bulk_insert(&self, collection: &str, payloads: &[JsonValue]) -> Result<u64> {
198 match self {
199 #[cfg(feature = "embedded")]
200 Reddb::Embedded(c) => c.bulk_insert(collection, payloads),
201 #[cfg(feature = "grpc")]
202 Reddb::Grpc(c) => c.bulk_insert(collection, payloads).await,
203 #[cfg(feature = "http")]
204 Reddb::Http(c) => c.bulk_insert(collection, payloads).await,
205 Reddb::Unavailable(name) => Err(ClientError::feature_disabled(name)),
206 }
207 }
208
209 pub async fn delete(&self, collection: &str, id: &str) -> Result<u64> {
210 match self {
211 #[cfg(feature = "embedded")]
212 Reddb::Embedded(c) => c.delete(collection, id),
213 #[cfg(feature = "grpc")]
214 Reddb::Grpc(c) => c.delete(collection, id).await,
215 #[cfg(feature = "http")]
216 Reddb::Http(c) => c.delete(collection, id).await,
217 Reddb::Unavailable(name) => Err(ClientError::feature_disabled(name)),
218 }
219 }
220
221 pub async fn close(&self) -> Result<()> {
222 match self {
223 #[cfg(feature = "embedded")]
224 Reddb::Embedded(c) => c.close(),
225 #[cfg(feature = "grpc")]
226 Reddb::Grpc(c) => c.close().await,
227 #[cfg(feature = "http")]
228 Reddb::Http(c) => c.close().await,
229 Reddb::Unavailable(_) => Ok(()),
230 }
231 }
232
233 pub fn kv(&self) -> KvClient<'_> {
234 KvClient {
235 db: self,
236 collection: "kv_default",
237 }
238 }
239
240 pub fn config(&self) -> ConfigClient<'_> {
241 self.config_collection("red.config")
242 }
243
244 pub fn vault(&self) -> VaultClient<'_> {
245 self.vault_collection("red.vault")
246 }
247
248 pub fn config_collection<'a>(&'a self, collection: &'a str) -> ConfigClient<'a> {
249 ConfigClient {
250 db: self,
251 collection,
252 }
253 }
254
255 pub fn vault_collection<'a>(&'a self, collection: &'a str) -> VaultClient<'a> {
256 VaultClient {
257 db: self,
258 collection,
259 }
260 }
261}
262
263#[derive(Debug)]
264pub struct KvClient<'a> {
265 db: &'a Reddb,
266 collection: &'static str,
267}
268
269impl<'a> KvClient<'a> {
270 pub async fn put(&self, key: &str, value: JsonValue, tags: &[&str]) -> Result<QueryResult> {
271 let tag_clause = if tags.is_empty() {
272 String::new()
273 } else {
274 format!(
275 " TAGS [{}]",
276 tags.iter()
277 .map(|tag| kv_tag_literal(tag))
278 .collect::<Vec<_>>()
279 .join(", ")
280 )
281 };
282 self.db
283 .query(&format!(
284 "KV PUT {}.{} = {}{}",
285 kv_identifier(self.collection),
286 kv_identifier(key),
287 kv_value_literal(&value),
288 tag_clause
289 ))
290 .await
291 }
292
293 pub async fn invalidate_tags(&self, tags: &[&str]) -> Result<u64> {
294 let result = self
295 .db
296 .query(&format!(
297 "INVALIDATE TAGS [{}] FROM {}",
298 tags.iter()
299 .map(|tag| kv_tag_literal(tag))
300 .collect::<Vec<_>>()
301 .join(", "),
302 kv_identifier(self.collection)
303 ))
304 .await?;
305 Ok(result.affected)
306 }
307
308 pub async fn watch(&self, key: &str) -> Result<Vec<KvWatchEvent>> {
309 self.watch_from_lsn(key, None).await
310 }
311
312 pub async fn watch_from_lsn(
313 &self,
314 key: &str,
315 from_lsn: Option<u64>,
316 ) -> Result<Vec<KvWatchEvent>> {
317 #[cfg(not(feature = "http"))]
318 {
319 let _ = key;
320 let _ = from_lsn;
321 let _ = self.collection;
322 }
323 match self.db {
324 #[cfg(feature = "http")]
325 Reddb::Http(c) => c.watch_kv(self.collection, key, from_lsn, None).await,
326 #[cfg(feature = "embedded")]
327 Reddb::Embedded(_) => Err(ClientError::feature_disabled("kv.watch embedded")),
328 #[cfg(feature = "grpc")]
329 Reddb::Grpc(_) => Err(ClientError::feature_disabled("kv.watch grpc")),
330 Reddb::Unavailable(name) => Err(ClientError::feature_disabled(name)),
331 }
332 }
333
334 pub async fn watch_prefix(&self, prefix: &str) -> Result<Vec<KvWatchEvent>> {
335 self.watch_prefix_from_lsn(prefix, None).await
336 }
337
338 pub async fn watch_prefix_from_lsn(
339 &self,
340 prefix: &str,
341 from_lsn: Option<u64>,
342 ) -> Result<Vec<KvWatchEvent>> {
343 let key = format!("{prefix}.*");
344 self.watch_from_lsn(&key, from_lsn).await
345 }
346}
347
348#[derive(Debug)]
349pub struct ConfigClient<'a> {
350 db: &'a Reddb,
351 collection: &'a str,
352}
353
354impl<'a> ConfigClient<'a> {
355 pub async fn put(&self, key: &str, value: JsonValue, tags: &[&str]) -> Result<QueryResult> {
356 let mut sql = format!(
357 "PUT CONFIG {} {} = {}",
358 kv_identifier(self.collection),
359 kv_identifier(key),
360 kv_value_literal(&value)
361 );
362 append_tag_clause(&mut sql, tags);
363 self.db.query(&sql).await
364 }
365
366 pub async fn put_secret_ref(
367 &self,
368 key: &str,
369 vault_collection: &str,
370 vault_key: &str,
371 tags: &[&str],
372 ) -> Result<QueryResult> {
373 let mut sql = format!(
374 "PUT CONFIG {} {} = SECRET_REF(vault, {}.{})",
375 kv_identifier(self.collection),
376 kv_identifier(key),
377 kv_identifier(vault_collection),
378 kv_identifier(vault_key)
379 );
380 append_tag_clause(&mut sql, tags);
381 self.db.query(&sql).await
382 }
383
384 pub async fn get(&self, key: &str) -> Result<QueryResult> {
385 self.db
386 .query(&format!(
387 "GET CONFIG {} {}",
388 kv_identifier(self.collection),
389 kv_identifier(key)
390 ))
391 .await
392 }
393
394 pub async fn resolve(&self, key: &str) -> Result<QueryResult> {
395 self.db
396 .query(&format!(
397 "RESOLVE CONFIG {} {}",
398 kv_identifier(self.collection),
399 kv_identifier(key)
400 ))
401 .await
402 }
403}
404
405#[derive(Debug)]
406pub struct VaultClient<'a> {
407 db: &'a Reddb,
408 collection: &'a str,
409}
410
411impl<'a> VaultClient<'a> {
412 pub async fn put(&self, key: &str, value: JsonValue, tags: &[&str]) -> Result<QueryResult> {
413 let mut sql = format!(
414 "VAULT PUT {}.{} = {}",
415 kv_identifier(self.collection),
416 kv_identifier(key),
417 kv_value_literal(&value)
418 );
419 append_tag_clause(&mut sql, tags);
420 self.db.query(&sql).await
421 }
422
423 pub async fn get(&self, key: &str) -> Result<QueryResult> {
424 self.db
425 .query(&format!(
426 "VAULT GET {}.{}",
427 kv_identifier(self.collection),
428 kv_identifier(key)
429 ))
430 .await
431 }
432
433 pub async fn unseal(&self, key: &str) -> Result<QueryResult> {
434 self.db
435 .query(&format!(
436 "UNSEAL VAULT {}.{}",
437 kv_identifier(self.collection),
438 kv_identifier(key)
439 ))
440 .await
441 }
442}
443
444fn append_tag_clause(sql: &mut String, tags: &[&str]) {
445 if tags.is_empty() {
446 return;
447 }
448 sql.push_str(" TAGS [");
449 sql.push_str(
450 &tags
451 .iter()
452 .map(|tag| kv_tag_literal(tag))
453 .collect::<Vec<_>>()
454 .join(", "),
455 );
456 sql.push(']');
457}
458
459fn kv_identifier(value: &str) -> String {
460 value
461 .chars()
462 .map(|ch| {
463 if ch.is_ascii_alphanumeric() || ch == '_' || ch == '.' {
464 ch
465 } else {
466 '_'
467 }
468 })
469 .collect()
470}
471
472fn kv_value_literal(value: &JsonValue) -> String {
473 match value {
474 JsonValue::Null => "NULL".to_string(),
475 JsonValue::Bool(value) => value.to_string(),
476 JsonValue::Number(value) => value.to_string(),
477 JsonValue::String(value) => format!("'{}'", value.replace('\'', "''")),
478 JsonValue::Array(_) | JsonValue::Object(_) => {
479 format!("'{}'", value.to_json_string().replace('\'', "''"))
480 }
481 }
482}
483
484fn kv_tag_literal(value: &str) -> String {
485 format!("'{}'", value.replace('\'', "''"))
486}
487
488pub fn version() -> &'static str {
490 env!("CARGO_PKG_VERSION")
491}