use crate::key::KeySet;
use crate::model::DirectedReadOptions;
use crate::model::read_request::{LockHint, OrderBy};
use crate::model::request_options::Priority;
use google_cloud_gax::backoff_policy::BackoffPolicyArg;
use google_cloud_gax::options::RequestOptions as GaxRequestOptions;
use google_cloud_gax::retry_policy::RetryPolicyArg;
use std::time::Duration;
#[derive(Clone, Debug, PartialEq)]
pub struct ReadRequestBuilder {
table: String,
columns: Vec<String>,
}
impl ReadRequestBuilder {
pub fn with_keys(self, keys: impl Into<KeySet>) -> ConfiguredReadRequestBuilder {
ConfiguredReadRequestBuilder {
table: self.table,
index: None,
keys: keys.into(),
columns: self.columns,
limit: None,
request_options: None,
directed_read_options: None,
order_by: None,
lock_hint: None,
gax_options: GaxRequestOptions::default(),
}
}
pub fn with_index(
self,
index: impl Into<String>,
keys: impl Into<KeySet>,
) -> ConfiguredReadRequestBuilder {
ConfiguredReadRequestBuilder {
table: self.table,
index: Some(index.into()),
keys: keys.into(),
columns: self.columns,
limit: None,
request_options: None,
directed_read_options: None,
order_by: None,
lock_hint: None,
gax_options: GaxRequestOptions::default(),
}
}
}
#[derive(Clone, Debug)]
pub struct ConfiguredReadRequestBuilder {
table: String,
index: Option<String>,
keys: KeySet,
columns: Vec<String>,
limit: Option<i64>,
request_options: Option<crate::model::RequestOptions>,
directed_read_options: Option<DirectedReadOptions>,
order_by: Option<OrderBy>,
lock_hint: Option<LockHint>,
gax_options: GaxRequestOptions,
}
impl ConfiguredReadRequestBuilder {
pub fn set_limit(mut self, limit: i64) -> Self {
self.limit = Some(limit);
self
}
pub fn set_request_tag(mut self, tag: impl Into<String>) -> Self {
self.request_options
.get_or_insert_with(crate::model::RequestOptions::default)
.request_tag = tag.into();
self
}
pub fn set_priority(mut self, priority: Priority) -> Self {
self.request_options
.get_or_insert_with(crate::model::RequestOptions::default)
.priority = priority;
self
}
pub fn set_directed_read_options(mut self, options: DirectedReadOptions) -> Self {
self.directed_read_options = Some(options);
self
}
pub fn set_order_by(mut self, order_by: OrderBy) -> Self {
self.order_by = Some(order_by);
self
}
pub fn set_lock_hint(mut self, lock_hint: LockHint) -> Self {
self.lock_hint = Some(lock_hint);
self
}
pub fn with_attempt_timeout(mut self, timeout: Duration) -> Self {
self.gax_options.set_attempt_timeout(timeout);
self
}
pub fn with_retry_policy(mut self, policy: impl Into<RetryPolicyArg>) -> Self {
self.gax_options.set_retry_policy(policy);
self
}
pub fn with_backoff_policy(mut self, policy: impl Into<BackoffPolicyArg>) -> Self {
self.gax_options.set_backoff_policy(policy);
self
}
pub fn build(self) -> ReadRequest {
ReadRequest {
table: self.table,
index: self.index,
keys: self.keys,
columns: self.columns,
limit: self.limit,
request_options: self.request_options,
directed_read_options: self.directed_read_options,
order_by: self.order_by,
lock_hint: self.lock_hint,
gax_options: self.gax_options,
}
}
}
#[derive(Clone, Debug)]
pub struct ReadRequest {
pub(crate) table: String,
pub(crate) index: Option<String>,
pub(crate) keys: KeySet,
pub(crate) columns: Vec<String>,
pub(crate) limit: Option<i64>,
pub(crate) request_options: Option<crate::model::RequestOptions>,
pub(crate) directed_read_options: Option<DirectedReadOptions>,
pub(crate) order_by: Option<OrderBy>,
pub(crate) lock_hint: Option<LockHint>,
pub(crate) gax_options: GaxRequestOptions,
}
impl ReadRequest {
pub fn builder(
table: impl Into<String>,
columns: impl IntoIterator<Item = impl Into<String>>,
) -> ReadRequestBuilder {
ReadRequestBuilder {
table: table.into(),
columns: columns.into_iter().map(|s| s.into()).collect(),
}
}
pub(crate) fn into_request(self) -> crate::model::ReadRequest {
crate::model::ReadRequest::default()
.set_table(self.table)
.set_columns(self.columns)
.set_key_set(self.keys.into_proto())
.set_index(self.index.unwrap_or_default())
.set_limit(self.limit.unwrap_or_default())
.set_or_clear_request_options(self.request_options)
.set_or_clear_directed_read_options(self.directed_read_options)
.set_order_by(self.order_by.unwrap_or_default())
.set_lock_hint(self.lock_hint.unwrap_or_default())
}
pub(crate) fn into_partition_read_request(self) -> crate::model::PartitionReadRequest {
crate::model::PartitionReadRequest::default()
.set_table(self.table)
.set_columns(self.columns)
.set_key_set(self.keys.into_proto())
.set_index(self.index.unwrap_or_default())
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn auto_traits() {
static_assertions::assert_impl_all!(ReadRequestBuilder: Send, Sync, Clone, std::fmt::Debug);
static_assertions::assert_impl_all!(ConfiguredReadRequestBuilder: Send, Sync, Clone, std::fmt::Debug);
static_assertions::assert_impl_all!(ReadRequest: Send, Sync, Clone, std::fmt::Debug);
}
#[test]
fn read_with_keys() {
let keys = KeySet::all();
let req = ReadRequest::builder("MyTable", vec!["col1", "col2"])
.with_keys(keys.clone())
.build();
assert_eq!(req.table, "MyTable");
assert_eq!(req.index, None);
assert_eq!(req.keys, keys);
assert_eq!(req.columns, vec!["col1", "col2"]);
assert_eq!(req.limit, None);
}
#[test]
fn read_with_index() {
let keys = KeySet::all();
let req = ReadRequest::builder("MyTable", vec!["col1", "col2"])
.with_index("MyIndex", keys.clone())
.build();
assert_eq!(req.table, "MyTable");
assert_eq!(req.index, Some("MyIndex".to_string()));
assert_eq!(req.keys, keys);
assert_eq!(req.columns, vec!["col1", "col2"]);
assert_eq!(req.limit, None);
}
#[test]
fn with_limit() {
let req = ReadRequest::builder("MyTable", vec!["col1"])
.with_keys(KeySet::all())
.set_limit(42)
.build();
assert_eq!(req.limit, Some(42));
}
#[test]
fn with_request_tag() {
let req = ReadRequest::builder("MyTable", vec!["col1"])
.with_keys(KeySet::all())
.set_request_tag("tag1")
.build();
assert_eq!(
req.request_options
.expect("request options missing")
.request_tag,
"tag1"
);
}
#[test]
fn with_priority() {
let req = ReadRequest::builder("MyTable", vec!["col1"])
.with_keys(KeySet::all())
.set_priority(Priority::High)
.build();
assert_eq!(
req.request_options
.expect("request options missing")
.priority,
Priority::High
);
}
#[test]
fn with_directed_read_options() {
let dro = DirectedReadOptions::default();
let req = ReadRequest::builder("MyTable", vec!["col1"])
.with_keys(KeySet::all())
.set_directed_read_options(dro.clone())
.build();
assert_eq!(req.directed_read_options, Some(dro));
}
#[test]
fn with_order_by() {
let req = ReadRequest::builder("MyTable", vec!["col1"])
.with_keys(KeySet::all())
.set_order_by(OrderBy::PrimaryKey)
.build();
assert_eq!(req.order_by, Some(OrderBy::PrimaryKey));
}
#[test]
fn with_lock_hint() {
let req = ReadRequest::builder("MyTable", vec!["col1"])
.with_keys(KeySet::all())
.set_lock_hint(LockHint::Exclusive)
.build();
assert_eq!(req.lock_hint, Some(LockHint::Exclusive));
}
#[test]
fn with_gax_options() -> anyhow::Result<()> {
use google_cloud_gax::exponential_backoff::ExponentialBackoff;
use google_cloud_gax::retry_policy::NeverRetry;
use std::time::Duration;
let req = ReadRequest::builder("MyTable", vec!["col1"])
.with_keys(KeySet::all())
.with_attempt_timeout(Duration::from_secs(10))
.with_retry_policy(NeverRetry)
.with_backoff_policy(ExponentialBackoff::default())
.build();
assert_eq!(
req.gax_options.attempt_timeout(),
&Some(Duration::from_secs(10))
);
assert!(req.gax_options.retry_policy().is_some());
assert!(req.gax_options.backoff_policy().is_some());
Ok(())
}
}