use crate::clients::tracing_client::OperationContext;
use crate::collection::Collection;
use crate::options::collection_ds_options::{
CouchbaseListOptions, CouchbaseMapOptions, CouchbaseQueueOptions, CouchbaseSetOptions,
};
use crate::options::kv_options::{MutateInOptions, StoreSemantics};
use crate::subdoc::lookup_in_specs::LookupInSpec;
use crate::subdoc::mutate_in_specs::MutateInSpec;
use crate::tracing::SERVICE_VALUE_KV;
use couchbase_core::create_span;
use serde::de::DeserializeOwned;
use serde::Serialize;
use std::collections::HashMap;
use tracing::Instrument;
#[derive(Clone)]
pub struct CouchbaseList<'a> {
collection: &'a Collection,
id: String,
options: CouchbaseListOptions,
}
impl Collection {
pub fn list(
&self,
id: impl Into<String>,
options: impl Into<Option<CouchbaseListOptions>>,
) -> CouchbaseList<'_> {
CouchbaseList {
collection: self,
id: id.into(),
options: options.into().unwrap_or_default(),
}
}
pub fn map(
&self,
id: impl Into<String>,
options: impl Into<Option<CouchbaseMapOptions>>,
) -> CouchbaseMap<'_> {
CouchbaseMap {
collection: self,
id: id.into(),
options: options.into().unwrap_or_default(),
}
}
pub fn set(
&self,
id: impl Into<String>,
options: impl Into<Option<CouchbaseSetOptions>>,
) -> CouchbaseSet<'_> {
CouchbaseSet {
collection: self,
id: id.into(),
options: options.into().unwrap_or_default(),
}
}
pub fn queue(
&self,
id: impl Into<String>,
options: impl Into<Option<CouchbaseQueueOptions>>,
) -> CouchbaseQueue<'_> {
CouchbaseQueue {
collection: self,
id: id.into(),
options: options.into().unwrap_or_default(),
}
}
}
impl CouchbaseList<'_> {
pub async fn iter<T: DeserializeOwned>(&self) -> crate::error::Result<impl Iterator<Item = T>> {
let ctx = self
.collection
.tracing_client
.begin_operation(
Some(SERVICE_VALUE_KV),
self.collection.keyspace(),
create_span!("list_iter"),
)
.await;
let result = self
.collection
.get(&self.id, None)
.instrument(ctx.span().clone())
.await
.and_then(|res| {
let list_contents: Vec<T> = res.content_as()?;
Ok(list_contents.into_iter())
});
ctx.end_operation(result.as_ref().err());
result
}
pub async fn get<V: DeserializeOwned>(&self, index: usize) -> crate::error::Result<V> {
let collection = self.collection;
let id = &self.id;
let ctx = self
.collection
.tracing_client
.begin_operation(
Some(SERVICE_VALUE_KV),
self.collection.keyspace(),
create_span!("list_get"),
)
.await;
let result = collection
.lookup_in(id, &[LookupInSpec::get(format!("[{index}]"), None)], None)
.instrument(ctx.span().clone())
.await
.and_then(|lookup_res| lookup_res.content_as(0));
ctx.end_operation(result.as_ref().err());
result
}
pub async fn remove(&self, index: usize) -> crate::error::Result<()> {
let ctx = self
.collection
.tracing_client
.begin_operation(
Some(SERVICE_VALUE_KV),
self.collection.keyspace(),
create_span!("list_remove"),
)
.await;
let result = self
.collection
.mutate_in(
&self.id,
&[MutateInSpec::remove(format!("[{index}]"), None)],
None,
)
.instrument(ctx.span().clone())
.await
.map(|_| ());
ctx.end_operation(result.as_ref().err());
result
}
pub async fn append<V: Serialize>(&self, value: V) -> crate::error::Result<()> {
let ctx = self
.collection
.tracing_client
.begin_operation(
Some(SERVICE_VALUE_KV),
self.collection.keyspace(),
create_span!("list_append"),
)
.await;
let spec = match MutateInSpec::array_append("", &[value], None) {
Ok(spec) => spec,
Err(e) => {
ctx.end_operation(Some(&e));
return Err(e);
}
};
let result = self
.collection
.mutate_in(
&self.id,
&[spec],
MutateInOptions::new().store_semantics(StoreSemantics::Upsert),
)
.instrument(ctx.span().clone())
.await
.map(|_| ());
ctx.end_operation(result.as_ref().err());
result
}
pub async fn prepend<V: Serialize>(&self, value: V) -> crate::error::Result<()> {
let ctx = self
.collection
.tracing_client
.begin_operation(
Some(SERVICE_VALUE_KV),
self.collection.keyspace(),
create_span!("list_prepend"),
)
.await;
let spec = match MutateInSpec::array_prepend("", &[value], None) {
Ok(spec) => spec,
Err(e) => {
ctx.end_operation(Some(&e));
return Err(e);
}
};
let result = self
.collection
.mutate_in(
&self.id,
&[spec],
MutateInOptions::new().store_semantics(StoreSemantics::Upsert),
)
.instrument(ctx.span().clone())
.await
.map(|_| ());
ctx.end_operation(result.as_ref().err());
result
}
pub async fn position<V: PartialEq + DeserializeOwned>(
&self,
value: &V,
) -> crate::error::Result<Option<usize>> {
let ctx = self
.collection
.tracing_client
.begin_operation(
Some(SERVICE_VALUE_KV),
self.collection.keyspace(),
create_span!("list_position"),
)
.await;
let result = self
.collection
.get(&self.id, None)
.instrument(ctx.span().clone())
.await
.and_then(|get_res| {
let list_contents: Vec<V> = get_res.content_as()?;
Ok(list_contents.iter().position(|item| item == value))
});
ctx.end_operation(result.as_ref().err());
result
}
pub async fn len(&self) -> crate::error::Result<usize> {
let ctx = self
.collection
.tracing_client
.begin_operation(
Some(SERVICE_VALUE_KV),
self.collection.keyspace(),
create_span!("list_len"),
)
.await;
let result = self
.collection
.lookup_in(&self.id, &[LookupInSpec::count("", None)], None)
.instrument(ctx.span().clone())
.await
.and_then(|res| res.content_as(0));
ctx.end_operation(result.as_ref().err());
result
}
pub async fn is_empty(&self) -> crate::error::Result<bool> {
Ok(self.len().await? == 0)
}
pub async fn clear(&self) -> crate::error::Result<()> {
let ctx = self
.collection
.tracing_client
.begin_operation(
Some(SERVICE_VALUE_KV),
self.collection.keyspace(),
create_span!("list_clear"),
)
.await;
let result = self
.collection
.remove(&self.id, None)
.instrument(ctx.span().clone())
.await
.map(|_| ());
ctx.end_operation(result.as_ref().err());
result
}
}
#[derive(Clone)]
pub struct CouchbaseMap<'a> {
collection: &'a Collection,
id: String,
options: CouchbaseMapOptions,
}
impl CouchbaseMap<'_> {
pub async fn iter<T: DeserializeOwned>(
&self,
) -> crate::error::Result<impl Iterator<Item = (String, T)>> {
let ctx = self
.collection
.tracing_client
.begin_operation(
Some(SERVICE_VALUE_KV),
self.collection.keyspace(),
create_span!("map_iter"),
)
.await;
let result = self
.collection
.get(&self.id, None)
.instrument(ctx.span().clone())
.await
.and_then(|res| {
let list_contents: HashMap<String, T> = res.content_as()?;
Ok(list_contents.into_iter())
});
ctx.end_operation(result.as_ref().err());
result
}
pub async fn get<V: DeserializeOwned>(&self, id: impl Into<String>) -> crate::error::Result<V> {
let ctx = self
.collection
.tracing_client
.begin_operation(
Some(SERVICE_VALUE_KV),
self.collection.keyspace(),
create_span!("map_get"),
)
.await;
let id = id.into();
let result = self
.collection
.lookup_in(&self.id, &[LookupInSpec::get(&id, None)], None)
.instrument(ctx.span().clone())
.await
.and_then(|res| res.content_as(0));
ctx.end_operation(result.as_ref().err());
result
}
pub async fn insert<V: Serialize>(
&self,
id: impl Into<String>,
value: V,
) -> crate::error::Result<()> {
let ctx = self
.collection
.tracing_client
.begin_operation(
Some(SERVICE_VALUE_KV),
self.collection.keyspace(),
create_span!("map_insert"),
)
.await;
let id = id.into();
let spec = match MutateInSpec::upsert(&id, value, None) {
Ok(spec) => spec,
Err(e) => {
ctx.end_operation(Some(&e));
return Err(e);
}
};
let result = self
.collection
.mutate_in(
&self.id,
&[spec],
MutateInOptions::new().store_semantics(StoreSemantics::Upsert),
)
.instrument(ctx.span().clone())
.await
.map(|_| ());
ctx.end_operation(result.as_ref().err());
result
}
pub async fn remove(&self, id: impl Into<String>) -> crate::error::Result<()> {
let ctx = self
.collection
.tracing_client
.begin_operation(
Some(SERVICE_VALUE_KV),
self.collection.keyspace(),
create_span!("map_remove"),
)
.await;
let id = id.into();
let result = self
.collection
.mutate_in(&self.id, &[MutateInSpec::remove(&id, None)], None)
.instrument(ctx.span().clone())
.await
.map(|_| ());
ctx.end_operation(result.as_ref().err());
result
}
pub async fn contains_key(&self, id: impl Into<String>) -> crate::error::Result<bool> {
let ctx = self
.collection
.tracing_client
.begin_operation(
Some(SERVICE_VALUE_KV),
self.collection.keyspace(),
create_span!("map_contains_key"),
)
.await;
let id = id.into();
let result = self
.collection
.lookup_in(&self.id, &[LookupInSpec::exists(&id, None)], None)
.instrument(ctx.span().clone())
.await
.and_then(|res| res.exists(0));
ctx.end_operation(result.as_ref().err());
result
}
pub async fn len(&self) -> crate::error::Result<usize> {
let ctx = self
.collection
.tracing_client
.begin_operation(
Some(SERVICE_VALUE_KV),
self.collection.keyspace(),
create_span!("map_len"),
)
.await;
let result = self
.collection
.lookup_in(&self.id, &[LookupInSpec::count("", None)], None)
.instrument(ctx.span().clone())
.await
.and_then(|res| res.content_as(0));
ctx.end_operation(result.as_ref().err());
result
}
pub async fn is_empty(&self) -> crate::error::Result<bool> {
Ok(self.len().await? == 0)
}
pub async fn keys(&self) -> crate::error::Result<Vec<String>> {
let ctx = self
.collection
.tracing_client
.begin_operation(
Some(SERVICE_VALUE_KV),
self.collection.keyspace(),
create_span!("map_keys"),
)
.await;
let result = self
.collection
.get(&self.id, None)
.instrument(ctx.span().clone())
.await
.and_then(|res| {
let map_contents: HashMap<String, serde_json::Value> = res.content_as()?;
Ok(map_contents.keys().cloned().collect())
});
ctx.end_operation(result.as_ref().err());
result
}
pub async fn values<T: DeserializeOwned>(&self) -> crate::error::Result<Vec<T>> {
let ctx = self
.collection
.tracing_client
.begin_operation(
Some(SERVICE_VALUE_KV),
self.collection.keyspace(),
create_span!("map_values"),
)
.await;
let result = self
.collection
.get(&self.id, None)
.instrument(ctx.span().clone())
.await
.and_then(|res| {
let map_contents: HashMap<String, T> = res.content_as()?;
Ok(map_contents.into_values().collect())
});
ctx.end_operation(result.as_ref().err());
result
}
pub async fn clear(&self) -> crate::error::Result<()> {
let ctx = self
.collection
.tracing_client
.begin_operation(
Some(SERVICE_VALUE_KV),
self.collection.keyspace(),
create_span!("map_clear"),
)
.await;
let result = self
.collection
.remove(&self.id, None)
.instrument(ctx.span().clone())
.await
.map(|_| ());
ctx.end_operation(result.as_ref().err());
result
}
}
#[derive(Clone)]
pub struct CouchbaseSet<'a> {
collection: &'a Collection,
id: String,
options: CouchbaseSetOptions,
}
impl CouchbaseSet<'_> {
pub async fn iter<T: DeserializeOwned>(&self) -> crate::error::Result<impl Iterator<Item = T>> {
let ctx = self
.collection
.tracing_client
.begin_operation(
Some(SERVICE_VALUE_KV),
self.collection.keyspace(),
create_span!("set_iter"),
)
.await;
let result = self
.collection
.get(&self.id, None)
.instrument(ctx.span().clone())
.await
.and_then(|res| {
let list_contents: Vec<T> = res.content_as()?;
Ok(list_contents.into_iter())
});
ctx.end_operation(result.as_ref().err());
result
}
pub async fn insert<V: Serialize>(&self, value: V) -> crate::error::Result<bool> {
let ctx = self
.collection
.tracing_client
.begin_operation(
Some(SERVICE_VALUE_KV),
self.collection.keyspace(),
create_span!("set_insert"),
)
.await;
let spec = match MutateInSpec::array_add_unique("", value, None) {
Ok(spec) => spec,
Err(e) => {
ctx.end_operation(Some(&e));
return Err(e);
}
};
let res = self
.collection
.mutate_in(
&self.id,
&[spec],
MutateInOptions::new().store_semantics(StoreSemantics::Upsert),
)
.instrument(ctx.span().clone())
.await;
let result = match res {
Ok(_) => Ok(true),
Err(e) => match e.kind() {
crate::error::ErrorKind::PathExists => Ok(false),
_ => Err(e),
},
};
ctx.end_operation(result.as_ref().err());
result
}
pub async fn remove<T: DeserializeOwned + PartialEq>(
&self,
value: T,
) -> crate::error::Result<()> {
let ctx = self
.collection
.tracing_client
.begin_operation(
Some(SERVICE_VALUE_KV),
self.collection.keyspace(),
create_span!("set_remove"),
)
.await;
let result = self.remove_loop(&ctx, value).await;
ctx.end_operation(result.as_ref().err());
result
}
async fn remove_loop<T: DeserializeOwned + PartialEq>(
&self,
ctx: &OperationContext<'_>,
value: T,
) -> crate::error::Result<()> {
for _ in 0..16 {
let items = self
.collection
.get(&self.id, None)
.instrument(ctx.span().clone())
.await?;
let cas = items.cas();
let set_contents: Vec<T> = items.content_as()?;
let mut index_to_remove: Option<usize> = None;
for (i, item) in set_contents.iter().enumerate() {
if *item == value {
index_to_remove = Some(i);
}
}
if let Some(index) = index_to_remove {
let res = self
.collection
.mutate_in(
&self.id,
&[MutateInSpec::remove(format!("[{index}]"), None)],
MutateInOptions::new().cas(cas),
)
.instrument(ctx.span().clone())
.await;
if let Err(e) = res {
match e.kind() {
crate::error::ErrorKind::DocumentExists => continue,
crate::error::ErrorKind::CasMismatch => continue,
_ => return Err(e),
};
}
}
return Ok(());
}
Err(crate::error::Error::other_failure(
"failed to perform operation after 16 retries",
))
}
pub async fn values<T: DeserializeOwned>(&self) -> crate::error::Result<Vec<T>> {
let ctx = self
.collection
.tracing_client
.begin_operation(
Some(SERVICE_VALUE_KV),
self.collection.keyspace(),
create_span!("set_values"),
)
.await;
let result = self
.collection
.get(&self.id, None)
.instrument(ctx.span().clone())
.await
.and_then(|res| {
let set_contents: Vec<T> = res.content_as()?;
Ok(set_contents)
});
ctx.end_operation(result.as_ref().err());
result
}
pub async fn contains<T: PartialEq + DeserializeOwned>(
&self,
value: &T,
) -> crate::error::Result<bool> {
let ctx = self
.collection
.tracing_client
.begin_operation(
Some(SERVICE_VALUE_KV),
self.collection.keyspace(),
create_span!("set_contains"),
)
.await;
let result = self
.collection
.get(&self.id, None)
.instrument(ctx.span().clone())
.await
.and_then(|res| {
let set_contents: Vec<T> = res.content_as()?;
Ok(set_contents.iter().any(|item| item == value))
});
ctx.end_operation(result.as_ref().err());
result
}
pub async fn len(&self) -> crate::error::Result<usize> {
let ctx = self
.collection
.tracing_client
.begin_operation(
Some(SERVICE_VALUE_KV),
self.collection.keyspace(),
create_span!("set_len"),
)
.await;
let result = self
.collection
.lookup_in(&self.id, &[LookupInSpec::count("", None)], None)
.instrument(ctx.span().clone())
.await
.and_then(|res| res.content_as(0));
ctx.end_operation(result.as_ref().err());
result
}
pub async fn is_empty(&self) -> crate::error::Result<bool> {
Ok(self.len().await? == 0)
}
pub async fn clear(&self) -> crate::error::Result<()> {
let ctx = self
.collection
.tracing_client
.begin_operation(
Some(SERVICE_VALUE_KV),
self.collection.keyspace(),
create_span!("set_clear"),
)
.await;
let result = self
.collection
.remove(&self.id, None)
.instrument(ctx.span().clone())
.await
.map(|_| ());
ctx.end_operation(result.as_ref().err());
result
}
}
#[derive(Clone)]
pub struct CouchbaseQueue<'a> {
collection: &'a Collection,
id: String,
options: CouchbaseQueueOptions,
}
impl CouchbaseQueue<'_> {
pub async fn iter<T: DeserializeOwned>(&self) -> crate::error::Result<impl Iterator<Item = T>> {
let ctx = self
.collection
.tracing_client
.begin_operation(
Some(SERVICE_VALUE_KV),
self.collection.keyspace(),
create_span!("queue_iter"),
)
.await;
let result = self
.collection
.get(&self.id, None)
.instrument(ctx.span().clone())
.await
.and_then(|res| {
let mut list_contents: Vec<T> = res.content_as()?;
list_contents.reverse();
Ok(list_contents.into_iter())
});
ctx.end_operation(result.as_ref().err());
result
}
pub async fn push<V: Serialize>(&self, value: V) -> crate::error::Result<()> {
let ctx = self
.collection
.tracing_client
.begin_operation(
Some(SERVICE_VALUE_KV),
self.collection.keyspace(),
create_span!("queue_push"),
)
.await;
let spec = match MutateInSpec::array_prepend("", &[value], None) {
Ok(spec) => spec,
Err(e) => {
ctx.end_operation(Some(&e));
return Err(e);
}
};
let result = self
.collection
.mutate_in(
&self.id,
&[spec],
MutateInOptions::new().store_semantics(StoreSemantics::Upsert),
)
.instrument(ctx.span().clone())
.await
.map(|_| ());
ctx.end_operation(result.as_ref().err());
result
}
pub async fn pop<T: DeserializeOwned>(&self) -> crate::error::Result<T> {
let ctx = self
.collection
.tracing_client
.begin_operation(
Some(SERVICE_VALUE_KV),
self.collection.keyspace(),
create_span!("queue_pop"),
)
.await;
let result = self.pop_loop(&ctx).await;
ctx.end_operation(result.as_ref().err());
result
}
async fn pop_loop<T: DeserializeOwned>(
&self,
ctx: &OperationContext<'_>,
) -> crate::error::Result<T> {
for _ in 0..16 {
let res = self
.collection
.lookup_in(&self.id, &[LookupInSpec::get("[-1]", None)], None)
.instrument(ctx.span().clone())
.await?;
let cas = res.cas();
let value: T = res.content_as(0)?;
let res = self
.collection
.mutate_in(
&self.id,
&[MutateInSpec::remove("[-1]", None)],
MutateInOptions::new().cas(cas),
)
.instrument(ctx.span().clone())
.await;
if let Err(e) = res {
match e.kind() {
crate::error::ErrorKind::DocumentExists => continue,
crate::error::ErrorKind::CasMismatch => continue,
_ => return Err(e),
};
}
return Ok(value);
}
Err(crate::error::Error::other_failure(
"failed to perform operation after 16 retries",
))
}
pub async fn len(&self) -> crate::error::Result<usize> {
let ctx = self
.collection
.tracing_client
.begin_operation(
Some(SERVICE_VALUE_KV),
self.collection.keyspace(),
create_span!("queue_len"),
)
.await;
let result = self
.collection
.lookup_in(&self.id, &[LookupInSpec::count("", None)], None)
.instrument(ctx.span().clone())
.await
.and_then(|res| res.content_as(0));
ctx.end_operation(result.as_ref().err());
result
}
pub async fn is_empty(&self) -> crate::error::Result<bool> {
Ok(self.len().await? == 0)
}
pub async fn clear(&self) -> crate::error::Result<()> {
let ctx = self
.collection
.tracing_client
.begin_operation(
Some(SERVICE_VALUE_KV),
self.collection.keyspace(),
create_span!("queue_clear"),
)
.await;
let result = self
.collection
.remove(&self.id, None)
.instrument(ctx.span().clone())
.await
.map(|_| ());
ctx.end_operation(result.as_ref().err());
result
}
}