use std::error::Error;
use std::fmt;
#[allow(warnings)]
use futures::future;
use futures::Future;
use rusoto_core::credential::ProvideAwsCredentials;
use rusoto_core::region;
use rusoto_core::request::{BufferedHttpResponse, DispatchSignedRequest};
use rusoto_core::{Client, RusotoError, RusotoFuture};
use rusoto_core::param::{Params, ServiceParams};
use rusoto_core::proto;
use rusoto_core::signature::SignedRequest;
use serde_json;
#[derive(Default, Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct BrokerNodeGroupInfo {
#[serde(rename = "BrokerAZDistribution")]
#[serde(skip_serializing_if = "Option::is_none")]
pub broker_az_distribution: Option<String>,
#[serde(rename = "ClientSubnets")]
pub client_subnets: Vec<String>,
#[serde(rename = "InstanceType")]
pub instance_type: String,
#[serde(rename = "SecurityGroups")]
#[serde(skip_serializing_if = "Option::is_none")]
pub security_groups: Option<Vec<String>>,
#[serde(rename = "StorageInfo")]
#[serde(skip_serializing_if = "Option::is_none")]
pub storage_info: Option<StorageInfo>,
}
#[derive(Default, Debug, Clone, PartialEq, Deserialize)]
#[cfg_attr(test, derive(Serialize))]
pub struct BrokerNodeInfo {
#[serde(rename = "AttachedENIId")]
#[serde(skip_serializing_if = "Option::is_none")]
pub attached_eni_id: Option<String>,
#[serde(rename = "BrokerId")]
#[serde(skip_serializing_if = "Option::is_none")]
pub broker_id: Option<f64>,
#[serde(rename = "ClientSubnet")]
#[serde(skip_serializing_if = "Option::is_none")]
pub client_subnet: Option<String>,
#[serde(rename = "ClientVpcIpAddress")]
#[serde(skip_serializing_if = "Option::is_none")]
pub client_vpc_ip_address: Option<String>,
#[serde(rename = "CurrentBrokerSoftwareInfo")]
#[serde(skip_serializing_if = "Option::is_none")]
pub current_broker_software_info: Option<BrokerSoftwareInfo>,
}
#[derive(Default, Debug, Clone, PartialEq, Deserialize)]
#[cfg_attr(test, derive(Serialize))]
pub struct BrokerSoftwareInfo {
#[serde(rename = "ConfigurationArn")]
#[serde(skip_serializing_if = "Option::is_none")]
pub configuration_arn: Option<String>,
#[serde(rename = "ConfigurationRevision")]
#[serde(skip_serializing_if = "Option::is_none")]
pub configuration_revision: Option<String>,
#[serde(rename = "KafkaVersion")]
#[serde(skip_serializing_if = "Option::is_none")]
pub kafka_version: Option<String>,
}
#[derive(Default, Debug, Clone, PartialEq, Deserialize)]
#[cfg_attr(test, derive(Serialize))]
pub struct ClusterInfo {
#[serde(rename = "BrokerNodeGroupInfo")]
#[serde(skip_serializing_if = "Option::is_none")]
pub broker_node_group_info: Option<BrokerNodeGroupInfo>,
#[serde(rename = "ClusterArn")]
#[serde(skip_serializing_if = "Option::is_none")]
pub cluster_arn: Option<String>,
#[serde(rename = "ClusterName")]
#[serde(skip_serializing_if = "Option::is_none")]
pub cluster_name: Option<String>,
#[serde(rename = "CreationTime")]
#[serde(skip_serializing_if = "Option::is_none")]
pub creation_time: Option<f64>,
#[serde(rename = "CurrentBrokerSoftwareInfo")]
#[serde(skip_serializing_if = "Option::is_none")]
pub current_broker_software_info: Option<BrokerSoftwareInfo>,
#[serde(rename = "CurrentVersion")]
#[serde(skip_serializing_if = "Option::is_none")]
pub current_version: Option<String>,
#[serde(rename = "EncryptionInfo")]
#[serde(skip_serializing_if = "Option::is_none")]
pub encryption_info: Option<EncryptionInfo>,
#[serde(rename = "EnhancedMonitoring")]
#[serde(skip_serializing_if = "Option::is_none")]
pub enhanced_monitoring: Option<String>,
#[serde(rename = "NumberOfBrokerNodes")]
#[serde(skip_serializing_if = "Option::is_none")]
pub number_of_broker_nodes: Option<i64>,
#[serde(rename = "State")]
#[serde(skip_serializing_if = "Option::is_none")]
pub state: Option<String>,
#[serde(rename = "ZookeeperConnectString")]
#[serde(skip_serializing_if = "Option::is_none")]
pub zookeeper_connect_string: Option<String>,
}
#[derive(Default, Debug, Clone, PartialEq, Serialize)]
pub struct CreateClusterRequest {
#[serde(rename = "BrokerNodeGroupInfo")]
pub broker_node_group_info: BrokerNodeGroupInfo,
#[serde(rename = "ClusterName")]
pub cluster_name: String,
#[serde(rename = "EncryptionInfo")]
#[serde(skip_serializing_if = "Option::is_none")]
pub encryption_info: Option<EncryptionInfo>,
#[serde(rename = "EnhancedMonitoring")]
#[serde(skip_serializing_if = "Option::is_none")]
pub enhanced_monitoring: Option<String>,
#[serde(rename = "KafkaVersion")]
pub kafka_version: String,
#[serde(rename = "NumberOfBrokerNodes")]
pub number_of_broker_nodes: i64,
}
#[derive(Default, Debug, Clone, PartialEq, Deserialize)]
#[cfg_attr(test, derive(Serialize))]
pub struct CreateClusterResponse {
#[serde(rename = "ClusterArn")]
#[serde(skip_serializing_if = "Option::is_none")]
pub cluster_arn: Option<String>,
#[serde(rename = "ClusterName")]
#[serde(skip_serializing_if = "Option::is_none")]
pub cluster_name: Option<String>,
#[serde(rename = "State")]
#[serde(skip_serializing_if = "Option::is_none")]
pub state: Option<String>,
}
#[derive(Default, Debug, Clone, PartialEq, Serialize)]
pub struct DeleteClusterRequest {
#[serde(rename = "ClusterArn")]
pub cluster_arn: String,
#[serde(rename = "CurrentVersion")]
#[serde(skip_serializing_if = "Option::is_none")]
pub current_version: Option<String>,
}
#[derive(Default, Debug, Clone, PartialEq, Deserialize)]
#[cfg_attr(test, derive(Serialize))]
pub struct DeleteClusterResponse {
#[serde(rename = "ClusterArn")]
#[serde(skip_serializing_if = "Option::is_none")]
pub cluster_arn: Option<String>,
#[serde(rename = "State")]
#[serde(skip_serializing_if = "Option::is_none")]
pub state: Option<String>,
}
#[derive(Default, Debug, Clone, PartialEq, Serialize)]
pub struct DescribeClusterRequest {
#[serde(rename = "ClusterArn")]
pub cluster_arn: String,
}
#[derive(Default, Debug, Clone, PartialEq, Deserialize)]
#[cfg_attr(test, derive(Serialize))]
pub struct DescribeClusterResponse {
#[serde(rename = "ClusterInfo")]
#[serde(skip_serializing_if = "Option::is_none")]
pub cluster_info: Option<ClusterInfo>,
}
#[derive(Default, Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct EBSStorageInfo {
#[serde(rename = "VolumeSize")]
#[serde(skip_serializing_if = "Option::is_none")]
pub volume_size: Option<i64>,
}
#[derive(Default, Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct EncryptionAtRest {
#[serde(rename = "DataVolumeKMSKeyId")]
pub data_volume_kms_key_id: String,
}
#[derive(Default, Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct EncryptionInfo {
#[serde(rename = "EncryptionAtRest")]
#[serde(skip_serializing_if = "Option::is_none")]
pub encryption_at_rest: Option<EncryptionAtRest>,
}
#[derive(Default, Debug, Clone, PartialEq)]
pub struct KafkaError {
pub invalid_parameter: Option<String>,
pub message: Option<String>,
}
#[derive(Default, Debug, Clone, PartialEq, Serialize)]
pub struct GetBootstrapBrokersRequest {
#[serde(rename = "ClusterArn")]
pub cluster_arn: String,
}
#[derive(Default, Debug, Clone, PartialEq, Deserialize)]
#[cfg_attr(test, derive(Serialize))]
pub struct GetBootstrapBrokersResponse {
#[serde(rename = "BootstrapBrokerString")]
#[serde(skip_serializing_if = "Option::is_none")]
pub bootstrap_broker_string: Option<String>,
}
#[derive(Default, Debug, Clone, PartialEq, Serialize)]
pub struct ListClustersRequest {
#[serde(rename = "ClusterNameFilter")]
#[serde(skip_serializing_if = "Option::is_none")]
pub cluster_name_filter: Option<String>,
#[serde(rename = "MaxResults")]
#[serde(skip_serializing_if = "Option::is_none")]
pub max_results: Option<i64>,
#[serde(rename = "NextToken")]
#[serde(skip_serializing_if = "Option::is_none")]
pub next_token: Option<String>,
}
#[derive(Default, Debug, Clone, PartialEq, Deserialize)]
#[cfg_attr(test, derive(Serialize))]
pub struct ListClustersResponse {
#[serde(rename = "ClusterInfoList")]
#[serde(skip_serializing_if = "Option::is_none")]
pub cluster_info_list: Option<Vec<ClusterInfo>>,
#[serde(rename = "NextToken")]
#[serde(skip_serializing_if = "Option::is_none")]
pub next_token: Option<String>,
}
#[derive(Default, Debug, Clone, PartialEq, Serialize)]
pub struct ListNodesRequest {
#[serde(rename = "ClusterArn")]
pub cluster_arn: String,
#[serde(rename = "MaxResults")]
#[serde(skip_serializing_if = "Option::is_none")]
pub max_results: Option<i64>,
#[serde(rename = "NextToken")]
#[serde(skip_serializing_if = "Option::is_none")]
pub next_token: Option<String>,
}
#[derive(Default, Debug, Clone, PartialEq, Deserialize)]
#[cfg_attr(test, derive(Serialize))]
pub struct ListNodesResponse {
#[serde(rename = "NextToken")]
#[serde(skip_serializing_if = "Option::is_none")]
pub next_token: Option<String>,
#[serde(rename = "NodeInfoList")]
#[serde(skip_serializing_if = "Option::is_none")]
pub node_info_list: Option<Vec<NodeInfo>>,
}
#[derive(Default, Debug, Clone, PartialEq, Deserialize)]
#[cfg_attr(test, derive(Serialize))]
pub struct NodeInfo {
#[serde(rename = "AddedToClusterTime")]
#[serde(skip_serializing_if = "Option::is_none")]
pub added_to_cluster_time: Option<String>,
#[serde(rename = "BrokerNodeInfo")]
#[serde(skip_serializing_if = "Option::is_none")]
pub broker_node_info: Option<BrokerNodeInfo>,
#[serde(rename = "InstanceType")]
#[serde(skip_serializing_if = "Option::is_none")]
pub instance_type: Option<String>,
#[serde(rename = "NodeARN")]
#[serde(skip_serializing_if = "Option::is_none")]
pub node_arn: Option<String>,
#[serde(rename = "NodeType")]
#[serde(skip_serializing_if = "Option::is_none")]
pub node_type: Option<String>,
#[serde(rename = "ZookeeperNodeInfo")]
#[serde(skip_serializing_if = "Option::is_none")]
pub zookeeper_node_info: Option<ZookeeperNodeInfo>,
}
#[derive(Default, Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct StorageInfo {
#[serde(rename = "EbsStorageInfo")]
#[serde(skip_serializing_if = "Option::is_none")]
pub ebs_storage_info: Option<EBSStorageInfo>,
}
#[derive(Default, Debug, Clone, PartialEq, Deserialize)]
#[cfg_attr(test, derive(Serialize))]
pub struct ZookeeperNodeInfo {
#[serde(rename = "AttachedENIId")]
#[serde(skip_serializing_if = "Option::is_none")]
pub attached_eni_id: Option<String>,
#[serde(rename = "ClientVpcIpAddress")]
#[serde(skip_serializing_if = "Option::is_none")]
pub client_vpc_ip_address: Option<String>,
#[serde(rename = "ZookeeperId")]
#[serde(skip_serializing_if = "Option::is_none")]
pub zookeeper_id: Option<f64>,
#[serde(rename = "ZookeeperVersion")]
#[serde(skip_serializing_if = "Option::is_none")]
pub zookeeper_version: Option<String>,
}
#[derive(Debug, PartialEq)]
pub enum CreateClusterError {
BadRequest(String),
Conflict(String),
Forbidden(String),
InternalServerError(String),
ServiceUnavailable(String),
TooManyRequests(String),
Unauthorized(String),
}
impl CreateClusterError {
pub fn from_response(res: BufferedHttpResponse) -> RusotoError<CreateClusterError> {
if let Some(err) = proto::json::Error::parse_rest(&res) {
match err.typ.as_str() {
"BadRequestException" => {
return RusotoError::Service(CreateClusterError::BadRequest(err.msg))
}
"ConflictException" => {
return RusotoError::Service(CreateClusterError::Conflict(err.msg))
}
"ForbiddenException" => {
return RusotoError::Service(CreateClusterError::Forbidden(err.msg))
}
"InternalServerErrorException" => {
return RusotoError::Service(CreateClusterError::InternalServerError(err.msg))
}
"ServiceUnavailableException" => {
return RusotoError::Service(CreateClusterError::ServiceUnavailable(err.msg))
}
"TooManyRequestsException" => {
return RusotoError::Service(CreateClusterError::TooManyRequests(err.msg))
}
"UnauthorizedException" => {
return RusotoError::Service(CreateClusterError::Unauthorized(err.msg))
}
"ValidationException" => return RusotoError::Validation(err.msg),
_ => {}
}
}
return RusotoError::Unknown(res);
}
}
impl fmt::Display for CreateClusterError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "{}", self.description())
}
}
impl Error for CreateClusterError {
fn description(&self) -> &str {
match *self {
CreateClusterError::BadRequest(ref cause) => cause,
CreateClusterError::Conflict(ref cause) => cause,
CreateClusterError::Forbidden(ref cause) => cause,
CreateClusterError::InternalServerError(ref cause) => cause,
CreateClusterError::ServiceUnavailable(ref cause) => cause,
CreateClusterError::TooManyRequests(ref cause) => cause,
CreateClusterError::Unauthorized(ref cause) => cause,
}
}
}
#[derive(Debug, PartialEq)]
pub enum DeleteClusterError {
BadRequest(String),
Forbidden(String),
InternalServerError(String),
NotFound(String),
}
impl DeleteClusterError {
pub fn from_response(res: BufferedHttpResponse) -> RusotoError<DeleteClusterError> {
if let Some(err) = proto::json::Error::parse_rest(&res) {
match err.typ.as_str() {
"BadRequestException" => {
return RusotoError::Service(DeleteClusterError::BadRequest(err.msg))
}
"ForbiddenException" => {
return RusotoError::Service(DeleteClusterError::Forbidden(err.msg))
}
"InternalServerErrorException" => {
return RusotoError::Service(DeleteClusterError::InternalServerError(err.msg))
}
"NotFoundException" => {
return RusotoError::Service(DeleteClusterError::NotFound(err.msg))
}
"ValidationException" => return RusotoError::Validation(err.msg),
_ => {}
}
}
return RusotoError::Unknown(res);
}
}
impl fmt::Display for DeleteClusterError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "{}", self.description())
}
}
impl Error for DeleteClusterError {
fn description(&self) -> &str {
match *self {
DeleteClusterError::BadRequest(ref cause) => cause,
DeleteClusterError::Forbidden(ref cause) => cause,
DeleteClusterError::InternalServerError(ref cause) => cause,
DeleteClusterError::NotFound(ref cause) => cause,
}
}
}
#[derive(Debug, PartialEq)]
pub enum DescribeClusterError {
BadRequest(String),
Forbidden(String),
InternalServerError(String),
NotFound(String),
Unauthorized(String),
}
impl DescribeClusterError {
pub fn from_response(res: BufferedHttpResponse) -> RusotoError<DescribeClusterError> {
if let Some(err) = proto::json::Error::parse_rest(&res) {
match err.typ.as_str() {
"BadRequestException" => {
return RusotoError::Service(DescribeClusterError::BadRequest(err.msg))
}
"ForbiddenException" => {
return RusotoError::Service(DescribeClusterError::Forbidden(err.msg))
}
"InternalServerErrorException" => {
return RusotoError::Service(DescribeClusterError::InternalServerError(err.msg))
}
"NotFoundException" => {
return RusotoError::Service(DescribeClusterError::NotFound(err.msg))
}
"UnauthorizedException" => {
return RusotoError::Service(DescribeClusterError::Unauthorized(err.msg))
}
"ValidationException" => return RusotoError::Validation(err.msg),
_ => {}
}
}
return RusotoError::Unknown(res);
}
}
impl fmt::Display for DescribeClusterError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "{}", self.description())
}
}
impl Error for DescribeClusterError {
fn description(&self) -> &str {
match *self {
DescribeClusterError::BadRequest(ref cause) => cause,
DescribeClusterError::Forbidden(ref cause) => cause,
DescribeClusterError::InternalServerError(ref cause) => cause,
DescribeClusterError::NotFound(ref cause) => cause,
DescribeClusterError::Unauthorized(ref cause) => cause,
}
}
}
#[derive(Debug, PartialEq)]
pub enum GetBootstrapBrokersError {
BadRequest(String),
Conflict(String),
Forbidden(String),
InternalServerError(String),
Unauthorized(String),
}
impl GetBootstrapBrokersError {
pub fn from_response(res: BufferedHttpResponse) -> RusotoError<GetBootstrapBrokersError> {
if let Some(err) = proto::json::Error::parse_rest(&res) {
match err.typ.as_str() {
"BadRequestException" => {
return RusotoError::Service(GetBootstrapBrokersError::BadRequest(err.msg))
}
"ConflictException" => {
return RusotoError::Service(GetBootstrapBrokersError::Conflict(err.msg))
}
"ForbiddenException" => {
return RusotoError::Service(GetBootstrapBrokersError::Forbidden(err.msg))
}
"InternalServerErrorException" => {
return RusotoError::Service(GetBootstrapBrokersError::InternalServerError(
err.msg,
))
}
"UnauthorizedException" => {
return RusotoError::Service(GetBootstrapBrokersError::Unauthorized(err.msg))
}
"ValidationException" => return RusotoError::Validation(err.msg),
_ => {}
}
}
return RusotoError::Unknown(res);
}
}
impl fmt::Display for GetBootstrapBrokersError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "{}", self.description())
}
}
impl Error for GetBootstrapBrokersError {
fn description(&self) -> &str {
match *self {
GetBootstrapBrokersError::BadRequest(ref cause) => cause,
GetBootstrapBrokersError::Conflict(ref cause) => cause,
GetBootstrapBrokersError::Forbidden(ref cause) => cause,
GetBootstrapBrokersError::InternalServerError(ref cause) => cause,
GetBootstrapBrokersError::Unauthorized(ref cause) => cause,
}
}
}
#[derive(Debug, PartialEq)]
pub enum ListClustersError {
BadRequest(String),
Forbidden(String),
InternalServerError(String),
Unauthorized(String),
}
impl ListClustersError {
pub fn from_response(res: BufferedHttpResponse) -> RusotoError<ListClustersError> {
if let Some(err) = proto::json::Error::parse_rest(&res) {
match err.typ.as_str() {
"BadRequestException" => {
return RusotoError::Service(ListClustersError::BadRequest(err.msg))
}
"ForbiddenException" => {
return RusotoError::Service(ListClustersError::Forbidden(err.msg))
}
"InternalServerErrorException" => {
return RusotoError::Service(ListClustersError::InternalServerError(err.msg))
}
"UnauthorizedException" => {
return RusotoError::Service(ListClustersError::Unauthorized(err.msg))
}
"ValidationException" => return RusotoError::Validation(err.msg),
_ => {}
}
}
return RusotoError::Unknown(res);
}
}
impl fmt::Display for ListClustersError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "{}", self.description())
}
}
impl Error for ListClustersError {
fn description(&self) -> &str {
match *self {
ListClustersError::BadRequest(ref cause) => cause,
ListClustersError::Forbidden(ref cause) => cause,
ListClustersError::InternalServerError(ref cause) => cause,
ListClustersError::Unauthorized(ref cause) => cause,
}
}
}
#[derive(Debug, PartialEq)]
pub enum ListNodesError {
BadRequest(String),
Forbidden(String),
InternalServerError(String),
NotFound(String),
}
impl ListNodesError {
pub fn from_response(res: BufferedHttpResponse) -> RusotoError<ListNodesError> {
if let Some(err) = proto::json::Error::parse_rest(&res) {
match err.typ.as_str() {
"BadRequestException" => {
return RusotoError::Service(ListNodesError::BadRequest(err.msg))
}
"ForbiddenException" => {
return RusotoError::Service(ListNodesError::Forbidden(err.msg))
}
"InternalServerErrorException" => {
return RusotoError::Service(ListNodesError::InternalServerError(err.msg))
}
"NotFoundException" => {
return RusotoError::Service(ListNodesError::NotFound(err.msg))
}
"ValidationException" => return RusotoError::Validation(err.msg),
_ => {}
}
}
return RusotoError::Unknown(res);
}
}
impl fmt::Display for ListNodesError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "{}", self.description())
}
}
impl Error for ListNodesError {
fn description(&self) -> &str {
match *self {
ListNodesError::BadRequest(ref cause) => cause,
ListNodesError::Forbidden(ref cause) => cause,
ListNodesError::InternalServerError(ref cause) => cause,
ListNodesError::NotFound(ref cause) => cause,
}
}
}
pub trait Kafka {
fn create_cluster(
&self,
input: CreateClusterRequest,
) -> RusotoFuture<CreateClusterResponse, CreateClusterError>;
fn delete_cluster(
&self,
input: DeleteClusterRequest,
) -> RusotoFuture<DeleteClusterResponse, DeleteClusterError>;
fn describe_cluster(
&self,
input: DescribeClusterRequest,
) -> RusotoFuture<DescribeClusterResponse, DescribeClusterError>;
fn get_bootstrap_brokers(
&self,
input: GetBootstrapBrokersRequest,
) -> RusotoFuture<GetBootstrapBrokersResponse, GetBootstrapBrokersError>;
fn list_clusters(
&self,
input: ListClustersRequest,
) -> RusotoFuture<ListClustersResponse, ListClustersError>;
fn list_nodes(
&self,
input: ListNodesRequest,
) -> RusotoFuture<ListNodesResponse, ListNodesError>;
}
#[derive(Clone)]
pub struct KafkaClient {
client: Client,
region: region::Region,
}
impl KafkaClient {
pub fn new(region: region::Region) -> KafkaClient {
KafkaClient {
client: Client::shared(),
region: region,
}
}
pub fn new_with<P, D>(
request_dispatcher: D,
credentials_provider: P,
region: region::Region,
) -> KafkaClient
where
P: ProvideAwsCredentials + Send + Sync + 'static,
P::Future: Send,
D: DispatchSignedRequest + Send + Sync + 'static,
D::Future: Send,
{
KafkaClient {
client: Client::new_with(credentials_provider, request_dispatcher),
region: region,
}
}
}
impl Kafka for KafkaClient {
fn create_cluster(
&self,
input: CreateClusterRequest,
) -> RusotoFuture<CreateClusterResponse, CreateClusterError> {
let request_uri = "/v1/clusters";
let mut request = SignedRequest::new("POST", "kafka", &self.region, &request_uri);
request.set_content_type("application/x-amz-json-1.1".to_owned());
let encoded = Some(serde_json::to_vec(&input).unwrap());
request.set_payload(encoded);
self.client.sign_and_dispatch(request, |response| {
if response.status.as_u16() == 200 {
Box::new(response.buffer().from_err().and_then(|response| {
let result = proto::json::ResponsePayload::new(&response)
.deserialize::<CreateClusterResponse, _>()?;
Ok(result)
}))
} else {
Box::new(
response
.buffer()
.from_err()
.and_then(|response| Err(CreateClusterError::from_response(response))),
)
}
})
}
fn delete_cluster(
&self,
input: DeleteClusterRequest,
) -> RusotoFuture<DeleteClusterResponse, DeleteClusterError> {
let request_uri = format!(
"/v1/clusters/{cluster_arn}",
cluster_arn = input.cluster_arn
);
let mut request = SignedRequest::new("DELETE", "kafka", &self.region, &request_uri);
request.set_content_type("application/x-amz-json-1.1".to_owned());
let mut params = Params::new();
if let Some(ref x) = input.current_version {
params.put("currentVersion", x);
}
request.set_params(params);
self.client.sign_and_dispatch(request, |response| {
if response.status.as_u16() == 200 {
Box::new(response.buffer().from_err().and_then(|response| {
let result = proto::json::ResponsePayload::new(&response)
.deserialize::<DeleteClusterResponse, _>()?;
Ok(result)
}))
} else {
Box::new(
response
.buffer()
.from_err()
.and_then(|response| Err(DeleteClusterError::from_response(response))),
)
}
})
}
fn describe_cluster(
&self,
input: DescribeClusterRequest,
) -> RusotoFuture<DescribeClusterResponse, DescribeClusterError> {
let request_uri = format!(
"/v1/clusters/{cluster_arn}",
cluster_arn = input.cluster_arn
);
let mut request = SignedRequest::new("GET", "kafka", &self.region, &request_uri);
request.set_content_type("application/x-amz-json-1.1".to_owned());
self.client.sign_and_dispatch(request, |response| {
if response.status.as_u16() == 200 {
Box::new(response.buffer().from_err().and_then(|response| {
let result = proto::json::ResponsePayload::new(&response)
.deserialize::<DescribeClusterResponse, _>()?;
Ok(result)
}))
} else {
Box::new(
response
.buffer()
.from_err()
.and_then(|response| Err(DescribeClusterError::from_response(response))),
)
}
})
}
fn get_bootstrap_brokers(
&self,
input: GetBootstrapBrokersRequest,
) -> RusotoFuture<GetBootstrapBrokersResponse, GetBootstrapBrokersError> {
let request_uri = format!(
"/v1/clusters/{cluster_arn}/bootstrap-brokers",
cluster_arn = input.cluster_arn
);
let mut request = SignedRequest::new("GET", "kafka", &self.region, &request_uri);
request.set_content_type("application/x-amz-json-1.1".to_owned());
self.client.sign_and_dispatch(request, |response| {
if response.status.as_u16() == 200 {
Box::new(response.buffer().from_err().and_then(|response| {
let result = proto::json::ResponsePayload::new(&response)
.deserialize::<GetBootstrapBrokersResponse, _>()?;
Ok(result)
}))
} else {
Box::new(
response.buffer().from_err().and_then(|response| {
Err(GetBootstrapBrokersError::from_response(response))
}),
)
}
})
}
fn list_clusters(
&self,
input: ListClustersRequest,
) -> RusotoFuture<ListClustersResponse, ListClustersError> {
let request_uri = "/v1/clusters";
let mut request = SignedRequest::new("GET", "kafka", &self.region, &request_uri);
request.set_content_type("application/x-amz-json-1.1".to_owned());
let mut params = Params::new();
if let Some(ref x) = input.cluster_name_filter {
params.put("clusterNameFilter", x);
}
if let Some(ref x) = input.max_results {
params.put("maxResults", x);
}
if let Some(ref x) = input.next_token {
params.put("nextToken", x);
}
request.set_params(params);
self.client.sign_and_dispatch(request, |response| {
if response.status.as_u16() == 200 {
Box::new(response.buffer().from_err().and_then(|response| {
let result = proto::json::ResponsePayload::new(&response)
.deserialize::<ListClustersResponse, _>()?;
Ok(result)
}))
} else {
Box::new(
response
.buffer()
.from_err()
.and_then(|response| Err(ListClustersError::from_response(response))),
)
}
})
}
fn list_nodes(
&self,
input: ListNodesRequest,
) -> RusotoFuture<ListNodesResponse, ListNodesError> {
let request_uri = format!(
"/v1/clusters/{cluster_arn}/nodes",
cluster_arn = input.cluster_arn
);
let mut request = SignedRequest::new("GET", "kafka", &self.region, &request_uri);
request.set_content_type("application/x-amz-json-1.1".to_owned());
let mut params = Params::new();
if let Some(ref x) = input.max_results {
params.put("maxResults", x);
}
if let Some(ref x) = input.next_token {
params.put("nextToken", x);
}
request.set_params(params);
self.client.sign_and_dispatch(request, |response| {
if response.status.as_u16() == 200 {
Box::new(response.buffer().from_err().and_then(|response| {
let result = proto::json::ResponsePayload::new(&response)
.deserialize::<ListNodesResponse, _>()?;
Ok(result)
}))
} else {
Box::new(
response
.buffer()
.from_err()
.and_then(|response| Err(ListNodesError::from_response(response))),
)
}
})
}
}
#[cfg(test)]
mod protocol_tests {}