1#![deny(unsafe_code)]
49#![warn(missing_debug_implementations)]
50
51pub mod connect;
52pub mod connector;
53pub mod error;
54pub mod params;
55pub mod topology;
56pub mod types;
57
58#[cfg(feature = "embedded")]
59pub mod embedded;
60
61#[cfg(feature = "grpc")]
62pub mod grpc;
63
64#[cfg(feature = "grpc")]
65pub mod router;
66
67#[cfg(feature = "redwire")]
68pub mod redwire;
69
70#[cfg(feature = "http")]
71pub mod http;
72
73pub use error::{ClientError, ErrorCode, Result};
74pub use params::{IntoParams, IntoValue, Value};
75pub use types::{
76 BulkInsertResult, DeleteResult, DocumentItem, ExistsResult, InsertResult, JsonValue, KvItem,
77 KvWatchEvent, ListOptions, ListResult, QueryResult, Row, ValueOut,
78};
79
80pub use connector::{
85 repl, BulkCreateStatus, CreatedEntity, HealthStatus, OperationStatus, QueryResponse,
86 RedDBClient,
87};
88
89use connect::Target;
90
91#[derive(Debug)]
93pub enum Reddb {
94 #[cfg(feature = "embedded")]
95 Embedded(embedded::EmbeddedClient),
96 #[cfg(feature = "grpc")]
97 Grpc(grpc::GrpcClient),
98 #[cfg(feature = "http")]
99 Http(http::HttpClient),
100 Unavailable(&'static str),
106}
107
108impl Reddb {
109 pub async fn connect(uri: &str) -> Result<Self> {
111 let target = connect::parse(uri)?;
112 match target {
113 Target::Memory => {
114 #[cfg(feature = "embedded")]
115 {
116 embedded::EmbeddedClient::in_memory().map(Reddb::Embedded)
117 }
118 #[cfg(not(feature = "embedded"))]
119 {
120 Err(ClientError::feature_disabled("embedded"))
121 }
122 }
123 Target::File { path } => {
124 #[cfg(feature = "embedded")]
125 {
126 embedded::EmbeddedClient::open(path).map(Reddb::Embedded)
127 }
128 #[cfg(not(feature = "embedded"))]
129 {
130 let _ = path;
131 Err(ClientError::feature_disabled("embedded"))
132 }
133 }
134 Target::Grpc { endpoint } => {
135 #[cfg(feature = "grpc")]
136 {
137 grpc::GrpcClient::connect(endpoint).await.map(Reddb::Grpc)
138 }
139 #[cfg(not(feature = "grpc"))]
140 {
141 let _ = endpoint;
142 Err(ClientError::feature_disabled("grpc"))
143 }
144 }
145 Target::GrpcCluster {
146 primary,
147 replicas,
148 force_primary,
149 } => {
150 #[cfg(feature = "grpc")]
151 {
152 grpc::GrpcClient::connect_cluster(primary, replicas, force_primary)
153 .await
154 .map(Reddb::Grpc)
155 }
156 #[cfg(not(feature = "grpc"))]
157 {
158 let _ = (primary, replicas, force_primary);
159 Err(ClientError::feature_disabled("grpc"))
160 }
161 }
162 Target::Http { base_url } => {
163 #[cfg(feature = "http")]
164 {
165 http::HttpClient::connect(http::HttpOptions::new(base_url))
166 .await
167 .map(Reddb::Http)
168 }
169 #[cfg(not(feature = "http"))]
170 {
171 let _ = base_url;
172 Err(ClientError::feature_disabled("http"))
173 }
174 }
175 }
176 }
177
178 pub async fn query(&self, sql: &str) -> Result<QueryResult> {
179 match self {
180 #[cfg(feature = "embedded")]
181 Reddb::Embedded(c) => c.query(sql),
182 #[cfg(feature = "grpc")]
183 Reddb::Grpc(c) => c.query(sql).await,
184 #[cfg(feature = "http")]
185 Reddb::Http(c) => c.query(sql).await,
186 Reddb::Unavailable(name) => Err(ClientError::feature_disabled(name)),
187 }
188 }
189
190 pub async fn query_with<P: IntoParams>(&self, sql: &str, params: P) -> Result<QueryResult> {
211 let values = params.into_params();
212 match self {
213 #[cfg(feature = "embedded")]
214 Reddb::Embedded(c) => c.query_with(sql, values),
215 #[cfg(feature = "grpc")]
216 Reddb::Grpc(c) => c.query_with(sql, &values).await,
217 #[cfg(feature = "http")]
218 Reddb::Http(c) => c.query_with(sql, &values).await,
219 Reddb::Unavailable(name) => Err(ClientError::feature_disabled(name)),
220 }
221 }
222
223 pub async fn execute_with<P: IntoParams>(&self, sql: &str, params: P) -> Result<QueryResult> {
227 self.query_with(sql, params).await
228 }
229
230 pub async fn insert(&self, collection: &str, payload: &JsonValue) -> Result<InsertResult> {
231 match self {
232 #[cfg(feature = "embedded")]
233 Reddb::Embedded(c) => c.insert(collection, payload),
234 #[cfg(feature = "grpc")]
235 Reddb::Grpc(c) => c.insert(collection, payload).await,
236 #[cfg(feature = "http")]
237 Reddb::Http(c) => c.insert(collection, payload).await,
238 Reddb::Unavailable(name) => Err(ClientError::feature_disabled(name)),
239 }
240 }
241
242 pub async fn bulk_insert(
243 &self,
244 collection: &str,
245 payloads: &[JsonValue],
246 ) -> Result<BulkInsertResult> {
247 match self {
248 #[cfg(feature = "embedded")]
249 Reddb::Embedded(c) => c.bulk_insert(collection, payloads),
250 #[cfg(feature = "grpc")]
251 Reddb::Grpc(c) => c.bulk_insert(collection, payloads).await,
252 #[cfg(feature = "http")]
253 Reddb::Http(c) => c.bulk_insert(collection, payloads).await,
254 Reddb::Unavailable(name) => Err(ClientError::feature_disabled(name)),
255 }
256 }
257
258 pub async fn delete(&self, collection: &str, rid: &str) -> Result<u64> {
259 match self {
260 #[cfg(feature = "embedded")]
261 Reddb::Embedded(c) => c.delete(collection, rid),
262 #[cfg(feature = "grpc")]
263 Reddb::Grpc(c) => c.delete(collection, rid).await,
264 #[cfg(feature = "http")]
265 Reddb::Http(c) => c.delete(collection, rid).await,
266 Reddb::Unavailable(name) => Err(ClientError::feature_disabled(name)),
267 }
268 }
269
270 pub fn documents(&self) -> DocumentClient<'_> {
271 DocumentClient { db: self }
272 }
273
274 pub fn queue(&self) -> QueueClient<'_> {
275 QueueClient { db: self }
276 }
277
278 pub fn kv_collection<'a>(&'a self, collection: &'a str) -> KvClient<'a> {
279 KvClient {
280 db: self,
281 collection,
282 }
283 }
284
285 pub async fn begin(&self) -> Result<QueryResult> {
286 self.query("BEGIN").await
287 }
288
289 pub async fn commit(&self) -> Result<QueryResult> {
290 self.query("COMMIT").await
291 }
292
293 pub async fn rollback(&self) -> Result<QueryResult> {
294 self.query("ROLLBACK").await
295 }
296
297 pub async fn close(&self) -> Result<()> {
298 match self {
299 #[cfg(feature = "embedded")]
300 Reddb::Embedded(c) => c.close(),
301 #[cfg(feature = "grpc")]
302 Reddb::Grpc(c) => c.close().await,
303 #[cfg(feature = "http")]
304 Reddb::Http(c) => c.close().await,
305 Reddb::Unavailable(_) => Ok(()),
306 }
307 }
308
309 pub fn kv(&self) -> KvClient<'_> {
310 KvClient {
311 db: self,
312 collection: "kv_default",
313 }
314 }
315
316 pub fn config(&self) -> ConfigClient<'_> {
317 self.config_collection("red.config")
318 }
319
320 pub fn vault(&self) -> VaultClient<'_> {
321 self.vault_collection("red.vault")
322 }
323
324 pub fn config_collection<'a>(&'a self, collection: &'a str) -> ConfigClient<'a> {
325 ConfigClient {
326 db: self,
327 collection,
328 }
329 }
330
331 pub fn vault_collection<'a>(&'a self, collection: &'a str) -> VaultClient<'a> {
332 VaultClient {
333 db: self,
334 collection,
335 }
336 }
337}
338
339#[derive(Debug)]
340pub struct DocumentClient<'a> {
341 db: &'a Reddb,
342}
343
344impl<'a> DocumentClient<'a> {
345 pub async fn insert(&self, collection: &str, body: &JsonValue) -> Result<DocumentItem> {
346 ensure_json_object("document body", body)?;
347 let collection = sql_identifier(collection)?;
348 self.db
349 .query(&format!("CREATE DOCUMENT IF NOT EXISTS {collection}"))
350 .await?;
351 let result = self
352 .db
353 .query(&format!(
354 "INSERT INTO {collection} DOCUMENT (body) VALUES ({}) RETURNING *",
355 json_text_literal(body)
356 ))
357 .await?;
358 document_item_from_first_row(result)
359 }
360
361 pub async fn get(&self, collection: &str, rid: &str) -> Result<DocumentItem> {
362 let collection = sql_identifier(collection)?;
363 let result = self
364 .db
365 .query(&format!(
366 "SELECT * FROM {collection} WHERE rid = {} LIMIT 1",
367 sql_string_literal(rid)
368 ))
369 .await?;
370 document_item_from_first_row(result)
371 }
372
373 pub async fn list(&self, collection: &str, options: ListOptions<'_>) -> Result<ListResult> {
374 let collection = sql_identifier(collection)?;
375 let result = self
376 .db
377 .query(&select_sql(&collection, "*", &options))
378 .await?;
379 Ok(ListResult {
380 affected: result.affected,
381 items: result.rows,
382 })
383 }
384
385 pub async fn filter(&self, collection: &str, filter: &str) -> Result<ListResult> {
386 self.list(collection, ListOptions::new().filter(filter))
387 .await
388 }
389
390 pub async fn patch(
391 &self,
392 collection: &str,
393 rid: &str,
394 patch: &JsonValue,
395 ) -> Result<DocumentItem> {
396 let entries = patch.as_object().ok_or_else(|| {
397 ClientError::new(
398 ErrorCode::InvalidArgument,
399 "document patch must be a JSON object",
400 )
401 })?;
402 if entries.is_empty() {
403 return Err(ClientError::new(
404 ErrorCode::InvalidArgument,
405 "document patch must contain at least one field",
406 ));
407 }
408 let collection = sql_identifier(collection)?;
409 let assignments = entries
410 .iter()
411 .map(|(field, value)| {
412 Ok(format!(
413 "{} = {}",
414 sql_identifier(field)?,
415 json_value_literal(value)
416 ))
417 })
418 .collect::<Result<Vec<_>>>()?;
419 let result = self
420 .db
421 .query(&format!(
422 "UPDATE {collection} DOCUMENTS SET {} WHERE rid = {} RETURNING *",
423 assignments.join(", "),
424 sql_string_literal(rid)
425 ))
426 .await?;
427 document_item_from_first_row(result)
428 }
429
430 pub async fn delete(&self, collection: &str, rid: &str) -> Result<DeleteResult> {
431 let collection = sql_identifier(collection)?;
432 let result = self
433 .db
434 .query(&format!(
435 "DELETE FROM {collection} WHERE rid = {}",
436 sql_string_literal(rid)
437 ))
438 .await?;
439 Ok(DeleteResult {
440 affected: result.affected,
441 deleted: result.affected > 0,
442 })
443 }
444}
445
446#[derive(Debug)]
447pub struct QueueClient<'a> {
448 db: &'a Reddb,
449}
450
451impl<'a> QueueClient<'a> {
452 pub async fn create(&self, queue: &str) -> Result<QueryResult> {
453 self.db
454 .query(&format!(
455 "CREATE QUEUE IF NOT EXISTS {}",
456 sql_identifier(queue)?
457 ))
458 .await
459 }
460
461 pub async fn push(&self, queue: &str, value: &JsonValue) -> Result<QueryResult> {
462 self.db
463 .query(&format!(
464 "QUEUE PUSH {} {}",
465 sql_identifier(queue)?,
466 json_value_literal(value)
467 ))
468 .await
469 }
470
471 pub async fn peek(&self, queue: &str, limit: Option<u64>) -> Result<ListResult> {
472 let mut sql = format!("QUEUE PEEK {}", sql_identifier(queue)?);
473 if let Some(limit) = limit {
474 sql.push_str(&format!(" {limit}"));
475 }
476 let result = self.db.query(&sql).await?;
477 Ok(ListResult {
478 affected: result.affected,
479 items: result.rows,
480 })
481 }
482
483 pub async fn pop(&self, queue: &str) -> Result<ListResult> {
484 let result = self
485 .db
486 .query(&format!("QUEUE POP {}", sql_identifier(queue)?))
487 .await?;
488 Ok(ListResult {
489 affected: result.affected,
490 items: result.rows,
491 })
492 }
493
494 pub async fn len(&self, queue: &str) -> Result<u64> {
495 let result = self
496 .db
497 .query(&format!("QUEUE LEN {}", sql_identifier(queue)?))
498 .await?;
499 row_value(&result.rows, "len")
500 .and_then(value_as_u64)
501 .ok_or_else(|| ClientError::new(ErrorCode::InvalidResponse, "QUEUE LEN missing len"))
502 }
503
504 pub async fn purge(&self, queue: &str) -> Result<DeleteResult> {
505 let result = self
506 .db
507 .query(&format!("QUEUE PURGE {}", sql_identifier(queue)?))
508 .await?;
509 Ok(DeleteResult {
510 affected: result.affected,
511 deleted: result.affected > 0,
512 })
513 }
514}
515
516#[derive(Debug)]
517pub struct KvClient<'a> {
518 db: &'a Reddb,
519 collection: &'a str,
520}
521
522impl<'a> KvClient<'a> {
523 pub async fn set(&self, key: &str, value: JsonValue) -> Result<QueryResult> {
524 self.put(key, value, &[]).await
525 }
526
527 pub async fn put(&self, key: &str, value: JsonValue, tags: &[&str]) -> Result<QueryResult> {
528 let tag_clause = if tags.is_empty() {
529 String::new()
530 } else {
531 format!(
532 " TAGS [{}]",
533 tags.iter()
534 .map(|tag| kv_tag_literal(tag))
535 .collect::<Vec<_>>()
536 .join(", ")
537 )
538 };
539 self.db
540 .query(&format!(
541 "KV PUT {}.{} = {}{}",
542 kv_collection_identifier(self.collection)?,
543 kv_path_segment(key),
544 json_value_literal(&value),
545 tag_clause
546 ))
547 .await
548 }
549
550 pub async fn get(&self, key: &str) -> Result<Option<KvItem>> {
551 let result = self
552 .db
553 .query(&format!(
554 "KV GET {}.{}",
555 kv_collection_identifier(self.collection)?,
556 kv_path_segment(key)
557 ))
558 .await?;
559 Ok(kv_item_from_rows(&result.rows))
560 }
561
562 pub async fn exists(&self, key: &str) -> Result<ExistsResult> {
563 Ok(ExistsResult {
564 exists: self.get(key).await?.is_some(),
565 })
566 }
567
568 pub async fn delete(&self, key: &str) -> Result<DeleteResult> {
569 let result = self
570 .db
571 .query(&format!(
572 "KV DELETE {}.{}",
573 kv_collection_identifier(self.collection)?,
574 kv_path_segment(key)
575 ))
576 .await?;
577 Ok(DeleteResult {
578 affected: result.affected,
579 deleted: result.affected > 0,
580 })
581 }
582
583 pub async fn list(&self, options: ListOptions<'_>) -> Result<ListResult> {
584 let collection = sql_identifier(self.collection)?;
585 let result = self
586 .db
587 .query(&select_sql(&collection, "key, value", &options))
588 .await?;
589 Ok(ListResult {
590 affected: result.affected,
591 items: result.rows,
592 })
593 }
594
595 pub async fn invalidate_tags(&self, tags: &[&str]) -> Result<u64> {
596 let result = self
597 .db
598 .query(&format!(
599 "INVALIDATE TAGS [{}] FROM {}",
600 tags.iter()
601 .map(|tag| kv_tag_literal(tag))
602 .collect::<Vec<_>>()
603 .join(", "),
604 kv_collection_identifier(self.collection)?
605 ))
606 .await?;
607 Ok(result.affected)
608 }
609
610 pub async fn watch(&self, key: &str) -> Result<Vec<KvWatchEvent>> {
611 self.watch_from_lsn(key, None).await
612 }
613
614 pub async fn watch_from_lsn(
615 &self,
616 key: &str,
617 from_lsn: Option<u64>,
618 ) -> Result<Vec<KvWatchEvent>> {
619 #[cfg(not(feature = "http"))]
620 {
621 let _ = key;
622 let _ = from_lsn;
623 let _ = self.collection;
624 }
625 match self.db {
626 #[cfg(feature = "http")]
627 Reddb::Http(c) => c.watch_kv(self.collection, key, from_lsn, None).await,
628 #[cfg(feature = "embedded")]
629 Reddb::Embedded(_) => Err(ClientError::feature_disabled("kv.watch embedded")),
630 #[cfg(feature = "grpc")]
631 Reddb::Grpc(_) => Err(ClientError::feature_disabled("kv.watch grpc")),
632 Reddb::Unavailable(name) => Err(ClientError::feature_disabled(name)),
633 }
634 }
635
636 pub async fn watch_prefix(&self, prefix: &str) -> Result<Vec<KvWatchEvent>> {
637 self.watch_prefix_from_lsn(prefix, None).await
638 }
639
640 pub async fn watch_prefix_from_lsn(
641 &self,
642 prefix: &str,
643 from_lsn: Option<u64>,
644 ) -> Result<Vec<KvWatchEvent>> {
645 let key = format!("{prefix}.*");
646 self.watch_from_lsn(&key, from_lsn).await
647 }
648}
649
650#[derive(Debug)]
651pub struct ConfigClient<'a> {
652 db: &'a Reddb,
653 collection: &'a str,
654}
655
656impl<'a> ConfigClient<'a> {
657 pub async fn put(&self, key: &str, value: JsonValue, tags: &[&str]) -> Result<QueryResult> {
658 let mut sql = format!(
659 "PUT CONFIG {} {} = {}",
660 kv_collection_identifier(self.collection)?,
661 kv_path_segment(key),
662 json_value_literal(&value)
663 );
664 append_tag_clause(&mut sql, tags);
665 self.db.query(&sql).await
666 }
667
668 pub async fn put_secret_ref(
669 &self,
670 key: &str,
671 vault_collection: &str,
672 vault_key: &str,
673 tags: &[&str],
674 ) -> Result<QueryResult> {
675 let mut sql = format!(
676 "PUT CONFIG {} {} = SECRET_REF(vault, {}.{})",
677 kv_collection_identifier(self.collection)?,
678 kv_path_segment(key),
679 kv_collection_identifier(vault_collection)?,
680 kv_path_segment(vault_key)
681 );
682 append_tag_clause(&mut sql, tags);
683 self.db.query(&sql).await
684 }
685
686 pub async fn get(&self, key: &str) -> Result<QueryResult> {
687 self.db
688 .query(&format!(
689 "GET CONFIG {} {}",
690 kv_collection_identifier(self.collection)?,
691 kv_path_segment(key)
692 ))
693 .await
694 }
695
696 pub async fn resolve(&self, key: &str) -> Result<QueryResult> {
697 self.db
698 .query(&format!(
699 "RESOLVE CONFIG {} {}",
700 kv_collection_identifier(self.collection)?,
701 kv_path_segment(key)
702 ))
703 .await
704 }
705}
706
707#[derive(Debug)]
708pub struct VaultClient<'a> {
709 db: &'a Reddb,
710 collection: &'a str,
711}
712
713impl<'a> VaultClient<'a> {
714 pub async fn put(&self, key: &str, value: JsonValue, tags: &[&str]) -> Result<QueryResult> {
715 let mut sql = format!(
716 "VAULT PUT {}.{} = {}",
717 kv_collection_identifier(self.collection)?,
718 kv_path_segment(key),
719 json_value_literal(&value)
720 );
721 append_tag_clause(&mut sql, tags);
722 self.db.query(&sql).await
723 }
724
725 pub async fn get(&self, key: &str) -> Result<QueryResult> {
726 self.db
727 .query(&format!(
728 "VAULT GET {}.{}",
729 kv_collection_identifier(self.collection)?,
730 kv_path_segment(key)
731 ))
732 .await
733 }
734
735 pub async fn unseal(&self, key: &str) -> Result<QueryResult> {
736 self.db
737 .query(&format!(
738 "UNSEAL VAULT {}.{}",
739 kv_collection_identifier(self.collection)?,
740 kv_path_segment(key)
741 ))
742 .await
743 }
744}
745
746fn append_tag_clause(sql: &mut String, tags: &[&str]) {
747 if tags.is_empty() {
748 return;
749 }
750 sql.push_str(" TAGS [");
751 sql.push_str(
752 &tags
753 .iter()
754 .map(|tag| kv_tag_literal(tag))
755 .collect::<Vec<_>>()
756 .join(", "),
757 );
758 sql.push(']');
759}
760
761fn sql_identifier(value: &str) -> Result<String> {
762 let mut chars = value.chars();
763 let Some(first) = chars.next() else {
764 return Err(ClientError::new(
765 ErrorCode::InvalidArgument,
766 "identifier must not be empty",
767 ));
768 };
769 if !(first.is_ascii_alphabetic() || first == '_') {
770 return Err(ClientError::new(
771 ErrorCode::InvalidArgument,
772 format!("invalid identifier `{value}`"),
773 ));
774 }
775 if chars.all(|ch| ch.is_ascii_alphanumeric() || ch == '_') {
776 Ok(value.to_string())
777 } else {
778 Err(ClientError::new(
779 ErrorCode::InvalidArgument,
780 format!("invalid identifier `{value}`"),
781 ))
782 }
783}
784
785fn kv_collection_identifier(value: &str) -> Result<String> {
786 let mut parts = Vec::new();
787 for part in value.split('.') {
788 parts.push(sql_identifier(part)?);
789 }
790 Ok(parts.join("."))
791}
792
793fn kv_path_segment(value: &str) -> String {
794 if is_plain_path_segment(value) {
795 value.to_string()
796 } else {
797 sql_string_literal(value)
798 }
799}
800
801fn is_plain_path_segment(value: &str) -> bool {
802 let mut chars = value.chars();
803 let Some(first) = chars.next() else {
804 return false;
805 };
806 (first.is_ascii_alphabetic() || first == '_')
807 && chars.all(|ch| ch.is_ascii_alphanumeric() || ch == '_' || ch == '-')
808}
809
810fn json_value_literal(value: &JsonValue) -> String {
811 match value {
812 JsonValue::Null => "NULL".to_string(),
813 JsonValue::Bool(value) => value.to_string(),
814 JsonValue::Number(value) => value.to_string(),
815 JsonValue::String(value) => sql_string_literal(value),
816 JsonValue::Array(_) | JsonValue::Object(_) => value.to_json_string(),
817 }
818}
819
820fn json_text_literal(value: &JsonValue) -> String {
821 sql_string_literal(&value.to_json_string())
822}
823
824fn kv_tag_literal(value: &str) -> String {
825 sql_string_literal(value)
826}
827
828fn sql_string_literal(value: &str) -> String {
829 format!("'{}'", value.replace('\'', "''"))
830}
831
832fn ensure_json_object(name: &str, value: &JsonValue) -> Result<()> {
833 if value.as_object().is_some() {
834 Ok(())
835 } else {
836 Err(ClientError::new(
837 ErrorCode::InvalidArgument,
838 format!("{name} must be a JSON object"),
839 ))
840 }
841}
842
843fn select_sql(collection: &str, columns: &str, options: &ListOptions<'_>) -> String {
844 let mut sql = format!("SELECT {columns} FROM {collection}");
845 if let Some(filter) = options.filter {
846 sql.push_str(" WHERE ");
847 sql.push_str(filter);
848 }
849 if let Some(order_by) = options.order_by {
850 sql.push_str(" ORDER BY ");
851 sql.push_str(order_by);
852 }
853 if let Some(limit) = options.limit {
854 sql.push_str(&format!(" LIMIT {limit}"));
855 }
856 sql
857}
858
859fn document_item_from_first_row(result: QueryResult) -> Result<DocumentItem> {
860 let Some(row) = result.rows.into_iter().next() else {
861 return Err(ClientError::new(ErrorCode::NotFound, "document not found"));
862 };
863 let rid = row
864 .iter()
865 .find(|(column, _)| column == "rid")
866 .and_then(|(_, value)| value_as_string(value))
867 .ok_or_else(|| ClientError::new(ErrorCode::InvalidResponse, "document row missing rid"))?;
868 Ok(DocumentItem { rid, fields: row })
869}
870
871fn kv_item_from_rows(rows: &[Row]) -> Option<KvItem> {
872 let row = rows.first()?;
873 let value = row
874 .iter()
875 .find(|(column, _)| column == "value")
876 .map(|(_, value)| value.clone())?;
877 let rid = row
878 .iter()
879 .find(|(column, _)| column == "rid")
880 .map(|(_, value)| value);
881 if matches!(rid, Some(ValueOut::Null)) && value == ValueOut::Null {
882 return None;
883 }
884 let collection = row
885 .iter()
886 .find(|(column, _)| column == "collection")
887 .and_then(|(_, value)| value_as_string(value))
888 .unwrap_or_default();
889 let key = row
890 .iter()
891 .find(|(column, _)| column == "key")
892 .and_then(|(_, value)| value_as_string(value))
893 .unwrap_or_default();
894 Some(KvItem {
895 collection,
896 key,
897 value,
898 })
899}
900
901fn row_value<'a>(rows: &'a [Row], column: &str) -> Option<&'a ValueOut> {
902 rows.first()?
903 .iter()
904 .find(|(name, _)| name == column)
905 .map(|(_, value)| value)
906}
907
908fn value_as_string(value: &ValueOut) -> Option<String> {
909 match value {
910 ValueOut::String(value) => Some(value.clone()),
911 ValueOut::Integer(value) => Some(value.to_string()),
912 _ => None,
913 }
914}
915
916fn value_as_u64(value: &ValueOut) -> Option<u64> {
917 match value {
918 ValueOut::Integer(value) => (*value).try_into().ok(),
919 ValueOut::Float(value) if *value >= 0.0 => Some(*value as u64),
920 ValueOut::String(value) => value.parse().ok(),
921 _ => None,
922 }
923}
924
925pub fn version() -> &'static str {
927 env!("CARGO_PKG_VERSION")
928}