burncloud_aws/
aws_deploy.rs

1use aws_config::BehaviorVersion;
2use aws_sdk_lambda::{
3    types::{Environment, Architecture, Runtime, FunctionCode},
4    primitives::Blob,
5    Client as LambdaClient,
6};
7use aws_sdk_iam::Client as IamClient;
8use aws_sdk_cloudwatch::Client as CloudWatchClient;
9use aws_sdk_eventbridge::Client as EventBridgeClient;
10use aws_sdk_cloudtrail::Client as CloudTrailClient;
11use aws_sdk_s3::Client as S3Client;
12use aws_sdk_sns::Client as SnsClient;
13use aws_types::SdkConfig;
14use std::collections::HashMap;
15use std::time::SystemTime;
16use anyhow::{Result, anyhow};
17use tracing::{info, error, warn};
18use aws_credential_types::Credentials;
19use aws_types::region::Region;
20
21/// 强制只从.env文件读取AWS凭证,严禁从系统环境变量读取
22async fn create_aws_config_from_env(aws_region: &str) -> Result<SdkConfig> {
23    // 确保只从.env文件加载
24    dotenv::dotenv().ok();
25
26    // 从环境变量读取AWS凭证(此时应该只来自.env文件)
27    let access_key_id = std::env::var("AWS_ACCESS_KEY_ID")
28        .map_err(|_| anyhow!("❌ 错误: AWS_ACCESS_KEY_ID 必须在.env文件中设置"))?;
29    let secret_access_key = std::env::var("AWS_SECRET_ACCESS_KEY")
30        .map_err(|_| anyhow!("❌ 错误: AWS_SECRET_ACCESS_KEY 必须在.env文件中设置"))?;
31
32    // 创建静态凭证
33    let credentials = Credentials::new(access_key_id, secret_access_key, None, None, "env-file");
34
35    // 强制禁用从其他来源读取凭证
36    let config = aws_config::defaults(BehaviorVersion::latest())
37        .region(Region::new(aws_region.to_string()))
38        .credentials_provider(credentials)
39        .load()
40        .await;
41
42    // 验证凭证是否有效
43    validate_credentials(&config).await?;
44
45    info!("✅ AWS凭证已成功从.env文件加载");
46    Ok(config)
47}
48
49/// 验证AWS凭证的有效性
50async fn validate_credentials(config: &SdkConfig) -> Result<()> {
51    use aws_sdk_sts::Client as StsClient;
52
53    let sts_client = StsClient::new(config);
54
55    match sts_client.get_caller_identity().send().await {
56        Ok(response) => {
57            if let Some(account_id) = response.account() {
58                info!("✅ AWS凭证验证成功 - 账户ID: {}", account_id);
59            }
60            Ok(())
61        }
62        Err(e) => {
63            error!("❌ AWS凭证验证失败: {}", e);
64            Err(anyhow!("AWS凭证无效,请检查.env文件中的凭证配置"))
65        }
66    }
67}
68
69#[derive(Debug)]
70pub struct AwsManager {
71    lambda_client: LambdaClient,
72    iam_client: IamClient,
73    cloudwatch_client: CloudWatchClient,
74    eventbridge_client: EventBridgeClient,
75    cloudtrail_client: CloudTrailClient,
76    s3_client: S3Client,
77    sns_client: SnsClient,
78    region: String,
79    sdk_config: SdkConfig,
80}
81
82impl AwsManager {
83    pub async fn new(aws_region: &str) -> Result<Self> {
84        // 强制只从.env文件读取凭证,严禁从系统环境变量读取
85        let config = create_aws_config_from_env(aws_region).await?;
86
87        let lambda_client = LambdaClient::new(&config);
88        let iam_client = IamClient::new(&config);
89        let cloudwatch_client = CloudWatchClient::new(&config);
90        let eventbridge_client = EventBridgeClient::new(&config);
91        let cloudtrail_client = CloudTrailClient::new(&config);
92        let s3_client = S3Client::new(&config);
93        let sns_client = SnsClient::new(&config);
94
95        Ok(AwsManager {
96            lambda_client,
97            iam_client,
98            cloudwatch_client,
99            eventbridge_client,
100            cloudtrail_client,
101            s3_client,
102            sns_client,
103            region: aws_region.to_string(),
104            sdk_config: config,
105        })
106    }
107
108    pub async fn create_lambda_execution_role(&self) -> Result<String> {
109        let role_name = format!("lambda-bedrock-monitor-role-{}",
110            SystemTime::now().duration_since(SystemTime::UNIX_EPOCH)?.as_secs());
111        self.create_lambda_execution_role_with_name(&role_name).await
112    }
113
114    pub async fn create_lambda_execution_role_with_name(&self, role_name: &str) -> Result<String> {
115        let trust_policy = serde_json::json!({
116            "Version": "2012-10-17",
117            "Statement": [
118                {
119                    "Effect": "Allow",
120                    "Principal": {
121                        "Service": "lambda.amazonaws.com"
122                    },
123                    "Action": "sts:AssumeRole"
124                }
125            ]
126        });
127
128        info!("创建IAM角色: {}", role_name);
129
130        let create_role_result = self.iam_client.create_role()
131            .role_name(role_name)
132            .assume_role_policy_document(trust_policy.to_string())
133            .send()
134            .await;
135
136        match create_role_result {
137            Ok(result) => {
138                let role_arn = result.role().unwrap().arn().to_string();
139                info!("IAM角色创建成功: {}", role_arn);
140
141                let managed_policy_arns = vec![
142                    "arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole",
143                    "arn:aws:iam::aws:policy/service-role/AWSLambdaVPCAccessExecutionRole",
144                ];
145
146                for policy_arn in managed_policy_arns {
147                    let _ = self.iam_client.attach_role_policy()
148                        .role_name(role_name)
149                        .policy_arn(policy_arn)
150                        .send()
151                        .await;
152                    info!("附加策略: {}", policy_arn);
153                }
154
155                // Add inline policy for CloudWatch Logs write access
156                let cloudwatch_logs_policy = r#"{
157                    "Version": "2012-10-17",
158                    "Statement": [
159                        {
160                            "Effect": "Allow",
161                            "Action": [
162                                "logs:CreateLogGroup",
163                                "logs:CreateLogStream",
164                                "logs:PutLogEvents",
165                                "logs:DescribeLogGroups",
166                                "logs:DescribeLogStreams"
167                            ],
168                            "Resource": "arn:aws:logs:*:*:*"
169                        }
170                    ]
171                }"#;
172
173                match self.iam_client.put_role_policy()
174                    .role_name(role_name)
175                    .policy_name("CloudWatchLogsWriteAccess")
176                    .policy_document(cloudwatch_logs_policy.to_string())
177                    .send()
178                    .await {
179                    Ok(_) => info!("CloudWatch Logs写权限策略添加成功"),
180                    Err(e) => warn!("CloudWatch Logs权限策略添加失败: {}", e)
181                }
182
183                // Add inline policy for AK/SK management (security critical) - 支持所有区域
184                let credential_management_policy = r#"{
185                    "Version": "2012-10-17",
186                    "Statement": [
187                        {
188                            "Effect": "Allow",
189                            "Action": [
190                                "iam:UpdateAccessKey",
191                                "iam:ListAccessKeys",
192                                "iam:GetAccessKeyLastUsed",
193                                "iam:GetUser"
194                            ],
195                            "Resource": "*"
196                        }
197                    ]
198                }"#;
199
200                match self.iam_client.put_role_policy()
201                    .role_name(role_name)
202                    .policy_name("BedrockCredentialManagement")
203                    .policy_document(credential_management_policy.to_string())
204                    .send()
205                    .await {
206                    Ok(_) => info!("🔐 凭据管理权限策略添加成功"),
207                    Err(e) => warn!("⚠️ 凭据管理权限策略添加失败: {}", e)
208                }
209
210                // Add inline policy for SNS notifications
211                let sns_notification_policy = r#"{
212                    "Version": "2012-10-17",
213                    "Statement": [
214                        {
215                            "Effect": "Allow",
216                            "Action": [
217                                "sns:Publish"
218                            ],
219                            "Resource": "arn:aws:sns:*:*:bedrock-throttling-alerts"
220                        }
221                    ]
222                }"#;
223
224                match self.iam_client.put_role_policy()
225                    .role_name(role_name)
226                    .policy_name("BedrockSNSNotifications")
227                    .policy_document(sns_notification_policy.to_string())
228                    .send()
229                    .await {
230                    Ok(_) => info!("📧 SNS通知权限策略添加成功"),
231                    Err(e) => warn!("⚠️ SNS通知权限策略添加失败: {}", e)
232                }
233
234                tokio::time::sleep(tokio::time::Duration::from_secs(10)).await;
235                Ok(role_arn)
236            }
237            Err(e) => {
238                error!("IAM角色创建失败: {}", e);
239                Err(anyhow!("创建IAM角色失败: {}", e))
240            }
241        }
242    }
243
244    pub async fn deploy_lambda_function(&self, zip_content: Vec<u8>) -> Result<String> {
245        let function_name = "bedrock-monitor-function";
246
247        info!("开始部署Lambda函数: {}", function_name);
248        info!("ZIP包大小: {} bytes", zip_content.len());
249
250        // 先检查函数是否存在
251        let function_exists = self.lambda_client.get_function()
252            .function_name(function_name)
253            .send()
254            .await.is_ok();
255
256        // 尝试获取现有角色或创建新角色
257        let role_arn = if !function_exists {
258            // 先尝试使用已存在的角色名称模式
259            let role_name = "lambda-bedrock-monitor-role";
260            match self.iam_client.get_role().role_name(role_name).send().await {
261                Ok(role_result) => {
262                    info!("使用现有IAM角色: {}", role_name);
263                    role_result.role().unwrap().arn().to_string()
264                }
265                Err(_) => {
266                    // 如果现有角色不存在,创建新角色
267                    info!("创建新IAM角色: {}", role_name);
268                    self.create_lambda_execution_role_with_name(role_name).await?
269                }
270            }
271        } else {
272            info!("函数已存在,获取现有角色ARN");
273            // 获取现有函数的角色ARN
274            let function_config = self.lambda_client.get_function_configuration()
275                .function_name(function_name)
276                .send()
277                .await?;
278            function_config.role().unwrap().to_string()
279        };
280
281        let env_vars = HashMap::from([
282            ("RUST_LOG".to_string(), "info".to_string()),
283            ("BEDROCK_AUTO_CREDENTIAL_DISABLE".to_string(), "true".to_string()),
284            ("BEDROCK_DRY_RUN_MODE".to_string(), "false".to_string()),
285            // 注意:AWS_REGION是Lambda保留变量,不能设置
286        ]);
287
288        let environment = Environment::builder()
289            .set_variables(Some(env_vars))
290            .build();
291
292        let function_arn = if function_exists {
293            // Check if we need to recreate the function due to runtime change
294            let function_config = self.lambda_client.get_function_configuration()
295                .function_name(function_name)
296                .send()
297                .await?;
298
299            let current_runtime = function_config.runtime().unwrap_or(&Runtime::Providedal2);
300
301            // If runtime is different, delete and recreate
302            if !matches!(current_runtime, Runtime::Python39 | Runtime::Python38 | Runtime::Python37) {
303                info!("检测到运行时变更,重新创建Lambda函数");
304
305                // Delete the existing function
306                let _ = self.lambda_client.delete_function()
307                    .function_name(function_name)
308                    .send()
309                    .await;
310
311                // Create new function with Python runtime
312                info!("创建新的Python Lambda函数");
313                let create_function_result = self.lambda_client.create_function()
314                    .function_name(function_name)
315                    .runtime(Runtime::Python312)
316                    .handler("lambda_function.lambda_handler")
317                    .code(FunctionCode::builder().zip_file(Blob::new(zip_content)).build())
318                    .role(role_arn)
319                    .architectures(Architecture::X8664)
320                    .timeout(30)
321                    .memory_size(128)
322                    .environment(environment)
323                    .send()
324                    .await;
325
326                match create_function_result {
327                    Ok(result) => {
328                        result.function_arn().unwrap().to_string()
329                    }
330                    Err(e) => {
331                        error!("Lambda函数创建失败: {}", e);
332                        error!("错误类型: {:?}", e);
333                        error!("完整错误信息: {:?}", e);
334                        return Err(anyhow!("创建Lambda函数失败: {}", e));
335                    }
336                }
337            } else {
338                // Same runtime, just update the code
339                info!("更新现有Lambda函数");
340                let update_result = self.lambda_client.update_function_code()
341                    .function_name(function_name)
342                    .zip_file(Blob::new(zip_content))
343                    .send()
344                    .await;
345
346                match update_result {
347                    Ok(result) => result.function_arn().unwrap().to_string(),
348                    Err(e) => {
349                        error!("Lambda函数更新失败: {}", e);
350                        return Err(anyhow!("更新Lambda函数失败: {}", e));
351                    }
352                }
353            }
354        } else {
355            info!("创建新Lambda函数");
356            let max_retries = 3;
357
358            for attempt in 1..=max_retries {
359                let create_function_result = self.lambda_client.create_function()
360                    .function_name(function_name)
361                    .runtime(Runtime::Python312)
362                    .handler("lambda_function.lambda_handler")
363                    .code(FunctionCode::builder().zip_file(Blob::new(zip_content.clone())).build())
364                    .role(&role_arn)
365                    .architectures(Architecture::X8664)
366                    .timeout(30)
367                    .memory_size(128)
368                    .environment(environment.clone())
369                    .send()
370                    .await;
371
372                match create_function_result {
373                    Ok(result) => {
374                        let function_arn = result.function_arn().unwrap().to_string();
375                        info!("✅ Lambda函数创建成功: {}", function_arn);
376                        return Ok(function_arn);
377                    }
378                    Err(e) => {
379                        let error_str = e.to_string();
380                        let raw_error = format!("{:?}", e);
381
382                        error!("❌ Lambda函数创建失败 (尝试 {}/{}):", attempt, max_retries);
383                        error!("┌─ 错误信息: {}", e);
384                        error!("├─ 错误字符串: {}", error_str);
385                        error!("└─ 原始错误对象: {}", raw_error);
386                        error!("┌─ 当前区域: {}", self.region);
387                        error!("├─ 函数名称: {}", function_name);
388                        error!("├─ IAM角色ARN: {}", role_arn);
389                        error!("└─ 账户ID: {}", self.get_account_id().await.unwrap_or_else(|_| "unknown".to_string()));
390
391                        // 检查是否是函数已存在的错误
392                        if error_str.contains("ResourceConflictException") ||
393                           error_str.contains("already exists") ||
394                           error_str.contains("Function already exist") {
395                            info!("ℹ️ Lambda函数已存在,获取函数ARN");
396                            // 尝试获取现有函数的ARN
397                            match self.lambda_client.get_function_configuration()
398                                .function_name(function_name)
399                                .send()
400                                .await {
401                                Ok(config) => {
402                                    let function_arn = config.function_arn().unwrap().to_string();
403                                    info!("✅ 获取已存在的Lambda函数: {}", function_arn);
404                                    return Ok(function_arn);
405                                }
406                                Err(get_e) => {
407                                    warn!("⚠️ 无法获取已存在的Lambda函数ARN: {}", get_e);
408                                    // 返回一个默认的ARN格式
409                                    let account_id = self.get_account_id().await?;
410                                    let default_arn = format!("arn:aws:lambda:{}:{}:function:{}",
411                                        self.region, account_id, function_name);
412                                    return Ok(default_arn);
413                                }
414                            }
415                        }
416
417                        if attempt < max_retries {
418                            let wait_time = attempt * 5; // 5, 10, 15 seconds
419                            info!("等待 {} 秒后重试...", wait_time);
420                            tokio::time::sleep(tokio::time::Duration::from_secs(wait_time as u64)).await;
421                        } else {
422                            error!("🔍 详细错误分析:");
423
424                            if error_str.contains("service error") {
425                                error!("┌─ 错误类型: AWS服务错误");
426                                error!("├─ 可能原因:");
427                                error!("│  • AWS服务暂时不可用");
428                                error!("│  • 网络连接问题");
429                                error!("│  • AWS服务延迟");
430                                error!("│  • 区域服务暂时中断");
431                                error!("├─ 建议解决方案:");
432                                error!("│  • 检查网络连接");
433                                error!("│  • 稍后重试");
434                                error!("│  • 尝试使用其他AWS区域");
435                                error!("│  • 检查AWS服务状态页面");
436                                error!("└─ AWS状态页面: https://status.aws.amazon.com/");
437                            } else if error_str.contains("AccessDeniedException") {
438                                error!("┌─ 错误类型: 权限不足");
439                                error!("├─ 可能原因:");
440                                error!("│  • IAM角色无权限在{}区域创建Lambda函数", self.region);
441                                error!("│  • IAM角色跨区域权限限制");
442                                error!("├─ IAM角色: {}", role_arn);
443                                error!("└─ 建议解决方案:");
444                                error!("   • 检查IAM角色{}的跨区域权限", role_arn);
445                                error!("   • 为IAM角色添加lambda:*权限");
446                                error!("   • 或者使用IAM:CreateFunction权限");
447                            } else if error_str.contains("InvalidParameterValueException") {
448                                error!("┌─ 错误类型: 参数无效");
449                                error!("├─ 可能原因:");
450                                error!("│  • 函数名、角色ARN或配置参数格式错误");
451                                error!("│  • 不支持的配置值");
452                                error!("└─ 建议解决方案:");
453                                error!("   • 检查函数名是否符合命名规范");
454                                error!("   • 验证IAM角色ARN格式");
455                                error!("   • 检查运行时和配置参数");
456                            } else if error_str.contains("LimitExceededException") {
457                                error!("┌─ 错误类型: 资源限制");
458                                error!("├─ 可能原因:");
459                                error!("│  • 账户Lambda函数数量超限");
460                                error!("│  • 并发创建限制");
461                                error!("└─ 建议解决方案:");
462                                error!("   • 删除不需要的Lambda函数");
463                                error!("   • 提高AWS服务配额");
464                            } else {
465                                error!("┌─ 错误类型: 未知错误");
466                                error!("└─ 建议联系AWS技术支持");
467                            }
468
469                            return Err(anyhow!("创建Lambda函数失败: {}", e));
470                        }
471                    }
472                }
473            }
474
475            return Err(anyhow!("Lambda函数创建失败: 重试{}次后仍然失败", max_retries));
476        };
477
478        info!("Lambda函数部署成功: {}", function_arn);
479        Ok(function_arn)
480    }
481
482    pub async fn create_cloudwatch_alarm(&self, function_name: &str) -> Result<()> {
483        let alarm_name = format!("{}-error-alarm", function_name);
484
485        info!("创建CloudWatch告警: {}", alarm_name);
486
487        let _account_id = self.get_account_id().await?;
488
489        let dimension1 = aws_sdk_cloudwatch::types::Dimension::builder()
490            .name("FunctionName")
491            .value(function_name)
492            .build();
493
494        let _ = self.cloudwatch_client.put_metric_alarm()
495            .alarm_name(&alarm_name)
496            .alarm_description("Lambda函数错误率告警")
497            .namespace("AWS/Lambda")
498            .metric_name("Errors")
499            .dimensions(dimension1)
500            .statistic(aws_sdk_cloudwatch::types::Statistic::Sum)
501            .period(300)
502            .evaluation_periods(1)
503            .threshold(1.0)
504            .comparison_operator(aws_sdk_cloudwatch::types::ComparisonOperator::GreaterThanOrEqualToThreshold)
505            .send()
506            .await;
507
508        info!("CloudWatch告警创建成功");
509        Ok(())
510    }
511
512    pub async fn create_bedrock_eventbridge_rule(&self) -> Result<String> {
513        let rule_name = "bedrock-api-monitor-rule";
514        let max_retries = 3;
515
516        info!("创建EventBridge规则: {}", rule_name);
517
518        // 首先检查规则是否已存在
519        match self.eventbridge_client.list_rules()
520            .name_prefix(rule_name)
521            .send()
522            .await {
523            Ok(response) => {
524                let rules = response.rules();
525                for rule in rules {
526                    if let Some(name) = rule.name() {
527                        if name == rule_name {
528                            let rule_arn = rule.arn().unwrap().to_string();
529                            info!("✅ EventBridge规则已存在: {}", rule_arn);
530                            return Ok(rule_arn);
531                        }
532                    }
533                }
534            }
535            Err(e) => {
536                warn!("⚠️ 无法检查EventBridge规则是否已存在: {},继续尝试创建", e);
537            }
538        }
539
540        // 创建EventBridge规则来监控Bedrock invoke-model API调用,特别关注429错误
541        let event_pattern = r#"{
542            "source": ["aws.bedrock"],
543            "detail-type": ["AWS API Call via CloudTrail"],
544            "detail": {
545                "eventSource": ["bedrock.amazonaws.com"],
546                "eventName": ["InvokeModel", "Converse", "InvokeModelWithResponseStream"]
547            }
548        }"#;
549
550        for attempt in 1..=max_retries {
551            let put_rule_result = self.eventbridge_client.put_rule()
552                .name(rule_name)
553                .event_pattern(event_pattern)
554                .state(aws_sdk_eventbridge::types::RuleState::Enabled)
555                .description("专门监控AWS Bedrock invoke-model API调用 - 记录200和429状态")
556                .send()
557                .await;
558
559            match put_rule_result {
560                Ok(result) => {
561                    let rule_arn = result.rule_arn().unwrap().to_string();
562                    info!("EventBridge规则创建成功: {}", rule_arn);
563                    return Ok(rule_arn);
564                }
565                Err(e) => {
566                    let error_str = e.to_string();
567                    error!("EventBridge规则创建失败 (尝试 {}/{}): {}", attempt, max_retries, e);
568
569                    // 检查是否是规则已存在的错误
570                    if error_str.contains("ResourceConflictException") ||
571                       error_str.contains("already exists") ||
572                       error_str.contains("Rule already exists") {
573                        info!("ℹ️ EventBridge规则已存在,跳过创建");
574                        // 尝试获取规则ARN
575                        match self.eventbridge_client.list_rules()
576                            .name_prefix(rule_name)
577                            .send()
578                            .await {
579                            Ok(response) => {
580                                let rules = response.rules();
581                                for rule in rules {
582                                    if let Some(name) = rule.name() {
583                                        if name == rule_name {
584                                            let rule_arn = rule.arn().unwrap().to_string();
585                                            info!("✅ 获取已存在的EventBridge规则: {}", rule_arn);
586                                            return Ok(rule_arn);
587                                        }
588                                    }
589                                }
590                            }
591                            Err(list_e) => {
592                                warn!("⚠️ 无法获取已存在的EventBridge规则ARN: {}", list_e);
593                                // 返回一个默认的ARN格式
594                                let account_id = self.get_account_id().await?;
595                                let default_arn = format!("arn:aws:events:{}:{}:rule/{}",
596                                    self.region, account_id, rule_name);
597                                return Ok(default_arn);
598                            }
599                        }
600                    }
601
602                    if attempt < max_retries {
603                        let wait_time = attempt * 3; // 3, 6, 9 seconds
604                        info!("等待 {} 秒后重试...", wait_time);
605                        tokio::time::sleep(tokio::time::Duration::from_secs(wait_time as u64)).await;
606                    } else {
607                        // 提供更详细的错误分析
608                        if error_str.contains("dispatch failure") {
609                            error!("❌ EventBridge服务暂时不可用,可能是网络问题或AWS服务延迟");
610                        } else if error_str.contains("InternalFailure") {
611                            error!("❌ EventBridge内部服务错误");
612                        } else if error_str.contains("AccessDeniedException") {
613                            error!("❌ 权限不足: 无权限创建EventBridge规则");
614                        } else if error_str.contains("LimitExceededException") {
615                            error!("❌ EventBridge规则数量超限");
616                        } else {
617                            error!("❌ 未知EventBridge错误: {}", error_str);
618                        }
619
620                        return Err(anyhow!("创建EventBridge规则失败: {}", e));
621                    }
622                }
623            }
624        }
625
626        Err(anyhow!("EventBridge规则创建失败: 重试{}次后仍然失败", max_retries))
627    }
628
629    pub async fn add_lambda_target_to_eventbridge(&self) -> Result<()> {
630        let rule_name = "bedrock-api-monitor-rule";
631        let function_name = "bedrock-monitor-function";
632
633        info!("为EventBridge规则添加Lambda目标: {} -> {}", rule_name, function_name);
634
635        // 获取Lambda函数的ARN
636        let function_config = self.lambda_client.get_function_configuration()
637            .function_name(function_name)
638            .send()
639            .await?;
640
641        let function_arn = function_config.function_arn().unwrap();
642
643        // 添加Lambda函数作为EventBridge目标
644        let target = aws_sdk_eventbridge::types::Target::builder()
645            .id("1")
646            .arn(function_arn)
647            .build()?;
648
649        let targets_result = self.eventbridge_client.put_targets()
650            .rule(rule_name)
651            .targets(target)
652            .send()
653            .await;
654
655        match targets_result {
656            Ok(_) => {
657                info!("Lambda目标添加成功");
658
659                // 首先检查EventBridge权限是否已存在
660                let eventbridge_permission_exists = match self.lambda_client.get_policy()
661                    .function_name(function_name)
662                    .send()
663                    .await {
664                    Ok(policy_result) => {
665                        if let Some(policy) = policy_result.policy() {
666                            policy.contains("bedrock-eventbridge-invoke") ||
667                            policy.contains("EventBridgeInvoke") ||
668                            (policy.contains("events.amazonaws.com") &&
669                             policy.contains(&format!("arn:aws:events:{}:{}:rule/{}",
670                                self.region, self.get_account_id().await?, rule_name)))
671                        } else {
672                            false
673                        }
674                    }
675                    Err(_) => false
676                };
677
678                let add_permission_result = if eventbridge_permission_exists {
679                    None
680                } else {
681                    info!("EventBridge权限不存在,开始添加...");
682                    Some(self.lambda_client.add_permission()
683                        .function_name(function_name)
684                        .statement_id("bedrock-eventbridge-invoke")
685                        .action("lambda:InvokeFunction")
686                        .principal("events.amazonaws.com")
687                        .source_arn(format!("arn:aws:events:{}:{}:rule/{}",
688                            self.region,
689                            self.get_account_id().await?,
690                            rule_name
691                        ))
692                        .send()
693                        .await)
694                };
695
696                match add_permission_result {
697                    Some(Ok(response)) => {
698                        info!("Lambda EventBridge调用权限添加成功");
699                        if let Some(statement) = response.statement() {
700                            info!("权限声明: {}", statement);
701                        }
702                        Ok(())
703                    }
704                    Some(Err(e)) => {
705                        let error_str = e.to_string();
706                        let raw_error = format!("{:?}", e);
707                        error!("Lambda EventBridge调用权限添加失败");
708                        error!("错误信息: {}", error_str);
709                        error!("原始错误: {}", raw_error);
710
711                        // 提供更详细的错误分析
712                        if error_str.contains("ResourceConflictException") ||
713                           error_str.contains("already exists") ||
714                           error_str.contains("The resource-based policy") ||
715                           error_str.contains("already has a statement") {
716                            info!("Lambda EventBridge调用权限已存在,跳过添加");
717                            Ok(())
718                        } else if error_str.contains("InvalidParameterValueException") {
719                            warn!("⚠️ EventBridge权限参数无效: 检查函数名和规则ARN格式,但继续部署");
720                            Ok(()) // 继续执行而不是失败
721                        } else if error_str.contains("AccessDeniedException") {
722                            warn!("⚠️ 权限不足: 无权限为Lambda函数添加EventBridge调用权限,但继续部署");
723                            Ok(()) // 继续执行而不是失败
724                        } else if error_str.contains("ResourceNotFoundException") {
725                            warn!("⚠️ 资源未找到: Lambda函数或EventBridge规则不存在,但继续部署");
726                            Ok(()) // 继续执行而不是失败
727                        } else if error_str.contains("service error") {
728                            warn!("⚠️ AWS服务错误,但权限可能已成功添加,继续部署");
729                            Ok(()) // 继续执行而不是失败
730                        } else {
731                            warn!("Lambda EventBridge调用权限添加失败: {},但继续部署", e);
732                            Ok(()) // 继续执行而不是失败
733                        }
734                    }
735                    None => {
736                        info!("EventBridge权限已存在,无需添加");
737                        Ok(())
738                    }
739                }
740            }
741            Err(e) => {
742                error!("Lambda目标添加失败: {}", e);
743                Err(anyhow!("添加Lambda目标失败: {}", e))
744            }
745        }
746    }
747
748    pub async fn test_lambda_function(&self, payload: &str) -> Result<String> {
749        info!("测试Lambda函数调用,payload: {}", payload);
750
751        match self.lambda_client.invoke()
752            .function_name("bedrock-monitor-function")
753            .payload(aws_sdk_lambda::primitives::Blob::new(payload.as_bytes()))
754            .send()
755            .await {
756            Ok(response) => {
757                info!("Lambda调用成功");
758                let mut result = String::new();
759
760                if let Some(payload) = response.payload() {
761                    let payload_str = String::from_utf8(payload.as_ref().to_vec())?;
762                    info!("响应: {}", payload_str);
763                    result.push_str(&format!("Response: {}\n", payload_str));
764                }
765
766                if let Some(log_result) = response.log_result() {
767                    info!("日志: {}", log_result);
768                    result.push_str(&format!("Logs: {}\n", log_result));
769                }
770
771                if let Some(function_error) = response.function_error() {
772                    let error_str = std::str::from_utf8(function_error.as_ref())
773                        .unwrap_or("无法解析错误信息");
774                    error!("函数错误: {}", error_str);
775                    result.push_str(&format!("Error: {}\n", error_str));
776                }
777
778                Ok(result)
779            }
780            Err(e) => {
781                error!("Lambda调用失败: {}", e);
782                Err(anyhow!("Lambda调用失败: {}", e))
783            }
784        }
785    }
786
787    pub async fn create_lambda_function_url(&self) -> Result<String> {
788        let function_name = "bedrock-monitor-function";
789        info!("创建Lambda Function URL for: {}", function_name);
790
791        // 获取Lambda函数配置
792        let _function_config = self.lambda_client.get_function_configuration()
793            .function_name(function_name)
794            .send()
795            .await?;
796
797        // 创建Function URL配置
798        let _create_url_config_result = self.lambda_client.create_function_url_config()
799            .function_name(function_name)
800            .auth_type(aws_sdk_lambda::types::FunctionUrlAuthType::None)
801            .invoke_mode(aws_sdk_lambda::types::InvokeMode::Buffered)
802            .send()
803            .await?;
804
805        // 添加资源策略以允许公共访问
806        let account_id = self.get_account_id().await?;
807
808        let _ = self.lambda_client.add_permission()
809            .function_name(function_name)
810            .statement_id("FunctionURLAllowPublicAccess")
811            .action("lambda:InvokeFunctionUrl")
812            .principal("*")
813            .function_url_auth_type(aws_sdk_lambda::types::FunctionUrlAuthType::None)
814            .source_arn(&format!("arn:aws:lambda:{}:{}:function:{}",
815                                self.region, account_id, function_name))
816            .send()
817            .await?;
818
819        // 构建Function URL
820        let function_url = format!(
821            "https://{}.lambda-url.{}.on.aws",
822            function_name, self.region
823        );
824
825        info!("Lambda Function URL创建成功: {}", function_url);
826        Ok(function_url)
827    }
828
829    pub async fn setup_cloudtrail_for_bedrock(&self) -> Result<String> {
830        let trail_name = "bedrock-api-monitoring-trail";
831        let account_id = self.get_account_id().await?;
832        let s3_bucket_name = format!("bedrock-cloudtrail-logs-{}", account_id);
833
834        info!("设置CloudTrail以监控Bedrock API调用: {}", trail_name);
835
836        // 创建S3存储桶(如果不存在)
837        self.create_s3_bucket_if_not_exists(&s3_bucket_name).await?;
838
839        // 创建CloudTrail
840        let create_trail_result = self.cloudtrail_client.create_trail()
841            .name(trail_name)
842            .s3_bucket_name(s3_bucket_name)
843            .include_global_service_events(true)
844            .is_multi_region_trail(true)
845            .enable_log_file_validation(true)
846            .send()
847            .await;
848
849        let trail_arn = match create_trail_result {
850            Ok(result) => {
851                let arn = result.trail_arn().unwrap().to_string();
852                info!("CloudTrail创建成功: {}", arn);
853                arn
854            }
855            Err(e) => {
856                error!("CloudTrail创建失败: {}", e);
857                return Err(anyhow!("创建CloudTrail失败: {}", e));
858            }
859        };
860
861        // 开启数据事件监控
862        info!("配置CloudTrail数据事件...");
863        self.configure_cloudtrail_data_events(trail_name).await?;
864
865        // 开启Trail
866        let _ = self.cloudtrail_client.start_logging()
867            .name(trail_name)
868            .send()
869            .await?;
870
871        info!("CloudTrail启动成功");
872        Ok(trail_arn)
873    }
874
875    async fn create_s3_bucket_if_not_exists(&self, bucket_name: &str) -> Result<()> {
876        info!("检查S3存储桶: {}", bucket_name);
877
878        // 检查存储桶是否已存在
879        match self.s3_client.head_bucket().bucket(bucket_name).send().await {
880            Ok(_) => {
881                info!("S3存储桶已存在: {}", bucket_name);
882                return Ok(());
883            }
884            Err(_) => {
885                info!("创建新的S3存储桶: {}", bucket_name);
886            }
887        }
888
889        // 创建存储桶
890        let create_bucket_result = self.s3_client.create_bucket()
891            .bucket(bucket_name)
892            .send()
893            .await;
894
895        match create_bucket_result {
896            Ok(_) => {
897                info!("S3存储桶创建成功: {}", bucket_name);
898            }
899            Err(e) => {
900                error!("S3存储桶创建失败: {}", e);
901                return Err(anyhow!("创建S3存储桶失败: {}", e));
902            }
903        }
904
905        Ok(())
906    }
907
908    async fn configure_cloudtrail_data_events(&self, trail_name: &str) -> Result<()> {
909        info!("配置CloudTrail数据事件以监控Bedrock");
910
911        // 获取账户ID
912        let account_id = self.get_account_id().await?;
913
914        // 配置数据事件,监控Bedrock服务
915        // 配置数据事件,监控Bedrock服务
916        let data_resource = aws_sdk_cloudtrail::types::DataResource::builder()
917            .r#type("AWS::Bedrock::Model")
918            .values(&format!("arn:aws:bedrock:{}:{}:model/*", self.region, account_id))
919            .build();
920
921        let event_selector = aws_sdk_cloudtrail::types::EventSelector::builder()
922            .read_write_type(aws_sdk_cloudtrail::types::ReadWriteType::All)
923            .include_management_events(true)
924            .data_resources(data_resource)
925            .build();
926
927        // 更新Trail以包含数据事件
928        let _ = self.cloudtrail_client.put_event_selectors()
929            .trail_name(trail_name)
930            .event_selectors(event_selector)
931            .send()
932            .await?;
933
934        
935        info!("CloudTrail数据事件配置完成");
936        Ok(())
937    }
938
939    pub async fn add_eventbridge_lambda_permission(&self) -> Result<()> {
940        let function_name = "bedrock-monitor-function";
941        let statement_id = "EventBridgeInvoke";
942        let action = "lambda:InvokeFunction";
943        let principal = "events.amazonaws.com";
944
945        let account_id = self.get_account_id().await?;
946        let source_arn = format!("arn:aws:events:{}:{}:rule/bedrock-api-monitor-rule",
947                                self.region, account_id);
948
949        info!("添加EventBridge调用Lambda权限: {} -> {}", source_arn, function_name);
950
951        let result = self.lambda_client.add_permission()
952            .function_name(function_name)
953            .statement_id(statement_id)
954            .action(action)
955            .principal(principal)
956            .source_arn(&source_arn)
957            .send()
958            .await;
959
960        match result {
961            Ok(_) => {
962                info!("EventBridge Lambda权限添加成功");
963                Ok(())
964            }
965            Err(e) => {
966                error!("EventBridge Lambda权限添加失败: {}", e);
967                Err(anyhow!("添加Lambda EventBridge调用权限失败: {}", e))
968            }
969        }
970    }
971
972    async fn get_account_id(&self) -> Result<String> {
973        let sts_client = aws_sdk_sts::Client::new(&self.sdk_config);
974        let identity = sts_client.get_caller_identity().send().await?;
975        Ok(identity.account().unwrap().to_string())
976    }
977
978    /// 创建SNS主题用于Bedrock告警通知
979    pub async fn create_sns_topic(&self) -> Result<String> {
980        let topic_name = "bedrock-throttling-alerts";
981
982        info!("创建SNS主题: {}", topic_name);
983
984        let result = self.sns_client.create_topic()
985            .name(topic_name)
986            .send()
987            .await;
988
989        match result {
990            Ok(response) => {
991                let topic_arn = response.topic_arn().unwrap().to_string();
992                info!("SNS主题创建成功: {}", topic_arn);
993                Ok(topic_arn)
994            }
995            Err(e) => {
996                error!("SNS主题创建失败: {}", e);
997                Err(anyhow!("创建SNS主题失败: {}", e))
998            }
999        }
1000    }
1001
1002    /// 创建完整的Bedrock监控CloudWatch告警
1003    pub async fn create_bedrock_cloudwatch_alarms(&self, sns_topic_arn: &str) -> Result<()> {
1004        info!("开始创建Bedrock监控CloudWatch告警...");
1005
1006        // let account_id = self.get_account_id().await?;
1007
1008        // 1. 立即响应告警 - 检测到1次429就触发
1009        self.create_single_alarm(
1010            "bedrock-InvocationThrottles-Immediate",
1011            "立即响应告警 - 检测到1次429时就触发AK/SK关闭",
1012            "AWS/Bedrock",
1013            "InvocationThrottles",
1014            10, // 10秒周期
1015            1,   // 1个评估周期
1016            1.0, // 阈值1次
1017            Some(sns_topic_arn),
1018            None,
1019        ).await?;
1020
1021        // 2. 高频限流告警 - 2分钟内超过10次
1022        self.create_single_alarm(
1023            "bedrock-InvocationThrottles-HighFrequency",
1024            "高频Bedrock API限流告警 - 当Bedrock API限流频率异常时触发",
1025            "AWS/Bedrock",
1026            "InvocationThrottles",
1027            60, // 60秒周期
1028            2,   // 2个评估周期
1029            10.0, // 阈值10次
1030            Some(sns_topic_arn),
1031            None,
1032        ).await?;
1033
1034        // 3. 通用限流告警 - 任何模型3分钟内超过5次
1035        let general_dimension = aws_sdk_cloudwatch::types::Dimension::builder()
1036            .name("ModelId")
1037            .value("*")
1038            .build();
1039
1040        self.create_single_alarm(
1041            "bedrock-InvocationThrottles-General",
1042            "通用Bedrock API限流告警 - 对任何模型限流超过3次时告警",
1043            "AWS/Bedrock",
1044            "InvocationThrottles",
1045            60, // 60秒周期
1046            3,   // 3个评估周期
1047            5.0, // 阈值5次
1048            Some(sns_topic_arn),
1049            Some(vec![general_dimension]),
1050        ).await?;
1051
1052        // 4. Claude 3.5 Sonnet 专门告警
1053        self.create_model_specific_alarm(
1054            "anthropic.claude-3-5-sonnet-20240620-v1:0",
1055            sns_topic_arn,
1056        ).await?;
1057
1058        // 5. Claude 3 Sonnet 专门告警
1059        self.create_model_specific_alarm(
1060            "anthropic.claude-3-sonnet-20240229-v1:0",
1061            sns_topic_arn,
1062        ).await?;
1063
1064        // 6. Claude 3.5 Haiku 专门告警
1065        self.create_model_specific_alarm(
1066            "anthropic.claude-3-5-haiku-20241022-v1:0",
1067            sns_topic_arn,
1068        ).await?;
1069
1070        // 7. 客户端错误告警
1071        self.create_single_alarm(
1072            "bedrock-InvocationClientErrors",
1073            "Bedrock客户端错误告警 - 当客户端错误超过5次时告警",
1074            "AWS/Bedrock",
1075            "InvocationClientErrors",
1076            60, // 60秒周期
1077            2,   // 2个评估周期
1078            5.0, // 阈值5次
1079            Some(sns_topic_arn),
1080            None,
1081        ).await?;
1082
1083        // 8. 服务器错误告警
1084        self.create_single_alarm(
1085            "bedrock-InvocationServerErrors",
1086            "Bedrock服务器错误告警 - 当服务器错误超过3次时告警",
1087            "AWS/Bedrock",
1088            "InvocationServerErrors",
1089            60, // 60秒周期
1090            1,   // 1个评估周期
1091            3.0, // 阈值3次
1092            Some(sns_topic_arn),
1093            None,
1094        ).await?;
1095
1096        info!("✅ 所有Bedrock监控CloudWatch告警创建完成");
1097        Ok(())
1098    }
1099
1100    /// 创建单个CloudWatch告警
1101    async fn create_single_alarm(
1102        &self,
1103        alarm_name: &str,
1104        description: &str,
1105        namespace: &str,
1106        metric_name: &str,
1107        period: i64,
1108        evaluation_periods: i32,
1109        threshold: f64,
1110        sns_topic_arn: Option<&str>,
1111        dimensions: Option<Vec<aws_sdk_cloudwatch::types::Dimension>>,
1112    ) -> Result<()> {
1113        info!("创建CloudWatch告警: {}", alarm_name);
1114
1115        let mut alarm_request = self.cloudwatch_client.put_metric_alarm()
1116            .alarm_name(alarm_name)
1117            .alarm_description(description)
1118            .namespace(namespace)
1119            .metric_name(metric_name)
1120            .statistic(aws_sdk_cloudwatch::types::Statistic::Sum)
1121            .period(period.try_into()?)
1122            .evaluation_periods(evaluation_periods)
1123            .threshold(threshold)
1124            .comparison_operator(aws_sdk_cloudwatch::types::ComparisonOperator::GreaterThanThreshold);
1125
1126        // 添加SNS通知
1127        if let Some(topic_arn) = sns_topic_arn {
1128            alarm_request = alarm_request.alarm_actions(topic_arn);
1129        }
1130
1131        // 添加维度
1132        if let Some(dims) = dimensions {
1133            for dim in dims {
1134                alarm_request = alarm_request.dimensions(dim);
1135            }
1136        }
1137
1138        match alarm_request.send().await {
1139            Ok(_) => {
1140                info!("CloudWatch告警创建成功: {}", alarm_name);
1141                Ok(())
1142            }
1143            Err(e) => {
1144                error!("CloudWatch告警创建失败 {}: {}", alarm_name, e);
1145                Err(anyhow!("创建CloudWatch告警失败: {}", e))
1146            }
1147        }
1148    }
1149
1150    /// 为特定模型创建告警
1151    async fn create_model_specific_alarm(&self, model_id: &str, sns_topic_arn: &str) -> Result<()> {
1152        let alarm_name = format!("bedrock-InvocationThrottles-{}", model_id.replace(':', "-"));
1153        let description = format!("Claude模型 {} 限流告警 - 当模型1分钟内限流超过3次时告警", model_id);
1154
1155        let dimension = aws_sdk_cloudwatch::types::Dimension::builder()
1156            .name("ModelId")
1157            .value(model_id)
1158            .build();
1159
1160        self.create_single_alarm(
1161            &alarm_name,
1162            &description,
1163            "AWS/Bedrock",
1164            "InvocationThrottles",
1165            60,  // 60秒周期
1166            1,   // 1个评估周期
1167            3.0, // 阈值3次
1168            Some(sns_topic_arn),
1169            Some(vec![dimension]),
1170        ).await
1171    }
1172
1173    /// 配置SNS到Lambda的触发器
1174    pub async fn configure_sns_lambda_trigger(&self, sns_topic_arn: &str) -> Result<()> {
1175        let function_name = "bedrock-monitor-function";
1176
1177        info!("配置SNS到Lambda的触发器: {} -> {}", sns_topic_arn, function_name);
1178
1179        // 获取Lambda函数ARN
1180        let function_config = self.lambda_client.get_function_configuration()
1181            .function_name(function_name)
1182            .send()
1183            .await?;
1184
1185        let function_arn = function_config.function_arn().unwrap();
1186
1187        // 订阅SNS主题到Lambda函数
1188        let result = self.sns_client.subscribe()
1189            .topic_arn(sns_topic_arn)
1190            .protocol("lambda")
1191            .endpoint(function_arn)
1192            .return_subscription_arn(true)
1193            .send()
1194            .await;
1195
1196        match result {
1197            Ok(response) => {
1198                let subscription_arn = response.subscription_arn().unwrap().to_string();
1199                info!("SNS订阅创建成功: {}", subscription_arn);
1200
1201                // 添加SNS调用Lambda的权限
1202                match self.add_sns_lambda_permission(function_name, sns_topic_arn).await {
1203                    Ok(_) => info!("SNS Lambda权限添加成功"),
1204                    Err(e) => {
1205                        // 由于add_sns_lambda_permission内部已经处理了"已存在"的情况,
1206                        // 这里的错误都是真正的失败,但仍然继续部署
1207                        warn!("SNS Lambda权限配置失败,但继续部署: {}", e);
1208                    }
1209                }
1210
1211                Ok(())
1212            }
1213            Err(e) => {
1214                let error_str = e.to_string();
1215                error!("SNS订阅创建失败: {}", e);
1216
1217                // 提供更详细的错误分析
1218                if error_str.contains("already exists") ||
1219                   error_str.contains("ResourceConflictException") ||
1220                   error_str.contains("Duplicate subscription") ||
1221                   error_str.contains("already a subscription") {
1222                    info!("SNS订阅已存在,跳过创建");
1223                    // 即使订阅已存在,仍然尝试添加权限
1224                    match self.add_sns_lambda_permission(function_name, sns_topic_arn).await {
1225                        Ok(_) => info!("SNS Lambda权限添加成功"),
1226                        Err(e) => {
1227                            warn!("SNS Lambda权限配置失败,但继续部署: {}", e);
1228                        }
1229                    }
1230                    Ok(())
1231                } else if error_str.contains("dispatch failure") {
1232                    warn!("⚠️ SNS订阅可能已存在或网络问题,但继续部署");
1233                    // 对于dispatch failure,仍然尝试添加权限
1234                    match self.add_sns_lambda_permission(function_name, sns_topic_arn).await {
1235                        Ok(_) => info!("SNS Lambda权限添加成功"),
1236                        Err(e) => {
1237                            warn!("SNS Lambda权限配置失败,但继续部署: {}", e);
1238                        }
1239                    }
1240                    Ok(())
1241                } else if error_str.contains("InvalidParameterException") {
1242                    error!("❌ SNS订阅参数无效: 检查主题ARN或Lambda函数ARN格式");
1243                    Err(anyhow!("SNS订阅参数无效: {}", error_str))
1244                } else if error_str.contains("AccessDeniedException") {
1245                    error!("❌ 权限不足: 无权限创建SNS订阅");
1246                    Err(anyhow!("权限不足: {}", error_str))
1247                } else if error_str.contains("NotFoundException") {
1248                    error!("❌ 资源未找到: SNS主题或Lambda函数不存在");
1249                    Err(anyhow!("资源未找到: {}", error_str))
1250                } else {
1251                    error!("❌ 未知SNS订阅错误: {}", error_str);
1252                    Err(anyhow!("创建SNS订阅失败: {}", e))
1253                }
1254            }
1255        }
1256    }
1257
1258    /// 添加SNS调用Lambda的权限
1259    async fn add_sns_lambda_permission(&self, function_name: &str, sns_topic_arn: &str) -> Result<()> {
1260        let statement_id = "sns-invoke-permission";
1261
1262        info!("添加SNS调用Lambda权限: {} -> {}", sns_topic_arn, function_name);
1263
1264        // 首先检查SNS权限是否已存在
1265        let sns_permission_exists = match self.lambda_client.get_policy()
1266            .function_name(function_name)
1267            .send()
1268            .await {
1269            Ok(policy_result) => {
1270                if let Some(policy) = policy_result.policy() {
1271                    policy.contains("sns-invoke-permission") ||
1272                    (policy.contains("sns.amazonaws.com") && policy.contains(sns_topic_arn))
1273                } else {
1274                    false
1275                }
1276            }
1277            Err(_) => false
1278        };
1279
1280        if sns_permission_exists {
1281            info!("✅ SNS权限已存在,跳过添加");
1282            return Ok(());
1283        }
1284
1285        let result = self.lambda_client.add_permission()
1286            .function_name(function_name)
1287            .statement_id(statement_id)
1288            .action("lambda:InvokeFunction")
1289            .principal("sns.amazonaws.com")
1290            .source_arn(sns_topic_arn)
1291            .send()
1292            .await;
1293
1294        match result {
1295            Ok(response) => {
1296                info!("SNS Lambda权限添加成功");
1297                if let Some(statement) = response.statement() {
1298                    info!("权限声明: {}", statement);
1299                }
1300                Ok(())
1301            }
1302            Err(e) => {
1303                let error_str = e.to_string();
1304                let raw_error = format!("{:?}", e);
1305                error!("SNS Lambda权限添加失败");
1306                error!("错误信息: {}", error_str);
1307                error!("原始错误: {}", raw_error);
1308
1309                // 检查权限是否实际上已经存在
1310                match self.lambda_client.get_policy()
1311                    .function_name(function_name)
1312                    .send()
1313                    .await {
1314                    Ok(policy_result) => {
1315                        if let Some(policy) = policy_result.policy() {
1316                            info!("当前Lambda函数策略已存在");
1317                            if policy.contains("sns-invoke-permission") {
1318                                info!("✅ SNS权限已存在于策略中,无需重复添加");
1319                                return Ok(());
1320                            }
1321                        }
1322                    }
1323                    Err(_) => {
1324                        warn!("无法检查Lambda函数策略状态");
1325                    }
1326                }
1327
1328                // 提供更详细的错误分析
1329                if error_str.contains("ResourceConflictException") ||
1330                   error_str.contains("already exists") ||
1331                   error_str.contains("The resource-based policy") ||
1332                   error_str.contains("already has a statement") {
1333                    info!("SNS Lambda权限已存在,跳过添加");
1334                    Ok(())
1335                } else if error_str.contains("InvalidParameterValueException") {
1336                    error!("❌ SNS权限参数无效: 检查函数名和SNS主题ARN格式");
1337                    Err(anyhow!("SNS权限参数无效: {}", error_str))
1338                } else if error_str.contains("AccessDeniedException") {
1339                    error!("❌ 权限不足: 无权限为Lambda函数添加SNS调用权限");
1340                    Err(anyhow!("权限不足: {}", error_str))
1341                } else if error_str.contains("ResourceNotFoundException") {
1342                    error!("❌ 资源未找到: Lambda函数或SNS主题不存在");
1343                    Err(anyhow!("资源未找到: {}", error_str))
1344                } else if error_str.contains("service error") {
1345                    warn!("⚠️ AWS服务错误,但权限可能已成功添加,继续部署");
1346                    Ok(())
1347                } else {
1348                    error!("❌ 未知SNS权限错误: {}", error_str);
1349                    Err(anyhow!("添加SNS Lambda权限失败: {}", e))
1350                }
1351            }
1352        }
1353    }
1354
1355    /// 部署完整的监控基础设施
1356    pub async fn deploy_complete_monitoring_stack(&self, zip_content: Vec<u8>) -> Result<()> {
1357        info!("🚀 开始部署完整的Bedrock监控基础设施...");
1358
1359        // 1. 部署Lambda函数
1360        let function_arn = self.deploy_lambda_function(zip_content).await?;
1361        info!("✅ Lambda函数部署成功: {}", function_arn);
1362
1363        // 1.5. 确保环境变量正确设置(带重试机制)
1364        let max_retries = 3;
1365
1366        for attempt in 1..=max_retries {
1367            match self.update_lambda_environment().await {
1368                Ok(_) => {
1369                    info!("✅ Lambda环境变量更新成功 (尝试 {})", attempt);
1370                    break;
1371                }
1372                Err(e) => {
1373                    warn!("⚠️ Lambda环境变量更新失败 (尝试 {}/{}): {}", attempt, max_retries, e);
1374                    if attempt < max_retries {
1375                        info!("等待 {} 秒后重试...", attempt * 5);
1376                        tokio::time::sleep(tokio::time::Duration::from_secs(attempt as u64 * 5)).await;
1377                    } else {
1378                        error!("❌ Lambda环境变量更新最终失败,但继续部署流程");
1379                    }
1380                }
1381            }
1382        }
1383
1384        // 2. 创建SNS主题
1385        let sns_topic_arn = self.create_sns_topic().await?;
1386        info!("✅ SNS主题创建成功: {}", sns_topic_arn);
1387
1388        // 3. 创建CloudWatch告警
1389        self.create_bedrock_cloudwatch_alarms(&sns_topic_arn).await?;
1390        info!("✅ CloudWatch告警创建成功");
1391
1392        // 4. 创建EventBridge规则
1393        let rule_arn = self.create_bedrock_eventbridge_rule().await?;
1394        info!("✅ EventBridge规则创建成功: {}", rule_arn);
1395
1396        // 5. 配置EventBridge到Lambda的触发器
1397        self.add_lambda_target_to_eventbridge().await?;
1398        info!("✅ EventBridge到Lambda触发器配置成功");
1399
1400        // 6. 配置SNS到Lambda的触发器
1401        self.configure_sns_lambda_trigger(&sns_topic_arn).await?;
1402        info!("✅ SNS到Lambda触发器配置成功");
1403
1404        // 7. 创建Lambda错误告警
1405        self.create_cloudwatch_alarm("bedrock-monitor-function").await?;
1406        info!("✅ Lambda错误告警创建成功");
1407
1408        info!("🎉 完整的Bedrock监控基础设施部署成功!");
1409        info!("📊 监控功能包括:");
1410        info!("   - 429错误检测和AK/SK自动关闭");
1411        info!("   - 8个CloudWatch告警");
1412        info!("   - EventBridge API调用监控");
1413        info!("   - SNS告警通知");
1414        info!("   - 实时CloudWatch日志记录");
1415
1416        Ok(())
1417    }
1418
1419    /// 更新Lambda函数环境变量
1420    pub async fn update_lambda_environment(&self) -> Result<()> {
1421        let function_name = "bedrock-monitor-function";
1422
1423        info!("更新Lambda函数环境变量: {}", function_name);
1424
1425        // 首先检查函数是否存在和可用
1426        match self.lambda_client.get_function_configuration()
1427            .function_name(function_name)
1428            .send()
1429            .await {
1430            Ok(function_config) => {
1431                info!("✅ Lambda函数存在,状态: {:?}", function_config.state());
1432
1433                // 检查函数状态
1434                match function_config.state() {
1435                    Some(aws_sdk_lambda::types::State::Pending) => {
1436                        warn!("⚠️ Lambda函数正在更新中,等待完成...");
1437                        // 等待函数状态稳定
1438                        tokio::time::sleep(tokio::time::Duration::from_secs(10)).await;
1439                    }
1440                    Some(aws_sdk_lambda::types::State::Active) => {
1441                        info!("✅ Lambda函数状态正常");
1442                    }
1443                    Some(aws_sdk_lambda::types::State::Failed) | Some(aws_sdk_lambda::types::State::Inactive) => {
1444                        return Err(anyhow!("❌ Lambda函数状态异常: {:?}", function_config.state()));
1445                    }
1446                    None => {
1447                        warn!("⚠️ 无法获取Lambda函数状态,继续尝试更新");
1448                    }
1449                    _ => {
1450                        warn!("⚠️ Lambda函数处于未知状态: {:?}, 继续尝试更新", function_config.state());
1451                    }
1452                }
1453            }
1454            Err(e) => {
1455                error!("❌ 无法获取Lambda函数配置: {}", e);
1456                return Err(anyhow!("Lambda函数不存在或无权限访问: {}", e));
1457            }
1458        }
1459
1460        let env_vars = HashMap::from([
1461            ("RUST_LOG".to_string(), "info".to_string()),
1462            ("BEDROCK_AUTO_CREDENTIAL_DISABLE".to_string(), "true".to_string()),
1463            ("BEDROCK_DRY_RUN_MODE".to_string(), "false".to_string()), // 关键:关闭试运行模式
1464            // 注意:AWS_REGION是Lambda保留变量,不能设置
1465        ]);
1466
1467        // 检查环境变量总大小(AWS Lambda限制为4KB)
1468        let total_size: usize = env_vars.iter()
1469            .map(|(k, v)| k.len() + v.len() + 2) // key + value + "=" separator
1470            .sum();
1471
1472        info!("环境变量总大小: {} bytes (限制: 4096 bytes)", total_size);
1473        if total_size > 4096 {
1474            return Err(anyhow!("环境变量总大小超过AWS Lambda限制(4KB): {} bytes", total_size));
1475        }
1476
1477        let environment = Environment::builder()
1478            .set_variables(Some(env_vars))
1479            .build();
1480
1481        info!("正在发送更新请求...");
1482        match self.lambda_client.update_function_configuration()
1483            .function_name(function_name)
1484            .environment(environment)
1485            .send()
1486            .await {
1487            Ok(response) => {
1488                info!("✅ Lambda环境变量更新请求已发送");
1489                info!("函数状态: {:?}", response.state());
1490                info!("最后更新时间: {:?}", response.last_modified());
1491
1492                // 等待配置更新生效
1493                info!("等待配置更新生效...");
1494                tokio::time::sleep(tokio::time::Duration::from_secs(10)).await;
1495
1496                // 验证更新是否成功
1497                match self.lambda_client.get_function_configuration()
1498                    .function_name(function_name)
1499                    .send()
1500                    .await {
1501                    Ok(updated_config) => {
1502                        if let Some(env) = updated_config.environment() {
1503                            if let Some(vars) = env.variables() {
1504                                info!("✅ 环境变量验证成功:");
1505                                for (key, value) in vars {
1506                                    info!("  {} = {}", key, value);
1507                                }
1508                            }
1509                        }
1510                        info!("✅ Lambda环境变量更新完成");
1511                    }
1512                    Err(e) => {
1513                        warn!("⚠️ 无法验证环境变量更新: {}", e);
1514                    }
1515                }
1516
1517                Ok(())
1518            }
1519            Err(e) => {
1520                let error_str = e.to_string();
1521                let raw_error = format!("{:?}", e);
1522
1523                // 首先检查环境变量是否已经正确设置
1524                match self.lambda_client.get_function_configuration()
1525                    .function_name(function_name)
1526                    .send()
1527                    .await {
1528                    Ok(config) => {
1529                        if let Some(env) = config.environment() {
1530                            if let Some(vars) = env.variables() {
1531                                let has_correct_vars = vars.get("BEDROCK_DRY_RUN_MODE") == Some(&"false".to_string()) &&
1532                                                       vars.get("BEDROCK_AUTO_CREDENTIAL_DISABLE") == Some(&"true".to_string()) &&
1533                                                       vars.get("RUST_LOG") == Some(&"info".to_string());
1534
1535                                if has_correct_vars {
1536                                    info!("✅ Lambda环境变量已正确设置,无需重复更新");
1537                                    if error_str.contains("ResourceConflictException") {
1538                                        info!("ℹ️ 检测到Lambda函数正在更新,但环境变量已正确配置");
1539                                    } else if error_str.contains("service error") {
1540                                        info!("ℹ️ 检测到AWS服务错误,但环境变量已正确配置");
1541                                    }
1542                                    return Ok(());
1543                                }
1544                            }
1545                        }
1546                    }
1547                    Err(_) => {
1548                        // 只有在无法获取配置时才显示警告
1549                        warn!("无法验证当前环境变量状态");
1550                    }
1551                }
1552
1553                // 如果环境变量未正确设置,才显示错误信息
1554                error!("❌ Lambda环境变量更新失败");
1555                error!("错误信息: {}", error_str);
1556                error!("原始错误: {}", raw_error);
1557
1558                // 提供更详细的错误分析
1559                if error_str.contains("ResourceConflictException") {
1560                    error!("❌ 资源冲突: Lambda函数正在被其他操作更新");
1561                } else if error_str.contains("InvalidParameterValueException") {
1562                    error!("❌ 参数无效: 环境变量格式或内容有问题");
1563                } else if error_str.contains("TooManyRequestsException") {
1564                    error!("❌ 请求过多: AWS API调用频率超限");
1565                } else if error_str.contains("AccessDeniedException") {
1566                    error!("❌ 权限不足: 无权限更新Lambda函数配置");
1567                } else if error_str.contains("ResourceNotFoundException") {
1568                    error!("❌ 资源未找到: Lambda函数不存在");
1569                } else if error_str.contains("service error") {
1570                    warn!("⚠️ AWS服务错误,可能环境变量已更新,将进行验证");
1571                    // 对于service error,检查是否环境变量已经正确设置
1572                    // 这会由上面的验证逻辑处理
1573                    return Err(anyhow!("更新Lambda环境变量失败: {}", e));
1574                } else {
1575                    error!("❌ 未知错误类型: {}", error_str);
1576                }
1577
1578                Err(anyhow!("更新Lambda环境变量失败: {}", e))
1579            }
1580        }
1581    }
1582
1583    /// 更新IAM角色权限以支持us-east-1
1584    pub async fn update_iam_credentials_policy(&self) -> Result<()> {
1585        let role_name = "lambda-bedrock-monitor-role";
1586
1587        info!("更新IAM角色AK/SK管理权限: {}", role_name);
1588
1589        // 支持所有区域的AK/SK管理权限
1590        let credential_management_policy = r#"{
1591            "Version": "2012-10-17",
1592            "Statement": [
1593                {
1594                    "Effect": "Allow",
1595                    "Action": [
1596                        "iam:UpdateAccessKey",
1597                        "iam:ListAccessKeys",
1598                        "iam:GetAccessKeyLastUsed",
1599                        "iam:GetUser"
1600                    ],
1601                    "Resource": "*"
1602                }
1603            ]
1604        }"#;
1605
1606        match self.iam_client.put_role_policy()
1607            .role_name(role_name)
1608            .policy_name("BedrockCredentialManagement")
1609            .policy_document(credential_management_policy.to_string())
1610            .send()
1611            .await {
1612            Ok(_) => {
1613                info!("✅ IAM角色AK/SK管理权限更新成功");
1614                Ok(())
1615            }
1616            Err(e) => {
1617                error!("❌ IAM角色AK/SK管理权限更新失败: {}", e);
1618                Err(anyhow!("更新IAM角色权限失败: {}", e))
1619            }
1620        }
1621    }
1622
1623    /// 修复AK/SK自动关闭功能
1624    pub async fn fix_ak_sk_auto_disable(&self) -> Result<()> {
1625        info!("🔧 开始修复AK/SK自动关闭功能...");
1626
1627        // 1. 更新IAM角色权限
1628        self.update_iam_credentials_policy().await?;
1629        info!("✅ IAM权限已更新支持多区域");
1630
1631        // 2. 更新Lambda环境变量
1632        self.update_lambda_environment().await?;
1633        info!("✅ Lambda环境变量已更新(关闭试运行模式)");
1634
1635        info!("🎉 AK/SK自动关闭功能修复完成!");
1636        info!("📊 修复内容:");
1637        info!("   - IAM权限现在支持所有区域");
1638        info!("   - BEDROCK_DRY_RUN_MODE=false(正式运行模式)");
1639        info!("   - BEDROCK_AUTO_CREDENTIAL_DISABLE=true(启用自动禁用)");
1640
1641        Ok(())
1642    }
1643}
1644
1645/// 多区域部署管理器
1646pub struct MultiRegionDeployer {
1647    regions: Vec<String>,
1648}
1649
1650impl MultiRegionDeployer {
1651    /// 从环境变量创建多区域部署器
1652    pub fn from_env() -> Result<Self> {
1653        dotenv::dotenv().ok();
1654
1655        let regions_str = std::env::var("AWS_REGION")
1656            .map_err(|_| anyhow!("❌ 错误: AWS_REGION 必须在.env文件中设置"))?;
1657
1658        let regions: Vec<String> = regions_str
1659            .split(',')
1660            .map(|s| s.trim().to_string())
1661            .filter(|s| !s.is_empty())
1662            .collect();
1663
1664        if regions.is_empty() {
1665            return Err(anyhow!("❌ 错误: AWS_REGION 不能为空"));
1666        }
1667
1668        info!("📍 配置的AWS区域: {}", regions.join(", "));
1669        info!("🌍 总计 {} 个区域", regions.len());
1670
1671        Ok(MultiRegionDeployer { regions })
1672    }
1673
1674    /// 在所有区域部署完整的监控堆栈
1675    pub async fn deploy_to_all_regions(&self) -> Result<Vec<RegionDeployResult>> {
1676        info!("🚀 开始在所有 {} 个区域部署Bedrock监控系统", self.regions.len());
1677
1678        let zip_content = create_lambda_zip()?;
1679        info!("✅ Lambda函数包创建完成,大小: {} bytes", zip_content.len());
1680
1681        let mut results = Vec::new();
1682        let mut success_count = 0;
1683        let mut failed_regions = Vec::new();
1684
1685        for (index, region) in self.regions.iter().enumerate() {
1686            info!("\n📍 [{}/{}] 部署到区域: {}", index + 1, self.regions.len(), region);
1687            info!("{}", "=".repeat(60));
1688
1689            let start_time = std::time::Instant::now();
1690            let result = self.deploy_to_single_region(region, &zip_content).await;
1691            let duration = start_time.elapsed();
1692
1693            match result {
1694                Ok(deploy_result) => {
1695                    success_count += 1;
1696                    info!("✅ 区域 {} 部署成功 (耗时: {:?})", region, duration);
1697                    results.push(RegionDeployResult {
1698                        region: region.clone(),
1699                        status: DeployStatus::Success,
1700                        duration,
1701                        details: deploy_result,
1702                    });
1703                }
1704                Err(e) => {
1705                    failed_regions.push((region.clone(), anyhow::anyhow!("{}", e)));
1706                    error!("❌ 区域 {} 部署失败 (耗时: {:?}): {}", region, duration, e);
1707                    results.push(RegionDeployResult {
1708                        region: region.clone(),
1709                        status: DeployStatus::Failed(e),
1710                        duration,
1711                        details: String::new(),
1712                    });
1713                }
1714            }
1715
1716            // 添加区域间延迟,避免AWS API限制
1717            if index < self.regions.len() - 1 {
1718                info!("⏳ 等待 3 秒后部署下一个区域...");
1719                tokio::time::sleep(tokio::time::Duration::from_secs(3)).await;
1720            }
1721        }
1722
1723        // 显示部署总结
1724        self.print_deployment_summary(&results, success_count, &failed_regions).await;
1725
1726        if success_count == 0 {
1727            return Err(anyhow!("❌ 所有区域部署失败"));
1728        } else if failed_regions.len() > 0 {
1729            warn!("⚠️ {} 个区域部署失败,请检查错误信息", failed_regions.len());
1730        }
1731
1732        Ok(results)
1733    }
1734
1735    /// 部署到单个区域
1736    async fn deploy_to_single_region(&self, region: &str, zip_content: &[u8]) -> Result<String> {
1737        let aws_manager = AwsManager::new(region).await?;
1738
1739        // 1. 部署完整的监控基础设施
1740        aws_manager.deploy_complete_monitoring_stack(zip_content.to_vec()).await?;
1741
1742        // 2. 修复AK/SK自动关闭功能
1743        aws_manager.fix_ak_sk_auto_disable().await?;
1744
1745        Ok(format!("区域 {} 部署完成", region))
1746    }
1747
1748    /// 打印部署总结
1749    async fn print_deployment_summary(&self, results: &[RegionDeployResult], success_count: usize, failed_regions: &[(String, anyhow::Error)]) {
1750        info!("\n{}", "=".repeat(80));
1751        info!("🎉 多区域部署完成总结");
1752        info!("{}", "=".repeat(80).as_str());
1753
1754        info!("📊 部署统计:");
1755        info!("   ✅ 成功区域: {}/{}", success_count, self.regions.len());
1756        info!("   ❌ 失败区域: {}/{}", failed_regions.len(), self.regions.len());
1757        info!("   ⏱️ 总耗时: {:?}",
1758            results.iter().map(|r| r.duration).sum::<std::time::Duration>());
1759
1760        if success_count > 0 {
1761            info!("\n✅ 成功部署的区域:");
1762            for result in results.iter().filter(|r| matches!(r.status, DeployStatus::Success)) {
1763                info!("   🌍 {} (耗时: {:?})", result.region, result.duration);
1764            }
1765        }
1766
1767        if !failed_regions.is_empty() {
1768            info!("\n❌ 失败的区域:");
1769            for (region, error) in failed_regions {
1770                info!("   🌍 {}: {}", region, error);
1771            }
1772        }
1773
1774        if success_count > 0 {
1775            info!("\n🚨 429检测和AK/SK自动关闭机制已激活:");
1776            info!("   🔍 监控范围: {} 个AWS区域", success_count);
1777            info!("   ⚡ 检测速度: 实时检测429错误");
1778            info!("   🔒 自动响应: 立即禁用相关AK/SK");
1779            info!("   📊 告警覆盖: 每个区域8个CloudWatch告警");
1780
1781            info!("\n📋 每个区域部署的组件:");
1782            info!("   ✅ Lambda函数: bedrock-monitor-function");
1783            info!("   ✅ SNS主题: bedrock-throttling-alerts");
1784            info!("   ✅ CloudWatch告警: 8个专业告警");
1785            info!("   ✅ EventBridge规则: API调用监控");
1786            info!("   ✅ 触发器配置: SNS+EventBridge");
1787            info!("   ✅ 日志系统: 标准日志+详细日志");
1788        }
1789
1790        info!("\n🔍 验证建议:");
1791        info!("   1. 检查各区域的CloudWatch告警状态");
1792        info!("   2. 查看Lambda函数日志输出");
1793        info!("   3. 监控Bedrock API调用情况");
1794        info!("   4. 测试429错误响应机制");
1795        info!("   5. 验证SNS告警通知系统");
1796    }
1797}
1798
1799/// 区域部署结果
1800#[derive(Debug)]
1801pub struct RegionDeployResult {
1802    pub region: String,
1803    pub status: DeployStatus,
1804    pub duration: std::time::Duration,
1805    pub details: String,
1806}
1807
1808/// 部署状态
1809#[derive(Debug)]
1810pub enum DeployStatus {
1811    Success,
1812    Failed(anyhow::Error),
1813}
1814
1815/// 部署状态检查器
1816impl MultiRegionDeployer {
1817    /// 检查所有区域的部署状态
1818    pub async fn check_deployment_status(&self) -> Result<Vec<RegionStatus>> {
1819        info!("🔍 开始检查所有 {} 个区域的部署状态", self.regions.len());
1820
1821        let mut region_statuses = Vec::new();
1822
1823        for (index, region) in self.regions.iter().enumerate() {
1824            info!("\n📍 [{}/{}] 检查区域: {}", index + 1, self.regions.len(), region);
1825
1826            let status = self.check_single_region_status(region).await;
1827            region_statuses.push(status);
1828
1829            // 添加区域间延迟,避免AWS API限制
1830            if index < self.regions.len() - 1 {
1831                tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;
1832            }
1833        }
1834
1835        self.print_status_summary(&region_statuses).await;
1836        Ok(region_statuses)
1837    }
1838
1839    /// 检查单个区域的部署状态
1840    async fn check_single_region_status(&self, region: &str) -> RegionStatus {
1841        let start_time = std::time::Instant::now();
1842        let mut status = RegionStatus {
1843            region: region.to_string(),
1844            lambda_function: false,
1845            sns_topic: false,
1846            eventbridge_rule: false,
1847            cloudwatch_alarms: 0,
1848            lambda_environment: false,
1849            iam_permissions: false,
1850            total_duration: std::time::Duration::default(),
1851            is_healthy: false,
1852        };
1853
1854        // 尝试创建AWS管理器
1855        if let Ok(aws_manager) = AwsManager::new(region).await {
1856            // 检查Lambda函数
1857            status.lambda_function = self.check_lambda_function(&aws_manager).await;
1858
1859            // 检查SNS主题
1860            status.sns_topic = self.check_sns_topic(&aws_manager).await;
1861
1862            // 检查EventBridge规则
1863            status.eventbridge_rule = self.check_eventbridge_rule(&aws_manager).await;
1864
1865            // 检查CloudWatch告警
1866            status.cloudwatch_alarms = self.count_cloudwatch_alarms(&aws_manager).await;
1867
1868            // 检查Lambda环境变量
1869            if status.lambda_function {
1870                status.lambda_environment = self.check_lambda_environment(&aws_manager).await;
1871            }
1872
1873            // 检查IAM权限
1874            status.iam_permissions = self.check_iam_permissions(&aws_manager).await;
1875
1876            // 计算整体健康状态
1877            status.is_healthy = status.lambda_function &&
1878                              status.sns_topic &&
1879                              status.eventbridge_rule &&
1880                              status.cloudwatch_alarms >= 5 && // 至少5个告警
1881                              status.lambda_environment &&
1882                              status.iam_permissions;
1883        }
1884
1885        status.total_duration = start_time.elapsed();
1886
1887        // 打印区域状态
1888        self.print_region_status(&status).await;
1889
1890        status
1891    }
1892
1893    /// 检查Lambda函数状态
1894    async fn check_lambda_function(&self, aws_manager: &AwsManager) -> bool {
1895        match aws_manager.lambda_client.get_function_configuration()
1896            .function_name("bedrock-monitor-function")
1897            .send()
1898            .await {
1899            Ok(config) => {
1900                let is_active = matches!(config.state(), Some(aws_sdk_lambda::types::State::Active));
1901                info!("   ✅ Lambda函数: 存在且状态: {:?}", config.state());
1902                is_active
1903            }
1904            Err(_) => {
1905                info!("   ❌ Lambda函数: 不存在或无法访问");
1906                false
1907            }
1908        }
1909    }
1910
1911    /// 检查SNS主题状态
1912    async fn check_sns_topic(&self, aws_manager: &AwsManager) -> bool {
1913        match aws_manager.sns_client.list_topics().send().await {
1914            Ok(response) => {
1915                let exists = response.topics()
1916                    .iter()
1917                    .any(|topic| topic.topic_arn()
1918                        .unwrap_or("")
1919                        .contains("bedrock-throttling-alerts"));
1920
1921                if exists {
1922                    info!("   ✅ SNS主题: bedrock-throttling-alerts 存在");
1923                } else {
1924                    info!("   ❌ SNS主题: bedrock-throttling-alerts 不存在");
1925                }
1926                exists
1927            }
1928            Err(_) => {
1929                info!("   ❌ SNS主题: 无法访问");
1930                false
1931            }
1932        }
1933    }
1934
1935    /// 检查EventBridge规则状态
1936    async fn check_eventbridge_rule(&self, aws_manager: &AwsManager) -> bool {
1937        match aws_manager.eventbridge_client.list_rules()
1938            .name_prefix("bedrock-api-monitor-rule")
1939            .send()
1940            .await {
1941            Ok(response) => {
1942                let exists = response.rules()
1943                    .iter()
1944                    .any(|rule| rule.name() == Some("bedrock-api-monitor-rule"));
1945
1946                if exists {
1947                    info!("   ✅ EventBridge规则: bedrock-api-monitor-rule 存在");
1948                } else {
1949                    info!("   ❌ EventBridge规则: bedrock-api-monitor-rule 不存在");
1950                }
1951                exists
1952            }
1953            Err(_) => {
1954                info!("   ❌ EventBridge规则: 无法访问");
1955                false
1956            }
1957        }
1958    }
1959
1960    /// 统计CloudWatch告警数量
1961    async fn count_cloudwatch_alarms(&self, aws_manager: &AwsManager) -> i32 {
1962        match aws_manager.cloudwatch_client.describe_alarms()
1963            .alarm_name_prefix("bedrock")
1964            .send()
1965            .await {
1966            Ok(response) => {
1967                let count = response.metric_alarms().len() as i32;
1968                info!("   📊 CloudWatch告警: {} 个", count);
1969                count
1970            }
1971            Err(_) => {
1972                info!("   ❌ CloudWatch告警: 无法访问");
1973                0
1974            }
1975        }
1976    }
1977
1978    /// 检查Lambda环境变量配置
1979    async fn check_lambda_environment(&self, aws_manager: &AwsManager) -> bool {
1980        match aws_manager.lambda_client.get_function_configuration()
1981            .function_name("bedrock-monitor-function")
1982            .send()
1983            .await {
1984            Ok(config) => {
1985                if let Some(env) = config.environment() {
1986                    if let Some(vars) = env.variables() {
1987                        let has_dry_run_false = vars.get("BEDROCK_DRY_RUN_MODE") == Some(&"false".to_string());
1988                        let has_auto_disable_true = vars.get("BEDROCK_AUTO_CREDENTIAL_DISABLE") == Some(&"true".to_string());
1989
1990                        if has_dry_run_false && has_auto_disable_true {
1991                            info!("   ✅ Lambda环境变量: 正确配置 (429检测已启用)");
1992                            return true;
1993                        } else {
1994                            info!("   ⚠️ Lambda环境变量: 配置不完整 (DRY_RUN={}, AUTO_DISABLE={})",
1995                                vars.get("BEDROCK_DRY_RUN_MODE").unwrap_or(&"未设置".to_string()),
1996                                vars.get("BEDROCK_AUTO_CREDENTIAL_DISABLE").unwrap_or(&"未设置".to_string()));
1997                        }
1998                    }
1999                }
2000                info!("   ❌ Lambda环境变量: 未正确配置");
2001                false
2002            }
2003            Err(_) => {
2004                info!("   ❌ Lambda环境变量: 无法检查");
2005                false
2006            }
2007        }
2008    }
2009
2010    /// 检查IAM权限
2011    async fn check_iam_permissions(&self, aws_manager: &AwsManager) -> bool {
2012        match aws_manager.iam_client.get_role()
2013            .role_name("lambda-bedrock-monitor-role")
2014            .send()
2015            .await {
2016            Ok(_) => {
2017                info!("   ✅ IAM权限: lambda-bedrock-monitor-role 存在");
2018                true
2019            }
2020            Err(_) => {
2021                info!("   ❌ IAM权限: lambda-bedrock-monitor-role 不存在");
2022                false
2023            }
2024        }
2025    }
2026
2027    /// 打印单个区域状态
2028    async fn print_region_status(&self, status: &RegionStatus) {
2029        let health_emoji = if status.is_healthy { "✅" } else { "❌" };
2030        info!("{} 区域 {} (耗时: {:?})", health_emoji, status.region, status.total_duration);
2031
2032        info!("   - Lambda函数: {}", if status.lambda_function { "✅" } else { "❌" });
2033        info!("   - SNS主题: {}", if status.sns_topic { "✅" } else { "❌" });
2034        info!("   - EventBridge规则: {}", if status.eventbridge_rule { "✅" } else { "❌" });
2035        info!("   - CloudWatch告警: {} 个 {}", if status.cloudwatch_alarms >= 5 { "✅" } else { "⚠️" }, status.cloudwatch_alarms);
2036        info!("   - 环境变量: {}", if status.lambda_environment { "✅" } else { "❌" });
2037        info!("   - IAM权限: {}", if status.iam_permissions { "✅" } else { "❌" });
2038
2039        if status.is_healthy {
2040            info!("   🚨 429 throttling限制: ✅ 已激活");
2041        } else {
2042            info!("   🚨 429 throttling限制: ❌ 未激活");
2043        }
2044    }
2045
2046    /// 打印状态总结
2047    async fn print_status_summary(&self, region_statuses: &[RegionStatus]) {
2048        info!("\n{}", "=".repeat(80));
2049        info!("🎉 多区域部署状态检查完成");
2050        info!("{}", "=".repeat(80));
2051
2052        let healthy_count = region_statuses.iter().filter(|r| r.is_healthy).count();
2053        let total_count = region_statuses.len();
2054
2055        info!("📊 部署状态统计:");
2056        info!("   ✅ 完全健康的区域: {}/{}", healthy_count, total_count);
2057        info!("   ❌ 有问题的区域: {}/{}", total_count - healthy_count, total_count);
2058
2059        // 详细统计
2060        let lambda_count = region_statuses.iter().filter(|r| r.lambda_function).count();
2061        let sns_count = region_statuses.iter().filter(|r| r.sns_topic).count();
2062        let eventbridge_count = region_statuses.iter().filter(|r| r.eventbridge_rule).count();
2063        let env_count = region_statuses.iter().filter(|r| r.lambda_environment).count();
2064        let iam_count = region_statuses.iter().filter(|r| r.iam_permissions).count();
2065
2066        info!("\n📋 组件部署统计:");
2067        info!("   ✅ Lambda函数: {}/{} 区域", lambda_count, total_count);
2068        info!("   ✅ SNS主题: {}/{} 区域", sns_count, total_count);
2069        info!("   ✅ EventBridge规则: {}/{} 区域", eventbridge_count, total_count);
2070        info!("   ✅ Lambda环境变量: {}/{} 区域", env_count, total_count);
2071        info!("   ✅ IAM权限: {}/{} 区域", iam_count, total_count);
2072
2073        // 成功区域列表
2074        if healthy_count > 0 {
2075            info!("\n✅ 完全部署成功的区域:");
2076            for status in region_statuses.iter().filter(|r| r.is_healthy) {
2077                info!("   🌍 {} (耗时: {:?})", status.region, status.total_duration);
2078            }
2079        }
2080
2081        // 失败区域列表
2082        if healthy_count < total_count {
2083            info!("\n❌ 需要修复的区域:");
2084            for status in region_statuses.iter().filter(|r| !r.is_healthy) {
2085                let issues = vec![
2086                    (!status.lambda_function, "Lambda函数"),
2087                    (!status.sns_topic, "SNS主题"),
2088                    (!status.eventbridge_rule, "EventBridge规则"),
2089                    (!status.lambda_environment, "环境变量"),
2090                    (!status.iam_permissions, "IAM权限"),
2091                ].into_iter()
2092                    .filter_map(|(missing, component)| if missing { Some(component) } else { None })
2093                    .collect::<Vec<_>>();
2094
2095                info!("   🌍 {}: 缺少 {}", status.region, issues.join(", "));
2096            }
2097        }
2098
2099        if healthy_count > 0 {
2100            info!("\n🚨 429 throttling限制状态:");
2101            info!("   ✅ 已激活区域: {}/{}", healthy_count, total_count);
2102            info!("   ⚡ 检测机制: 实时检测429错误");
2103            info!("   🔒 自动响应: 立即禁用相关AK/SK");
2104
2105            info!("\n💡 验证建议:");
2106            info!("   1. 在成功部署的区域测试Bedrock API调用");
2107            info!("   2. 检查CloudWatch告警是否正常触发");
2108            info!("   3. 验证SNS告警通知系统");
2109            info!("   4. 测试429错误AK/SK自动禁用功能");
2110        }
2111    }
2112}
2113
2114/// 区域状态信息
2115#[derive(Debug)]
2116pub struct RegionStatus {
2117    pub region: String,
2118    pub lambda_function: bool,
2119    pub sns_topic: bool,
2120    pub eventbridge_rule: bool,
2121    pub cloudwatch_alarms: i32,
2122    pub lambda_environment: bool,
2123    pub iam_permissions: bool,
2124    pub total_duration: std::time::Duration,
2125    pub is_healthy: bool,
2126}
2127
2128pub fn create_lambda_zip() -> Result<Vec<u8>> {
2129    use std::io::{Cursor, Write};
2130    use zip::write::FileOptions;
2131
2132    let mut buffer = Vec::new();
2133    {
2134        let mut zip = zip::ZipWriter::new(Cursor::new(&mut buffer));
2135        let options = FileOptions::default()
2136            .compression_method(zip::CompressionMethod::Deflated);
2137
2138        // Enhanced Python Lambda function that records detailed request and response information
2139        let python_content = r#"import json
2140import os
2141import time
2142import re
2143import hashlib
2144from datetime import datetime
2145
2146# Redis connection setup (已移除,现在使用CloudWatch日志)
2147# REDIS_HOST = os.environ.get('REDIS_HOST', 'localhost')
2148# REDIS_PORT = int(os.environ.get('REDIS_PORT', 6379))
2149# REDIS_PASSWORD = os.environ.get('REDIS_PASSWORD', None)
2150
2151# CloudWatch Logs setup
2152CLOUDWATCH_LOG_GROUP = '/aws/lambda/bedrock-monitor-function/detailed'
2153CLOUDWATCH_LOG_STREAM = f"detailed-logs-{int(time.time())}"
2154
2155# CloudWatch Logs client
2156try:
2157    import boto3
2158    cloudwatch_logs = boto3.client('logs')
2159    CLOUDWATCH_AVAILABLE = True
2160except ImportError:
2161    CLOUDWATCH_AVAILABLE = False
2162    print("Warning: boto3 not available, CloudWatch logging disabled")
2163
2164def get_log_stream_name():
2165    """Generate unique log stream name"""
2166    return CLOUDWATCH_LOG_STREAM
2167
2168def ensure_log_group_and_stream():
2169    """Ensure log group and stream exist"""
2170    if not CLOUDWATCH_AVAILABLE:
2171        return
2172
2173    try:
2174        # Create log group if it doesn't exist
2175        try:
2176            cloudwatch_logs.create_log_group(logGroupName=CLOUDWATCH_LOG_GROUP)
2177            print(f"Created CloudWatch log group: {CLOUDWATCH_LOG_GROUP}")
2178        except cloudwatch_logs.exceptions.ResourceAlreadyExistsException:
2179            pass  # Log group already exists
2180
2181        # Create log stream
2182        log_stream_name = get_log_stream_name()
2183        try:
2184            cloudwatch_logs.create_log_stream(
2185                logGroupName=CLOUDWATCH_LOG_GROUP,
2186                logStreamName=log_stream_name
2187            )
2188            print(f"Created CloudWatch log stream: {log_stream_name}")
2189        except cloudwatch_logs.exceptions.ResourceAlreadyExistsException:
2190            pass  # Log stream already exists
2191
2192    except Exception as e:
2193        print(f"Error setting up CloudWatch Logs: {e}")
2194
2195
2196def sanitize_data(data, level='partial'):
2197    """Sanitize sensitive data from request/response content"""
2198    if not isinstance(data, (dict, str)):
2199        return data
2200
2201    if isinstance(data, str):
2202        # Remove potential API keys, tokens, and sensitive patterns
2203        sanitized = data
2204
2205        # Remove AWS access keys
2206        sanitized = re.sub(r'AKIA[0-9A-Z]{16}', 'AKIAXXXXXXXXXXXXXXXX', sanitized)
2207
2208        # Remove potential tokens and secrets
2209        sanitized = re.sub(r'[Bb]earer\s+[A-Za-z0-9\-._~+\/]+=*', 'bearer XXXXXXXX', sanitized)
2210        sanitized = re.sub(r'[Tt]oken\s*[:=]\s*[A-Za-z0-9\-._~+\/]+=*', 'token: XXXXXXXX', sanitized)
2211
2212        # Remove email addresses
2213        sanitized = re.sub(r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b', 'email@domain.com', sanitized)
2214
2215        # Remove phone numbers (basic pattern)
2216        sanitized = re.sub(r'\b\d{3}[-.]?\d{3}[-.]?\d{4}\b', 'XXX-XXX-XXXX', sanitized)
2217
2218        # Remove credit card numbers (basic pattern)
2219        sanitized = re.sub(r'\b\d{4}[-\s]?\d{4}[-\s]?\d{4}[-\s]?\d{4}\b', 'XXXX-XXXX-XXXX-XXXX', sanitized)
2220
2221        # Remove IP addresses for privacy (optional, based on level)
2222        if level == 'full':
2223            sanitized = re.sub(r'\b(?:\d{1,3}\.){3}\d{1,3}\b', 'X.X.X.X', sanitized)
2224
2225        return sanitized
2226
2227    if isinstance(data, dict):
2228        sanitized_dict = {}
2229        for key, value in data.items():
2230            # Check for sensitive key patterns
2231            if any(pattern in key.lower() for pattern in
2232                   ['password', 'token', 'key', 'secret', 'auth', 'credential', 'session']):
2233                sanitized_dict[key] = 'XXXXXXXX'
2234            elif any(pattern in key.lower() for pattern in ['email', 'mail']):
2235                sanitized_dict[key] = 'email@domain.com'
2236            elif key.lower() in ['client_ip', 'source_ip'] and level == 'full':
2237                sanitized_dict[key] = 'X.X.X.X'
2238            else:
2239                sanitized_dict[key] = sanitize_data(value, level)
2240        return sanitized_dict
2241
2242    return data
2243
2244def should_log_detailed_content(status_code, event_type='unknown'):
2245    """Determine if we should log detailed content based on status and event type"""
2246    # Always log errors and throttling in detail
2247    if status_code in [429, 500, 502, 503, 504]:
2248        return 'full'
2249
2250    # Log SNS alarms in detail
2251    if event_type == 'CloudWatchAlarm':
2252        return 'full'
2253
2254    # For successful requests, log summary only
2255    if status_code == 200:
2256        return 'summary'
2257
2258    # Default to partial for other cases
2259    return 'partial'
2260
2261def log_to_cloudwatch_detailed(log_entry, log_level='INFO'):
2262    """Send detailed log entry to CloudWatch Logs"""
2263    if not CLOUDWATCH_AVAILABLE:
2264        print(f"CloudWatch unavailable, logging to console: {log_level} - {json.dumps(log_entry)}")
2265        return
2266
2267    try:
2268        timestamp = int(time.time() * 1000)
2269
2270        log_event = {
2271            'timestamp': timestamp,
2272            'message': json.dumps(log_entry, default=str)
2273        }
2274
2275        cloudwatch_logs.put_log_events(
2276            logGroupName=CLOUDWATCH_LOG_GROUP,
2277            logStreamName=get_log_stream_name(),
2278            logEvents=[log_event]
2279        )
2280
2281    except Exception as e:
2282        print(f"Failed to send to CloudWatch Logs: {e}")
2283        # Fallback to console logging
2284        print(f"{log_level} - {json.dumps(log_entry, default=str)}")
2285
2286def create_detailed_log_entry(event_data, request_data, response_data, status_code, log_level='summary'):
2287    """Create a detailed log entry with appropriate level of detail"""
2288    timestamp = datetime.now().isoformat()
2289
2290    # Base log entry with always-included fields
2291    log_entry = {
2292        'timestamp': timestamp,
2293        'status_code': status_code,
2294        'log_level': log_level,
2295        'request_id': request_data.get('request_id', 'unknown'),
2296        'model': request_data.get('model', 'unknown'),
2297        'client_ip': request_data.get('client_ip', 'unknown'),
2298        'aws_region': request_data.get('aws_region', 'unknown'),
2299        'event_type': event_data.get('event_type', 'unknown')
2300    }
2301
2302    # Add detailed information based on log level
2303    if log_level == 'full':
2304        # Include complete sanitized data
2305        log_entry.update({
2306            'full_request': sanitize_data(request_data, 'partial'),
2307            'full_response': sanitize_data(response_data, 'partial'),
2308            'error_details': request_data.get('error_details', {}),
2309            'user_agent': request_data.get('user_agent', 'unknown'),
2310            'caller_identity': request_data.get('caller_identity', {}),
2311            'request_parameters': sanitize_data(request_data.get('request_parameters', {}), 'partial'),
2312            'response_elements': sanitize_data(response_data.get('response_elements', {}), 'partial')
2313        })
2314    elif log_level == 'summary':
2315        # Include summary information only
2316        log_entry.update({
2317            'api_action': request_data.get('api_action', 'unknown'),
2318            'processing_time_ms': request_data.get('processing_time_ms', 0),
2319            'content_type': response_data.get('content_type', 'unknown'),
2320            'error_code': request_data.get('error_code', None)
2321        })
2322    else:  # partial
2323        # Include some detail but sanitize heavily
2324        log_entry.update({
2325            'api_action': request_data.get('api_action', 'unknown'),
2326            'partial_request': sanitize_data(request_data, 'full'),
2327            'error_code': request_data.get('error_code', None)
2328        })
2329
2330    return log_entry
2331
2332def handle_throttle_event(request_data):
2333    """处理429错误事件,立即禁用AK/SK(简化版,无Redis依赖)"""
2334    try:
2335        import boto3
2336        import os
2337
2338        # 检查是否为试运行模式
2339        dry_run_mode = os.environ.get('BEDROCK_DRY_RUN_MODE', 'false').lower() == 'true'
2340
2341        # 提取关键信息
2342        timestamp = request_data.get('timestamp', int(datetime.now().timestamp()))
2343        request_id = request_data.get('request_id', 'unknown')
2344        model_id = request_data.get('model', 'unknown')
2345        client_ip = request_data.get('client_ip', 'unknown')
2346        aws_region = request_data.get('aws_region', 'unknown')
2347
2348        # 从调用者身份中提取AK/SK信息
2349        caller_identity = request_data.get('caller_identity', {})
2350        username = caller_identity.get('userName', 'unknown')
2351        access_key_id = caller_identity.get('accessKeyId', 'unknown')
2352
2353        # 如果没有找到AK/SK,尝试从事件详情中获取
2354        if not access_key_id or access_key_id == 'unknown':
2355            event_details = request_data.get('event_details', {})
2356            access_key_id = event_details.get('accessKeyId', 'unknown')
2357            if not access_key_id or access_key_id == 'unknown':
2358                username = event_details.get('userName', 'unknown')
2359                access_key_id = event_details.get('accessKeyId', 'unknown')
2360
2361        print(f"🚨 429错误检测:")
2362        print(f"   - 时间: {datetime.fromtimestamp(timestamp).isoformat()}")
2363        print(f"   - 用户: {username}")
2364        print(f"   - AK/SK: {access_key_id}")
2365        print(f"   - 模型: {model_id}")
2366        print(f"   - 源IP: {client_ip}")
2367        print(f"   - 区域: {aws_region}")
2368        print(f"   - 模式: {'试运行' if dry_run_mode else '正式运行'}")
2369
2370        # 创建详细的事件记录
2371        disable_record = {
2372            "timestamp": datetime.fromtimestamp(timestamp).isoformat(),
2373            "username": username,
2374            "access_key_id": access_key_id,
2375            "model_id": model_id,
2376            "source_ip": client_ip,
2377            "request_id": request_id,
2378            "aws_region": aws_region,
2379            "event_type": "throttle_immediate_disable",
2380            "dry_run": dry_run_mode,
2381            "action_reason": "429_rate_limit_error"
2382        }
2383
2384        # 记录到CloudWatch日志
2385        print(f"📝 429错误记录: {json.dumps(disable_record)}")
2386
2387        # 如果无法获取必要信息,跳过禁用操作
2388        if access_key_id == 'unknown' or username == 'unknown':
2389            print(f"⚠️ 无法获取有效的用户或AK/SK信息,跳过禁用操作")
2390            return False
2391
2392        # 立即禁用AK/SK(无需Redis检查)
2393        if not dry_run_mode:
2394            try:
2395                # 初始化IAM客户端
2396                iam_client = boto3.client('iam')
2397
2398                # 调用IAM禁用访问密钥
2399                response = iam_client.update_access_key(
2400                    UserName=username,
2401                    AccessKeyId=access_key_id,
2402                    Status='Inactive'
2403                )
2404
2405                print(f"✅ 成功禁用AK/SK: {access_key_id}")
2406                print(f"🔒 用户 {username} 的访问密钥已被禁用")
2407
2408                # 更新记录
2409                disable_record.update({
2410                    "action": "access_key_disabled",
2411                    "status": "success",
2412                    "iam_response": str(response)
2413                })
2414
2415            except Exception as iam_error:
2416                print(f"❌ IAM禁用AK/SK失败: {iam_error}")
2417
2418                # 记录失败信息
2419                disable_record.update({
2420                    "action": "access_key_disable_failed",
2421                    "status": "error",
2422                    "error": str(iam_error)
2423                })
2424
2425                return False
2426        else:
2427            print(f"🧪 试运行模式:AK/SK {access_key_id} 将被禁用(未实际执行)")
2428            disable_record["action"] = "dry_run_would_disable"
2429
2430        # 最终记录到CloudWatch
2431        print(f"📋 最终操作记录: {json.dumps(disable_record)}")
2432
2433        return True
2434
2435    except Exception as e:
2436        print(f"❌ 处理429错误时发生异常: {e}")
2437        return False
2438
2439
2440def record_detailed_request(request_data):
2441    """记录详细的请求信息到CloudWatch日志(简化版,无Redis依赖)"""
2442    try:
2443        timestamp = request_data.get('timestamp', int(datetime.now().timestamp()))
2444        status = request_data.get('status_code', 0)
2445        model = request_data.get('model', 'unknown')
2446        client_ip = request_data.get('client_ip', 'unknown')
2447        request_id = request_data.get('request_id', 'unknown')
2448
2449        # 创建日志记录
2450        log_entry = {
2451            "timestamp": datetime.fromtimestamp(timestamp).isoformat(),
2452            "status_code": status,
2453            "model": model,
2454            "client_ip": client_ip,
2455            "request_id": request_id,
2456            "api_action": request_data.get('api_action', 'unknown'),
2457            "aws_region": request_data.get('aws_region', 'unknown'),
2458            "user_agent": request_data.get('user_agent', 'unknown'),
2459            "log_type": "bedrock_api_call"
2460        }
2461
2462        # 记录到CloudWatch日志
2463        print(f"📊 Bedrock API调用记录: {json.dumps(log_entry)}")
2464
2465        # 如果是429错误,立即处理AK/SK禁用
2466        if status == 429:
2467            handle_throttle_event(request_data)
2468
2469        return True
2470
2471    except Exception as e:
2472        print(f"❌ 记录请求信息失败: {e}")
2473        return False
2474
2475def record_basic_status(status_code, timestamp):
2476    """记录基本状态到CloudWatch日志(替代Redis记录)"""
2477    try:
2478        # 记录基本计数到CloudWatch日志
2479        log_entry = {
2480            "timestamp": datetime.fromtimestamp(timestamp).isoformat(),
2481            "status_code": status_code,
2482            "log_type": "bedrock_status_count"
2483        }
2484
2485        print(f"📈 Bedrock状态计数: {json.dumps(log_entry)}")
2486        return True
2487
2488    except Exception as e:
2489        print(f"❌ 记录状态失败: {e}")
2490        return False
2491
2492def extract_bedrock_details_from_event(event):
2493    """从EventBridge CloudTrail事件中提取详细的请求和响应信息"""
2494    try:
2495        if 'detail' not in event:
2496            return None
2497
2498        detail = event['detail']
2499
2500        # 提取请求信息
2501        request_params = detail.get('requestParameters', {})
2502        response_elements = detail.get('responseElements', {})
2503        user_identity = detail.get('userIdentity', {})
2504
2505        # 增强的429错误检测逻辑 - 处理responseElements可能为None的情况
2506        status_code = response_elements.get('httpStatusCode', 0) if response_elements else 0
2507        error_code = detail.get('errorCode', '')
2508        response_error = response_elements.get('error', '') if response_elements else ''
2509
2510        # 检查多个字段判断是否为429错误
2511        is_throttled = (
2512            status_code == 429 or
2513            error_code in ['ThrottlingException', 'ServiceQuotaExceededException'] or
2514            (isinstance(response_error, dict) and response_error.get('code') in ['ThrottlingException', 'ServiceQuotaExceededException']) or
2515            (isinstance(response_error, str) and ('ThrottlingException' in response_error or 'ServiceQuotaExceededException' in response_error))
2516        )
2517
2518        # 如果检测到429相关错误,强制设置status_code为429
2519        if is_throttled:
2520            status_code = 429
2521            print(f"🚨 检测到429/限流错误: status={status_code}, errorCode={error_code}, responseError={response_error}")
2522
2523        request_data = {
2524            'timestamp': int(datetime.now().timestamp()),
2525            'event_time': detail.get('eventTime', ''),
2526            'request_id': response_elements.get('requestId', detail.get('requestID', 'unknown')) if response_elements else detail.get('requestID', 'unknown'),
2527            'model': request_params.get('modelId', 'unknown'),
2528            'api_action': detail.get('eventName', 'unknown'),
2529            'aws_region': detail.get('awsRegion', ''),
2530            'status_code': status_code,  # 使用增强检测后的状态码
2531            'client_ip': detail.get('sourceIPAddress', 'unknown'),
2532            'user_agent': detail.get('userAgent', 'unknown'),
2533            'error_code': error_code,
2534            'content_type': response_elements.get('contentType', None) if response_elements else None,
2535            'caller_identity': {
2536                'account_id': user_identity.get('accountId', ''),
2537                'principal_id': user_identity.get('principalId', ''),
2538                'type': user_identity.get('type', ''),
2539                'arn': user_identity.get('arn', ''),
2540                'userName': user_identity.get('userName', ''),
2541                'accessKeyId': user_identity.get('accessKeyId', '')
2542            },
2543            'request_parameters': {
2544                'model_id': request_params.get('modelId', ''),
2545                # 注意:实际请求内容不会记录在CloudTrail中,只记录参数
2546            },
2547            'response_elements': {
2548                'http_status': response_elements.get('httpStatusCode', 0) if response_elements else 0,
2549                'content_type': response_elements.get('contentType', '') if response_elements else '',
2550                'request_id': response_elements.get('requestId', '') if response_elements else '',
2551                'error': response_elements.get('error', None) if response_elements else None
2552            },
2553            'raw_error_code': error_code,
2554            'raw_response_error': response_error,
2555            'is_throttled': is_throttled
2556        }
2557
2558        print(f"📋 提取的请求数据: status_code={status_code}, is_throttled={is_throttled}")
2559        print(f"📊 调试信息: errorCode={error_code}, responseError={response_error}")
2560
2561        return request_data
2562
2563    except Exception as e:
2564        print(f"❌ 从EventBridge事件提取详情时出错: {e}")
2565        return None
2566
2567def extract_bedrock_status_from_event(event):
2568    """Extract status code from EventBridge CloudTrail event (backwards compatibility)"""
2569    try:
2570        request_data = extract_bedrock_details_from_event(event)
2571        if request_data:
2572            return request_data['status_code']
2573        return None
2574    except Exception as e:
2575        print(f"Error extracting status from EventBridge event: {e}")
2576        return 200
2577
2578def handle_sns_alarm(event):
2579    """Handle CloudWatch alarm notifications from SNS"""
2580    try:
2581        print(f"🚨 处理CloudWatch SNS告警事件: {json.dumps(event)}")
2582
2583        # Extract SNS message
2584        if 'Records' not in event:
2585            print("❌ SNS事件中没有找到记录")
2586            return None
2587
2588        alarm_count = 0
2589        for record in event['Records']:
2590            if 'Sns' not in record:
2591                continue
2592
2593            sns_message = json.loads(record['Sns']['Message'])
2594            print(f"📋 SNS消息: {json.dumps(sns_message)}")
2595
2596            # Extract CloudWatch alarm information
2597            alarm_name = sns_message.get('AlarmName', 'unknown')
2598            alarm_arn = sns_message.get('AlarmArn', 'unknown')
2599            state_value = sns_message.get('NewStateValue', 'unknown')
2600            state_reason = sns_message.get('NewStateReason', 'unknown')
2601
2602            # Check if this is a 429-related alarm
2603            is_429_alarm = (
2604                '429' in alarm_name or
2605                'throttle' in alarm_name.lower() or
2606                'InvocationThrottles' in alarm_name or
2607                'Bedrock' in alarm_name
2608            )
2609
2610            # Extract alarm configuration details
2611            trigger_info = sns_message.get('Trigger', {})
2612            metric_name = trigger_info.get('MetricName', 'unknown')
2613            namespace = trigger_info.get('Namespace', 'unknown')
2614            threshold = trigger_info.get('Threshold', 0)
2615
2616            print(f"🔔 CloudWatch告警详情:")
2617            print(f"   - 名称: {alarm_name}")
2618            print(f"   - 状态: {state_value}")
2619            print(f"   - 原因: {state_reason}")
2620            print(f"   - 指标: {metric_name}")
2621            print(f"   - 命名空间: {namespace}")
2622            print(f"   - 阈值: {threshold}")
2623            print(f"   - 是否429相关: {is_429_alarm}")
2624
2625            # Create alarm data for logging
2626            alarm_data = {
2627                'timestamp': datetime.now().isoformat(),
2628                'alarm_name': alarm_name,
2629                'alarm_arn': alarm_arn,
2630                'state_value': state_value,
2631                'state_reason': state_reason,
2632                'region': sns_message.get('Region', os.environ.get('AWS_REGION', '')),
2633                'event_type': 'CloudWatchAlarm',
2634                'is_429_related': is_429_alarm,
2635                'alarm_configuration': {
2636                    'metric': metric_name,
2637                    'namespace': namespace,
2638                    'threshold': threshold,
2639                    'comparison_operator': trigger_info.get('ComparisonOperator', 'unknown'),
2640                    'evaluation_periods': trigger_info.get('EvaluationPeriods', 0)
2641                },
2642                'dimensions': trigger_info.get('Dimensions', []),
2643                'raw_sns_message': sns_message
2644            }
2645
2646            # Log to CloudWatch
2647            print(f"📝 记录CloudWatch告警: {json.dumps(alarm_data)}")
2648
2649            # If this is a 429 alarm in ALARM state, trigger immediate action
2650            if is_429_alarm and state_value == 'ALARM':
2651                print(f"🚨 检测到429相关告警触发!")
2652                print(f"   - 告警名称: {alarm_name}")
2653                print(f"   - 阈值: {threshold} 次")
2654                print(f"   - 状态: {state_value}")
2655
2656                # Create a mock 429 event to trigger the same logic as direct 429 detection
2657                mock_429_event = {
2658                    "timestamp": datetime.now().isoformat(),
2659                    "event_type": "SNS_Alarm_429_Detection",
2660                    "alarm_name": alarm_name,
2661                    "metric_value": threshold,
2662                    "threshold": threshold,
2663                    "state_reason": state_reason,
2664                    "trigger_source": "CloudWatch_Metric_Alarm"
2665                }
2666
2667                # Handle this as a 429 event (will trigger AK/SK disabling)
2668                handle_sns_triggered_429(mock_429_event)
2669
2670            alarm_count += 1
2671
2672        print(f"✅ 成功处理了 {alarm_count} 个SNS告警事件")
2673
2674        return {
2675            'status': 'success',
2676            'message': f'SNS告警处理成功,共处理 {alarm_count} 个告警',
2677            'alarms_processed': alarm_count,
2678            'recorded': True
2679        }
2680
2681    except Exception as e:
2682        print(f"❌ 处理SNS告警时出错: {e}")
2683        return {
2684            'status': 'error',
2685            'message': f'处理SNS告警失败: {str(e)}',
2686            'recorded': False
2687        }
2688
2689def handle_sns_triggered_429(alarm_event):
2690    """处理由SNS告警触发的429检测"""
2691    try:
2692        print(f"🔥 处理SNS告警触发的429事件: {json.dumps(alarm_event)}")
2693
2694        # Since this is a CloudWatch alarm, we don't have specific user/AK/SK info
2695        # But we can still log and potentially implement account-level policies
2696
2697        alarm_name = alarm_event.get('alarm_name', 'unknown')
2698        metric_value = alarm_event.get('metric_value', 0)
2699        threshold = alarm_event.get('threshold', 0)
2700
2701        # Create a log entry
2702        log_entry = {
2703            'timestamp': datetime.now().isoformat(),
2704            'event_type': 'SNS_Alarm_429_Response',
2705            'alarm_name': alarm_name,
2706            'metric_value': metric_value,
2707            'threshold': threshold,
2708            'state_reason': alarm_event.get('state_reason', 'CloudWatch alarm triggered'),
2709            'trigger_source': alarm_event.get('trigger_source', 'CloudWatch'),
2710            'action_taken': 'Alarm logged and monitored',
2711            'recommendation': 'Check recent Bedrock API usage and consider rate limiting'
2712        }
2713
2714        print(f"📝 SNS触发的429日志: {json.dumps(log_entry)}")
2715
2716        # Note: Since SNS alarms don't contain specific AK/SK information,
2717        # we can't disable specific credentials here.
2718        # However, we can implement account-level monitoring and policies
2719
2720        print(f"💡 注意: SNS告警不包含具体的AK/SK信息")
2721        print(f"   - 建议检查CloudTrail日志获取具体的用户信息")
2722        print(f"   - 考虑实施账户级别的限流策略")
2723
2724        return True
2725
2726    except Exception as e:
2727        print(f"❌ 处理SNS触发的429事件时出错: {e}")
2728        return False
2729
2730
2731def lambda_handler(event, context):
2732    try:
2733        print(f"Received event: {json.dumps(event)}")
2734
2735        # Initialize CloudWatch Logs if available
2736        if CLOUDWATCH_AVAILABLE:
2737            ensure_log_group_and_stream()
2738
2739        # Check if this is an SNS event (CloudWatch alarm)
2740        if 'Records' in event and len(event['Records']) > 0 and 'Sns' in event['Records'][0]:
2741            print("Detected SNS event, processing as CloudWatch alarm")
2742            sns_result = handle_sns_alarm(event)
2743            return {
2744                'statusCode': 200,
2745                'body': json.dumps(sns_result),
2746                'headers': {'Content-Type': 'application/json'}
2747            }
2748
2749        # 尝试从EventBridge事件中提取详细信息
2750        request_data = extract_bedrock_details_from_event(event)
2751        status_code = None
2752
2753        if request_data:
2754            status_code = request_data['status_code']
2755            print(f"Extracted from EventBridge - Status: {status_code}, Model: {request_data['model']}, Client IP: {request_data['client_ip']}")
2756        else:
2757            # 如果不是EventBridge事件,使用直接调用数据
2758            status_code = event.get('status_code', 200)
2759            request_data = {
2760                'timestamp': int(datetime.now().timestamp()),
2761                'request_id': event.get('request_id', context.aws_request_id if context else 'unknown'),
2762                'model': event.get('model_id', 'unknown'),
2763                'api_action': 'DirectInvoke',
2764                'aws_region': os.environ.get('AWS_REGION', ''),
2765                'status_code': status_code,
2766                'client_ip': event.get('client_ip', 'lambda-direct'),
2767                'user_agent': event.get('user_agent', 'lambda-function'),
2768                'error_code': event.get('error', None),
2769                'content_type': None,
2770                'caller_identity': {
2771                    'account_id': '',
2772                    'principal_id': 'lambda',
2773                    'type': 'Lambda',
2774                    'arn': context.invoked_function_arn if context else ''
2775                },
2776                'request_parameters': event.get('request_parameters', {}),
2777                'response_elements': event.get('response_elements', {})
2778            }
2779            print(f"Using direct call data - Status: {status_code}, Model: {request_data['model']}")
2780
2781        # Log the status (this goes to CloudWatch)
2782        print(f"Bedrock API Status: {status_code}")
2783
2784        # Determine logging level based on status and event type
2785        event_type = "CloudWatchAlarm" if 'Records' in event and 'Sns' in event.get('Records', [{}])[0] else ("EventBridge" if 'detail' in event else "Direct")
2786        log_level = should_log_detailed_content(status_code, event_type)
2787
2788        # Create response data for logging
2789        response_data = {
2790            'content_type': request_data.get('content_type'),
2791            'response_elements': request_data.get('response_elements', {}),
2792            'processing_time_ms': request_data.get('processing_time_ms', 0)
2793        }
2794
2795        # 记录详细信息(扩展到所有状态码,但使用不同的日志级别)
2796        print(f"Recording information for status {status_code} with log level: {log_level}")
2797
2798        # 记录到CloudWatch日志(简化版,无Redis依赖)
2799        if status_code in [200, 429]:
2800            record_detailed_request(request_data)
2801            # 记录基本计数(替代Redis)
2802            timestamp = int(datetime.now().timestamp())
2803            record_basic_status(status_code, timestamp)
2804        else:
2805            # 记录基本计数
2806            timestamp = int(datetime.now().timestamp())
2807            record_basic_status(status_code, timestamp)
2808
2809        # 🔥 关键修复:检测429错误并禁用AK/SK
2810        if status_code in [429] or request_data.get('error_code') in ['ThrottlingException', 'ServiceQuotaExceededException']:
2811            print(f"🚨 检测到429/Throttling错误!状态码: {status_code}, 错误码: {request_data.get('error_code')}")
2812            try:
2813                handle_throttle_event(request_data)
2814            except Exception as e:
2815                print(f"❌ 处理429事件失败: {e}")
2816        elif status_code == 200 and request_data.get('error_code') in ['ThrottlingException', 'ServiceQuotaExceededException']:
2817            print(f"🚨 检测到状态码200但包含Throttling错误!错误码: {request_data.get('error_code')}")
2818            try:
2819                handle_throttle_event(request_data)
2820            except Exception as e:
2821                print(f"❌ 处理Throttling事件失败: {e}")
2822
2823        # Log to CloudWatch Logs with appropriate detail level
2824        try:
2825            detailed_log_entry = create_detailed_log_entry(
2826                event_data={'event_type': event_type, 'raw_event': sanitize_data(event, 'full')},
2827                request_data=request_data,
2828                response_data=response_data,
2829                status_code=status_code,
2830                log_level=log_level
2831            )
2832
2833            log_to_cloudwatch_detailed(detailed_log_entry, 'INFO' if status_code == 200 else 'ERROR')
2834            print(f"Logged to CloudWatch with level: {log_level}")
2835
2836        except Exception as e:
2837            print(f"Failed to log to CloudWatch: {e}")
2838
2839        # Create appropriate response based on status code
2840        if status_code == 200:
2841            response_body = {
2842                "status": "success",
2843                "message": "Bedrock request completed successfully",
2844                "recorded": True,
2845                "event_type": "EventBridge" if 'detail' in event else "Direct",
2846                "request_id": request_data['request_id'],
2847                "model": request_data['model'],
2848                "client_ip": request_data['client_ip']
2849            }
2850        elif status_code == 429:
2851            response_body = {
2852                "status": "rate_limit",
2853                "message": "Bedrock rate limit exceeded",
2854                "recorded": True,
2855                "event_type": "EventBridge" if 'detail' in event else "Direct",
2856                "request_id": request_data['request_id'],
2857                "model": request_data['model'],
2858                "client_ip": request_data['client_ip']
2859            }
2860        elif status_code == 400:
2861            response_body = {
2862                "status": "bad_request",
2863                "message": "Bedrock bad request",
2864                "recorded": False,  # Not recorded per requirement
2865                "event_type": "EventBridge" if 'detail' in event else "Direct"
2866            }
2867        elif status_code == 500:
2868            response_body = {
2869                "status": "server_error",
2870                "message": "Bedrock server error",
2871                "recorded": False,  # Not recorded per requirement
2872                "event_type": "EventBridge" if 'detail' in event else "Direct"
2873            }
2874        else:
2875            response_body = {
2876                "status": "processed",
2877                "message": f"Bedrock status {status_code} received",
2878                "recorded": False,
2879                "event_type": "EventBridge" if 'detail' in event else "Direct"
2880            }
2881
2882        return {
2883            'statusCode': 200,  # Always return 200 for EventBridge
2884            'body': json.dumps(response_body)
2885        }
2886
2887    except Exception as e:
2888        print(f"Error processing Lambda function: {e}")
2889        return {
2890            'statusCode': 500,
2891            'body': json.dumps({
2892                'status': 'error',
2893                'message': f'Internal server error: {str(e)}',
2894                'recorded': False
2895            })
2896        }
2897"#;
2898
2899        zip.start_file("lambda_function.py", options)?;
2900        zip.write_all(python_content.as_bytes())?;
2901
2902        zip.finish()?;
2903    }
2904
2905    Ok(buffer)
2906}