use std::error::Error;
use std::fmt;
use std::io;
#[allow(warnings)]
use futures::future;
use futures::Future;
use rusoto_core::region;
use rusoto_core::request::{BufferedHttpResponse, DispatchSignedRequest};
use rusoto_core::{Client, RusotoFuture};
use rusoto_core::credential::{CredentialsError, ProvideAwsCredentials};
use rusoto_core::request::HttpDispatchError;
use rusoto_core::signature::SignedRequest;
use serde_json;
use serde_json::from_slice;
use serde_json::Value as SerdeJsonValue;
#[derive(Default, Debug, Clone, PartialEq, Deserialize)]
#[cfg_attr(test, derive(Serialize))]
pub struct AttributeValue {
#[serde(rename = "B")]
#[serde(
deserialize_with = "::rusoto_core::serialization::SerdeBlob::deserialize_blob",
serialize_with = "::rusoto_core::serialization::SerdeBlob::serialize_blob",
default
)]
#[serde(skip_serializing_if = "Option::is_none")]
pub b: Option<Vec<u8>>,
#[serde(rename = "BOOL")]
#[serde(skip_serializing_if = "Option::is_none")]
pub bool: Option<bool>,
#[serde(rename = "BS")]
#[serde(skip_serializing_if = "Option::is_none")]
pub bs: Option<Vec<Vec<u8>>>,
#[serde(rename = "L")]
#[serde(skip_serializing_if = "Option::is_none")]
pub l: Option<Vec<AttributeValue>>,
#[serde(rename = "M")]
#[serde(skip_serializing_if = "Option::is_none")]
pub m: Option<::std::collections::HashMap<String, AttributeValue>>,
#[serde(rename = "N")]
#[serde(skip_serializing_if = "Option::is_none")]
pub n: Option<String>,
#[serde(rename = "NS")]
#[serde(skip_serializing_if = "Option::is_none")]
pub ns: Option<Vec<String>>,
#[serde(rename = "NULL")]
#[serde(skip_serializing_if = "Option::is_none")]
pub null: Option<bool>,
#[serde(rename = "S")]
#[serde(skip_serializing_if = "Option::is_none")]
pub s: Option<String>,
#[serde(rename = "SS")]
#[serde(skip_serializing_if = "Option::is_none")]
pub ss: Option<Vec<String>>,
}
#[derive(Default, Debug, Clone, PartialEq, Serialize)]
pub struct DescribeStreamInput {
#[serde(rename = "ExclusiveStartShardId")]
#[serde(skip_serializing_if = "Option::is_none")]
pub exclusive_start_shard_id: Option<String>,
#[serde(rename = "Limit")]
#[serde(skip_serializing_if = "Option::is_none")]
pub limit: Option<i64>,
#[serde(rename = "StreamArn")]
pub stream_arn: String,
}
#[derive(Default, Debug, Clone, PartialEq, Deserialize)]
#[cfg_attr(test, derive(Serialize))]
pub struct DescribeStreamOutput {
#[serde(rename = "StreamDescription")]
#[serde(skip_serializing_if = "Option::is_none")]
pub stream_description: Option<StreamDescription>,
}
#[derive(Default, Debug, Clone, PartialEq, Serialize)]
pub struct GetRecordsInput {
#[serde(rename = "Limit")]
#[serde(skip_serializing_if = "Option::is_none")]
pub limit: Option<i64>,
#[serde(rename = "ShardIterator")]
pub shard_iterator: String,
}
#[derive(Default, Debug, Clone, PartialEq, Deserialize)]
#[cfg_attr(test, derive(Serialize))]
pub struct GetRecordsOutput {
#[serde(rename = "NextShardIterator")]
#[serde(skip_serializing_if = "Option::is_none")]
pub next_shard_iterator: Option<String>,
#[serde(rename = "Records")]
#[serde(skip_serializing_if = "Option::is_none")]
pub records: Option<Vec<Record>>,
}
#[derive(Default, Debug, Clone, PartialEq, Serialize)]
pub struct GetShardIteratorInput {
#[serde(rename = "SequenceNumber")]
#[serde(skip_serializing_if = "Option::is_none")]
pub sequence_number: Option<String>,
#[serde(rename = "ShardId")]
pub shard_id: String,
#[serde(rename = "ShardIteratorType")]
pub shard_iterator_type: String,
#[serde(rename = "StreamArn")]
pub stream_arn: String,
}
#[derive(Default, Debug, Clone, PartialEq, Deserialize)]
#[cfg_attr(test, derive(Serialize))]
pub struct GetShardIteratorOutput {
#[serde(rename = "ShardIterator")]
#[serde(skip_serializing_if = "Option::is_none")]
pub shard_iterator: Option<String>,
}
#[derive(Default, Debug, Clone, PartialEq, Deserialize)]
#[cfg_attr(test, derive(Serialize))]
pub struct Identity {
#[serde(rename = "PrincipalId")]
#[serde(skip_serializing_if = "Option::is_none")]
pub principal_id: Option<String>,
#[serde(rename = "Type")]
#[serde(skip_serializing_if = "Option::is_none")]
pub type_: Option<String>,
}
#[derive(Default, Debug, Clone, PartialEq, Deserialize)]
#[cfg_attr(test, derive(Serialize))]
pub struct KeySchemaElement {
#[serde(rename = "AttributeName")]
pub attribute_name: String,
#[serde(rename = "KeyType")]
pub key_type: String,
}
#[derive(Default, Debug, Clone, PartialEq, Serialize)]
pub struct ListStreamsInput {
#[serde(rename = "ExclusiveStartStreamArn")]
#[serde(skip_serializing_if = "Option::is_none")]
pub exclusive_start_stream_arn: Option<String>,
#[serde(rename = "Limit")]
#[serde(skip_serializing_if = "Option::is_none")]
pub limit: Option<i64>,
#[serde(rename = "TableName")]
#[serde(skip_serializing_if = "Option::is_none")]
pub table_name: Option<String>,
}
#[derive(Default, Debug, Clone, PartialEq, Deserialize)]
#[cfg_attr(test, derive(Serialize))]
pub struct ListStreamsOutput {
#[serde(rename = "LastEvaluatedStreamArn")]
#[serde(skip_serializing_if = "Option::is_none")]
pub last_evaluated_stream_arn: Option<String>,
#[serde(rename = "Streams")]
#[serde(skip_serializing_if = "Option::is_none")]
pub streams: Option<Vec<Stream>>,
}
#[derive(Default, Debug, Clone, PartialEq, Deserialize)]
#[cfg_attr(test, derive(Serialize))]
pub struct Record {
#[serde(rename = "awsRegion")]
#[serde(skip_serializing_if = "Option::is_none")]
pub aws_region: Option<String>,
#[serde(rename = "dynamodb")]
#[serde(skip_serializing_if = "Option::is_none")]
pub dynamodb: Option<StreamRecord>,
#[serde(rename = "eventID")]
#[serde(skip_serializing_if = "Option::is_none")]
pub event_id: Option<String>,
#[serde(rename = "eventName")]
#[serde(skip_serializing_if = "Option::is_none")]
pub event_name: Option<String>,
#[serde(rename = "eventSource")]
#[serde(skip_serializing_if = "Option::is_none")]
pub event_source: Option<String>,
#[serde(rename = "eventVersion")]
#[serde(skip_serializing_if = "Option::is_none")]
pub event_version: Option<String>,
#[serde(rename = "userIdentity")]
#[serde(skip_serializing_if = "Option::is_none")]
pub user_identity: Option<Identity>,
}
#[derive(Default, Debug, Clone, PartialEq, Deserialize)]
#[cfg_attr(test, derive(Serialize))]
pub struct SequenceNumberRange {
#[serde(rename = "EndingSequenceNumber")]
#[serde(skip_serializing_if = "Option::is_none")]
pub ending_sequence_number: Option<String>,
#[serde(rename = "StartingSequenceNumber")]
#[serde(skip_serializing_if = "Option::is_none")]
pub starting_sequence_number: Option<String>,
}
#[derive(Default, Debug, Clone, PartialEq, Deserialize)]
#[cfg_attr(test, derive(Serialize))]
pub struct Shard {
#[serde(rename = "ParentShardId")]
#[serde(skip_serializing_if = "Option::is_none")]
pub parent_shard_id: Option<String>,
#[serde(rename = "SequenceNumberRange")]
#[serde(skip_serializing_if = "Option::is_none")]
pub sequence_number_range: Option<SequenceNumberRange>,
#[serde(rename = "ShardId")]
#[serde(skip_serializing_if = "Option::is_none")]
pub shard_id: Option<String>,
}
#[derive(Default, Debug, Clone, PartialEq, Deserialize)]
#[cfg_attr(test, derive(Serialize))]
pub struct Stream {
#[serde(rename = "StreamArn")]
#[serde(skip_serializing_if = "Option::is_none")]
pub stream_arn: Option<String>,
#[serde(rename = "StreamLabel")]
#[serde(skip_serializing_if = "Option::is_none")]
pub stream_label: Option<String>,
#[serde(rename = "TableName")]
#[serde(skip_serializing_if = "Option::is_none")]
pub table_name: Option<String>,
}
#[derive(Default, Debug, Clone, PartialEq, Deserialize)]
#[cfg_attr(test, derive(Serialize))]
pub struct StreamDescription {
#[serde(rename = "CreationRequestDateTime")]
#[serde(skip_serializing_if = "Option::is_none")]
pub creation_request_date_time: Option<f64>,
#[serde(rename = "KeySchema")]
#[serde(skip_serializing_if = "Option::is_none")]
pub key_schema: Option<Vec<KeySchemaElement>>,
#[serde(rename = "LastEvaluatedShardId")]
#[serde(skip_serializing_if = "Option::is_none")]
pub last_evaluated_shard_id: Option<String>,
#[serde(rename = "Shards")]
#[serde(skip_serializing_if = "Option::is_none")]
pub shards: Option<Vec<Shard>>,
#[serde(rename = "StreamArn")]
#[serde(skip_serializing_if = "Option::is_none")]
pub stream_arn: Option<String>,
#[serde(rename = "StreamLabel")]
#[serde(skip_serializing_if = "Option::is_none")]
pub stream_label: Option<String>,
#[serde(rename = "StreamStatus")]
#[serde(skip_serializing_if = "Option::is_none")]
pub stream_status: Option<String>,
#[serde(rename = "StreamViewType")]
#[serde(skip_serializing_if = "Option::is_none")]
pub stream_view_type: Option<String>,
#[serde(rename = "TableName")]
#[serde(skip_serializing_if = "Option::is_none")]
pub table_name: Option<String>,
}
#[derive(Default, Debug, Clone, PartialEq, Deserialize)]
#[cfg_attr(test, derive(Serialize))]
pub struct StreamRecord {
#[serde(rename = "ApproximateCreationDateTime")]
#[serde(skip_serializing_if = "Option::is_none")]
pub approximate_creation_date_time: Option<f64>,
#[serde(rename = "Keys")]
#[serde(skip_serializing_if = "Option::is_none")]
pub keys: Option<::std::collections::HashMap<String, AttributeValue>>,
#[serde(rename = "NewImage")]
#[serde(skip_serializing_if = "Option::is_none")]
pub new_image: Option<::std::collections::HashMap<String, AttributeValue>>,
#[serde(rename = "OldImage")]
#[serde(skip_serializing_if = "Option::is_none")]
pub old_image: Option<::std::collections::HashMap<String, AttributeValue>>,
#[serde(rename = "SequenceNumber")]
#[serde(skip_serializing_if = "Option::is_none")]
pub sequence_number: Option<String>,
#[serde(rename = "SizeBytes")]
#[serde(skip_serializing_if = "Option::is_none")]
pub size_bytes: Option<i64>,
#[serde(rename = "StreamViewType")]
#[serde(skip_serializing_if = "Option::is_none")]
pub stream_view_type: Option<String>,
}
#[derive(Debug, PartialEq)]
pub enum DescribeStreamError {
InternalServerError(String),
ResourceNotFound(String),
HttpDispatch(HttpDispatchError),
Credentials(CredentialsError),
Validation(String),
ParseError(String),
Unknown(BufferedHttpResponse),
}
impl DescribeStreamError {
pub fn from_response(res: BufferedHttpResponse) -> DescribeStreamError {
if let Ok(json) = from_slice::<SerdeJsonValue>(&res.body) {
let raw_error_type = json
.get("__type")
.and_then(|e| e.as_str())
.unwrap_or("Unknown");
let error_message = json.get("message").and_then(|m| m.as_str()).unwrap_or("");
let pieces: Vec<&str> = raw_error_type.split("#").collect();
let error_type = pieces.last().expect("Expected error type");
match *error_type {
"InternalServerError" => {
return DescribeStreamError::InternalServerError(String::from(error_message));
}
"ResourceNotFoundException" => {
return DescribeStreamError::ResourceNotFound(String::from(error_message));
}
"ValidationException" => {
return DescribeStreamError::Validation(error_message.to_string());
}
_ => {}
}
}
return DescribeStreamError::Unknown(res);
}
}
impl From<serde_json::error::Error> for DescribeStreamError {
fn from(err: serde_json::error::Error) -> DescribeStreamError {
DescribeStreamError::ParseError(err.description().to_string())
}
}
impl From<CredentialsError> for DescribeStreamError {
fn from(err: CredentialsError) -> DescribeStreamError {
DescribeStreamError::Credentials(err)
}
}
impl From<HttpDispatchError> for DescribeStreamError {
fn from(err: HttpDispatchError) -> DescribeStreamError {
DescribeStreamError::HttpDispatch(err)
}
}
impl From<io::Error> for DescribeStreamError {
fn from(err: io::Error) -> DescribeStreamError {
DescribeStreamError::HttpDispatch(HttpDispatchError::from(err))
}
}
impl fmt::Display for DescribeStreamError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "{}", self.description())
}
}
impl Error for DescribeStreamError {
fn description(&self) -> &str {
match *self {
DescribeStreamError::InternalServerError(ref cause) => cause,
DescribeStreamError::ResourceNotFound(ref cause) => cause,
DescribeStreamError::Validation(ref cause) => cause,
DescribeStreamError::Credentials(ref err) => err.description(),
DescribeStreamError::HttpDispatch(ref dispatch_error) => dispatch_error.description(),
DescribeStreamError::ParseError(ref cause) => cause,
DescribeStreamError::Unknown(_) => "unknown error",
}
}
}
#[derive(Debug, PartialEq)]
pub enum GetRecordsError {
ExpiredIterator(String),
InternalServerError(String),
LimitExceeded(String),
ResourceNotFound(String),
TrimmedDataAccess(String),
HttpDispatch(HttpDispatchError),
Credentials(CredentialsError),
Validation(String),
ParseError(String),
Unknown(BufferedHttpResponse),
}
impl GetRecordsError {
pub fn from_response(res: BufferedHttpResponse) -> GetRecordsError {
if let Ok(json) = from_slice::<SerdeJsonValue>(&res.body) {
let raw_error_type = json
.get("__type")
.and_then(|e| e.as_str())
.unwrap_or("Unknown");
let error_message = json.get("message").and_then(|m| m.as_str()).unwrap_or("");
let pieces: Vec<&str> = raw_error_type.split("#").collect();
let error_type = pieces.last().expect("Expected error type");
match *error_type {
"ExpiredIteratorException" => {
return GetRecordsError::ExpiredIterator(String::from(error_message));
}
"InternalServerError" => {
return GetRecordsError::InternalServerError(String::from(error_message));
}
"LimitExceededException" => {
return GetRecordsError::LimitExceeded(String::from(error_message));
}
"ResourceNotFoundException" => {
return GetRecordsError::ResourceNotFound(String::from(error_message));
}
"TrimmedDataAccessException" => {
return GetRecordsError::TrimmedDataAccess(String::from(error_message));
}
"ValidationException" => {
return GetRecordsError::Validation(error_message.to_string());
}
_ => {}
}
}
return GetRecordsError::Unknown(res);
}
}
impl From<serde_json::error::Error> for GetRecordsError {
fn from(err: serde_json::error::Error) -> GetRecordsError {
GetRecordsError::ParseError(err.description().to_string())
}
}
impl From<CredentialsError> for GetRecordsError {
fn from(err: CredentialsError) -> GetRecordsError {
GetRecordsError::Credentials(err)
}
}
impl From<HttpDispatchError> for GetRecordsError {
fn from(err: HttpDispatchError) -> GetRecordsError {
GetRecordsError::HttpDispatch(err)
}
}
impl From<io::Error> for GetRecordsError {
fn from(err: io::Error) -> GetRecordsError {
GetRecordsError::HttpDispatch(HttpDispatchError::from(err))
}
}
impl fmt::Display for GetRecordsError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "{}", self.description())
}
}
impl Error for GetRecordsError {
fn description(&self) -> &str {
match *self {
GetRecordsError::ExpiredIterator(ref cause) => cause,
GetRecordsError::InternalServerError(ref cause) => cause,
GetRecordsError::LimitExceeded(ref cause) => cause,
GetRecordsError::ResourceNotFound(ref cause) => cause,
GetRecordsError::TrimmedDataAccess(ref cause) => cause,
GetRecordsError::Validation(ref cause) => cause,
GetRecordsError::Credentials(ref err) => err.description(),
GetRecordsError::HttpDispatch(ref dispatch_error) => dispatch_error.description(),
GetRecordsError::ParseError(ref cause) => cause,
GetRecordsError::Unknown(_) => "unknown error",
}
}
}
#[derive(Debug, PartialEq)]
pub enum GetShardIteratorError {
InternalServerError(String),
ResourceNotFound(String),
TrimmedDataAccess(String),
HttpDispatch(HttpDispatchError),
Credentials(CredentialsError),
Validation(String),
ParseError(String),
Unknown(BufferedHttpResponse),
}
impl GetShardIteratorError {
pub fn from_response(res: BufferedHttpResponse) -> GetShardIteratorError {
if let Ok(json) = from_slice::<SerdeJsonValue>(&res.body) {
let raw_error_type = json
.get("__type")
.and_then(|e| e.as_str())
.unwrap_or("Unknown");
let error_message = json.get("message").and_then(|m| m.as_str()).unwrap_or("");
let pieces: Vec<&str> = raw_error_type.split("#").collect();
let error_type = pieces.last().expect("Expected error type");
match *error_type {
"InternalServerError" => {
return GetShardIteratorError::InternalServerError(String::from(error_message));
}
"ResourceNotFoundException" => {
return GetShardIteratorError::ResourceNotFound(String::from(error_message));
}
"TrimmedDataAccessException" => {
return GetShardIteratorError::TrimmedDataAccess(String::from(error_message));
}
"ValidationException" => {
return GetShardIteratorError::Validation(error_message.to_string());
}
_ => {}
}
}
return GetShardIteratorError::Unknown(res);
}
}
impl From<serde_json::error::Error> for GetShardIteratorError {
fn from(err: serde_json::error::Error) -> GetShardIteratorError {
GetShardIteratorError::ParseError(err.description().to_string())
}
}
impl From<CredentialsError> for GetShardIteratorError {
fn from(err: CredentialsError) -> GetShardIteratorError {
GetShardIteratorError::Credentials(err)
}
}
impl From<HttpDispatchError> for GetShardIteratorError {
fn from(err: HttpDispatchError) -> GetShardIteratorError {
GetShardIteratorError::HttpDispatch(err)
}
}
impl From<io::Error> for GetShardIteratorError {
fn from(err: io::Error) -> GetShardIteratorError {
GetShardIteratorError::HttpDispatch(HttpDispatchError::from(err))
}
}
impl fmt::Display for GetShardIteratorError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "{}", self.description())
}
}
impl Error for GetShardIteratorError {
fn description(&self) -> &str {
match *self {
GetShardIteratorError::InternalServerError(ref cause) => cause,
GetShardIteratorError::ResourceNotFound(ref cause) => cause,
GetShardIteratorError::TrimmedDataAccess(ref cause) => cause,
GetShardIteratorError::Validation(ref cause) => cause,
GetShardIteratorError::Credentials(ref err) => err.description(),
GetShardIteratorError::HttpDispatch(ref dispatch_error) => dispatch_error.description(),
GetShardIteratorError::ParseError(ref cause) => cause,
GetShardIteratorError::Unknown(_) => "unknown error",
}
}
}
#[derive(Debug, PartialEq)]
pub enum ListStreamsError {
InternalServerError(String),
ResourceNotFound(String),
HttpDispatch(HttpDispatchError),
Credentials(CredentialsError),
Validation(String),
ParseError(String),
Unknown(BufferedHttpResponse),
}
impl ListStreamsError {
pub fn from_response(res: BufferedHttpResponse) -> ListStreamsError {
if let Ok(json) = from_slice::<SerdeJsonValue>(&res.body) {
let raw_error_type = json
.get("__type")
.and_then(|e| e.as_str())
.unwrap_or("Unknown");
let error_message = json.get("message").and_then(|m| m.as_str()).unwrap_or("");
let pieces: Vec<&str> = raw_error_type.split("#").collect();
let error_type = pieces.last().expect("Expected error type");
match *error_type {
"InternalServerError" => {
return ListStreamsError::InternalServerError(String::from(error_message));
}
"ResourceNotFoundException" => {
return ListStreamsError::ResourceNotFound(String::from(error_message));
}
"ValidationException" => {
return ListStreamsError::Validation(error_message.to_string());
}
_ => {}
}
}
return ListStreamsError::Unknown(res);
}
}
impl From<serde_json::error::Error> for ListStreamsError {
fn from(err: serde_json::error::Error) -> ListStreamsError {
ListStreamsError::ParseError(err.description().to_string())
}
}
impl From<CredentialsError> for ListStreamsError {
fn from(err: CredentialsError) -> ListStreamsError {
ListStreamsError::Credentials(err)
}
}
impl From<HttpDispatchError> for ListStreamsError {
fn from(err: HttpDispatchError) -> ListStreamsError {
ListStreamsError::HttpDispatch(err)
}
}
impl From<io::Error> for ListStreamsError {
fn from(err: io::Error) -> ListStreamsError {
ListStreamsError::HttpDispatch(HttpDispatchError::from(err))
}
}
impl fmt::Display for ListStreamsError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "{}", self.description())
}
}
impl Error for ListStreamsError {
fn description(&self) -> &str {
match *self {
ListStreamsError::InternalServerError(ref cause) => cause,
ListStreamsError::ResourceNotFound(ref cause) => cause,
ListStreamsError::Validation(ref cause) => cause,
ListStreamsError::Credentials(ref err) => err.description(),
ListStreamsError::HttpDispatch(ref dispatch_error) => dispatch_error.description(),
ListStreamsError::ParseError(ref cause) => cause,
ListStreamsError::Unknown(_) => "unknown error",
}
}
}
pub trait DynamoDbStreams {
fn describe_stream(
&self,
input: DescribeStreamInput,
) -> RusotoFuture<DescribeStreamOutput, DescribeStreamError>;
fn get_records(
&self,
input: GetRecordsInput,
) -> RusotoFuture<GetRecordsOutput, GetRecordsError>;
fn get_shard_iterator(
&self,
input: GetShardIteratorInput,
) -> RusotoFuture<GetShardIteratorOutput, GetShardIteratorError>;
fn list_streams(
&self,
input: ListStreamsInput,
) -> RusotoFuture<ListStreamsOutput, ListStreamsError>;
}
#[derive(Clone)]
pub struct DynamoDbStreamsClient {
client: Client,
region: region::Region,
}
impl DynamoDbStreamsClient {
pub fn new(region: region::Region) -> DynamoDbStreamsClient {
DynamoDbStreamsClient {
client: Client::shared(),
region: region,
}
}
pub fn new_with<P, D>(
request_dispatcher: D,
credentials_provider: P,
region: region::Region,
) -> DynamoDbStreamsClient
where
P: ProvideAwsCredentials + Send + Sync + 'static,
P::Future: Send,
D: DispatchSignedRequest + Send + Sync + 'static,
D::Future: Send,
{
DynamoDbStreamsClient {
client: Client::new_with(credentials_provider, request_dispatcher),
region: region,
}
}
}
impl DynamoDbStreams for DynamoDbStreamsClient {
fn describe_stream(
&self,
input: DescribeStreamInput,
) -> RusotoFuture<DescribeStreamOutput, DescribeStreamError> {
let mut request = SignedRequest::new("POST", "dynamodb", &self.region, "/");
request.set_endpoint_prefix("streams.dynamodb".to_string());
request.set_content_type("application/x-amz-json-1.0".to_owned());
request.add_header("x-amz-target", "DynamoDBStreams_20120810.DescribeStream");
let encoded = serde_json::to_string(&input).unwrap();
request.set_payload(Some(encoded.into_bytes()));
self.client.sign_and_dispatch(request, |response| {
if response.status.is_success() {
Box::new(response.buffer().from_err().map(|response| {
let mut body = response.body;
if body.is_empty() || body == b"null" {
body = b"{}".to_vec();
}
serde_json::from_str::<DescribeStreamOutput>(
String::from_utf8_lossy(body.as_ref()).as_ref(),
)
.unwrap()
}))
} else {
Box::new(
response
.buffer()
.from_err()
.and_then(|response| Err(DescribeStreamError::from_response(response))),
)
}
})
}
fn get_records(
&self,
input: GetRecordsInput,
) -> RusotoFuture<GetRecordsOutput, GetRecordsError> {
let mut request = SignedRequest::new("POST", "dynamodb", &self.region, "/");
request.set_endpoint_prefix("streams.dynamodb".to_string());
request.set_content_type("application/x-amz-json-1.0".to_owned());
request.add_header("x-amz-target", "DynamoDBStreams_20120810.GetRecords");
let encoded = serde_json::to_string(&input).unwrap();
request.set_payload(Some(encoded.into_bytes()));
self.client.sign_and_dispatch(request, |response| {
if response.status.is_success() {
Box::new(response.buffer().from_err().map(|response| {
let mut body = response.body;
if body.is_empty() || body == b"null" {
body = b"{}".to_vec();
}
serde_json::from_str::<GetRecordsOutput>(
String::from_utf8_lossy(body.as_ref()).as_ref(),
)
.unwrap()
}))
} else {
Box::new(
response
.buffer()
.from_err()
.and_then(|response| Err(GetRecordsError::from_response(response))),
)
}
})
}
fn get_shard_iterator(
&self,
input: GetShardIteratorInput,
) -> RusotoFuture<GetShardIteratorOutput, GetShardIteratorError> {
let mut request = SignedRequest::new("POST", "dynamodb", &self.region, "/");
request.set_endpoint_prefix("streams.dynamodb".to_string());
request.set_content_type("application/x-amz-json-1.0".to_owned());
request.add_header("x-amz-target", "DynamoDBStreams_20120810.GetShardIterator");
let encoded = serde_json::to_string(&input).unwrap();
request.set_payload(Some(encoded.into_bytes()));
self.client.sign_and_dispatch(request, |response| {
if response.status.is_success() {
Box::new(response.buffer().from_err().map(|response| {
let mut body = response.body;
if body.is_empty() || body == b"null" {
body = b"{}".to_vec();
}
serde_json::from_str::<GetShardIteratorOutput>(
String::from_utf8_lossy(body.as_ref()).as_ref(),
)
.unwrap()
}))
} else {
Box::new(
response
.buffer()
.from_err()
.and_then(|response| Err(GetShardIteratorError::from_response(response))),
)
}
})
}
fn list_streams(
&self,
input: ListStreamsInput,
) -> RusotoFuture<ListStreamsOutput, ListStreamsError> {
let mut request = SignedRequest::new("POST", "dynamodb", &self.region, "/");
request.set_endpoint_prefix("streams.dynamodb".to_string());
request.set_content_type("application/x-amz-json-1.0".to_owned());
request.add_header("x-amz-target", "DynamoDBStreams_20120810.ListStreams");
let encoded = serde_json::to_string(&input).unwrap();
request.set_payload(Some(encoded.into_bytes()));
self.client.sign_and_dispatch(request, |response| {
if response.status.is_success() {
Box::new(response.buffer().from_err().map(|response| {
let mut body = response.body;
if body.is_empty() || body == b"null" {
body = b"{}".to_vec();
}
serde_json::from_str::<ListStreamsOutput>(
String::from_utf8_lossy(body.as_ref()).as_ref(),
)
.unwrap()
}))
} else {
Box::new(
response
.buffer()
.from_err()
.and_then(|response| Err(ListStreamsError::from_response(response))),
)
}
})
}
}
#[cfg(test)]
mod protocol_tests {}