#![allow(clippy::too_many_arguments)]
use chrono::Utc;
use http::StatusCode;
use serde_json::{json, Value};
use fakecloud_core::pagination::paginate;
use fakecloud_core::service::{AwsRequest, AwsResponse, AwsServiceError};
use super::*;
impl EventBridgeService {
pub(super) fn create_partner_event_source(
&self,
req: &AwsRequest,
) -> Result<AwsResponse, AwsServiceError> {
let body = req.json_body();
validate_required("Name", &body["Name"])?;
let name = body["Name"]
.as_str()
.ok_or_else(|| missing("Name"))?
.to_string();
validate_string_length("name", &name, 1, 256)?;
validate_required("Account", &body["Account"])?;
let account = body["Account"]
.as_str()
.ok_or_else(|| missing("Account"))?
.to_string();
validate_string_length("account", &account, 12, 12)?;
let mut accounts = self.state.write();
let state = accounts.get_or_create(&req.account_id);
if state.partner_event_sources.contains_key(&name) {
return Err(AwsServiceError::aws_error(
StatusCode::CONFLICT,
"ResourceAlreadyExistsException",
format!("Partner event source {name} already exists."),
));
}
let arn = format!(
"arn:aws:events:{}::event-source/aws.partner/{}",
state.region, name
);
let now = Utc::now();
let ps = PartnerEventSource {
name: name.clone(),
arn: arn.clone(),
account,
creation_time: now,
expiration_time: None,
state: "ACTIVE".to_string(),
};
state.partner_event_sources.insert(name.clone(), ps);
Ok(AwsResponse::ok_json(json!({ "EventSourceArn": arn })))
}
pub(super) fn delete_partner_event_source(
&self,
req: &AwsRequest,
) -> Result<AwsResponse, AwsServiceError> {
let body = req.json_body();
validate_required("Name", &body["Name"])?;
let name = body["Name"]
.as_str()
.ok_or_else(|| missing("Name"))?
.to_string();
validate_required("Account", &body["Account"])?;
let account = body["Account"]
.as_str()
.ok_or_else(|| missing("Account"))?
.to_string();
let mut accounts = self.state.write();
let state = accounts.get_or_create(&req.account_id);
match state.partner_event_sources.get(&name) {
Some(ps) if ps.account == account => {
state.partner_event_sources.remove(&name);
}
Some(_) => {
return Err(AwsServiceError::aws_error(
StatusCode::NOT_FOUND,
"ResourceNotFoundException",
format!("Partner event source {name} does not exist for account {account}."),
));
}
None => {
return Err(AwsServiceError::aws_error(
StatusCode::NOT_FOUND,
"ResourceNotFoundException",
format!("Partner event source {name} does not exist."),
));
}
}
Ok(AwsResponse::ok_json(json!({})))
}
pub(super) fn describe_partner_event_source(
&self,
req: &AwsRequest,
) -> Result<AwsResponse, AwsServiceError> {
let body = req.json_body();
validate_required("Name", &body["Name"])?;
let name = body["Name"]
.as_str()
.ok_or_else(|| missing("Name"))?
.to_string();
validate_string_length("name", &name, 1, 256)?;
let accounts = self.state.read();
let empty = EventBridgeState::new(&req.account_id, &req.region);
let state = accounts.get(&req.account_id).unwrap_or(&empty);
let ps = state.partner_event_sources.get(&name).ok_or_else(|| {
AwsServiceError::aws_error(
StatusCode::NOT_FOUND,
"ResourceNotFoundException",
format!("Partner event source {name} does not exist."),
)
})?;
Ok(AwsResponse::ok_json(json!({
"Arn": ps.arn,
"Name": ps.name,
})))
}
pub(super) fn list_partner_event_sources(
&self,
req: &AwsRequest,
) -> Result<AwsResponse, AwsServiceError> {
let body = req.json_body();
validate_required("namePrefix", &body["NamePrefix"])?;
let name_prefix = body["NamePrefix"]
.as_str()
.ok_or_else(|| missing("NamePrefix"))?;
validate_string_length("namePrefix", name_prefix, 1, 256)?;
validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
validate_optional_string_length("nextToken", body["NextToken"].as_str(), 1, 2048)?;
let limit = body["Limit"].as_i64().unwrap_or(100) as usize;
let accounts = self.state.read();
let empty = EventBridgeState::new(&req.account_id, &req.region);
let state = accounts.get(&req.account_id).unwrap_or(&empty);
let all: Vec<Value> = state
.partner_event_sources
.values()
.filter(|ps| ps.name.starts_with(name_prefix))
.map(|ps| {
json!({
"Arn": ps.arn,
"Name": ps.name,
})
})
.collect();
let (sources, next_token) = paginate(&all, body["NextToken"].as_str(), limit);
let mut resp = json!({ "PartnerEventSources": sources });
if let Some(token) = next_token {
resp["NextToken"] = json!(token);
}
Ok(AwsResponse::ok_json(resp))
}
pub(super) fn list_partner_event_source_accounts(
&self,
req: &AwsRequest,
) -> Result<AwsResponse, AwsServiceError> {
let body = req.json_body();
validate_required("EventSourceName", &body["EventSourceName"])?;
let event_source_name = body["EventSourceName"]
.as_str()
.ok_or_else(|| missing("EventSourceName"))?;
validate_string_length("eventSourceName", event_source_name, 1, 256)?;
validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
validate_optional_string_length("nextToken", body["NextToken"].as_str(), 1, 2048)?;
let accounts = self.state.read();
let empty = EventBridgeState::new(&req.account_id, &req.region);
let state = accounts.get(&req.account_id).unwrap_or(&empty);
let accounts: Vec<Value> = state
.partner_event_sources
.values()
.filter(|ps| ps.name == event_source_name)
.map(|ps| json!({ "Account": ps.account }))
.collect();
Ok(AwsResponse::ok_json(json!({
"PartnerEventSourceAccounts": accounts
})))
}
pub(super) fn activate_event_source(
&self,
req: &AwsRequest,
) -> Result<AwsResponse, AwsServiceError> {
let body = req.json_body();
validate_required("Name", &body["Name"])?;
let name = body["Name"]
.as_str()
.ok_or_else(|| missing("Name"))?
.to_string();
let mut accounts = self.state.write();
let state = accounts.get_or_create(&req.account_id);
let ps = state.partner_event_sources.get_mut(&name).ok_or_else(|| {
AwsServiceError::aws_error(
StatusCode::NOT_FOUND,
"ResourceNotFoundException",
format!("Event source {name} does not exist."),
)
})?;
ps.state = "ACTIVE".to_string();
Ok(AwsResponse::ok_json(json!({})))
}
pub(super) fn deactivate_event_source(
&self,
req: &AwsRequest,
) -> Result<AwsResponse, AwsServiceError> {
let body = req.json_body();
validate_required("Name", &body["Name"])?;
let name = body["Name"]
.as_str()
.ok_or_else(|| missing("Name"))?
.to_string();
let mut accounts = self.state.write();
let state = accounts.get_or_create(&req.account_id);
let ps = state.partner_event_sources.get_mut(&name).ok_or_else(|| {
AwsServiceError::aws_error(
StatusCode::NOT_FOUND,
"ResourceNotFoundException",
format!("Event source {name} does not exist."),
)
})?;
ps.state = "INACTIVE".to_string();
Ok(AwsResponse::ok_json(json!({})))
}
pub(super) fn describe_event_source(
&self,
req: &AwsRequest,
) -> Result<AwsResponse, AwsServiceError> {
let body = req.json_body();
validate_required("Name", &body["Name"])?;
let name = body["Name"]
.as_str()
.ok_or_else(|| missing("Name"))?
.to_string();
let accounts = self.state.read();
let empty = EventBridgeState::new(&req.account_id, &req.region);
let state = accounts.get(&req.account_id).unwrap_or(&empty);
let ps = state.partner_event_sources.get(&name).ok_or_else(|| {
AwsServiceError::aws_error(
StatusCode::NOT_FOUND,
"ResourceNotFoundException",
format!("Event source {name} does not exist."),
)
})?;
Ok(AwsResponse::ok_json(json!({
"Arn": ps.arn,
"Name": ps.name,
"CreatedBy": ps.account,
"CreationTime": ps.creation_time.timestamp() as f64,
"State": ps.state,
})))
}
pub(super) fn list_event_sources(
&self,
req: &AwsRequest,
) -> Result<AwsResponse, AwsServiceError> {
let body = req.json_body();
validate_optional_string_length("namePrefix", body["NamePrefix"].as_str(), 1, 256)?;
validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
validate_optional_string_length("nextToken", body["NextToken"].as_str(), 1, 2048)?;
let name_prefix = body["NamePrefix"].as_str();
let limit = body["Limit"].as_i64().unwrap_or(100) as usize;
let accounts = self.state.read();
let empty = EventBridgeState::new(&req.account_id, &req.region);
let state = accounts.get(&req.account_id).unwrap_or(&empty);
let all: Vec<Value> = state
.partner_event_sources
.values()
.filter(|ps| match name_prefix {
Some(prefix) => ps.name.starts_with(prefix),
None => true,
})
.map(|ps| {
json!({
"Arn": ps.arn,
"Name": ps.name,
"CreatedBy": ps.account,
"CreationTime": ps.creation_time.timestamp() as f64,
"State": ps.state,
})
})
.collect();
let (sources, next_token) = paginate(&all, body["NextToken"].as_str(), limit);
let mut resp = json!({ "EventSources": sources });
if let Some(token) = next_token {
resp["NextToken"] = json!(token);
}
Ok(AwsResponse::ok_json(resp))
}
pub(super) fn put_partner_events(
&self,
req: &AwsRequest,
) -> Result<AwsResponse, AwsServiceError> {
let body = req.json_body();
validate_required("Entries", &body["Entries"])?;
let entries = body["Entries"]
.as_array()
.ok_or_else(|| missing("Entries"))?;
let mut result_entries = Vec::new();
for _entry in entries {
let event_id = uuid::Uuid::new_v4().to_string();
result_entries.push(json!({ "EventId": event_id }));
}
Ok(AwsResponse::ok_json(json!({
"FailedEntryCount": 0,
"Entries": result_entries,
})))
}
}