use aws_config::BehaviorVersion;
use aws_sdk_lambda::{
types::{Environment, Architecture, Runtime, FunctionCode},
primitives::Blob,
Client as LambdaClient,
};
use aws_sdk_iam::Client as IamClient;
use aws_sdk_cloudwatch::Client as CloudWatchClient;
use aws_sdk_eventbridge::Client as EventBridgeClient;
use aws_sdk_cloudtrail::Client as CloudTrailClient;
use aws_sdk_s3::Client as S3Client;
use aws_sdk_sns::Client as SnsClient;
use aws_types::SdkConfig;
use std::collections::HashMap;
use std::time::SystemTime;
use anyhow::{Result, anyhow};
use tracing::{info, error, warn};
use aws_credential_types::Credentials;
use aws_types::region::Region;
async fn create_aws_config_from_env(aws_region: &str) -> Result<SdkConfig> {
dotenv::dotenv().ok();
let access_key_id = std::env::var("AWS_ACCESS_KEY_ID")
.map_err(|_| anyhow!("❌ 错误: AWS_ACCESS_KEY_ID 必须在.env文件中设置"))?;
let secret_access_key = std::env::var("AWS_SECRET_ACCESS_KEY")
.map_err(|_| anyhow!("❌ 错误: AWS_SECRET_ACCESS_KEY 必须在.env文件中设置"))?;
let credentials = Credentials::new(access_key_id, secret_access_key, None, None, "env-file");
let config = aws_config::defaults(BehaviorVersion::latest())
.region(Region::new(aws_region.to_string()))
.credentials_provider(credentials)
.load()
.await;
validate_credentials(&config).await?;
info!("✅ AWS凭证已成功从.env文件加载");
Ok(config)
}
async fn validate_credentials(config: &SdkConfig) -> Result<()> {
use aws_sdk_sts::Client as StsClient;
let sts_client = StsClient::new(config);
match sts_client.get_caller_identity().send().await {
Ok(response) => {
if let Some(account_id) = response.account() {
info!("✅ AWS凭证验证成功 - 账户ID: {}", account_id);
}
Ok(())
}
Err(e) => {
error!("❌ AWS凭证验证失败: {}", e);
Err(anyhow!("AWS凭证无效,请检查.env文件中的凭证配置"))
}
}
}
#[derive(Debug)]
pub struct AwsManager {
lambda_client: LambdaClient,
iam_client: IamClient,
cloudwatch_client: CloudWatchClient,
eventbridge_client: EventBridgeClient,
cloudtrail_client: CloudTrailClient,
s3_client: S3Client,
sns_client: SnsClient,
region: String,
sdk_config: SdkConfig,
}
impl AwsManager {
pub async fn new(aws_region: &str) -> Result<Self> {
let config = create_aws_config_from_env(aws_region).await?;
let lambda_client = LambdaClient::new(&config);
let iam_client = IamClient::new(&config);
let cloudwatch_client = CloudWatchClient::new(&config);
let eventbridge_client = EventBridgeClient::new(&config);
let cloudtrail_client = CloudTrailClient::new(&config);
let s3_client = S3Client::new(&config);
let sns_client = SnsClient::new(&config);
Ok(AwsManager {
lambda_client,
iam_client,
cloudwatch_client,
eventbridge_client,
cloudtrail_client,
s3_client,
sns_client,
region: aws_region.to_string(),
sdk_config: config,
})
}
pub async fn create_lambda_execution_role(&self) -> Result<String> {
let role_name = format!("lambda-bedrock-monitor-role-{}",
SystemTime::now().duration_since(SystemTime::UNIX_EPOCH)?.as_secs());
self.create_lambda_execution_role_with_name(&role_name).await
}
pub async fn create_lambda_execution_role_with_name(&self, role_name: &str) -> Result<String> {
let trust_policy = serde_json::json!({
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"Service": "lambda.amazonaws.com"
},
"Action": "sts:AssumeRole"
}
]
});
info!("创建IAM角色: {}", role_name);
let create_role_result = self.iam_client.create_role()
.role_name(role_name)
.assume_role_policy_document(trust_policy.to_string())
.send()
.await;
match create_role_result {
Ok(result) => {
let role_arn = result.role().unwrap().arn().to_string();
info!("IAM角色创建成功: {}", role_arn);
let managed_policy_arns = vec![
"arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole",
"arn:aws:iam::aws:policy/service-role/AWSLambdaVPCAccessExecutionRole",
];
for policy_arn in managed_policy_arns {
let _ = self.iam_client.attach_role_policy()
.role_name(role_name)
.policy_arn(policy_arn)
.send()
.await;
info!("附加策略: {}", policy_arn);
}
let cloudwatch_logs_policy = r#"{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"logs:CreateLogGroup",
"logs:CreateLogStream",
"logs:PutLogEvents",
"logs:DescribeLogGroups",
"logs:DescribeLogStreams"
],
"Resource": "arn:aws:logs:*:*:*"
}
]
}"#;
match self.iam_client.put_role_policy()
.role_name(role_name)
.policy_name("CloudWatchLogsWriteAccess")
.policy_document(cloudwatch_logs_policy.to_string())
.send()
.await {
Ok(_) => info!("CloudWatch Logs写权限策略添加成功"),
Err(e) => warn!("CloudWatch Logs权限策略添加失败: {}", e)
}
let credential_management_policy = r#"{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"iam:UpdateAccessKey",
"iam:ListAccessKeys",
"iam:GetAccessKeyLastUsed",
"iam:GetUser"
],
"Resource": "*"
}
]
}"#;
match self.iam_client.put_role_policy()
.role_name(role_name)
.policy_name("BedrockCredentialManagement")
.policy_document(credential_management_policy.to_string())
.send()
.await {
Ok(_) => info!("🔐 凭据管理权限策略添加成功"),
Err(e) => warn!("⚠️ 凭据管理权限策略添加失败: {}", e)
}
let sns_notification_policy = r#"{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"sns:Publish"
],
"Resource": "arn:aws:sns:*:*:bedrock-throttling-alerts"
}
]
}"#;
match self.iam_client.put_role_policy()
.role_name(role_name)
.policy_name("BedrockSNSNotifications")
.policy_document(sns_notification_policy.to_string())
.send()
.await {
Ok(_) => info!("📧 SNS通知权限策略添加成功"),
Err(e) => warn!("⚠️ SNS通知权限策略添加失败: {}", e)
}
tokio::time::sleep(tokio::time::Duration::from_secs(10)).await;
Ok(role_arn)
}
Err(e) => {
error!("IAM角色创建失败: {}", e);
Err(anyhow!("创建IAM角色失败: {}", e))
}
}
}
pub async fn deploy_lambda_function(&self, zip_content: Vec<u8>) -> Result<String> {
let function_name = "bedrock-monitor-function";
info!("开始部署Lambda函数: {}", function_name);
info!("ZIP包大小: {} bytes", zip_content.len());
let function_exists = self.lambda_client.get_function()
.function_name(function_name)
.send()
.await.is_ok();
let role_arn = if !function_exists {
let role_name = "lambda-bedrock-monitor-role";
match self.iam_client.get_role().role_name(role_name).send().await {
Ok(role_result) => {
info!("使用现有IAM角色: {}", role_name);
role_result.role().unwrap().arn().to_string()
}
Err(_) => {
info!("创建新IAM角色: {}", role_name);
self.create_lambda_execution_role_with_name(role_name).await?
}
}
} else {
info!("函数已存在,获取现有角色ARN");
let function_config = self.lambda_client.get_function_configuration()
.function_name(function_name)
.send()
.await?;
function_config.role().unwrap().to_string()
};
let env_vars = HashMap::from([
("RUST_LOG".to_string(), "info".to_string()),
("BEDROCK_AUTO_CREDENTIAL_DISABLE".to_string(), "true".to_string()),
("BEDROCK_DRY_RUN_MODE".to_string(), "false".to_string()),
]);
let environment = Environment::builder()
.set_variables(Some(env_vars))
.build();
let function_arn = if function_exists {
let function_config = self.lambda_client.get_function_configuration()
.function_name(function_name)
.send()
.await?;
let current_runtime = function_config.runtime().unwrap_or(&Runtime::Providedal2);
if !matches!(current_runtime, Runtime::Python39 | Runtime::Python38 | Runtime::Python37) {
info!("检测到运行时变更,重新创建Lambda函数");
let _ = self.lambda_client.delete_function()
.function_name(function_name)
.send()
.await;
info!("创建新的Python Lambda函数");
let create_function_result = self.lambda_client.create_function()
.function_name(function_name)
.runtime(Runtime::Python312)
.handler("lambda_function.lambda_handler")
.code(FunctionCode::builder().zip_file(Blob::new(zip_content)).build())
.role(role_arn)
.architectures(Architecture::X8664)
.timeout(30)
.memory_size(128)
.environment(environment)
.send()
.await;
match create_function_result {
Ok(result) => {
result.function_arn().unwrap().to_string()
}
Err(e) => {
error!("Lambda函数创建失败: {}", e);
error!("错误类型: {:?}", e);
error!("完整错误信息: {:?}", e);
return Err(anyhow!("创建Lambda函数失败: {}", e));
}
}
} else {
info!("更新现有Lambda函数");
let update_result = self.lambda_client.update_function_code()
.function_name(function_name)
.zip_file(Blob::new(zip_content))
.send()
.await;
match update_result {
Ok(result) => result.function_arn().unwrap().to_string(),
Err(e) => {
error!("Lambda函数更新失败: {}", e);
return Err(anyhow!("更新Lambda函数失败: {}", e));
}
}
}
} else {
info!("创建新Lambda函数");
let max_retries = 3;
for attempt in 1..=max_retries {
let create_function_result = self.lambda_client.create_function()
.function_name(function_name)
.runtime(Runtime::Python312)
.handler("lambda_function.lambda_handler")
.code(FunctionCode::builder().zip_file(Blob::new(zip_content.clone())).build())
.role(&role_arn)
.architectures(Architecture::X8664)
.timeout(30)
.memory_size(128)
.environment(environment.clone())
.send()
.await;
match create_function_result {
Ok(result) => {
let function_arn = result.function_arn().unwrap().to_string();
info!("✅ Lambda函数创建成功: {}", function_arn);
return Ok(function_arn);
}
Err(e) => {
let error_str = e.to_string();
let raw_error = format!("{:?}", e);
error!("❌ Lambda函数创建失败 (尝试 {}/{}):", attempt, max_retries);
error!("┌─ 错误信息: {}", e);
error!("├─ 错误字符串: {}", error_str);
error!("└─ 原始错误对象: {}", raw_error);
error!("┌─ 当前区域: {}", self.region);
error!("├─ 函数名称: {}", function_name);
error!("├─ IAM角色ARN: {}", role_arn);
error!("└─ 账户ID: {}", self.get_account_id().await.unwrap_or_else(|_| "unknown".to_string()));
if error_str.contains("ResourceConflictException") ||
error_str.contains("already exists") ||
error_str.contains("Function already exist") {
info!("ℹ️ Lambda函数已存在,获取函数ARN");
match self.lambda_client.get_function_configuration()
.function_name(function_name)
.send()
.await {
Ok(config) => {
let function_arn = config.function_arn().unwrap().to_string();
info!("✅ 获取已存在的Lambda函数: {}", function_arn);
return Ok(function_arn);
}
Err(get_e) => {
warn!("⚠️ 无法获取已存在的Lambda函数ARN: {}", get_e);
let account_id = self.get_account_id().await?;
let default_arn = format!("arn:aws:lambda:{}:{}:function:{}",
self.region, account_id, function_name);
return Ok(default_arn);
}
}
}
if attempt < max_retries {
let wait_time = attempt * 5; info!("等待 {} 秒后重试...", wait_time);
tokio::time::sleep(tokio::time::Duration::from_secs(wait_time as u64)).await;
} else {
error!("🔍 详细错误分析:");
if error_str.contains("service error") {
error!("┌─ 错误类型: AWS服务错误");
error!("├─ 可能原因:");
error!("│ • AWS服务暂时不可用");
error!("│ • 网络连接问题");
error!("│ • AWS服务延迟");
error!("│ • 区域服务暂时中断");
error!("├─ 建议解决方案:");
error!("│ • 检查网络连接");
error!("│ • 稍后重试");
error!("│ • 尝试使用其他AWS区域");
error!("│ • 检查AWS服务状态页面");
error!("└─ AWS状态页面: https://status.aws.amazon.com/");
} else if error_str.contains("AccessDeniedException") {
error!("┌─ 错误类型: 权限不足");
error!("├─ 可能原因:");
error!("│ • IAM角色无权限在{}区域创建Lambda函数", self.region);
error!("│ • IAM角色跨区域权限限制");
error!("├─ IAM角色: {}", role_arn);
error!("└─ 建议解决方案:");
error!(" • 检查IAM角色{}的跨区域权限", role_arn);
error!(" • 为IAM角色添加lambda:*权限");
error!(" • 或者使用IAM:CreateFunction权限");
} else if error_str.contains("InvalidParameterValueException") {
error!("┌─ 错误类型: 参数无效");
error!("├─ 可能原因:");
error!("│ • 函数名、角色ARN或配置参数格式错误");
error!("│ • 不支持的配置值");
error!("└─ 建议解决方案:");
error!(" • 检查函数名是否符合命名规范");
error!(" • 验证IAM角色ARN格式");
error!(" • 检查运行时和配置参数");
} else if error_str.contains("LimitExceededException") {
error!("┌─ 错误类型: 资源限制");
error!("├─ 可能原因:");
error!("│ • 账户Lambda函数数量超限");
error!("│ • 并发创建限制");
error!("└─ 建议解决方案:");
error!(" • 删除不需要的Lambda函数");
error!(" • 提高AWS服务配额");
} else {
error!("┌─ 错误类型: 未知错误");
error!("└─ 建议联系AWS技术支持");
}
return Err(anyhow!("创建Lambda函数失败: {}", e));
}
}
}
}
return Err(anyhow!("Lambda函数创建失败: 重试{}次后仍然失败", max_retries));
};
info!("Lambda函数部署成功: {}", function_arn);
Ok(function_arn)
}
pub async fn create_cloudwatch_alarm(&self, function_name: &str) -> Result<()> {
let alarm_name = format!("{}-error-alarm", function_name);
info!("创建CloudWatch告警: {}", alarm_name);
let _account_id = self.get_account_id().await?;
let dimension1 = aws_sdk_cloudwatch::types::Dimension::builder()
.name("FunctionName")
.value(function_name)
.build();
let _ = self.cloudwatch_client.put_metric_alarm()
.alarm_name(&alarm_name)
.alarm_description("Lambda函数错误率告警")
.namespace("AWS/Lambda")
.metric_name("Errors")
.dimensions(dimension1)
.statistic(aws_sdk_cloudwatch::types::Statistic::Sum)
.period(300)
.evaluation_periods(1)
.threshold(1.0)
.comparison_operator(aws_sdk_cloudwatch::types::ComparisonOperator::GreaterThanOrEqualToThreshold)
.send()
.await;
info!("CloudWatch告警创建成功");
Ok(())
}
pub async fn create_bedrock_eventbridge_rule(&self) -> Result<String> {
let rule_name = "bedrock-api-monitor-rule";
let max_retries = 3;
info!("创建EventBridge规则: {}", rule_name);
match self.eventbridge_client.list_rules()
.name_prefix(rule_name)
.send()
.await {
Ok(response) => {
let rules = response.rules();
for rule in rules {
if let Some(name) = rule.name() {
if name == rule_name {
let rule_arn = rule.arn().unwrap().to_string();
info!("✅ EventBridge规则已存在: {}", rule_arn);
return Ok(rule_arn);
}
}
}
}
Err(e) => {
warn!("⚠️ 无法检查EventBridge规则是否已存在: {},继续尝试创建", e);
}
}
let event_pattern = r#"{
"source": ["aws.bedrock"],
"detail-type": ["AWS API Call via CloudTrail"],
"detail": {
"eventSource": ["bedrock.amazonaws.com"],
"eventName": ["InvokeModel", "Converse", "InvokeModelWithResponseStream"]
}
}"#;
for attempt in 1..=max_retries {
let put_rule_result = self.eventbridge_client.put_rule()
.name(rule_name)
.event_pattern(event_pattern)
.state(aws_sdk_eventbridge::types::RuleState::Enabled)
.description("专门监控AWS Bedrock invoke-model API调用 - 记录200和429状态")
.send()
.await;
match put_rule_result {
Ok(result) => {
let rule_arn = result.rule_arn().unwrap().to_string();
info!("EventBridge规则创建成功: {}", rule_arn);
return Ok(rule_arn);
}
Err(e) => {
let error_str = e.to_string();
error!("EventBridge规则创建失败 (尝试 {}/{}): {}", attempt, max_retries, e);
if error_str.contains("ResourceConflictException") ||
error_str.contains("already exists") ||
error_str.contains("Rule already exists") {
info!("ℹ️ EventBridge规则已存在,跳过创建");
match self.eventbridge_client.list_rules()
.name_prefix(rule_name)
.send()
.await {
Ok(response) => {
let rules = response.rules();
for rule in rules {
if let Some(name) = rule.name() {
if name == rule_name {
let rule_arn = rule.arn().unwrap().to_string();
info!("✅ 获取已存在的EventBridge规则: {}", rule_arn);
return Ok(rule_arn);
}
}
}
}
Err(list_e) => {
warn!("⚠️ 无法获取已存在的EventBridge规则ARN: {}", list_e);
let account_id = self.get_account_id().await?;
let default_arn = format!("arn:aws:events:{}:{}:rule/{}",
self.region, account_id, rule_name);
return Ok(default_arn);
}
}
}
if attempt < max_retries {
let wait_time = attempt * 3; info!("等待 {} 秒后重试...", wait_time);
tokio::time::sleep(tokio::time::Duration::from_secs(wait_time as u64)).await;
} else {
if error_str.contains("dispatch failure") {
error!("❌ EventBridge服务暂时不可用,可能是网络问题或AWS服务延迟");
} else if error_str.contains("InternalFailure") {
error!("❌ EventBridge内部服务错误");
} else if error_str.contains("AccessDeniedException") {
error!("❌ 权限不足: 无权限创建EventBridge规则");
} else if error_str.contains("LimitExceededException") {
error!("❌ EventBridge规则数量超限");
} else {
error!("❌ 未知EventBridge错误: {}", error_str);
}
return Err(anyhow!("创建EventBridge规则失败: {}", e));
}
}
}
}
Err(anyhow!("EventBridge规则创建失败: 重试{}次后仍然失败", max_retries))
}
pub async fn add_lambda_target_to_eventbridge(&self) -> Result<()> {
let rule_name = "bedrock-api-monitor-rule";
let function_name = "bedrock-monitor-function";
info!("为EventBridge规则添加Lambda目标: {} -> {}", rule_name, function_name);
let function_config = self.lambda_client.get_function_configuration()
.function_name(function_name)
.send()
.await?;
let function_arn = function_config.function_arn().unwrap();
let target = aws_sdk_eventbridge::types::Target::builder()
.id("1")
.arn(function_arn)
.build()?;
let targets_result = self.eventbridge_client.put_targets()
.rule(rule_name)
.targets(target)
.send()
.await;
match targets_result {
Ok(_) => {
info!("Lambda目标添加成功");
let eventbridge_permission_exists = match self.lambda_client.get_policy()
.function_name(function_name)
.send()
.await {
Ok(policy_result) => {
if let Some(policy) = policy_result.policy() {
policy.contains("bedrock-eventbridge-invoke") ||
policy.contains("EventBridgeInvoke") ||
(policy.contains("events.amazonaws.com") &&
policy.contains(&format!("arn:aws:events:{}:{}:rule/{}",
self.region, self.get_account_id().await?, rule_name)))
} else {
false
}
}
Err(_) => false
};
let add_permission_result = if eventbridge_permission_exists {
None
} else {
info!("EventBridge权限不存在,开始添加...");
Some(self.lambda_client.add_permission()
.function_name(function_name)
.statement_id("bedrock-eventbridge-invoke")
.action("lambda:InvokeFunction")
.principal("events.amazonaws.com")
.source_arn(format!("arn:aws:events:{}:{}:rule/{}",
self.region,
self.get_account_id().await?,
rule_name
))
.send()
.await)
};
match add_permission_result {
Some(Ok(response)) => {
info!("Lambda EventBridge调用权限添加成功");
if let Some(statement) = response.statement() {
info!("权限声明: {}", statement);
}
Ok(())
}
Some(Err(e)) => {
let error_str = e.to_string();
let raw_error = format!("{:?}", e);
error!("Lambda EventBridge调用权限添加失败");
error!("错误信息: {}", error_str);
error!("原始错误: {}", raw_error);
if error_str.contains("ResourceConflictException") ||
error_str.contains("already exists") ||
error_str.contains("The resource-based policy") ||
error_str.contains("already has a statement") {
info!("Lambda EventBridge调用权限已存在,跳过添加");
Ok(())
} else if error_str.contains("InvalidParameterValueException") {
warn!("⚠️ EventBridge权限参数无效: 检查函数名和规则ARN格式,但继续部署");
Ok(()) } else if error_str.contains("AccessDeniedException") {
warn!("⚠️ 权限不足: 无权限为Lambda函数添加EventBridge调用权限,但继续部署");
Ok(()) } else if error_str.contains("ResourceNotFoundException") {
warn!("⚠️ 资源未找到: Lambda函数或EventBridge规则不存在,但继续部署");
Ok(()) } else if error_str.contains("service error") {
warn!("⚠️ AWS服务错误,但权限可能已成功添加,继续部署");
Ok(()) } else {
warn!("Lambda EventBridge调用权限添加失败: {},但继续部署", e);
Ok(()) }
}
None => {
info!("EventBridge权限已存在,无需添加");
Ok(())
}
}
}
Err(e) => {
error!("Lambda目标添加失败: {}", e);
Err(anyhow!("添加Lambda目标失败: {}", e))
}
}
}
pub async fn test_lambda_function(&self, payload: &str) -> Result<String> {
info!("测试Lambda函数调用,payload: {}", payload);
match self.lambda_client.invoke()
.function_name("bedrock-monitor-function")
.payload(aws_sdk_lambda::primitives::Blob::new(payload.as_bytes()))
.send()
.await {
Ok(response) => {
info!("Lambda调用成功");
let mut result = String::new();
if let Some(payload) = response.payload() {
let payload_str = String::from_utf8(payload.as_ref().to_vec())?;
info!("响应: {}", payload_str);
result.push_str(&format!("Response: {}\n", payload_str));
}
if let Some(log_result) = response.log_result() {
info!("日志: {}", log_result);
result.push_str(&format!("Logs: {}\n", log_result));
}
if let Some(function_error) = response.function_error() {
let error_str = std::str::from_utf8(function_error.as_ref())
.unwrap_or("无法解析错误信息");
error!("函数错误: {}", error_str);
result.push_str(&format!("Error: {}\n", error_str));
}
Ok(result)
}
Err(e) => {
error!("Lambda调用失败: {}", e);
Err(anyhow!("Lambda调用失败: {}", e))
}
}
}
pub async fn create_lambda_function_url(&self) -> Result<String> {
let function_name = "bedrock-monitor-function";
info!("创建Lambda Function URL for: {}", function_name);
let _function_config = self.lambda_client.get_function_configuration()
.function_name(function_name)
.send()
.await?;
let _create_url_config_result = self.lambda_client.create_function_url_config()
.function_name(function_name)
.auth_type(aws_sdk_lambda::types::FunctionUrlAuthType::None)
.invoke_mode(aws_sdk_lambda::types::InvokeMode::Buffered)
.send()
.await?;
let account_id = self.get_account_id().await?;
let _ = self.lambda_client.add_permission()
.function_name(function_name)
.statement_id("FunctionURLAllowPublicAccess")
.action("lambda:InvokeFunctionUrl")
.principal("*")
.function_url_auth_type(aws_sdk_lambda::types::FunctionUrlAuthType::None)
.source_arn(&format!("arn:aws:lambda:{}:{}:function:{}",
self.region, account_id, function_name))
.send()
.await?;
let function_url = format!(
"https://{}.lambda-url.{}.on.aws",
function_name, self.region
);
info!("Lambda Function URL创建成功: {}", function_url);
Ok(function_url)
}
pub async fn setup_cloudtrail_for_bedrock(&self) -> Result<String> {
let trail_name = "bedrock-api-monitoring-trail";
let account_id = self.get_account_id().await?;
let s3_bucket_name = format!("bedrock-cloudtrail-logs-{}", account_id);
info!("设置CloudTrail以监控Bedrock API调用: {}", trail_name);
self.create_s3_bucket_if_not_exists(&s3_bucket_name).await?;
let create_trail_result = self.cloudtrail_client.create_trail()
.name(trail_name)
.s3_bucket_name(s3_bucket_name)
.include_global_service_events(true)
.is_multi_region_trail(true)
.enable_log_file_validation(true)
.send()
.await;
let trail_arn = match create_trail_result {
Ok(result) => {
let arn = result.trail_arn().unwrap().to_string();
info!("CloudTrail创建成功: {}", arn);
arn
}
Err(e) => {
error!("CloudTrail创建失败: {}", e);
return Err(anyhow!("创建CloudTrail失败: {}", e));
}
};
info!("配置CloudTrail数据事件...");
self.configure_cloudtrail_data_events(trail_name).await?;
let _ = self.cloudtrail_client.start_logging()
.name(trail_name)
.send()
.await?;
info!("CloudTrail启动成功");
Ok(trail_arn)
}
async fn create_s3_bucket_if_not_exists(&self, bucket_name: &str) -> Result<()> {
info!("检查S3存储桶: {}", bucket_name);
match self.s3_client.head_bucket().bucket(bucket_name).send().await {
Ok(_) => {
info!("S3存储桶已存在: {}", bucket_name);
return Ok(());
}
Err(_) => {
info!("创建新的S3存储桶: {}", bucket_name);
}
}
let create_bucket_result = self.s3_client.create_bucket()
.bucket(bucket_name)
.send()
.await;
match create_bucket_result {
Ok(_) => {
info!("S3存储桶创建成功: {}", bucket_name);
}
Err(e) => {
error!("S3存储桶创建失败: {}", e);
return Err(anyhow!("创建S3存储桶失败: {}", e));
}
}
Ok(())
}
async fn configure_cloudtrail_data_events(&self, trail_name: &str) -> Result<()> {
info!("配置CloudTrail数据事件以监控Bedrock");
let account_id = self.get_account_id().await?;
let data_resource = aws_sdk_cloudtrail::types::DataResource::builder()
.r#type("AWS::Bedrock::Model")
.values(&format!("arn:aws:bedrock:{}:{}:model/*", self.region, account_id))
.build();
let event_selector = aws_sdk_cloudtrail::types::EventSelector::builder()
.read_write_type(aws_sdk_cloudtrail::types::ReadWriteType::All)
.include_management_events(true)
.data_resources(data_resource)
.build();
let _ = self.cloudtrail_client.put_event_selectors()
.trail_name(trail_name)
.event_selectors(event_selector)
.send()
.await?;
info!("CloudTrail数据事件配置完成");
Ok(())
}
pub async fn add_eventbridge_lambda_permission(&self) -> Result<()> {
let function_name = "bedrock-monitor-function";
let statement_id = "EventBridgeInvoke";
let action = "lambda:InvokeFunction";
let principal = "events.amazonaws.com";
let account_id = self.get_account_id().await?;
let source_arn = format!("arn:aws:events:{}:{}:rule/bedrock-api-monitor-rule",
self.region, account_id);
info!("添加EventBridge调用Lambda权限: {} -> {}", source_arn, function_name);
let result = self.lambda_client.add_permission()
.function_name(function_name)
.statement_id(statement_id)
.action(action)
.principal(principal)
.source_arn(&source_arn)
.send()
.await;
match result {
Ok(_) => {
info!("EventBridge Lambda权限添加成功");
Ok(())
}
Err(e) => {
error!("EventBridge Lambda权限添加失败: {}", e);
Err(anyhow!("添加Lambda EventBridge调用权限失败: {}", e))
}
}
}
async fn get_account_id(&self) -> Result<String> {
let sts_client = aws_sdk_sts::Client::new(&self.sdk_config);
let identity = sts_client.get_caller_identity().send().await?;
Ok(identity.account().unwrap().to_string())
}
pub async fn create_sns_topic(&self) -> Result<String> {
let topic_name = "bedrock-throttling-alerts";
info!("创建SNS主题: {}", topic_name);
let result = self.sns_client.create_topic()
.name(topic_name)
.send()
.await;
match result {
Ok(response) => {
let topic_arn = response.topic_arn().unwrap().to_string();
info!("SNS主题创建成功: {}", topic_arn);
Ok(topic_arn)
}
Err(e) => {
error!("SNS主题创建失败: {}", e);
Err(anyhow!("创建SNS主题失败: {}", e))
}
}
}
pub async fn create_bedrock_cloudwatch_alarms(&self, sns_topic_arn: &str) -> Result<()> {
info!("开始创建Bedrock监控CloudWatch告警...");
self.create_single_alarm(
"bedrock-InvocationThrottles-Immediate",
"立即响应告警 - 检测到1次429时就触发AK/SK关闭",
"AWS/Bedrock",
"InvocationThrottles",
10, 1, 1.0, Some(sns_topic_arn),
None,
).await?;
self.create_single_alarm(
"bedrock-InvocationThrottles-HighFrequency",
"高频Bedrock API限流告警 - 当Bedrock API限流频率异常时触发",
"AWS/Bedrock",
"InvocationThrottles",
60, 2, 10.0, Some(sns_topic_arn),
None,
).await?;
let general_dimension = aws_sdk_cloudwatch::types::Dimension::builder()
.name("ModelId")
.value("*")
.build();
self.create_single_alarm(
"bedrock-InvocationThrottles-General",
"通用Bedrock API限流告警 - 对任何模型限流超过3次时告警",
"AWS/Bedrock",
"InvocationThrottles",
60, 3, 5.0, Some(sns_topic_arn),
Some(vec![general_dimension]),
).await?;
self.create_model_specific_alarm(
"anthropic.claude-3-5-sonnet-20240620-v1:0",
sns_topic_arn,
).await?;
self.create_model_specific_alarm(
"anthropic.claude-3-sonnet-20240229-v1:0",
sns_topic_arn,
).await?;
self.create_model_specific_alarm(
"anthropic.claude-3-5-haiku-20241022-v1:0",
sns_topic_arn,
).await?;
self.create_single_alarm(
"bedrock-InvocationClientErrors",
"Bedrock客户端错误告警 - 当客户端错误超过5次时告警",
"AWS/Bedrock",
"InvocationClientErrors",
60, 2, 5.0, Some(sns_topic_arn),
None,
).await?;
self.create_single_alarm(
"bedrock-InvocationServerErrors",
"Bedrock服务器错误告警 - 当服务器错误超过3次时告警",
"AWS/Bedrock",
"InvocationServerErrors",
60, 1, 3.0, Some(sns_topic_arn),
None,
).await?;
info!("✅ 所有Bedrock监控CloudWatch告警创建完成");
Ok(())
}
async fn create_single_alarm(
&self,
alarm_name: &str,
description: &str,
namespace: &str,
metric_name: &str,
period: i64,
evaluation_periods: i32,
threshold: f64,
sns_topic_arn: Option<&str>,
dimensions: Option<Vec<aws_sdk_cloudwatch::types::Dimension>>,
) -> Result<()> {
info!("创建CloudWatch告警: {}", alarm_name);
let mut alarm_request = self.cloudwatch_client.put_metric_alarm()
.alarm_name(alarm_name)
.alarm_description(description)
.namespace(namespace)
.metric_name(metric_name)
.statistic(aws_sdk_cloudwatch::types::Statistic::Sum)
.period(period.try_into()?)
.evaluation_periods(evaluation_periods)
.threshold(threshold)
.comparison_operator(aws_sdk_cloudwatch::types::ComparisonOperator::GreaterThanThreshold);
if let Some(topic_arn) = sns_topic_arn {
alarm_request = alarm_request.alarm_actions(topic_arn);
}
if let Some(dims) = dimensions {
for dim in dims {
alarm_request = alarm_request.dimensions(dim);
}
}
match alarm_request.send().await {
Ok(_) => {
info!("CloudWatch告警创建成功: {}", alarm_name);
Ok(())
}
Err(e) => {
error!("CloudWatch告警创建失败 {}: {}", alarm_name, e);
Err(anyhow!("创建CloudWatch告警失败: {}", e))
}
}
}
async fn create_model_specific_alarm(&self, model_id: &str, sns_topic_arn: &str) -> Result<()> {
let alarm_name = format!("bedrock-InvocationThrottles-{}", model_id.replace(':', "-"));
let description = format!("Claude模型 {} 限流告警 - 当模型1分钟内限流超过3次时告警", model_id);
let dimension = aws_sdk_cloudwatch::types::Dimension::builder()
.name("ModelId")
.value(model_id)
.build();
self.create_single_alarm(
&alarm_name,
&description,
"AWS/Bedrock",
"InvocationThrottles",
60, 1, 3.0, Some(sns_topic_arn),
Some(vec![dimension]),
).await
}
pub async fn configure_sns_lambda_trigger(&self, sns_topic_arn: &str) -> Result<()> {
let function_name = "bedrock-monitor-function";
info!("配置SNS到Lambda的触发器: {} -> {}", sns_topic_arn, function_name);
let function_config = self.lambda_client.get_function_configuration()
.function_name(function_name)
.send()
.await?;
let function_arn = function_config.function_arn().unwrap();
let result = self.sns_client.subscribe()
.topic_arn(sns_topic_arn)
.protocol("lambda")
.endpoint(function_arn)
.return_subscription_arn(true)
.send()
.await;
match result {
Ok(response) => {
let subscription_arn = response.subscription_arn().unwrap().to_string();
info!("SNS订阅创建成功: {}", subscription_arn);
match self.add_sns_lambda_permission(function_name, sns_topic_arn).await {
Ok(_) => info!("SNS Lambda权限添加成功"),
Err(e) => {
warn!("SNS Lambda权限配置失败,但继续部署: {}", e);
}
}
Ok(())
}
Err(e) => {
let error_str = e.to_string();
error!("SNS订阅创建失败: {}", e);
if error_str.contains("already exists") ||
error_str.contains("ResourceConflictException") ||
error_str.contains("Duplicate subscription") ||
error_str.contains("already a subscription") {
info!("SNS订阅已存在,跳过创建");
match self.add_sns_lambda_permission(function_name, sns_topic_arn).await {
Ok(_) => info!("SNS Lambda权限添加成功"),
Err(e) => {
warn!("SNS Lambda权限配置失败,但继续部署: {}", e);
}
}
Ok(())
} else if error_str.contains("dispatch failure") {
warn!("⚠️ SNS订阅可能已存在或网络问题,但继续部署");
match self.add_sns_lambda_permission(function_name, sns_topic_arn).await {
Ok(_) => info!("SNS Lambda权限添加成功"),
Err(e) => {
warn!("SNS Lambda权限配置失败,但继续部署: {}", e);
}
}
Ok(())
} else if error_str.contains("InvalidParameterException") {
error!("❌ SNS订阅参数无效: 检查主题ARN或Lambda函数ARN格式");
Err(anyhow!("SNS订阅参数无效: {}", error_str))
} else if error_str.contains("AccessDeniedException") {
error!("❌ 权限不足: 无权限创建SNS订阅");
Err(anyhow!("权限不足: {}", error_str))
} else if error_str.contains("NotFoundException") {
error!("❌ 资源未找到: SNS主题或Lambda函数不存在");
Err(anyhow!("资源未找到: {}", error_str))
} else {
error!("❌ 未知SNS订阅错误: {}", error_str);
Err(anyhow!("创建SNS订阅失败: {}", e))
}
}
}
}
async fn add_sns_lambda_permission(&self, function_name: &str, sns_topic_arn: &str) -> Result<()> {
let statement_id = "sns-invoke-permission";
info!("添加SNS调用Lambda权限: {} -> {}", sns_topic_arn, function_name);
let sns_permission_exists = match self.lambda_client.get_policy()
.function_name(function_name)
.send()
.await {
Ok(policy_result) => {
if let Some(policy) = policy_result.policy() {
policy.contains("sns-invoke-permission") ||
(policy.contains("sns.amazonaws.com") && policy.contains(sns_topic_arn))
} else {
false
}
}
Err(_) => false
};
if sns_permission_exists {
info!("✅ SNS权限已存在,跳过添加");
return Ok(());
}
let result = self.lambda_client.add_permission()
.function_name(function_name)
.statement_id(statement_id)
.action("lambda:InvokeFunction")
.principal("sns.amazonaws.com")
.source_arn(sns_topic_arn)
.send()
.await;
match result {
Ok(response) => {
info!("SNS Lambda权限添加成功");
if let Some(statement) = response.statement() {
info!("权限声明: {}", statement);
}
Ok(())
}
Err(e) => {
let error_str = e.to_string();
let raw_error = format!("{:?}", e);
error!("SNS Lambda权限添加失败");
error!("错误信息: {}", error_str);
error!("原始错误: {}", raw_error);
match self.lambda_client.get_policy()
.function_name(function_name)
.send()
.await {
Ok(policy_result) => {
if let Some(policy) = policy_result.policy() {
info!("当前Lambda函数策略已存在");
if policy.contains("sns-invoke-permission") {
info!("✅ SNS权限已存在于策略中,无需重复添加");
return Ok(());
}
}
}
Err(_) => {
warn!("无法检查Lambda函数策略状态");
}
}
if error_str.contains("ResourceConflictException") ||
error_str.contains("already exists") ||
error_str.contains("The resource-based policy") ||
error_str.contains("already has a statement") {
info!("SNS Lambda权限已存在,跳过添加");
Ok(())
} else if error_str.contains("InvalidParameterValueException") {
error!("❌ SNS权限参数无效: 检查函数名和SNS主题ARN格式");
Err(anyhow!("SNS权限参数无效: {}", error_str))
} else if error_str.contains("AccessDeniedException") {
error!("❌ 权限不足: 无权限为Lambda函数添加SNS调用权限");
Err(anyhow!("权限不足: {}", error_str))
} else if error_str.contains("ResourceNotFoundException") {
error!("❌ 资源未找到: Lambda函数或SNS主题不存在");
Err(anyhow!("资源未找到: {}", error_str))
} else if error_str.contains("service error") {
warn!("⚠️ AWS服务错误,但权限可能已成功添加,继续部署");
Ok(())
} else {
error!("❌ 未知SNS权限错误: {}", error_str);
Err(anyhow!("添加SNS Lambda权限失败: {}", e))
}
}
}
}
pub async fn deploy_complete_monitoring_stack(&self, zip_content: Vec<u8>) -> Result<()> {
info!("🚀 开始部署完整的Bedrock监控基础设施...");
let function_arn = self.deploy_lambda_function(zip_content).await?;
info!("✅ Lambda函数部署成功: {}", function_arn);
let max_retries = 3;
for attempt in 1..=max_retries {
match self.update_lambda_environment().await {
Ok(_) => {
info!("✅ Lambda环境变量更新成功 (尝试 {})", attempt);
break;
}
Err(e) => {
warn!("⚠️ Lambda环境变量更新失败 (尝试 {}/{}): {}", attempt, max_retries, e);
if attempt < max_retries {
info!("等待 {} 秒后重试...", attempt * 5);
tokio::time::sleep(tokio::time::Duration::from_secs(attempt as u64 * 5)).await;
} else {
error!("❌ Lambda环境变量更新最终失败,但继续部署流程");
}
}
}
}
let sns_topic_arn = self.create_sns_topic().await?;
info!("✅ SNS主题创建成功: {}", sns_topic_arn);
self.create_bedrock_cloudwatch_alarms(&sns_topic_arn).await?;
info!("✅ CloudWatch告警创建成功");
let rule_arn = self.create_bedrock_eventbridge_rule().await?;
info!("✅ EventBridge规则创建成功: {}", rule_arn);
self.add_lambda_target_to_eventbridge().await?;
info!("✅ EventBridge到Lambda触发器配置成功");
self.configure_sns_lambda_trigger(&sns_topic_arn).await?;
info!("✅ SNS到Lambda触发器配置成功");
self.create_cloudwatch_alarm("bedrock-monitor-function").await?;
info!("✅ Lambda错误告警创建成功");
info!("🎉 完整的Bedrock监控基础设施部署成功!");
info!("📊 监控功能包括:");
info!(" - 429错误检测和AK/SK自动关闭");
info!(" - 8个CloudWatch告警");
info!(" - EventBridge API调用监控");
info!(" - SNS告警通知");
info!(" - 实时CloudWatch日志记录");
Ok(())
}
pub async fn update_lambda_environment(&self) -> Result<()> {
let function_name = "bedrock-monitor-function";
info!("更新Lambda函数环境变量: {}", function_name);
match self.lambda_client.get_function_configuration()
.function_name(function_name)
.send()
.await {
Ok(function_config) => {
info!("✅ Lambda函数存在,状态: {:?}", function_config.state());
match function_config.state() {
Some(aws_sdk_lambda::types::State::Pending) => {
warn!("⚠️ Lambda函数正在更新中,等待完成...");
tokio::time::sleep(tokio::time::Duration::from_secs(10)).await;
}
Some(aws_sdk_lambda::types::State::Active) => {
info!("✅ Lambda函数状态正常");
}
Some(aws_sdk_lambda::types::State::Failed) | Some(aws_sdk_lambda::types::State::Inactive) => {
return Err(anyhow!("❌ Lambda函数状态异常: {:?}", function_config.state()));
}
None => {
warn!("⚠️ 无法获取Lambda函数状态,继续尝试更新");
}
_ => {
warn!("⚠️ Lambda函数处于未知状态: {:?}, 继续尝试更新", function_config.state());
}
}
}
Err(e) => {
error!("❌ 无法获取Lambda函数配置: {}", e);
return Err(anyhow!("Lambda函数不存在或无权限访问: {}", e));
}
}
let env_vars = HashMap::from([
("RUST_LOG".to_string(), "info".to_string()),
("BEDROCK_AUTO_CREDENTIAL_DISABLE".to_string(), "true".to_string()),
("BEDROCK_DRY_RUN_MODE".to_string(), "false".to_string()), ]);
let total_size: usize = env_vars.iter()
.map(|(k, v)| k.len() + v.len() + 2) .sum();
info!("环境变量总大小: {} bytes (限制: 4096 bytes)", total_size);
if total_size > 4096 {
return Err(anyhow!("环境变量总大小超过AWS Lambda限制(4KB): {} bytes", total_size));
}
let environment = Environment::builder()
.set_variables(Some(env_vars))
.build();
info!("正在发送更新请求...");
match self.lambda_client.update_function_configuration()
.function_name(function_name)
.environment(environment)
.send()
.await {
Ok(response) => {
info!("✅ Lambda环境变量更新请求已发送");
info!("函数状态: {:?}", response.state());
info!("最后更新时间: {:?}", response.last_modified());
info!("等待配置更新生效...");
tokio::time::sleep(tokio::time::Duration::from_secs(10)).await;
match self.lambda_client.get_function_configuration()
.function_name(function_name)
.send()
.await {
Ok(updated_config) => {
if let Some(env) = updated_config.environment() {
if let Some(vars) = env.variables() {
info!("✅ 环境变量验证成功:");
for (key, value) in vars {
info!(" {} = {}", key, value);
}
}
}
info!("✅ Lambda环境变量更新完成");
}
Err(e) => {
warn!("⚠️ 无法验证环境变量更新: {}", e);
}
}
Ok(())
}
Err(e) => {
let error_str = e.to_string();
let raw_error = format!("{:?}", e);
match self.lambda_client.get_function_configuration()
.function_name(function_name)
.send()
.await {
Ok(config) => {
if let Some(env) = config.environment() {
if let Some(vars) = env.variables() {
let has_correct_vars = vars.get("BEDROCK_DRY_RUN_MODE") == Some(&"false".to_string()) &&
vars.get("BEDROCK_AUTO_CREDENTIAL_DISABLE") == Some(&"true".to_string()) &&
vars.get("RUST_LOG") == Some(&"info".to_string());
if has_correct_vars {
info!("✅ Lambda环境变量已正确设置,无需重复更新");
if error_str.contains("ResourceConflictException") {
info!("ℹ️ 检测到Lambda函数正在更新,但环境变量已正确配置");
} else if error_str.contains("service error") {
info!("ℹ️ 检测到AWS服务错误,但环境变量已正确配置");
}
return Ok(());
}
}
}
}
Err(_) => {
warn!("无法验证当前环境变量状态");
}
}
error!("❌ Lambda环境变量更新失败");
error!("错误信息: {}", error_str);
error!("原始错误: {}", raw_error);
if error_str.contains("ResourceConflictException") {
error!("❌ 资源冲突: Lambda函数正在被其他操作更新");
} else if error_str.contains("InvalidParameterValueException") {
error!("❌ 参数无效: 环境变量格式或内容有问题");
} else if error_str.contains("TooManyRequestsException") {
error!("❌ 请求过多: AWS API调用频率超限");
} else if error_str.contains("AccessDeniedException") {
error!("❌ 权限不足: 无权限更新Lambda函数配置");
} else if error_str.contains("ResourceNotFoundException") {
error!("❌ 资源未找到: Lambda函数不存在");
} else if error_str.contains("service error") {
warn!("⚠️ AWS服务错误,可能环境变量已更新,将进行验证");
return Err(anyhow!("更新Lambda环境变量失败: {}", e));
} else {
error!("❌ 未知错误类型: {}", error_str);
}
Err(anyhow!("更新Lambda环境变量失败: {}", e))
}
}
}
pub async fn update_iam_credentials_policy(&self) -> Result<()> {
let role_name = "lambda-bedrock-monitor-role";
info!("更新IAM角色AK/SK管理权限: {}", role_name);
let credential_management_policy = r#"{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"iam:UpdateAccessKey",
"iam:ListAccessKeys",
"iam:GetAccessKeyLastUsed",
"iam:GetUser"
],
"Resource": "*"
}
]
}"#;
match self.iam_client.put_role_policy()
.role_name(role_name)
.policy_name("BedrockCredentialManagement")
.policy_document(credential_management_policy.to_string())
.send()
.await {
Ok(_) => {
info!("✅ IAM角色AK/SK管理权限更新成功");
Ok(())
}
Err(e) => {
error!("❌ IAM角色AK/SK管理权限更新失败: {}", e);
Err(anyhow!("更新IAM角色权限失败: {}", e))
}
}
}
pub async fn fix_ak_sk_auto_disable(&self) -> Result<()> {
info!("🔧 开始修复AK/SK自动关闭功能...");
self.update_iam_credentials_policy().await?;
info!("✅ IAM权限已更新支持多区域");
self.update_lambda_environment().await?;
info!("✅ Lambda环境变量已更新(关闭试运行模式)");
info!("🎉 AK/SK自动关闭功能修复完成!");
info!("📊 修复内容:");
info!(" - IAM权限现在支持所有区域");
info!(" - BEDROCK_DRY_RUN_MODE=false(正式运行模式)");
info!(" - BEDROCK_AUTO_CREDENTIAL_DISABLE=true(启用自动禁用)");
Ok(())
}
}
pub struct MultiRegionDeployer {
regions: Vec<String>,
}
impl MultiRegionDeployer {
pub fn from_env() -> Result<Self> {
dotenv::dotenv().ok();
let regions_str = std::env::var("AWS_REGION")
.map_err(|_| anyhow!("❌ 错误: AWS_REGION 必须在.env文件中设置"))?;
let regions: Vec<String> = regions_str
.split(',')
.map(|s| s.trim().to_string())
.filter(|s| !s.is_empty())
.collect();
if regions.is_empty() {
return Err(anyhow!("❌ 错误: AWS_REGION 不能为空"));
}
info!("📍 配置的AWS区域: {}", regions.join(", "));
info!("🌍 总计 {} 个区域", regions.len());
Ok(MultiRegionDeployer { regions })
}
pub async fn deploy_to_all_regions(&self) -> Result<Vec<RegionDeployResult>> {
info!("🚀 开始在所有 {} 个区域部署Bedrock监控系统", self.regions.len());
let zip_content = create_lambda_zip()?;
info!("✅ Lambda函数包创建完成,大小: {} bytes", zip_content.len());
let mut results = Vec::new();
let mut success_count = 0;
let mut failed_regions = Vec::new();
for (index, region) in self.regions.iter().enumerate() {
info!("\n📍 [{}/{}] 部署到区域: {}", index + 1, self.regions.len(), region);
info!("{}", "=".repeat(60));
let start_time = std::time::Instant::now();
let result = self.deploy_to_single_region(region, &zip_content).await;
let duration = start_time.elapsed();
match result {
Ok(deploy_result) => {
success_count += 1;
info!("✅ 区域 {} 部署成功 (耗时: {:?})", region, duration);
results.push(RegionDeployResult {
region: region.clone(),
status: DeployStatus::Success,
duration,
details: deploy_result,
});
}
Err(e) => {
failed_regions.push((region.clone(), anyhow::anyhow!("{}", e)));
error!("❌ 区域 {} 部署失败 (耗时: {:?}): {}", region, duration, e);
results.push(RegionDeployResult {
region: region.clone(),
status: DeployStatus::Failed(e),
duration,
details: String::new(),
});
}
}
if index < self.regions.len() - 1 {
info!("⏳ 等待 3 秒后部署下一个区域...");
tokio::time::sleep(tokio::time::Duration::from_secs(3)).await;
}
}
self.print_deployment_summary(&results, success_count, &failed_regions).await;
if success_count == 0 {
return Err(anyhow!("❌ 所有区域部署失败"));
} else if failed_regions.len() > 0 {
warn!("⚠️ {} 个区域部署失败,请检查错误信息", failed_regions.len());
}
Ok(results)
}
async fn deploy_to_single_region(&self, region: &str, zip_content: &[u8]) -> Result<String> {
let aws_manager = AwsManager::new(region).await?;
aws_manager.deploy_complete_monitoring_stack(zip_content.to_vec()).await?;
aws_manager.fix_ak_sk_auto_disable().await?;
Ok(format!("区域 {} 部署完成", region))
}
async fn print_deployment_summary(&self, results: &[RegionDeployResult], success_count: usize, failed_regions: &[(String, anyhow::Error)]) {
info!("\n{}", "=".repeat(80));
info!("🎉 多区域部署完成总结");
info!("{}", "=".repeat(80).as_str());
info!("📊 部署统计:");
info!(" ✅ 成功区域: {}/{}", success_count, self.regions.len());
info!(" ❌ 失败区域: {}/{}", failed_regions.len(), self.regions.len());
info!(" ⏱️ 总耗时: {:?}",
results.iter().map(|r| r.duration).sum::<std::time::Duration>());
if success_count > 0 {
info!("\n✅ 成功部署的区域:");
for result in results.iter().filter(|r| matches!(r.status, DeployStatus::Success)) {
info!(" 🌍 {} (耗时: {:?})", result.region, result.duration);
}
}
if !failed_regions.is_empty() {
info!("\n❌ 失败的区域:");
for (region, error) in failed_regions {
info!(" 🌍 {}: {}", region, error);
}
}
if success_count > 0 {
info!("\n🚨 429检测和AK/SK自动关闭机制已激活:");
info!(" 🔍 监控范围: {} 个AWS区域", success_count);
info!(" ⚡ 检测速度: 实时检测429错误");
info!(" 🔒 自动响应: 立即禁用相关AK/SK");
info!(" 📊 告警覆盖: 每个区域8个CloudWatch告警");
info!("\n📋 每个区域部署的组件:");
info!(" ✅ Lambda函数: bedrock-monitor-function");
info!(" ✅ SNS主题: bedrock-throttling-alerts");
info!(" ✅ CloudWatch告警: 8个专业告警");
info!(" ✅ EventBridge规则: API调用监控");
info!(" ✅ 触发器配置: SNS+EventBridge");
info!(" ✅ 日志系统: 标准日志+详细日志");
}
info!("\n🔍 验证建议:");
info!(" 1. 检查各区域的CloudWatch告警状态");
info!(" 2. 查看Lambda函数日志输出");
info!(" 3. 监控Bedrock API调用情况");
info!(" 4. 测试429错误响应机制");
info!(" 5. 验证SNS告警通知系统");
}
}
#[derive(Debug)]
pub struct RegionDeployResult {
pub region: String,
pub status: DeployStatus,
pub duration: std::time::Duration,
pub details: String,
}
#[derive(Debug)]
pub enum DeployStatus {
Success,
Failed(anyhow::Error),
}
impl MultiRegionDeployer {
pub async fn check_deployment_status(&self) -> Result<Vec<RegionStatus>> {
info!("🔍 开始检查所有 {} 个区域的部署状态", self.regions.len());
let mut region_statuses = Vec::new();
for (index, region) in self.regions.iter().enumerate() {
info!("\n📍 [{}/{}] 检查区域: {}", index + 1, self.regions.len(), region);
let status = self.check_single_region_status(region).await;
region_statuses.push(status);
if index < self.regions.len() - 1 {
tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;
}
}
self.print_status_summary(®ion_statuses).await;
Ok(region_statuses)
}
async fn check_single_region_status(&self, region: &str) -> RegionStatus {
let start_time = std::time::Instant::now();
let mut status = RegionStatus {
region: region.to_string(),
lambda_function: false,
sns_topic: false,
eventbridge_rule: false,
cloudwatch_alarms: 0,
lambda_environment: false,
iam_permissions: false,
total_duration: std::time::Duration::default(),
is_healthy: false,
};
if let Ok(aws_manager) = AwsManager::new(region).await {
status.lambda_function = self.check_lambda_function(&aws_manager).await;
status.sns_topic = self.check_sns_topic(&aws_manager).await;
status.eventbridge_rule = self.check_eventbridge_rule(&aws_manager).await;
status.cloudwatch_alarms = self.count_cloudwatch_alarms(&aws_manager).await;
if status.lambda_function {
status.lambda_environment = self.check_lambda_environment(&aws_manager).await;
}
status.iam_permissions = self.check_iam_permissions(&aws_manager).await;
status.is_healthy = status.lambda_function &&
status.sns_topic &&
status.eventbridge_rule &&
status.cloudwatch_alarms >= 5 && status.lambda_environment &&
status.iam_permissions;
}
status.total_duration = start_time.elapsed();
self.print_region_status(&status).await;
status
}
async fn check_lambda_function(&self, aws_manager: &AwsManager) -> bool {
match aws_manager.lambda_client.get_function_configuration()
.function_name("bedrock-monitor-function")
.send()
.await {
Ok(config) => {
let is_active = matches!(config.state(), Some(aws_sdk_lambda::types::State::Active));
info!(" ✅ Lambda函数: 存在且状态: {:?}", config.state());
is_active
}
Err(_) => {
info!(" ❌ Lambda函数: 不存在或无法访问");
false
}
}
}
async fn check_sns_topic(&self, aws_manager: &AwsManager) -> bool {
match aws_manager.sns_client.list_topics().send().await {
Ok(response) => {
let exists = response.topics()
.iter()
.any(|topic| topic.topic_arn()
.unwrap_or("")
.contains("bedrock-throttling-alerts"));
if exists {
info!(" ✅ SNS主题: bedrock-throttling-alerts 存在");
} else {
info!(" ❌ SNS主题: bedrock-throttling-alerts 不存在");
}
exists
}
Err(_) => {
info!(" ❌ SNS主题: 无法访问");
false
}
}
}
async fn check_eventbridge_rule(&self, aws_manager: &AwsManager) -> bool {
match aws_manager.eventbridge_client.list_rules()
.name_prefix("bedrock-api-monitor-rule")
.send()
.await {
Ok(response) => {
let exists = response.rules()
.iter()
.any(|rule| rule.name() == Some("bedrock-api-monitor-rule"));
if exists {
info!(" ✅ EventBridge规则: bedrock-api-monitor-rule 存在");
} else {
info!(" ❌ EventBridge规则: bedrock-api-monitor-rule 不存在");
}
exists
}
Err(_) => {
info!(" ❌ EventBridge规则: 无法访问");
false
}
}
}
async fn count_cloudwatch_alarms(&self, aws_manager: &AwsManager) -> i32 {
match aws_manager.cloudwatch_client.describe_alarms()
.alarm_name_prefix("bedrock")
.send()
.await {
Ok(response) => {
let count = response.metric_alarms().len() as i32;
info!(" 📊 CloudWatch告警: {} 个", count);
count
}
Err(_) => {
info!(" ❌ CloudWatch告警: 无法访问");
0
}
}
}
async fn check_lambda_environment(&self, aws_manager: &AwsManager) -> bool {
match aws_manager.lambda_client.get_function_configuration()
.function_name("bedrock-monitor-function")
.send()
.await {
Ok(config) => {
if let Some(env) = config.environment() {
if let Some(vars) = env.variables() {
let has_dry_run_false = vars.get("BEDROCK_DRY_RUN_MODE") == Some(&"false".to_string());
let has_auto_disable_true = vars.get("BEDROCK_AUTO_CREDENTIAL_DISABLE") == Some(&"true".to_string());
if has_dry_run_false && has_auto_disable_true {
info!(" ✅ Lambda环境变量: 正确配置 (429检测已启用)");
return true;
} else {
info!(" ⚠️ Lambda环境变量: 配置不完整 (DRY_RUN={}, AUTO_DISABLE={})",
vars.get("BEDROCK_DRY_RUN_MODE").unwrap_or(&"未设置".to_string()),
vars.get("BEDROCK_AUTO_CREDENTIAL_DISABLE").unwrap_or(&"未设置".to_string()));
}
}
}
info!(" ❌ Lambda环境变量: 未正确配置");
false
}
Err(_) => {
info!(" ❌ Lambda环境变量: 无法检查");
false
}
}
}
async fn check_iam_permissions(&self, aws_manager: &AwsManager) -> bool {
match aws_manager.iam_client.get_role()
.role_name("lambda-bedrock-monitor-role")
.send()
.await {
Ok(_) => {
info!(" ✅ IAM权限: lambda-bedrock-monitor-role 存在");
true
}
Err(_) => {
info!(" ❌ IAM权限: lambda-bedrock-monitor-role 不存在");
false
}
}
}
async fn print_region_status(&self, status: &RegionStatus) {
let health_emoji = if status.is_healthy { "✅" } else { "❌" };
info!("{} 区域 {} (耗时: {:?})", health_emoji, status.region, status.total_duration);
info!(" - Lambda函数: {}", if status.lambda_function { "✅" } else { "❌" });
info!(" - SNS主题: {}", if status.sns_topic { "✅" } else { "❌" });
info!(" - EventBridge规则: {}", if status.eventbridge_rule { "✅" } else { "❌" });
info!(" - CloudWatch告警: {} 个 {}", if status.cloudwatch_alarms >= 5 { "✅" } else { "⚠️" }, status.cloudwatch_alarms);
info!(" - 环境变量: {}", if status.lambda_environment { "✅" } else { "❌" });
info!(" - IAM权限: {}", if status.iam_permissions { "✅" } else { "❌" });
if status.is_healthy {
info!(" 🚨 429 throttling限制: ✅ 已激活");
} else {
info!(" 🚨 429 throttling限制: ❌ 未激活");
}
}
async fn print_status_summary(&self, region_statuses: &[RegionStatus]) {
info!("\n{}", "=".repeat(80));
info!("🎉 多区域部署状态检查完成");
info!("{}", "=".repeat(80));
let healthy_count = region_statuses.iter().filter(|r| r.is_healthy).count();
let total_count = region_statuses.len();
info!("📊 部署状态统计:");
info!(" ✅ 完全健康的区域: {}/{}", healthy_count, total_count);
info!(" ❌ 有问题的区域: {}/{}", total_count - healthy_count, total_count);
let lambda_count = region_statuses.iter().filter(|r| r.lambda_function).count();
let sns_count = region_statuses.iter().filter(|r| r.sns_topic).count();
let eventbridge_count = region_statuses.iter().filter(|r| r.eventbridge_rule).count();
let env_count = region_statuses.iter().filter(|r| r.lambda_environment).count();
let iam_count = region_statuses.iter().filter(|r| r.iam_permissions).count();
info!("\n📋 组件部署统计:");
info!(" ✅ Lambda函数: {}/{} 区域", lambda_count, total_count);
info!(" ✅ SNS主题: {}/{} 区域", sns_count, total_count);
info!(" ✅ EventBridge规则: {}/{} 区域", eventbridge_count, total_count);
info!(" ✅ Lambda环境变量: {}/{} 区域", env_count, total_count);
info!(" ✅ IAM权限: {}/{} 区域", iam_count, total_count);
if healthy_count > 0 {
info!("\n✅ 完全部署成功的区域:");
for status in region_statuses.iter().filter(|r| r.is_healthy) {
info!(" 🌍 {} (耗时: {:?})", status.region, status.total_duration);
}
}
if healthy_count < total_count {
info!("\n❌ 需要修复的区域:");
for status in region_statuses.iter().filter(|r| !r.is_healthy) {
let issues = vec![
(!status.lambda_function, "Lambda函数"),
(!status.sns_topic, "SNS主题"),
(!status.eventbridge_rule, "EventBridge规则"),
(!status.lambda_environment, "环境变量"),
(!status.iam_permissions, "IAM权限"),
].into_iter()
.filter_map(|(missing, component)| if missing { Some(component) } else { None })
.collect::<Vec<_>>();
info!(" 🌍 {}: 缺少 {}", status.region, issues.join(", "));
}
}
if healthy_count > 0 {
info!("\n🚨 429 throttling限制状态:");
info!(" ✅ 已激活区域: {}/{}", healthy_count, total_count);
info!(" ⚡ 检测机制: 实时检测429错误");
info!(" 🔒 自动响应: 立即禁用相关AK/SK");
info!("\n💡 验证建议:");
info!(" 1. 在成功部署的区域测试Bedrock API调用");
info!(" 2. 检查CloudWatch告警是否正常触发");
info!(" 3. 验证SNS告警通知系统");
info!(" 4. 测试429错误AK/SK自动禁用功能");
}
}
}
#[derive(Debug)]
pub struct RegionStatus {
pub region: String,
pub lambda_function: bool,
pub sns_topic: bool,
pub eventbridge_rule: bool,
pub cloudwatch_alarms: i32,
pub lambda_environment: bool,
pub iam_permissions: bool,
pub total_duration: std::time::Duration,
pub is_healthy: bool,
}
pub fn create_lambda_zip() -> Result<Vec<u8>> {
use std::io::{Cursor, Write};
use zip::write::FileOptions;
let mut buffer = Vec::new();
{
let mut zip = zip::ZipWriter::new(Cursor::new(&mut buffer));
let options = FileOptions::default()
.compression_method(zip::CompressionMethod::Deflated);
let python_content = r#"import json
import os
import time
import re
import hashlib
from datetime import datetime
# Redis connection setup (已移除,现在使用CloudWatch日志)
# REDIS_HOST = os.environ.get('REDIS_HOST', 'localhost')
# REDIS_PORT = int(os.environ.get('REDIS_PORT', 6379))
# REDIS_PASSWORD = os.environ.get('REDIS_PASSWORD', None)
# CloudWatch Logs setup
CLOUDWATCH_LOG_GROUP = '/aws/lambda/bedrock-monitor-function/detailed'
CLOUDWATCH_LOG_STREAM = f"detailed-logs-{int(time.time())}"
# CloudWatch Logs client
try:
import boto3
cloudwatch_logs = boto3.client('logs')
CLOUDWATCH_AVAILABLE = True
except ImportError:
CLOUDWATCH_AVAILABLE = False
print("Warning: boto3 not available, CloudWatch logging disabled")
def get_log_stream_name():
"""Generate unique log stream name"""
return CLOUDWATCH_LOG_STREAM
def ensure_log_group_and_stream():
"""Ensure log group and stream exist"""
if not CLOUDWATCH_AVAILABLE:
return
try:
# Create log group if it doesn't exist
try:
cloudwatch_logs.create_log_group(logGroupName=CLOUDWATCH_LOG_GROUP)
print(f"Created CloudWatch log group: {CLOUDWATCH_LOG_GROUP}")
except cloudwatch_logs.exceptions.ResourceAlreadyExistsException:
pass # Log group already exists
# Create log stream
log_stream_name = get_log_stream_name()
try:
cloudwatch_logs.create_log_stream(
logGroupName=CLOUDWATCH_LOG_GROUP,
logStreamName=log_stream_name
)
print(f"Created CloudWatch log stream: {log_stream_name}")
except cloudwatch_logs.exceptions.ResourceAlreadyExistsException:
pass # Log stream already exists
except Exception as e:
print(f"Error setting up CloudWatch Logs: {e}")
def sanitize_data(data, level='partial'):
"""Sanitize sensitive data from request/response content"""
if not isinstance(data, (dict, str)):
return data
if isinstance(data, str):
# Remove potential API keys, tokens, and sensitive patterns
sanitized = data
# Remove AWS access keys
sanitized = re.sub(r'AKIA[0-9A-Z]{16}', 'AKIAXXXXXXXXXXXXXXXX', sanitized)
# Remove potential tokens and secrets
sanitized = re.sub(r'[Bb]earer\s+[A-Za-z0-9\-._~+\/]+=*', 'bearer XXXXXXXX', sanitized)
sanitized = re.sub(r'[Tt]oken\s*[:=]\s*[A-Za-z0-9\-._~+\/]+=*', 'token: XXXXXXXX', sanitized)
# Remove email addresses
sanitized = re.sub(r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b', 'email@domain.com', sanitized)
# Remove phone numbers (basic pattern)
sanitized = re.sub(r'\b\d{3}[-.]?\d{3}[-.]?\d{4}\b', 'XXX-XXX-XXXX', sanitized)
# Remove credit card numbers (basic pattern)
sanitized = re.sub(r'\b\d{4}[-\s]?\d{4}[-\s]?\d{4}[-\s]?\d{4}\b', 'XXXX-XXXX-XXXX-XXXX', sanitized)
# Remove IP addresses for privacy (optional, based on level)
if level == 'full':
sanitized = re.sub(r'\b(?:\d{1,3}\.){3}\d{1,3}\b', 'X.X.X.X', sanitized)
return sanitized
if isinstance(data, dict):
sanitized_dict = {}
for key, value in data.items():
# Check for sensitive key patterns
if any(pattern in key.lower() for pattern in
['password', 'token', 'key', 'secret', 'auth', 'credential', 'session']):
sanitized_dict[key] = 'XXXXXXXX'
elif any(pattern in key.lower() for pattern in ['email', 'mail']):
sanitized_dict[key] = 'email@domain.com'
elif key.lower() in ['client_ip', 'source_ip'] and level == 'full':
sanitized_dict[key] = 'X.X.X.X'
else:
sanitized_dict[key] = sanitize_data(value, level)
return sanitized_dict
return data
def should_log_detailed_content(status_code, event_type='unknown'):
"""Determine if we should log detailed content based on status and event type"""
# Always log errors and throttling in detail
if status_code in [429, 500, 502, 503, 504]:
return 'full'
# Log SNS alarms in detail
if event_type == 'CloudWatchAlarm':
return 'full'
# For successful requests, log summary only
if status_code == 200:
return 'summary'
# Default to partial for other cases
return 'partial'
def log_to_cloudwatch_detailed(log_entry, log_level='INFO'):
"""Send detailed log entry to CloudWatch Logs"""
if not CLOUDWATCH_AVAILABLE:
print(f"CloudWatch unavailable, logging to console: {log_level} - {json.dumps(log_entry)}")
return
try:
timestamp = int(time.time() * 1000)
log_event = {
'timestamp': timestamp,
'message': json.dumps(log_entry, default=str)
}
cloudwatch_logs.put_log_events(
logGroupName=CLOUDWATCH_LOG_GROUP,
logStreamName=get_log_stream_name(),
logEvents=[log_event]
)
except Exception as e:
print(f"Failed to send to CloudWatch Logs: {e}")
# Fallback to console logging
print(f"{log_level} - {json.dumps(log_entry, default=str)}")
def create_detailed_log_entry(event_data, request_data, response_data, status_code, log_level='summary'):
"""Create a detailed log entry with appropriate level of detail"""
timestamp = datetime.now().isoformat()
# Base log entry with always-included fields
log_entry = {
'timestamp': timestamp,
'status_code': status_code,
'log_level': log_level,
'request_id': request_data.get('request_id', 'unknown'),
'model': request_data.get('model', 'unknown'),
'client_ip': request_data.get('client_ip', 'unknown'),
'aws_region': request_data.get('aws_region', 'unknown'),
'event_type': event_data.get('event_type', 'unknown')
}
# Add detailed information based on log level
if log_level == 'full':
# Include complete sanitized data
log_entry.update({
'full_request': sanitize_data(request_data, 'partial'),
'full_response': sanitize_data(response_data, 'partial'),
'error_details': request_data.get('error_details', {}),
'user_agent': request_data.get('user_agent', 'unknown'),
'caller_identity': request_data.get('caller_identity', {}),
'request_parameters': sanitize_data(request_data.get('request_parameters', {}), 'partial'),
'response_elements': sanitize_data(response_data.get('response_elements', {}), 'partial')
})
elif log_level == 'summary':
# Include summary information only
log_entry.update({
'api_action': request_data.get('api_action', 'unknown'),
'processing_time_ms': request_data.get('processing_time_ms', 0),
'content_type': response_data.get('content_type', 'unknown'),
'error_code': request_data.get('error_code', None)
})
else: # partial
# Include some detail but sanitize heavily
log_entry.update({
'api_action': request_data.get('api_action', 'unknown'),
'partial_request': sanitize_data(request_data, 'full'),
'error_code': request_data.get('error_code', None)
})
return log_entry
def handle_throttle_event(request_data):
"""处理429错误事件,立即禁用AK/SK(简化版,无Redis依赖)"""
try:
import boto3
import os
# 检查是否为试运行模式
dry_run_mode = os.environ.get('BEDROCK_DRY_RUN_MODE', 'false').lower() == 'true'
# 提取关键信息
timestamp = request_data.get('timestamp', int(datetime.now().timestamp()))
request_id = request_data.get('request_id', 'unknown')
model_id = request_data.get('model', 'unknown')
client_ip = request_data.get('client_ip', 'unknown')
aws_region = request_data.get('aws_region', 'unknown')
# 从调用者身份中提取AK/SK信息
caller_identity = request_data.get('caller_identity', {})
username = caller_identity.get('userName', 'unknown')
access_key_id = caller_identity.get('accessKeyId', 'unknown')
# 如果没有找到AK/SK,尝试从事件详情中获取
if not access_key_id or access_key_id == 'unknown':
event_details = request_data.get('event_details', {})
access_key_id = event_details.get('accessKeyId', 'unknown')
if not access_key_id or access_key_id == 'unknown':
username = event_details.get('userName', 'unknown')
access_key_id = event_details.get('accessKeyId', 'unknown')
print(f"🚨 429错误检测:")
print(f" - 时间: {datetime.fromtimestamp(timestamp).isoformat()}")
print(f" - 用户: {username}")
print(f" - AK/SK: {access_key_id}")
print(f" - 模型: {model_id}")
print(f" - 源IP: {client_ip}")
print(f" - 区域: {aws_region}")
print(f" - 模式: {'试运行' if dry_run_mode else '正式运行'}")
# 创建详细的事件记录
disable_record = {
"timestamp": datetime.fromtimestamp(timestamp).isoformat(),
"username": username,
"access_key_id": access_key_id,
"model_id": model_id,
"source_ip": client_ip,
"request_id": request_id,
"aws_region": aws_region,
"event_type": "throttle_immediate_disable",
"dry_run": dry_run_mode,
"action_reason": "429_rate_limit_error"
}
# 记录到CloudWatch日志
print(f"📝 429错误记录: {json.dumps(disable_record)}")
# 如果无法获取必要信息,跳过禁用操作
if access_key_id == 'unknown' or username == 'unknown':
print(f"⚠️ 无法获取有效的用户或AK/SK信息,跳过禁用操作")
return False
# 立即禁用AK/SK(无需Redis检查)
if not dry_run_mode:
try:
# 初始化IAM客户端
iam_client = boto3.client('iam')
# 调用IAM禁用访问密钥
response = iam_client.update_access_key(
UserName=username,
AccessKeyId=access_key_id,
Status='Inactive'
)
print(f"✅ 成功禁用AK/SK: {access_key_id}")
print(f"🔒 用户 {username} 的访问密钥已被禁用")
# 更新记录
disable_record.update({
"action": "access_key_disabled",
"status": "success",
"iam_response": str(response)
})
except Exception as iam_error:
print(f"❌ IAM禁用AK/SK失败: {iam_error}")
# 记录失败信息
disable_record.update({
"action": "access_key_disable_failed",
"status": "error",
"error": str(iam_error)
})
return False
else:
print(f"🧪 试运行模式:AK/SK {access_key_id} 将被禁用(未实际执行)")
disable_record["action"] = "dry_run_would_disable"
# 最终记录到CloudWatch
print(f"📋 最终操作记录: {json.dumps(disable_record)}")
return True
except Exception as e:
print(f"❌ 处理429错误时发生异常: {e}")
return False
def record_detailed_request(request_data):
"""记录详细的请求信息到CloudWatch日志(简化版,无Redis依赖)"""
try:
timestamp = request_data.get('timestamp', int(datetime.now().timestamp()))
status = request_data.get('status_code', 0)
model = request_data.get('model', 'unknown')
client_ip = request_data.get('client_ip', 'unknown')
request_id = request_data.get('request_id', 'unknown')
# 创建日志记录
log_entry = {
"timestamp": datetime.fromtimestamp(timestamp).isoformat(),
"status_code": status,
"model": model,
"client_ip": client_ip,
"request_id": request_id,
"api_action": request_data.get('api_action', 'unknown'),
"aws_region": request_data.get('aws_region', 'unknown'),
"user_agent": request_data.get('user_agent', 'unknown'),
"log_type": "bedrock_api_call"
}
# 记录到CloudWatch日志
print(f"📊 Bedrock API调用记录: {json.dumps(log_entry)}")
# 如果是429错误,立即处理AK/SK禁用
if status == 429:
handle_throttle_event(request_data)
return True
except Exception as e:
print(f"❌ 记录请求信息失败: {e}")
return False
def record_basic_status(status_code, timestamp):
"""记录基本状态到CloudWatch日志(替代Redis记录)"""
try:
# 记录基本计数到CloudWatch日志
log_entry = {
"timestamp": datetime.fromtimestamp(timestamp).isoformat(),
"status_code": status_code,
"log_type": "bedrock_status_count"
}
print(f"📈 Bedrock状态计数: {json.dumps(log_entry)}")
return True
except Exception as e:
print(f"❌ 记录状态失败: {e}")
return False
def extract_bedrock_details_from_event(event):
"""从EventBridge CloudTrail事件中提取详细的请求和响应信息"""
try:
if 'detail' not in event:
return None
detail = event['detail']
# 提取请求信息
request_params = detail.get('requestParameters', {})
response_elements = detail.get('responseElements', {})
user_identity = detail.get('userIdentity', {})
# 增强的429错误检测逻辑 - 处理responseElements可能为None的情况
status_code = response_elements.get('httpStatusCode', 0) if response_elements else 0
error_code = detail.get('errorCode', '')
response_error = response_elements.get('error', '') if response_elements else ''
# 检查多个字段判断是否为429错误
is_throttled = (
status_code == 429 or
error_code in ['ThrottlingException', 'ServiceQuotaExceededException'] or
(isinstance(response_error, dict) and response_error.get('code') in ['ThrottlingException', 'ServiceQuotaExceededException']) or
(isinstance(response_error, str) and ('ThrottlingException' in response_error or 'ServiceQuotaExceededException' in response_error))
)
# 如果检测到429相关错误,强制设置status_code为429
if is_throttled:
status_code = 429
print(f"🚨 检测到429/限流错误: status={status_code}, errorCode={error_code}, responseError={response_error}")
request_data = {
'timestamp': int(datetime.now().timestamp()),
'event_time': detail.get('eventTime', ''),
'request_id': response_elements.get('requestId', detail.get('requestID', 'unknown')) if response_elements else detail.get('requestID', 'unknown'),
'model': request_params.get('modelId', 'unknown'),
'api_action': detail.get('eventName', 'unknown'),
'aws_region': detail.get('awsRegion', ''),
'status_code': status_code, # 使用增强检测后的状态码
'client_ip': detail.get('sourceIPAddress', 'unknown'),
'user_agent': detail.get('userAgent', 'unknown'),
'error_code': error_code,
'content_type': response_elements.get('contentType', None) if response_elements else None,
'caller_identity': {
'account_id': user_identity.get('accountId', ''),
'principal_id': user_identity.get('principalId', ''),
'type': user_identity.get('type', ''),
'arn': user_identity.get('arn', ''),
'userName': user_identity.get('userName', ''),
'accessKeyId': user_identity.get('accessKeyId', '')
},
'request_parameters': {
'model_id': request_params.get('modelId', ''),
# 注意:实际请求内容不会记录在CloudTrail中,只记录参数
},
'response_elements': {
'http_status': response_elements.get('httpStatusCode', 0) if response_elements else 0,
'content_type': response_elements.get('contentType', '') if response_elements else '',
'request_id': response_elements.get('requestId', '') if response_elements else '',
'error': response_elements.get('error', None) if response_elements else None
},
'raw_error_code': error_code,
'raw_response_error': response_error,
'is_throttled': is_throttled
}
print(f"📋 提取的请求数据: status_code={status_code}, is_throttled={is_throttled}")
print(f"📊 调试信息: errorCode={error_code}, responseError={response_error}")
return request_data
except Exception as e:
print(f"❌ 从EventBridge事件提取详情时出错: {e}")
return None
def extract_bedrock_status_from_event(event):
"""Extract status code from EventBridge CloudTrail event (backwards compatibility)"""
try:
request_data = extract_bedrock_details_from_event(event)
if request_data:
return request_data['status_code']
return None
except Exception as e:
print(f"Error extracting status from EventBridge event: {e}")
return 200
def handle_sns_alarm(event):
"""Handle CloudWatch alarm notifications from SNS"""
try:
print(f"🚨 处理CloudWatch SNS告警事件: {json.dumps(event)}")
# Extract SNS message
if 'Records' not in event:
print("❌ SNS事件中没有找到记录")
return None
alarm_count = 0
for record in event['Records']:
if 'Sns' not in record:
continue
sns_message = json.loads(record['Sns']['Message'])
print(f"📋 SNS消息: {json.dumps(sns_message)}")
# Extract CloudWatch alarm information
alarm_name = sns_message.get('AlarmName', 'unknown')
alarm_arn = sns_message.get('AlarmArn', 'unknown')
state_value = sns_message.get('NewStateValue', 'unknown')
state_reason = sns_message.get('NewStateReason', 'unknown')
# Check if this is a 429-related alarm
is_429_alarm = (
'429' in alarm_name or
'throttle' in alarm_name.lower() or
'InvocationThrottles' in alarm_name or
'Bedrock' in alarm_name
)
# Extract alarm configuration details
trigger_info = sns_message.get('Trigger', {})
metric_name = trigger_info.get('MetricName', 'unknown')
namespace = trigger_info.get('Namespace', 'unknown')
threshold = trigger_info.get('Threshold', 0)
print(f"🔔 CloudWatch告警详情:")
print(f" - 名称: {alarm_name}")
print(f" - 状态: {state_value}")
print(f" - 原因: {state_reason}")
print(f" - 指标: {metric_name}")
print(f" - 命名空间: {namespace}")
print(f" - 阈值: {threshold}")
print(f" - 是否429相关: {is_429_alarm}")
# Create alarm data for logging
alarm_data = {
'timestamp': datetime.now().isoformat(),
'alarm_name': alarm_name,
'alarm_arn': alarm_arn,
'state_value': state_value,
'state_reason': state_reason,
'region': sns_message.get('Region', os.environ.get('AWS_REGION', '')),
'event_type': 'CloudWatchAlarm',
'is_429_related': is_429_alarm,
'alarm_configuration': {
'metric': metric_name,
'namespace': namespace,
'threshold': threshold,
'comparison_operator': trigger_info.get('ComparisonOperator', 'unknown'),
'evaluation_periods': trigger_info.get('EvaluationPeriods', 0)
},
'dimensions': trigger_info.get('Dimensions', []),
'raw_sns_message': sns_message
}
# Log to CloudWatch
print(f"📝 记录CloudWatch告警: {json.dumps(alarm_data)}")
# If this is a 429 alarm in ALARM state, trigger immediate action
if is_429_alarm and state_value == 'ALARM':
print(f"🚨 检测到429相关告警触发!")
print(f" - 告警名称: {alarm_name}")
print(f" - 阈值: {threshold} 次")
print(f" - 状态: {state_value}")
# Create a mock 429 event to trigger the same logic as direct 429 detection
mock_429_event = {
"timestamp": datetime.now().isoformat(),
"event_type": "SNS_Alarm_429_Detection",
"alarm_name": alarm_name,
"metric_value": threshold,
"threshold": threshold,
"state_reason": state_reason,
"trigger_source": "CloudWatch_Metric_Alarm"
}
# Handle this as a 429 event (will trigger AK/SK disabling)
handle_sns_triggered_429(mock_429_event)
alarm_count += 1
print(f"✅ 成功处理了 {alarm_count} 个SNS告警事件")
return {
'status': 'success',
'message': f'SNS告警处理成功,共处理 {alarm_count} 个告警',
'alarms_processed': alarm_count,
'recorded': True
}
except Exception as e:
print(f"❌ 处理SNS告警时出错: {e}")
return {
'status': 'error',
'message': f'处理SNS告警失败: {str(e)}',
'recorded': False
}
def handle_sns_triggered_429(alarm_event):
"""处理由SNS告警触发的429检测"""
try:
print(f"🔥 处理SNS告警触发的429事件: {json.dumps(alarm_event)}")
# Since this is a CloudWatch alarm, we don't have specific user/AK/SK info
# But we can still log and potentially implement account-level policies
alarm_name = alarm_event.get('alarm_name', 'unknown')
metric_value = alarm_event.get('metric_value', 0)
threshold = alarm_event.get('threshold', 0)
# Create a log entry
log_entry = {
'timestamp': datetime.now().isoformat(),
'event_type': 'SNS_Alarm_429_Response',
'alarm_name': alarm_name,
'metric_value': metric_value,
'threshold': threshold,
'state_reason': alarm_event.get('state_reason', 'CloudWatch alarm triggered'),
'trigger_source': alarm_event.get('trigger_source', 'CloudWatch'),
'action_taken': 'Alarm logged and monitored',
'recommendation': 'Check recent Bedrock API usage and consider rate limiting'
}
print(f"📝 SNS触发的429日志: {json.dumps(log_entry)}")
# Note: Since SNS alarms don't contain specific AK/SK information,
# we can't disable specific credentials here.
# However, we can implement account-level monitoring and policies
print(f"💡 注意: SNS告警不包含具体的AK/SK信息")
print(f" - 建议检查CloudTrail日志获取具体的用户信息")
print(f" - 考虑实施账户级别的限流策略")
return True
except Exception as e:
print(f"❌ 处理SNS触发的429事件时出错: {e}")
return False
def lambda_handler(event, context):
try:
print(f"Received event: {json.dumps(event)}")
# Initialize CloudWatch Logs if available
if CLOUDWATCH_AVAILABLE:
ensure_log_group_and_stream()
# Check if this is an SNS event (CloudWatch alarm)
if 'Records' in event and len(event['Records']) > 0 and 'Sns' in event['Records'][0]:
print("Detected SNS event, processing as CloudWatch alarm")
sns_result = handle_sns_alarm(event)
return {
'statusCode': 200,
'body': json.dumps(sns_result),
'headers': {'Content-Type': 'application/json'}
}
# 尝试从EventBridge事件中提取详细信息
request_data = extract_bedrock_details_from_event(event)
status_code = None
if request_data:
status_code = request_data['status_code']
print(f"Extracted from EventBridge - Status: {status_code}, Model: {request_data['model']}, Client IP: {request_data['client_ip']}")
else:
# 如果不是EventBridge事件,使用直接调用数据
status_code = event.get('status_code', 200)
request_data = {
'timestamp': int(datetime.now().timestamp()),
'request_id': event.get('request_id', context.aws_request_id if context else 'unknown'),
'model': event.get('model_id', 'unknown'),
'api_action': 'DirectInvoke',
'aws_region': os.environ.get('AWS_REGION', ''),
'status_code': status_code,
'client_ip': event.get('client_ip', 'lambda-direct'),
'user_agent': event.get('user_agent', 'lambda-function'),
'error_code': event.get('error', None),
'content_type': None,
'caller_identity': {
'account_id': '',
'principal_id': 'lambda',
'type': 'Lambda',
'arn': context.invoked_function_arn if context else ''
},
'request_parameters': event.get('request_parameters', {}),
'response_elements': event.get('response_elements', {})
}
print(f"Using direct call data - Status: {status_code}, Model: {request_data['model']}")
# Log the status (this goes to CloudWatch)
print(f"Bedrock API Status: {status_code}")
# Determine logging level based on status and event type
event_type = "CloudWatchAlarm" if 'Records' in event and 'Sns' in event.get('Records', [{}])[0] else ("EventBridge" if 'detail' in event else "Direct")
log_level = should_log_detailed_content(status_code, event_type)
# Create response data for logging
response_data = {
'content_type': request_data.get('content_type'),
'response_elements': request_data.get('response_elements', {}),
'processing_time_ms': request_data.get('processing_time_ms', 0)
}
# 记录详细信息(扩展到所有状态码,但使用不同的日志级别)
print(f"Recording information for status {status_code} with log level: {log_level}")
# 记录到CloudWatch日志(简化版,无Redis依赖)
if status_code in [200, 429]:
record_detailed_request(request_data)
# 记录基本计数(替代Redis)
timestamp = int(datetime.now().timestamp())
record_basic_status(status_code, timestamp)
else:
# 记录基本计数
timestamp = int(datetime.now().timestamp())
record_basic_status(status_code, timestamp)
# 🔥 关键修复:检测429错误并禁用AK/SK
if status_code in [429] or request_data.get('error_code') in ['ThrottlingException', 'ServiceQuotaExceededException']:
print(f"🚨 检测到429/Throttling错误!状态码: {status_code}, 错误码: {request_data.get('error_code')}")
try:
handle_throttle_event(request_data)
except Exception as e:
print(f"❌ 处理429事件失败: {e}")
elif status_code == 200 and request_data.get('error_code') in ['ThrottlingException', 'ServiceQuotaExceededException']:
print(f"🚨 检测到状态码200但包含Throttling错误!错误码: {request_data.get('error_code')}")
try:
handle_throttle_event(request_data)
except Exception as e:
print(f"❌ 处理Throttling事件失败: {e}")
# Log to CloudWatch Logs with appropriate detail level
try:
detailed_log_entry = create_detailed_log_entry(
event_data={'event_type': event_type, 'raw_event': sanitize_data(event, 'full')},
request_data=request_data,
response_data=response_data,
status_code=status_code,
log_level=log_level
)
log_to_cloudwatch_detailed(detailed_log_entry, 'INFO' if status_code == 200 else 'ERROR')
print(f"Logged to CloudWatch with level: {log_level}")
except Exception as e:
print(f"Failed to log to CloudWatch: {e}")
# Create appropriate response based on status code
if status_code == 200:
response_body = {
"status": "success",
"message": "Bedrock request completed successfully",
"recorded": True,
"event_type": "EventBridge" if 'detail' in event else "Direct",
"request_id": request_data['request_id'],
"model": request_data['model'],
"client_ip": request_data['client_ip']
}
elif status_code == 429:
response_body = {
"status": "rate_limit",
"message": "Bedrock rate limit exceeded",
"recorded": True,
"event_type": "EventBridge" if 'detail' in event else "Direct",
"request_id": request_data['request_id'],
"model": request_data['model'],
"client_ip": request_data['client_ip']
}
elif status_code == 400:
response_body = {
"status": "bad_request",
"message": "Bedrock bad request",
"recorded": False, # Not recorded per requirement
"event_type": "EventBridge" if 'detail' in event else "Direct"
}
elif status_code == 500:
response_body = {
"status": "server_error",
"message": "Bedrock server error",
"recorded": False, # Not recorded per requirement
"event_type": "EventBridge" if 'detail' in event else "Direct"
}
else:
response_body = {
"status": "processed",
"message": f"Bedrock status {status_code} received",
"recorded": False,
"event_type": "EventBridge" if 'detail' in event else "Direct"
}
return {
'statusCode': 200, # Always return 200 for EventBridge
'body': json.dumps(response_body)
}
except Exception as e:
print(f"Error processing Lambda function: {e}")
return {
'statusCode': 500,
'body': json.dumps({
'status': 'error',
'message': f'Internal server error: {str(e)}',
'recorded': False
})
}
"#;
zip.start_file("lambda_function.py", options)?;
zip.write_all(python_content.as_bytes())?;
zip.finish()?;
}
Ok(buffer)
}