use awsim_core::{AwsError, RequestContext};
use serde_json::{Value, json};
use tracing::info;
use uuid::Uuid;
use crate::state::{InsightsQuery, LogsState, MetricFilter, QueryDefinition, SubscriptionFilter};
fn now_millis() -> u64 {
crate::state::now_millis()
}
fn new_id() -> String {
Uuid::new_v4().to_string()
}
fn require_log_group<'a>(
state: &'a LogsState,
name: &str,
) -> Result<dashmap::mapref::one::Ref<'a, String, crate::state::LogGroup>, AwsError> {
state.log_groups.get(name).ok_or_else(|| {
AwsError::not_found(
"ResourceNotFoundException",
format!("Log group not found: {name}"),
)
})
}
fn log_group_from_arn(arn: &str) -> &str {
if let Some(rest) = arn.strip_prefix("arn:aws:logs:") {
if let Some(pos) = rest.find(":log-group:") {
return &rest[pos + ":log-group:".len()..];
}
}
arn
}
pub fn tag_resource(
state: &LogsState,
input: &Value,
_ctx: &RequestContext,
) -> Result<Value, AwsError> {
let resource_arn = input["resourceArn"].as_str().ok_or_else(|| {
AwsError::bad_request("InvalidParameterException", "resourceArn is required")
})?;
let tags = input["tags"]
.as_object()
.ok_or_else(|| AwsError::bad_request("InvalidParameterException", "tags is required"))?;
let name = log_group_from_arn(resource_arn);
let mut group = state.log_groups.get_mut(name).ok_or_else(|| {
AwsError::not_found(
"ResourceNotFoundException",
format!("Log group not found: {name}"),
)
})?;
for (k, v) in tags {
if let Some(s) = v.as_str() {
group.tags.insert(k.clone(), s.to_string());
}
}
Ok(json!({}))
}
pub fn untag_resource(
state: &LogsState,
input: &Value,
_ctx: &RequestContext,
) -> Result<Value, AwsError> {
let resource_arn = input["resourceArn"].as_str().ok_or_else(|| {
AwsError::bad_request("InvalidParameterException", "resourceArn is required")
})?;
let tag_keys = input["tagKeys"]
.as_array()
.ok_or_else(|| AwsError::bad_request("InvalidParameterException", "tagKeys is required"))?;
let name = log_group_from_arn(resource_arn);
let mut group = state.log_groups.get_mut(name).ok_or_else(|| {
AwsError::not_found(
"ResourceNotFoundException",
format!("Log group not found: {name}"),
)
})?;
for key in tag_keys {
if let Some(k) = key.as_str() {
group.tags.remove(k);
}
}
Ok(json!({}))
}
pub fn list_tags_for_resource(
state: &LogsState,
input: &Value,
_ctx: &RequestContext,
) -> Result<Value, AwsError> {
let resource_arn = input["resourceArn"].as_str().ok_or_else(|| {
AwsError::bad_request("InvalidParameterException", "resourceArn is required")
})?;
let name = log_group_from_arn(resource_arn);
let group = state.log_groups.get(name).ok_or_else(|| {
AwsError::not_found(
"ResourceNotFoundException",
format!("Log group not found: {name}"),
)
})?;
let tags: serde_json::Map<String, Value> = group
.tags
.iter()
.map(|(k, v)| (k.clone(), Value::String(v.clone())))
.collect();
Ok(json!({ "tags": tags }))
}
pub fn put_subscription_filter(
state: &LogsState,
input: &Value,
_ctx: &RequestContext,
) -> Result<Value, AwsError> {
let log_group_name = input["logGroupName"].as_str().ok_or_else(|| {
AwsError::bad_request("InvalidParameterException", "logGroupName is required")
})?;
let filter_name = input["filterName"].as_str().ok_or_else(|| {
AwsError::bad_request("InvalidParameterException", "filterName is required")
})?;
let filter_pattern = input["filterPattern"].as_str().unwrap_or("").to_string();
let destination_arn = input["destinationArn"].as_str().ok_or_else(|| {
AwsError::bad_request("InvalidParameterException", "destinationArn is required")
})?;
require_log_group(state, log_group_name)?;
let filter = SubscriptionFilter {
filter_name: filter_name.to_string(),
log_group_name: log_group_name.to_string(),
filter_pattern,
destination_arn: destination_arn.to_string(),
creation_time: now_millis(),
};
info!(
log_group = log_group_name,
filter_name, "PutSubscriptionFilter"
);
state.subscription_filters.insert(
(log_group_name.to_string(), filter_name.to_string()),
filter,
);
Ok(json!({}))
}
pub fn describe_subscription_filters(
state: &LogsState,
input: &Value,
_ctx: &RequestContext,
) -> Result<Value, AwsError> {
let log_group_name = input["logGroupName"].as_str().ok_or_else(|| {
AwsError::bad_request("InvalidParameterException", "logGroupName is required")
})?;
let filter_name_prefix = input["filterNamePrefix"].as_str().unwrap_or("");
let limit = input["limit"].as_u64().unwrap_or(50) as usize;
let mut filters: Vec<Value> = state
.subscription_filters
.iter()
.filter(|e| e.key().0 == log_group_name && e.filter_name.starts_with(filter_name_prefix))
.map(|e| {
json!({
"filterName": e.filter_name,
"logGroupName": e.log_group_name,
"filterPattern": e.filter_pattern,
"destinationArn": e.destination_arn,
"creationTime": e.creation_time,
})
})
.take(limit)
.collect();
filters.sort_by(|a, b| {
a["filterName"]
.as_str()
.unwrap_or("")
.cmp(b["filterName"].as_str().unwrap_or(""))
});
Ok(json!({ "subscriptionFilters": filters }))
}
pub fn delete_subscription_filter(
state: &LogsState,
input: &Value,
_ctx: &RequestContext,
) -> Result<Value, AwsError> {
let log_group_name = input["logGroupName"].as_str().ok_or_else(|| {
AwsError::bad_request("InvalidParameterException", "logGroupName is required")
})?;
let filter_name = input["filterName"].as_str().ok_or_else(|| {
AwsError::bad_request("InvalidParameterException", "filterName is required")
})?;
state
.subscription_filters
.remove(&(log_group_name.to_string(), filter_name.to_string()))
.ok_or_else(|| {
AwsError::not_found(
"ResourceNotFoundException",
format!(
"Subscription filter {filter_name} not found for log group {log_group_name}"
),
)
})?;
Ok(json!({}))
}
pub fn put_metric_filter(
state: &LogsState,
input: &Value,
_ctx: &RequestContext,
) -> Result<Value, AwsError> {
let log_group_name = input["logGroupName"].as_str().ok_or_else(|| {
AwsError::bad_request("InvalidParameterException", "logGroupName is required")
})?;
let filter_name = input["filterName"].as_str().ok_or_else(|| {
AwsError::bad_request("InvalidParameterException", "filterName is required")
})?;
let filter_pattern = input["filterPattern"].as_str().unwrap_or("").to_string();
let metric_transformations = input["metricTransformations"]
.as_array()
.cloned()
.unwrap_or_default();
require_log_group(state, log_group_name)?;
let filter = MetricFilter {
filter_name: filter_name.to_string(),
log_group_name: log_group_name.to_string(),
filter_pattern,
metric_transformations,
creation_time: now_millis(),
};
info!(log_group = log_group_name, filter_name, "PutMetricFilter");
state.metric_filters.insert(
(log_group_name.to_string(), filter_name.to_string()),
filter,
);
Ok(json!({}))
}
pub fn describe_metric_filters(
state: &LogsState,
input: &Value,
_ctx: &RequestContext,
) -> Result<Value, AwsError> {
let log_group_name = input["logGroupName"].as_str();
let filter_name_prefix = input["filterNamePrefix"].as_str().unwrap_or("");
let limit = input["limit"].as_u64().unwrap_or(50) as usize;
let mut filters: Vec<Value> = state
.metric_filters
.iter()
.filter(|e| {
log_group_name.is_none_or(|lg| e.key().0 == lg)
&& e.filter_name.starts_with(filter_name_prefix)
})
.map(|e| {
json!({
"filterName": e.filter_name,
"logGroupName": e.log_group_name,
"filterPattern": e.filter_pattern,
"metricTransformations": e.metric_transformations,
"creationTime": e.creation_time,
})
})
.take(limit)
.collect();
filters.sort_by(|a, b| {
a["filterName"]
.as_str()
.unwrap_or("")
.cmp(b["filterName"].as_str().unwrap_or(""))
});
Ok(json!({ "metricFilters": filters }))
}
pub fn delete_metric_filter(
state: &LogsState,
input: &Value,
_ctx: &RequestContext,
) -> Result<Value, AwsError> {
let log_group_name = input["logGroupName"].as_str().ok_or_else(|| {
AwsError::bad_request("InvalidParameterException", "logGroupName is required")
})?;
let filter_name = input["filterName"].as_str().ok_or_else(|| {
AwsError::bad_request("InvalidParameterException", "filterName is required")
})?;
state
.metric_filters
.remove(&(log_group_name.to_string(), filter_name.to_string()))
.ok_or_else(|| {
AwsError::not_found(
"ResourceNotFoundException",
format!("Metric filter {filter_name} not found for log group {log_group_name}"),
)
})?;
Ok(json!({}))
}
pub fn put_query_definition(
state: &LogsState,
input: &Value,
_ctx: &RequestContext,
) -> Result<Value, AwsError> {
let name = input["name"]
.as_str()
.ok_or_else(|| AwsError::bad_request("InvalidParameterException", "name is required"))?;
let query_string = input["queryString"].as_str().ok_or_else(|| {
AwsError::bad_request("InvalidParameterException", "queryString is required")
})?;
let log_group_names: Vec<String> = input["logGroupNames"]
.as_array()
.unwrap_or(&vec![])
.iter()
.filter_map(|v| v.as_str().map(|s| s.to_string()))
.collect();
let query_definition_id = if let Some(existing_id) = input["queryDefinitionId"].as_str() {
existing_id.to_string()
} else {
new_id()
};
let def = QueryDefinition {
query_definition_id: query_definition_id.clone(),
name: name.to_string(),
query_string: query_string.to_string(),
log_group_names,
};
state
.query_definitions
.insert(query_definition_id.clone(), def);
Ok(json!({ "queryDefinitionId": query_definition_id }))
}
pub fn describe_query_definitions(
state: &LogsState,
input: &Value,
_ctx: &RequestContext,
) -> Result<Value, AwsError> {
let name_prefix = input["queryDefinitionNamePrefix"].as_str().unwrap_or("");
let max_results = input["maxResults"].as_u64().unwrap_or(50) as usize;
let mut defs: Vec<Value> = state
.query_definitions
.iter()
.filter(|e| e.name.starts_with(name_prefix))
.map(|e| {
json!({
"queryDefinitionId": e.query_definition_id,
"name": e.name,
"queryString": e.query_string,
"logGroupNames": e.log_group_names,
})
})
.take(max_results)
.collect();
defs.sort_by(|a, b| {
a["name"]
.as_str()
.unwrap_or("")
.cmp(b["name"].as_str().unwrap_or(""))
});
Ok(json!({ "queryDefinitions": defs }))
}
pub fn delete_query_definition(
state: &LogsState,
input: &Value,
_ctx: &RequestContext,
) -> Result<Value, AwsError> {
let query_definition_id = input["queryDefinitionId"].as_str().ok_or_else(|| {
AwsError::bad_request("InvalidParameterException", "queryDefinitionId is required")
})?;
let existed = state
.query_definitions
.remove(query_definition_id)
.is_some();
Ok(json!({ "success": existed }))
}
pub fn start_query(
state: &LogsState,
_input: &Value,
_ctx: &RequestContext,
) -> Result<Value, AwsError> {
let query_id = new_id();
let query = InsightsQuery {
query_id: query_id.clone(),
status: "Complete".to_string(),
};
state.insights_queries.insert(query_id.clone(), query);
Ok(json!({ "queryId": query_id }))
}
pub fn get_query_results(
state: &LogsState,
input: &Value,
_ctx: &RequestContext,
) -> Result<Value, AwsError> {
let query_id = input["queryId"]
.as_str()
.ok_or_else(|| AwsError::bad_request("InvalidParameterException", "queryId is required"))?;
let query = state.insights_queries.get(query_id).ok_or_else(|| {
AwsError::not_found(
"ResourceNotFoundException",
format!("Query {query_id} not found"),
)
})?;
Ok(json!({
"queryId": query.query_id,
"status": query.status,
"results": [],
"statistics": {
"recordsMatched": 0.0,
"recordsScanned": 0.0,
"bytesScanned": 0.0,
},
}))
}
pub fn stop_query(
state: &LogsState,
input: &Value,
_ctx: &RequestContext,
) -> Result<Value, AwsError> {
let query_id = input["queryId"]
.as_str()
.ok_or_else(|| AwsError::bad_request("InvalidParameterException", "queryId is required"))?;
if let Some(mut q) = state.insights_queries.get_mut(query_id) {
q.status = "Cancelled".to_string();
}
Ok(json!({ "success": true }))
}