#![allow(clippy::too_many_arguments)]
use chrono::Utc;
use http::StatusCode;
use serde_json::{json, Value};
use fakecloud_core::service::{AwsRequest, AwsResponse, AwsServiceError};
use super::*;
impl LambdaService {
pub(super) fn create_event_source_mapping(
&self,
req: &AwsRequest,
) -> Result<AwsResponse, AwsServiceError> {
let body: Value = serde_json::from_slice(&req.body).unwrap_or_default();
let event_source_arn = body["EventSourceArn"]
.as_str()
.ok_or_else(|| {
AwsServiceError::aws_error(
StatusCode::BAD_REQUEST,
"InvalidParameterValueException",
"EventSourceArn is required",
)
})?
.to_string();
let function_name = body["FunctionName"]
.as_str()
.ok_or_else(|| {
AwsServiceError::aws_error(
StatusCode::BAD_REQUEST,
"InvalidParameterValueException",
"FunctionName is required",
)
})?
.to_string();
let mut accounts = self.state.write();
let state = accounts.get_or_create(&req.account_id);
let function_arn = if function_name.starts_with("arn:") {
function_name.clone()
} else {
let func = state.functions.get(&function_name).ok_or_else(|| {
AwsServiceError::aws_error(
StatusCode::NOT_FOUND,
"ResourceNotFoundException",
format!(
"Function not found: arn:aws:lambda:{}:{}:function:{}",
state.region, state.account_id, function_name
),
)
})?;
func.function_arn.clone()
};
let batch_size = body["BatchSize"].as_i64().unwrap_or(10);
let enabled = body["Enabled"].as_bool().unwrap_or(true);
let mapping_uuid = uuid::Uuid::new_v4().to_string();
let now = Utc::now();
let filter_patterns: Vec<String> = match body.get("FilterCriteria") {
None | Some(Value::Null) => Vec::new(),
Some(Value::Object(_)) => {
match body.get("FilterCriteria").and_then(|v| v.get("Filters")) {
None => Vec::new(),
Some(Value::Array(arr)) => {
let mut out = Vec::with_capacity(arr.len());
for f in arr {
match f.get("Pattern") {
Some(Value::String(s)) => out.push(s.clone()),
_ => {
return Err(AwsServiceError::aws_error(
StatusCode::BAD_REQUEST,
"InvalidParameterValueException",
"FilterCriteria.Filters[].Pattern must be a string",
));
}
}
}
out
}
Some(_) => {
return Err(AwsServiceError::aws_error(
StatusCode::BAD_REQUEST,
"InvalidParameterValueException",
"FilterCriteria.Filters must be an array",
));
}
}
}
Some(_) => {
return Err(AwsServiceError::aws_error(
StatusCode::BAD_REQUEST,
"InvalidParameterValueException",
"FilterCriteria must be an object",
));
}
};
if let Err(err) = crate::filter::FilterSet::validate(filter_patterns.iter()) {
return Err(AwsServiceError::aws_error(
StatusCode::BAD_REQUEST,
"InvalidParameterValueException",
err,
));
}
let function_response_types: Vec<String> = body
.get("FunctionResponseTypes")
.and_then(|v| v.as_array())
.map(|arr| {
arr.iter()
.filter_map(|v| v.as_str().map(String::from))
.collect()
})
.unwrap_or_default();
let starting_position = body
.get("StartingPosition")
.and_then(|v| v.as_str())
.map(String::from);
let starting_position_timestamp = body
.get("StartingPositionTimestamp")
.and_then(|v| v.as_f64());
let parallelization_factor = body.get("ParallelizationFactor").and_then(|v| v.as_i64());
let maximum_batching_window_in_seconds = body
.get("MaximumBatchingWindowInSeconds")
.and_then(|v| v.as_i64());
let kms_key_arn = body
.get("KMSKeyArn")
.and_then(|v| v.as_str())
.map(String::from);
let metrics_config = body.get("MetricsConfig").cloned();
let destination_config = body.get("DestinationConfig").cloned();
let maximum_retry_attempts = body.get("MaximumRetryAttempts").and_then(|v| v.as_i64());
let maximum_record_age_in_seconds = body
.get("MaximumRecordAgeInSeconds")
.and_then(|v| v.as_i64());
let bisect_batch_on_function_error = body
.get("BisectBatchOnFunctionError")
.and_then(|v| v.as_bool());
let tumbling_window_in_seconds =
body.get("TumblingWindowInSeconds").and_then(|v| v.as_i64());
let topics: Vec<String> = body
.get("Topics")
.and_then(|v| v.as_array())
.map(|arr| {
arr.iter()
.filter_map(|v| v.as_str().map(String::from))
.collect()
})
.unwrap_or_default();
let queues: Vec<String> = body
.get("Queues")
.and_then(|v| v.as_array())
.map(|arr| {
arr.iter()
.filter_map(|v| v.as_str().map(String::from))
.collect()
})
.unwrap_or_default();
let source_access_configurations: Vec<Value> = body
.get("SourceAccessConfigurations")
.and_then(|v| v.as_array())
.cloned()
.unwrap_or_default();
let mapping = EventSourceMapping {
uuid: mapping_uuid.clone(),
function_arn: function_arn.clone(),
event_source_arn: event_source_arn.clone(),
batch_size,
enabled,
state: if enabled {
"Enabled".to_string()
} else {
"Disabled".to_string()
},
last_modified: now,
filter_patterns,
maximum_batching_window_in_seconds,
starting_position,
starting_position_timestamp,
parallelization_factor,
function_response_types,
kms_key_arn,
metrics_config,
destination_config,
maximum_retry_attempts,
maximum_record_age_in_seconds,
bisect_batch_on_function_error,
tumbling_window_in_seconds,
topics,
queues,
source_access_configurations,
};
let response = self.event_source_mapping_json(&mapping);
state.event_source_mappings.insert(mapping_uuid, mapping);
Ok(AwsResponse::json(
StatusCode::ACCEPTED,
response.to_string(),
))
}
pub(super) fn list_event_source_mappings(
&self,
account_id: &str,
) -> Result<AwsResponse, AwsServiceError> {
let accounts = self.state.read();
let empty = LambdaState::new(account_id, "");
let state = accounts.get(account_id).unwrap_or(&empty);
let mappings: Vec<Value> = state
.event_source_mappings
.values()
.map(|m| self.event_source_mapping_json(m))
.collect();
let response = json!({
"EventSourceMappings": mappings,
});
Ok(AwsResponse::json(StatusCode::OK, response.to_string()))
}
pub(super) fn get_event_source_mapping(
&self,
uuid: &str,
account_id: &str,
) -> Result<AwsResponse, AwsServiceError> {
let accounts = self.state.read();
let empty = LambdaState::new(account_id, "");
let state = accounts.get(account_id).unwrap_or(&empty);
let mapping = state.event_source_mappings.get(uuid).ok_or_else(|| {
AwsServiceError::aws_error(
StatusCode::NOT_FOUND,
"ResourceNotFoundException",
format!("The resource you requested does not exist. (Service: Lambda, Status Code: 404, Request ID: {uuid})"),
)
})?;
let response = self.event_source_mapping_json(mapping);
Ok(AwsResponse::json(StatusCode::OK, response.to_string()))
}
pub(super) fn delete_event_source_mapping(
&self,
uuid: &str,
account_id: &str,
) -> Result<AwsResponse, AwsServiceError> {
let mut accounts = self.state.write();
let state = accounts.get_or_create(account_id);
let mapping = state.event_source_mappings.remove(uuid).ok_or_else(|| {
AwsServiceError::aws_error(
StatusCode::NOT_FOUND,
"ResourceNotFoundException",
format!("The resource you requested does not exist. (Service: Lambda, Status Code: 404, Request ID: {uuid})"),
)
})?;
let mut response = self.event_source_mapping_json(&mapping);
response["State"] = json!("Deleting");
Ok(AwsResponse::json(
StatusCode::ACCEPTED,
response.to_string(),
))
}
pub(crate) fn event_source_mapping_json(&self, mapping: &EventSourceMapping) -> Value {
let esm_arn = format!(
"{}:event-source-mapping/{}",
mapping.function_arn, mapping.uuid
);
let mut out = json!({
"UUID": mapping.uuid,
"FunctionArn": mapping.function_arn,
"EventSourceArn": mapping.event_source_arn,
"BatchSize": mapping.batch_size,
"State": mapping.state,
"LastModified": mapping.last_modified.timestamp_millis() as f64 / 1000.0,
"EventSourceMappingArn": esm_arn,
"MaximumRetryAttempts": mapping.maximum_retry_attempts.unwrap_or(-1),
"MaximumRecordAgeInSeconds": mapping.maximum_record_age_in_seconds.unwrap_or(-1),
"BisectBatchOnFunctionError": mapping.bisect_batch_on_function_error.unwrap_or(false),
"TumblingWindowInSeconds": mapping.tumbling_window_in_seconds.unwrap_or(0),
"Topics": mapping.topics,
"Queues": mapping.queues,
"SourceAccessConfigurations": mapping.source_access_configurations,
"LastProcessingResult": "No records processed",
"StateTransitionReason": "User action",
});
let obj = out.as_object_mut().expect("json! built object");
if !mapping.filter_patterns.is_empty() {
obj.insert(
"FilterCriteria".into(),
json!({
"Filters": mapping.filter_patterns.iter().map(|p| json!({"Pattern": p})).collect::<Vec<_>>(),
}),
);
}
if !mapping.function_response_types.is_empty() {
obj.insert(
"FunctionResponseTypes".into(),
json!(mapping.function_response_types),
);
}
if let Some(sp) = &mapping.starting_position {
obj.insert("StartingPosition".into(), json!(sp));
}
if let Some(ts) = mapping.starting_position_timestamp {
obj.insert("StartingPositionTimestamp".into(), json!(ts));
}
if let Some(pf) = mapping.parallelization_factor {
obj.insert("ParallelizationFactor".into(), json!(pf));
}
if let Some(w) = mapping.maximum_batching_window_in_seconds {
obj.insert("MaximumBatchingWindowInSeconds".into(), json!(w));
}
if let Some(ref kms) = mapping.kms_key_arn {
obj.insert("KMSKeyArn".into(), json!(kms));
}
if let Some(ref mc) = mapping.metrics_config {
obj.insert("MetricsConfig".into(), mc.clone());
}
if let Some(ref dc) = mapping.destination_config {
obj.insert("DestinationConfig".into(), dc.clone());
}
out
}
}