use aws_sdk_dynamodb::model::AttributeValue;
use aws_sdk_dynamodb::{Client, Config};
use aws_smithy_client::test_connection::{capture_request, TestConnection};
use aws_smithy_http::body::SdkBody;
use aws_smithy_protocol_test::{assert_ok, validate_body, MediaType};
use aws_types::region::Region;
use aws_types::Credentials;
use std::collections::HashMap;
use std::iter::FromIterator;
use tokio_stream::StreamExt;
fn stub_config() -> Config {
Config::builder()
.region(Region::new("us-east-1"))
.credentials_provider(Credentials::new("akid", "secret", None, None, "test"))
.build()
}
#[tokio::test]
async fn paginators_pass_args() {
let (conn, request) = capture_request(None);
let client = Client::from_conf_conn(stub_config(), conn);
let mut paginator = client
.scan()
.table_name("test-table")
.into_paginator()
.page_size(32)
.send();
let _ = paginator.next().await;
let request = request.expect_request();
let body = request.body().bytes().expect("data is loaded");
assert_ok(validate_body(
body,
r#"{"TableName":"test-table","Limit":32}"#,
MediaType::Json,
));
}
fn mk_request(body: &'static str) -> http::Request<SdkBody> {
http::Request::builder()
.uri("https://dynamodb.us-east-1.amazonaws.com/")
.body(SdkBody::from(body))
.unwrap()
}
fn mk_response(body: &'static str) -> http::Response<SdkBody> {
http::Response::builder().body(SdkBody::from(body)).unwrap()
}
#[tokio::test(flavor = "current_thread")]
async fn paginators_loop_until_completion() {
let conn = TestConnection::new(vec![
(
mk_request(r#"{"TableName":"test-table","Limit":32}"#),
mk_response(
r#"{
"Count": 1,
"Items": [{
"PostedBy": {
"S": "joe@example.com"
}
}],
"LastEvaluatedKey": {
"PostedBy": { "S": "joe@example.com" }
}
}"#,
),
),
(
mk_request(
r#"{"TableName":"test-table","Limit":32,"ExclusiveStartKey":{"PostedBy":{"S":"joe@example.com"}}}"#,
),
mk_response(
r#"{
"Count": 1,
"Items": [{
"PostedBy": {
"S": "jack@example.com"
}
}]
}"#,
),
),
]);
let client = Client::from_conf_conn(stub_config(), conn.clone());
let mut paginator = client
.scan()
.table_name("test-table")
.into_paginator()
.page_size(32)
.send();
assert_eq!(conn.requests().len(), 0);
let first_page = paginator
.try_next()
.await
.expect("success")
.expect("page exists");
assert_eq!(
first_page.items.unwrap_or_default(),
vec![HashMap::from_iter([(
"PostedBy".to_string(),
AttributeValue::S("joe@example.com".to_string())
)])]
);
assert_eq!(conn.requests().len(), 1);
let second_page = paginator
.try_next()
.await
.expect("success")
.expect("page exists");
assert_eq!(
second_page.items.unwrap_or_default(),
vec![HashMap::from_iter([(
"PostedBy".to_string(),
AttributeValue::S("jack@example.com".to_string())
)])]
);
assert_eq!(conn.requests().len(), 2);
assert!(
paginator.next().await.is_none(),
"no more pages should exist"
);
assert_eq!(conn.requests().len(), 2);
conn.assert_requests_match(&[]);
}
#[tokio::test]
async fn paginators_handle_errors() {
let conn = TestConnection::new(vec![(
mk_request(r#"{"TableName":"test-table","Limit":32}"#),
mk_response(
r#"{
"Count": 1,
"Items": [{
"PostedBy": {
"S": "joe@example.com"
}
}],
"LastEvaluatedKey": {
"PostedBy": { "S": "joe@example.com" }
}
}"#,
),
)]);
let client = Client::from_conf_conn(stub_config(), conn.clone());
let mut rows = client
.scan()
.table_name("test-table")
.into_paginator()
.page_size(32)
.items()
.send();
assert_eq!(
rows.try_next()
.await
.expect("no error")
.expect("not EOS")
.get("PostedBy"),
Some(&AttributeValue::S("joe@example.com".to_string()))
);
rows.try_next().await.expect_err("failure");
assert_eq!(rows.try_next().await.expect("ok"), None);
}
#[tokio::test]
async fn paginators_error_on_repeated_token() {
let response = r#"{
"Count": 1,
"Items": [{
"PostedBy": {
"S": "joe@example.com"
}
}],
"LastEvaluatedKey": {
"PostedBy": { "S": "joe@example.com" }
}
}"#;
let conn = TestConnection::new(vec![
(
mk_request(r#"{"TableName":"test-table","Limit":32}"#),
mk_response(response),
),
(
mk_request(
r#"{"TableName":"test-table","Limit":32,"ExclusiveStartKey":{"PostedBy":{"S":"joe@example.com"}}}"#,
),
mk_response(response),
),
]);
let client = Client::from_conf_conn(stub_config(), conn.clone());
let mut rows = client
.scan()
.table_name("test-table")
.into_paginator()
.page_size(32)
.items()
.send();
assert_eq!(
rows.try_next()
.await
.expect("no error")
.expect("not EOS")
.get("PostedBy"),
Some(&AttributeValue::S("joe@example.com".to_string()))
);
let err = rows.try_next().await.expect_err("failure");
assert!(
format!("{}", err).contains("next token did not change"),
"{}",
err
);
assert_eq!(rows.try_next().await.expect("ok"), None);
}